# Data Cleaning and Preprocessing

- find spark

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-google').getOrCreate()
import pandas as pd

from pyspark.sql.functions import *
from pyspark.sql.types import *

from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler, Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

In [2]:
df = spark.read.csv('train_v2.csv',quote=None,escape='"',header=True)

In [3]:
df.printSchema()

root
 |-- channelGrouping: string (nullable = true)
 |-- customDimensions: string (nullable = true)
 |-- date: string (nullable = true)
 |-- device: string (nullable = true)
 |-- fullVisitorId: string (nullable = true)
 |-- geoNetwork: string (nullable = true)
 |-- hits: string (nullable = true)
 |-- socialEngagementType: string (nullable = true)
 |-- totals: string (nullable = true)
 |-- trafficSource: string (nullable = true)
 |-- visitId: string (nullable = true)
 |-- visitNumber: string (nullable = true)
 |-- visitStartTime: string (nullable = true)



In [4]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
channelGrouping,Organic Search,Referral,Direct,Organic Search,Organic Search
customDimensions,"[{'index': '4', 'value': 'EMEA'}]","[{'index': '4', 'value': 'North America'}]","[{'index': '4', 'value': 'North America'}]","[{'index': '4', 'value': 'EMEA'}]","[{'index': '4', 'value': 'Central America'}]"
date,20171016,20171016,20171016,20171016,20171016
device,"{""browser"": ""Firefox"", ""browserVersion"": ""not ...","{""browser"": ""Chrome"", ""browserVersion"": ""not a...","{""browser"": ""Chrome"", ""browserVersion"": ""not a...","{""browser"": ""Chrome"", ""browserVersion"": ""not a...","{""browser"": ""Chrome"", ""browserVersion"": ""not a..."
fullVisitorId,3162355547410993243,8934116514970143966,7992466427990357681,9075655783635761930,6960673291025684308
geoNetwork,"{""continent"": ""Europe"", ""subContinent"": ""Weste...","{""continent"": ""Americas"", ""subContinent"": ""Nor...","{""continent"": ""Americas"", ""subContinent"": ""Nor...","{""continent"": ""Asia"", ""subContinent"": ""Western...","{""continent"": ""Americas"", ""subContinent"": ""Cen..."
hits,"[{'hitNumber': '1', 'time': '0', 'hour': '17',...","[{'hitNumber': '1', 'time': '0', 'hour': '10',...","[{'hitNumber': '1', 'time': '0', 'hour': '17',...","[{'hitNumber': '1', 'time': '0', 'hour': '9', ...","[{'hitNumber': '1', 'time': '0', 'hour': '14',..."
socialEngagementType,Not Socially Engaged,Not Socially Engaged,Not Socially Engaged,Not Socially Engaged,Not Socially Engaged
totals,"{""visits"": ""1"", ""hits"": ""1"", ""pageviews"": ""1"",...","{""visits"": ""1"", ""hits"": ""2"", ""pageviews"": ""2"",...","{""visits"": ""1"", ""hits"": ""2"", ""pageviews"": ""2"",...","{""visits"": ""1"", ""hits"": ""2"", ""pageviews"": ""2"",...","{""visits"": ""1"", ""hits"": ""2"", ""pageviews"": ""2"",..."
trafficSource,"{""campaign"": ""(not set)"", ""source"": ""google"", ...","{""referralPath"": ""/a/google.com/transportation...","{""campaign"": ""(not set)"", ""source"": ""(direct)""...","{""campaign"": ""(not set)"", ""source"": ""google"", ...","{""campaign"": ""(not set)"", ""source"": ""google"", ..."


From the preview of head 5 samples, we could find that there are 6 columns include aggregate data. Now our next target is clean and preprocess the data.

In [5]:
df = df.withColumn('customDimensions',regexp_replace('customDimensions',"""([\[]{1})""",''))\
.withColumn('customDimensions',regexp_replace('customDimensions',"(]{1})", ""))\
.withColumn('customDimensions',regexp_replace('customDimensions',"'", "\""))\
.withColumn('hits',regexp_replace('hits',"(^\[*)",''))\
.withColumn('hits',regexp_replace('hits',"(]*$)", ''))\
.withColumn('hits',regexp_replace('hits',"'", "\""))\
.withColumn('hits',regexp_replace('hits','True','"True"'))

In [6]:
## Data in some columns are with .json format and they need transformation
df = df.select('fullVisitorId',
               get_json_object('hits', '$.hitNumber').alias('hitNumber').cast(IntegerType()),
               get_json_object('hits', '$.time').alias('time').cast(IntegerType()),
               get_json_object('hits', '$.hour').alias('hour').cast(IntegerType()),
               get_json_object('hits', '$.minute').alias('minute').cast(IntegerType()),
               get_json_object('hits', '$.isInteraction').alias('isInteraction'),
               get_json_object('hits', '$.isEntrance').alias('isEntrance'),
               get_json_object('hits', '$.isExit').alias('isExit'),
               get_json_object('hits', '$.promotionActionInfo').alias("promotionActionInfo"),
               get_json_object('totals','$.visits').alias('visits').cast(IntegerType()),
               get_json_object('totals','$.hits').alias('hits').cast(IntegerType()),
               get_json_object('totals','$.pageviews').alias('pageviews').cast(IntegerType()),
               get_json_object('totals','$.timeOnSite').alias('timeOnSite').cast(IntegerType()),
               get_json_object('totals','$.newVisits').alias('newVisits').cast(IntegerType()),
               get_json_object('totals','$.transactions').alias("transactions").cast(IntegerType()),
               get_json_object('totals','$.transactionRevenue').alias("transactionRevenue").cast(IntegerType()),
               get_json_object('totals','$.totalTransactionRevenue').alias('totalTransactionRevenue').cast(IntegerType()),
               get_json_object('totals','$.sessionQualityDim').alias("sessionQualityDim").cast(IntegerType()),
               'visitNumber')

In [7]:
df = df.withColumn('promotionActionInfo',  when(df.promotionActionInfo.isNotNull(), 1).otherwise(0))\
.withColumn('transactions', when(df.transactions.isNotNull(), 1).otherwise(0))\
.withColumn('isInteraction', when(df.isInteraction=='True', 1).otherwise(0))\
.withColumn('isEntrance', when(df.isEntrance=='True', 1).otherwise(0))\
.withColumn('isExit', when(df.isExit=='True', 1).otherwise(0))

In [8]:
df = df.na.fill({'fullVisitorId':0, 'hitNumber':0, 'time':0, 'hour':0, 'minute':0, 'visits':0, 'hits':0, 'pageviews':0,'timeOnSite':0, 'newVisits':0,'transactions':0, 'transactionRevenue':0, 'totalTransactionRevenue':0,
                         'sessionQualityDim':0, 'visitNumber':0})

In [9]:
df = df.withColumn('visitNumber', df.visitNumber.cast(IntegerType()))

In [10]:
pd.DataFrame(df.take(5), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
fullVisitorId,3162355547410993243,8934116514970143966,7992466427990357681,9075655783635761930,6960673291025684308
hitNumber,1,1,1,1,1
time,0,0,0,0,0
hour,17,10,17,9,14
minute,0,51,53,4,49
isInteraction,1,1,1,1,1
isEntrance,1,1,1,1,1
isExit,1,0,0,0,0
promotionActionInfo,0,1,1,1,1
visits,1,1,1,1,1


In [11]:
df.printSchema()

root
 |-- fullVisitorId: string (nullable = false)
 |-- hitNumber: integer (nullable = false)
 |-- time: integer (nullable = false)
 |-- hour: integer (nullable = false)
 |-- minute: integer (nullable = false)
 |-- isInteraction: integer (nullable = false)
 |-- isEntrance: integer (nullable = false)
 |-- isExit: integer (nullable = false)
 |-- promotionActionInfo: integer (nullable = false)
 |-- visits: integer (nullable = false)
 |-- hits: integer (nullable = false)
 |-- pageviews: integer (nullable = false)
 |-- timeOnSite: integer (nullable = false)
 |-- newVisits: integer (nullable = false)
 |-- transactions: integer (nullable = false)
 |-- transactionRevenue: integer (nullable = false)
 |-- totalTransactionRevenue: integer (nullable = false)
 |-- sessionQualityDim: integer (nullable = false)
 |-- visitNumber: integer (nullable = true)



In [12]:
df.describe(['totalTransactionRevenue']).show()

+-------+-----------------------+
|summary|totalTransactionRevenue|
+-------+-----------------------+
|  count|                1708337|
|   mean|      1280770.813955326|
| stddev|    2.439825683230373E7|
|    min|                      0|
|    max|             2120880000|
+-------+-----------------------+



Since the mean of total revenue is 1280770, we assume that the customer who created revenue at top 20% will make the revenue above 2049232. 

In [13]:
df = df.withColumn('valuable_customer', when(df.totalTransactionRevenue > 2049232, 1).otherwise(0))

In [14]:
df = df.select('fullVisitorId', 'hitNumber', 
               'time', 'hour', 
               'minute', 'isInteraction', 
               'isEntrance', 'isExit', 
               'promotionActionInfo',
               'visits','hits', 
               'pageviews', 'timeOnSite', 
               'newVisits', 'transactions', 
               'sessionQualityDim', 'visitNumber', 
               'valuable_customer')

# Using Logistic Regression Algorithm for valuable customer/less valuable customer group prediction

In [15]:
sample_df = df.sample(False,0.005, 3)

In [16]:
sample_df.count()

8560

In [17]:
cols = sample_df.columns

In [18]:
categoricalColumns = ['hitNumber', 'isInteraction', 
                         'isEntrance', 'isExit', 'promotionActionInfo','newVisits', 
                         'transactions']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

In [19]:
label_stringIdx = StringIndexer(inputCol = 'valuable_customer', outputCol = 'label')
stages += [label_stringIdx]
numericCols = [ 'time', 'hour', 'minute','visits','hits', 'pageviews', 'timeOnSite','sessionQualityDim', 'visitNumber']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(sample_df)
sample_df = pipelineModel.transform(sample_df)
selectedCols = ['label', 'features'] + cols
sample_df = sample_df.select(selectedCols)
sample_df.printSchema()

In [None]:
pd.DataFrame(sample_df.take(5), columns=sample_df.columns).transpose()

In [None]:
train, test = sample_df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [None]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [None]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

In [None]:
pr = trainingSummary.pr.toPandas()
plt.plot(pr['recall'],pr['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

In [None]:
predictions = lrModel.transform(test)
predictions.show(10)