In [28]:
from pyspark.sql import SparkSession
from pyspark import SQLContext
import json
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from datetime import datetime
import pincode2latlong
import gmaps
from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
import pandas

API_KEY = "AIzaSyB6B35hqOfJgm-yYqHrgM9TNH8Ug6x8m5s"

PAYMENT_MODES = ['COD', 'RAZOR_PAY_EMI','RENTOMOJO', 'JUSPAY', 'ZEST_MONEY']
PRODUCT_ID_MAPPING = {"98": 'MOBILE', "5": 'MATTRESS', "40": 'TABLE', "92": 'WASHING_MACHINE', "10": 'BED', "45": 'OFFICE', "50": 'CHAIR', "20": 'SOFA', "94": 'AC', "90": 'TV', "91": 'MICROWAVE', "60": 'STORAGE', "93": 'REFRIGERATOR', "30": 'DINING_TABLE_SET', "VP":"VIRTUAL", "00":"VIRTUAL"}
DELIVERY_STATUS_MAPPING = {"0":"UNSCHEDULED","1":"DELIVERED","2":"CANCELLED","3":"RETURNED","4":"OUT_FOR_DELIVERY","5":"FAKE_ORDER","6":"RETURN_REQUESTED","7":"SPOT_RETURN","8":"SCHEDULED","9":"EMI_PAYMENT_AWAITED","10":"VERIFICATION_PENDING"}
STATES = ['Karnataka', 'Haryana', 'Punjab', 'Goa', 'Kerala', 'Bihar', 'Tamil Nadu', 'Chandigarh', 'Jharkhand', 'Meghalaya', 'Delhi', 'Assam', 'Madhya Pradesh', 'Lakshadweep', 'Manipur', 'Rajasthan', 'Sikkim', 'West Bengal', 'Telangana', 'Andhra Pradesh', 'Daman & Diu', 'Himachal Pradesh', 'Andaman & Nicobar Islands', 'Nagaland', 'Gujarat', 'Arunachal Pradesh', 'Maharashtra', 'Tripura', 'Uttarakhand', 'Pondicherry', 'Jammu & Kashmir', 'Mizoram', 'Odisha', 'Chattisgarh', 'Uttar Pradesh']
CATEGORIES = ['MOBILE','MATTRESS', 'TABLE', 'WASHING_MACHINE', 'BED', 'OFFICE', 'CHAIR', 'SOFA', 'AC', 'TV', 'MICROWAVE', 'STORAGE', 'REFRIGERATOR', 'DINING_TABLE_SET',"VIRTUAL"]
spark = SparkSession \
    .builder \
    .appName("cohort") \
    .config("spark.some.config.option", "delivery") \
    .getOrCreate()

file_path = "/home/marino/Desktop/deliv*.csv"

def get_month_from_datetime(datetime_str):
    return datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S').strftime("%B")[:3]

def get_category_from_pid(pid):
    if pid[:2] in PRODUCT_ID_MAPPING.keys():
        return PRODUCT_ID_MAPPING.get(pid[:2])
    elif pid[:1] == "5":
        return "MATTRESS"
    else:
        return "OTHERS"

def get_delivery_status_from_id(id):
    return DELIVERY_STATUS_MAPPING.get(id, "UNKNOWN")

date_to_month_udf = udf(lambda z: get_month_from_datetime(z), StringType())
get_category_from_pid_udf = udf(lambda z: get_category_from_pid(z), StringType())
get_delivery_status_udf = udf(lambda z: get_delivery_status_from_id(z), StringType())

source_df  = spark.read.format("csv").option("header", "true").load(file_path)

In [21]:
source_df = source_df.select("whProductId", "orderDateTime","payment","pinCode","city","deliveryStatus")
all_orders = source_df.filter(source_df.payment.isin(PAYMENT_MODES)) 
all_orders.cache()
all_orders = all_orders.withColumn("category", get_category_from_pid_udf(all_orders.whProductId)).withColumn("deliveryStatus", get_delivery_status_udf(all_orders.deliveryStatus)).withColumn("month", date_to_month_udf(all_orders.orderDateTime))
fake_orders = all_orders.filter(all_orders.deliveryStatus.isin(["FAKE_ORDER"]))
virtual_products = all_orders.filter(all_orders.category ==  "VIRTUAL")
delivered_orders = all_orders.filter(all_orders.deliveryStatus.isin(["DELIVERED"]) == True).filter(all_orders.category != "VIRTUAL")
pincodes= delivered_orders.select("pinCode").rdd.map(lambda x: x["pinCode"]).collect()
fake_order_pincodes= fake_orders.select("pinCode").rdd.map(lambda x: x["pinCode"]).collect()

In [12]:
locations = pincode2latlong.findLatLong(pincodes, None, [])
fake_order_locations = pincode2latlong.findLatLong(fake_order_pincodes, None, [])


# All India

In [44]:
gmaps.configure(api_key=API_KEY)
fig = gmaps.figure(display_errors=True, layout={'width': '600px', 'height': '700px'})
map_layer = gmaps.heatmap_layer(locations, max_intensity=6, point_radius=5,dissipating=True)
fig.add_layer(map_layer)
fig

Figure(layout=FigureLayout(height=u'700px', width=u'600px'))

# Bangalore

In [7]:
gmaps.configure(api_key=API_KEY)
fig = gmaps.figure(display_errors=True, layout={'width': '600px', 'height': '600px'}, zoom_level=10, center=(12.908136, 77.647606))
map_layer = gmaps.heatmap_layer(locations, max_intensity=10, point_radius=10)
fig.add_layer(map_layer)
fig


Figure(layout=FigureLayout(height=u'600px', width=u'600px'))

# Interactive maps

In [22]:
def drawMap(state):
    locations = pincode2latlong.findLatLong(pincodes, state, [])
    gmaps.configure(api_key=API_KEY)
    fig = gmaps.figure(display_errors=True, layout={'width': '600px', 'height': '600px'})
    map_layer = gmaps.heatmap_layer(locations, max_intensity=5, point_radius=3)
    fig.add_layer(map_layer)
    display(fig)

heatmap = interactive(drawMap, state=STATES)
heatmap

aW50ZXJhY3RpdmUoY2hpbGRyZW49KERyb3Bkb3duKGRlc2NyaXB0aW9uPXUnc3RhdGUnLCBvcHRpb25zPShOb25lLCAnS2FybmF0YWthJywgJ0hhcnlhbmEnLCAnUHVuamFiJywgJ0dvYScsICfigKY=


# Rest of India

In [29]:
roi_locations = pincode2latlong.findLatLong(pincodes, None, ["Karnataka", "Delhi", "Maharashtra", "Telangana"])
gmaps.configure(api_key=API_KEY)
fig = gmaps.figure(display_errors=True, layout={'width': '600px', 'height': '600px'})
map_layer = gmaps.heatmap_layer(roi_locations, max_intensity=5, point_radius=3)
fig.add_layer(map_layer)
fig


set(['Pondicherry', 'Haryana', 'Punjab', 'Goa', 'Kerala', 'Dadra & Nagar Haveli', 'Bihar', 'Tamil Nadu', 'Chandigarh', 'Jharkhand', 'Meghalaya', 'Delhi', 'Assam', 'Madhya Pradesh', 'Lakshadweep', 'Manipur', 'Rajasthan', 'Sikkim', 'West Bengal', 'Telangana', 'Andhra Pradesh', 'Daman & Diu', 'Himachal Pradesh', 'Andaman & Nicobar Islands', 'Nagaland', 'Gujarat', 'Arunachal Pradesh', 'Maharashtra', 'Tripura', 'Uttarakhand', 'Karnataka', 'Jammu & Kashmir', 'Mizoram', 'Odisha', 'Chattisgarh', 'Uttar Pradesh'])


Figure(layout=FigureLayout(height=u'600px', width=u'600px'))

In [19]:
def drawMap(category):
    pincodes= delivered_orders.filter(delivered_orders.category == category).select("pinCode").rdd.map(lambda x: x["pinCode"]).collect()
    locations = pincode2latlong.findLatLong(pincodes, None, [])
    gmaps.configure(api_key=API_KEY)
    fig = gmaps.figure(display_errors=True, layout={'width': '600px', 'height': '600px'})
    map_layer = gmaps.heatmap_layer(locations, max_intensity=10, point_radius=8)
    fig.add_layer(map_layer)
    display(fig)

heatmap = interactive(drawMap, category=CATEGORIES)
heatmap

aW50ZXJhY3RpdmUoY2hpbGRyZW49KERyb3Bkb3duKGRlc2NyaXB0aW9uPXUnY2F0ZWdvcnknLCBvcHRpb25zPShOb25lLCAnTU9CSUxFJywgJ01BVFRSRVNTJywgJ1RBQkxFJywgJ1dBU0hJTkfigKY=


# Fake Orders

In [7]:
def drawMap(state):
    locations = pincode2latlong.findLatLong(fake_order_pincodes, state, [])
    gmaps.configure(api_key=API_KEY)
    fig = gmaps.figure(display_errors=True, layout={'width': '600px', 'height': '600px'})
    map_layer = gmaps.heatmap_layer(locations, max_intensity=5, point_radius=3)
    fig.add_layer(map_layer)
    display(fig)

heatmap = interactive(drawMap, state=STATES)
heatmap

aW50ZXJhY3RpdmUoY2hpbGRyZW49KERyb3Bkb3duKGRlc2NyaXB0aW9uPXUnc3RhdGUnLCBvcHRpb25zPShOb25lLCAnS2FybmF0YWthJywgJ0hhcnlhbmEnLCAnUHVuamFiJywgJ0dvYScsICfigKY=


In [15]:
display(delivered_orders.toPandas().head(5))

Unnamed: 0,whProductId,orderDateTime,payment,pinCode,city,deliveryStatus,category,month
0,10-240518-014,2018-06-01 08:07:49,COD,560102,BLR,DELIVERED,BED,Jun
1,60-IARBA_190518-001,2018-06-01 08:25:52,ZEST_MONEY,110059,DELHI,DELIVERED,STORAGE,Jun
2,10-ANCJW_210418-001,2018-06-01 08:30:55,RAZOR_PAY_EMI,400063,MUM,DELIVERED,BED,Jun
3,10-240518-010,2018-06-01 09:09:26,COD,560008,BLR,DELIVERED,BED,Jun
4,10-100518-014,2018-06-01 10:15:06,COD,560016,BLR,DELIVERED,BED,Jun


In [17]:
all_orders.count()

226168

In [26]:
display(source_df[source_df.city == "BLR"].toPandas().head(10))

Unnamed: 0,whProductId,orderDateTime,payment,pinCode,city,deliveryStatus
0,98-BMQHV_180418-001,2018-06-01 06:26:24,ZEST_MONEY,560037,BLR,2
1,00-extended_buyback_mob_3-00,2018-06-01 06:26:24,ZEST_MONEY,560037,BLR,2
2,10-240518-014,2018-06-01 08:07:49,COD,560102,BLR,1
3,VP_CFA3BE5CF42EF6EDDEB5E1A5DF4407CB,2018-06-01 08:07:49,COD,560102,BLR,1
4,00-bangalore-00,2018-06-01 08:07:49,COD,560102,BLR,1
5,30-180518-004,2018-06-01 08:54:52,RAZOR_PAY,560016,BLR,1
6,VP_FA3A25272F531921884D3574CB936723,2018-06-01 08:54:52,RAZOR_PAY,560016,BLR,1
7,60-HDZTB_170518-001,2018-06-01 08:55:56,COD,560102,BLR,2
8,60-220518-015,2018-06-01 08:55:56,COD,560102,BLR,2
9,10-240518-010,2018-06-01 09:09:26,COD,560008,BLR,1


In [55]:
display(delivered_orders.groupBy(delivered_orders.payment).count().take(20))

[Row(payment=u'COD', count=55891),
 Row(payment=u'RAZOR_PAY_EMI', count=3245),
 Row(payment=u'RENTOMOJO', count=2181),
 Row(payment=u'JUSPAY', count=4134),
 Row(payment=u'ZEST_MONEY', count=2540)]

In [29]:
def drawMap(payment_mode, category, state):         
    pincodes= delivered_orders.filter(delivered_orders.category == category).filter(delivered_orders.payment == payment_mode).filter(delivered_orders.state == state).select("pinCode").rdd.map(lambda x: x["pinCode"]).collect()
    locations = pincode2latlong.findLatLong(pincodes, state, [])
    gmaps.configure(api_key=API_KEY)
    fig = gmaps.figure(display_errors=True, layout={'width': '600px', 'height': '600px'})
    map_layer = gmaps.heatmap_layer(locations, max_intensity=7, point_radius=5)
    fig.add_layer(map_layer)
    display(fig)

heatmap = interactive(drawMap, payment_mode=PAYMENT_MODES, category=CATEGORIES, state=STATES)
heatmap

aW50ZXJhY3RpdmUoY2hpbGRyZW49KERyb3Bkb3duKGRlc2NyaXB0aW9uPXUncGF5bWVudF9tb2RlJywgb3B0aW9ucz0oJ0NPRCcsICdSQVpPUl9QQVlfRU1JJywgJ1JFTlRPTU9KTycsICdKVVPigKY=


In [17]:
delivered_orders.head()

Row(whProductId=u'10-240518-014', orderDateTime=u'2018-06-01 08:07:49', payment=u'COD', pinCode=u'560102', city=u'BLR', deliveryStatus=u'DELIVERED', category=u'BED', month=u'Jun')