# Practice PySpark
## Week 5

In [1]:
sc

In [2]:
# Load the libraries
import os
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.sql import functions as f
from pyspark.sql.functions import udf, StringType
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer

# Initialize Spark Session

In [3]:
spark = SparkSession.builder.appName('deep_learning').getOrCreate()

# Read the Dataset

In [4]:
# Load the dl_data.csv into hadoop in the named folder 'user1'

data = spark.read.csv('/user1/dl_data.csv', header=True, inferSchema=True)

In [5]:
# Display the structure of schema
data.printSchema()

root
 |-- Visit_Number_Bucket: string (nullable = true)
 |-- Page_Views_Normalized: double (nullable = true)
 |-- Orders_Normalized: integer (nullable = true)
 |-- Internal_Search_Successful_Normalized: double (nullable = true)
 |-- Internal_Search_Null_Normalized: double (nullable = true)
 |-- Email_Signup_Normalized: double (nullable = true)
 |-- Total_Seconds_Spent_Normalized: double (nullable = true)
 |-- Store_Locator_Search_Normalized: double (nullable = true)
 |-- Mapped_Last_Touch_Channel: string (nullable = true)
 |-- Mapped_Mobile_Device_Type: string (nullable = true)
 |-- Mapped_Browser_Type: string (nullable = true)
 |-- Mapped_Entry_Pages: string (nullable = true)
 |-- Mapped_Site_Section: string (nullable = true)
 |-- Mapped_Promo_Code: string (nullable = true)
 |-- Maped_Product_Name: string (nullable = true)
 |-- Mapped_Search_Term: string (nullable = true)
 |-- Mapped_Product_Collection: string (nullable = true)



# Rename Target Column

In [6]:
# Renamed target column
data = data.withColumnRenamed('Orders_Normalized', 'label')

In [7]:
# Display the structure of the schema
data.printSchema()

root
 |-- Visit_Number_Bucket: string (nullable = true)
 |-- Page_Views_Normalized: double (nullable = true)
 |-- label: integer (nullable = true)
 |-- Internal_Search_Successful_Normalized: double (nullable = true)
 |-- Internal_Search_Null_Normalized: double (nullable = true)
 |-- Email_Signup_Normalized: double (nullable = true)
 |-- Total_Seconds_Spent_Normalized: double (nullable = true)
 |-- Store_Locator_Search_Normalized: double (nullable = true)
 |-- Mapped_Last_Touch_Channel: string (nullable = true)
 |-- Mapped_Mobile_Device_Type: string (nullable = true)
 |-- Mapped_Browser_Type: string (nullable = true)
 |-- Mapped_Entry_Pages: string (nullable = true)
 |-- Mapped_Site_Section: string (nullable = true)
 |-- Mapped_Promo_Code: string (nullable = true)
 |-- Maped_Product_Name: string (nullable = true)
 |-- Mapped_Search_Term: string (nullable = true)
 |-- Mapped_Product_Collection: string (nullable = true)



# Split the dataset into Train, Validation and Test

In [8]:
# split the data into three parts
train, validation, test  = data.randomSplit([0.7, 0.2, 0.1], 1234)

# Build Pipeline

In [9]:
categorical_columns = [item[0] for item in data.dtypes if item[1].startswith('string')]
numeric_columns = [item[0] for item in data.dtypes if item[1].startswith('double')]

print(categorical_columns, '\n', numeric_columns)

['Visit_Number_Bucket', 'Mapped_Last_Touch_Channel', 'Mapped_Mobile_Device_Type', 'Mapped_Browser_Type', 'Mapped_Entry_Pages', 'Mapped_Site_Section', 'Mapped_Promo_Code', 'Maped_Product_Name', 'Mapped_Search_Term', 'Mapped_Product_Collection'] 
 ['Page_Views_Normalized', 'Internal_Search_Successful_Normalized', 'Internal_Search_Null_Normalized', 'Email_Signup_Normalized', 'Total_Seconds_Spent_Normalized', 'Store_Locator_Search_Normalized']


In [10]:
indexers = [StringIndexer(inputCol=column, outputCol='{0}_index'.format(column)) for column in categorical_columns]

print(indexers)

[StringIndexer_0dc8d50638f1, StringIndexer_1a5545c803a7, StringIndexer_557944578db9, StringIndexer_bf916dc5675e, StringIndexer_f67fa5ee8203, StringIndexer_a31bfe392f1f, StringIndexer_4ac396b727da, StringIndexer_56e16f93bbe7, StringIndexer_ad85c00c1974, StringIndexer_505fe7cf5da0]


In [11]:
featuresCreator = VectorAssembler(inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns, outputCol="features")


In [12]:
# Declare the layers
layers = [len(featuresCreator.getInputCols()), 4, 2, 2]

# Delcare the object classifier
classifier = MultilayerPerceptronClassifier(labelCol='label', featuresCol='features', maxIter=100, layers=layers, blockSize=128, seed=1234)

In [13]:
pipeline = Pipeline(stages = indexers + [featuresCreator, classifier])

# Fit Pipeline

In [14]:
model = pipeline.fit(train)

                                                                                

# Get Pipeline Output

In [15]:
train_output_df = model.transform(train)
validation_output_df = model.transform(validation)
test_output_df = model.transform(test)

# Evaluate the Predictions

In [16]:
train_predictionAndLabels = train_output_df.select("prediction", "label")
validation_predictionAndLabels = validation_output_df.select("prediction", "label")
test_predictionAndLabels = test_output_df.select("prediction", "label")

metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']

for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(metricName = metric)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_predictionAndLabels)))
    print('Validation ' + metric + ' = ' + str(evaluator.evaluate(validation_predictionAndLabels)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_predictionAndLabels)))

                                                                                

Train weightedPrecision = 0.9701627897523112
Validation weightedPrecision = 0.9696512407261686
Test weightedPrecision = 0.9673657827607092




Train weightedRecall = 0.9697517544371221


                                                                                

Validation weightedRecall = 0.9692391452293454
Test weightedRecall = 0.9668117519042437




Train accuracy = 0.9697517544371221
Validation accuracy = 0.9692391452293453
Test accuracy = 0.9668117519042437


## Reference
* Learning PySpark, Pramod Singh, Apress, 2019.