In [1]:
sc._jsc.hadoopConfiguration().set(
  "fs.azure.account.key.sankalpstoragesapient.blob.core.windows.net",
  "QTxaNp0ti9TOLjVGIZy3Ftx8xGoq/thTy/WdzDuBzgoCwbqdg5jIqQt6OqnE5DUrF0g01Zr5jrRtEO0WVAYrxg==")
 
spark.conf.set(
  "fs.azure.account.key.sankalpstoragesapient.blob.core.windows.net",
  "QTxaNp0ti9TOLjVGIZy3Ftx8xGoq/thTy/WdzDuBzgoCwbqdg5jIqQt6OqnE5DUrF0g01Zr5jrRtEO0WVAYrxg==")
 

In [2]:
#import required libraries
import numpy as np
import matplotlib.pyplot as plt
import pyspark
import re
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from datetime import datetime
import os
from IPython.display import Image
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier

In [3]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('page_imp_italy').getOrCreate()

In [4]:
#load dataset into spark dataframe
df = spark.read.csv("wasbs://hashed-italy@sankalpstoragesapient.blob.core.windows.net/seven/2018-09-10/*", header=True,sep='\t')

In [5]:
#load the page mapping file
page_mapping_uri=spark.read.csv("wasbs://hashed-italy@sankalpstoragesapient.blob.core.windows.net/pagename_category.csv",header=True)

In [6]:
#create page mapping dictionary 
dict_pagename_mapping = {row['PageName']:row['Category'] for row in page_mapping_uri.select('PageName','Category').collect()}


In [7]:
def udf_wrapper(returntype):
    def udf_func(func):
        return udf(func, returnType=returntype)
    return udf_func

  
@udf_wrapper(StringType())
def get_page_category(page_type):
    '''
    based on the page_type return page category
    '''
    if page_type is None:
        return None
    if page_type == 'null':
        return None
    if page_type in dict_pagename_mapping.keys():
        return dict_pagename_mapping[page_type]
    else:
        return None

In [8]:
#create an additional column to add page category in dataframe
df=df.withColumn('page_category', get_page_category(col('post_pagename')))

In [9]:
#page category count
page_category_count=df.groupBy('page_category').count().orderBy('count',ascending=False)

In [10]:
#save the category count back to storage 
page_category_count.coalesce(1).write.format("csv").option("header","true").save("wasbs://hashed-italy@sankalpstoragesapient.blob.core.windows.net/pagecategory_count.csv")

In [11]:
#number of records in the data 
df.count()

In [12]:
#number of unique visitors
df.select('masked_mcvisid').distinct().count()

In [13]:
#number of actual test drive booking 
df.filter(col('page_category')=='test_drive').select('masked_mcvisid').distinct().count()

In [14]:
#number of actual test drive booking 
df.filter(col('page_category')=='your_vehicle').select('masked_mcvisid').distinct().count()

In [15]:
#number of records for each user 
df_agg=df.groupBy('masked_mcvisid').count().orderBy('count',ascending=False)

In [16]:
#filter outliers with high or less number of activities 
df_filter=df_agg.filter(df_agg['count'] < 10000).filter(df_agg['count'] > 2)

In [17]:
#genuine visitor count
df_filter.count()

In [18]:
#filter genuine visitors only for propensity model
df_processed = df.join(df_filter, on='masked_mcvisid', how='inner')

In [19]:
#data cleaning 
df_processed = df_processed.filter(col('hit_source') == '1') \
                    .filter(col('exclude_hit') == '0') \
                    .filter(col('page_category') != '')

In [20]:
# sessionization
TIME_OUT = 30 # 30 minutes of timeout


@udf_wrapper(IntegerType())
def get_event_boundary(time_diff):
    
    if(time_diff is None):
      return 0
    else:
        if(time_diff > TIME_OUT):
            return 1
        else:
            return 0

@udf_wrapper(FloatType())
def set_default_page_time(time_spent):
    '''
    based on the time spent, if if
    '''
    if(time_spent is None):
        return 0.08 # 5 seconds 
    else:
        return time_spent

In [21]:
def conversion(page_category):
        if page_category == 'your_vehicle':
            return 1
        else : 
            return 0
    
conversion_udf = udf(conversion, IntegerType())

In [22]:
ts_pattern_1 = 'yyyy-MM-dd HH:mm:ss'
date_pattern_1 = 'yyyy-MM-dd'

df_processed = df_processed \
                     .withColumn('date_time_ts',unix_timestamp(col('date_time'), ts_pattern_1).cast('timestamp'))\
                     .withColumn('conversion_status', conversion_udf('page_category'))

In [23]:
    
# Sessionization
w0 = Window.partitionBy(col('masked_mcvisid'))

w1 = Window.partitionBy(col('masked_mcvisid')).orderBy(col('date_time_ts'))

w2 = Window.partitionBy(col('masked_mcvisid'),col('session_id')).orderBy(col('date_time_ts').desc())

w3 = Window.partitionBy(col('masked_mcvisid'),col('session_id')).orderBy(col('date_time_ts'))

df_processed  = df_processed \
                   .withColumn('is_converted', max(col('conversion_status')).over(w0)) \
                   .withColumn('prev', lag(col('date_time_ts'),1).over(w1)) \
                   .withColumn('time_diff',((col('date_time_ts').cast('long') - col('prev').cast('long'))/60.0)) \
                   .withColumn('new_event_boundary',get_event_boundary(col('time_diff'))) \
                   .withColumn('session_id', sum(col('new_event_boundary')).over(w1)) \
                   .drop('prev','time_diff','new_event_boundary')

                    
# hit order
df_processed = df_processed.withColumn('hit_rank_reversed', dense_rank().over(w2))



#Time Spent                      
df_processed = df_processed\
                    .withColumn('next', lead(col('date_time_ts'),1).over(w3)) \
                    .withColumn('time_diff',((col('next').cast('long') - col('date_time_ts').cast('long'))/60.0)) \
                    .withColumn('time_spent_in_mins',set_default_page_time(col('time_diff'))) \
                    .drop('next','time_diff')


In [24]:
df_final=df_processed.select('masked_mcvisid','page_category','time_spent_in_mins','is_converted')

In [25]:
#saving dataframe as cacheTable
df_final.createOrReplaceTempView('df_hits_view')
spark.catalog.cacheTable('df_hits_view')
df_hits = spark.table("df_hits_view")

In [26]:
df_hits.select('masked_mcvisid','is_converted').distinct().count()

In [27]:
data_final=df_hits.filter((df_hits['is_converted']=='1') |  (df_hits['is_converted']=='0'))

In [28]:
#Ensuring no null values for pagename
data_final=data_final.filter(data_final['page_category'] !='null')

In [29]:
#unique visitors and conversion status dataframe to be used later
pos_users=data_final.select(['masked_mcvisid','is_converted']).distinct()

In [30]:
#creating list of unique pages 
page_list=data_final.select('page_category').distinct().collect()

#creating an list of equal size of distinct pages to map and replace with pagenames in dataset
page_id=list(range(1,(len(page_list)+1)))

#converting the page id list into strings to be used as columns later
page_id_str=list(map(str, page_id))


#creating the dictionary with page id mapped to pagename
page_dict = {x[0]: y for x,y in zip(page_list, page_id_str)}

In [31]:
#creating a udf to map the url pages to url ids
def page_map(page):
    return page_dict[page]

#implementing the udf and adding a new column (url status) to the datset
page_udf = udf(page_map, StringType())
finaldata= data_final.withColumn('page_status',page_udf(col('page_category')))

aggregations = []
aggregations.append(count(col('page_status')).alias('page_count'))

#creating a new dataset with aggregated values to be used in machine learning model
df_test=finaldata \
        .groupBy(['masked_mcvisid','page_status']) \
        .agg(*aggregations)

In [32]:
#creating a processed dataset with each url id as a new column for each unique visitor id
data_reshaped=df_test.groupBy('masked_mcvisid').pivot('page_status').sum('page_count').fillna(0)

In [33]:
data_reshaped.show(5)

In [34]:
final_model_data = data_reshaped.join(pos_users, on='masked_mcvisid', how='inner')

In [35]:
page_dict 

In [36]:
vehicle_dict_value=page_dict['your_vehicle']

In [37]:
excluded_columns=['masked_mcvisid','is_converted',vehicle_dict_value]

In [38]:
model_inputs=[col for col in final_model_data.columns if col not in excluded_columns]

In [39]:
#Machine Learning Model function

def page_imp_model(model_inputs,data):
    
    
    #creating the assemblerfor input variables 
    assembler=VectorAssembler(inputCols=model_inputs,outputCol='features')


    #creating dense vector represntation of input variables
    output=assembler.transform(final_model_data)

    #declaring the input dense vector and output variable
    data=output.select('features','is_converted')

    #build and train the ML model
    rfc=RandomForestClassifier(labelCol='is_converted',featuresCol='features')


    #fit the model on training data 
    rf_model=rfc.fit(data)
    
    #map the page ids back to pagenames
    feats={}
    for feature,importance in zip(model_inputs,rf_model.featureImportances):
        feats[feature]=importance


    results=sorted(feats.items(),key=lambda x:x[1],reverse=True)

    page_dict_rev = {y:x for x,y in page_dict.items()}

    #create dataframe with page and importance
    final_results_page=[(page_dict_rev[key],str(imp)) for key,imp in results]
    final_results_page_df=spark.createDataFrame(final_results_page,['page_category','importance'])
    return final_results_page_df



In [40]:
#run the model 
page_results=page_imp_model(model_inputs,final_model_data)

In [41]:
#top results 
page_results.show(100,False)

In [42]:
#create dataframe with page counts 
page_counts=data_final.groupBy('page_category').count().orderBy('count',ascending=True)

In [43]:
#create dataframe with total time spent per page category
page_time_spent=data_final.groupBy('page_category').agg({"time_spent_in_mins": "sum"}).alias("total_time_spent")

In [44]:
#merge category count dataframe with total time spent per page category dataframe
df_merge_1 = page_results.join(page_time_spent, on='page_category', how='inner')

In [45]:
#merge with final results dataframe 
df_merge=df_merge_1.join(page_counts ,on='page_category', how='inner')

In [46]:
df_merge.show(50,False)

In [47]:
#save final results back to storage 
df_merge.coalesce(1).write.format("csv").option("header","true").save("wasbs://hashed-italy@sankalpstoragesapient.blob.core.windows.net/page_importance_configurator.csv")