In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [2]:
## Create SparkContext, SparkSession
from os.path import expanduser, join, abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs:///apps/hive/warehouse/'

spark = SparkSession \
    .builder \
    .appName("Machine Learning Example using Spark ML") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

In [4]:
##Verify Spark Session:
spark

In [5]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [6]:
import numpy as np
import StringIO
import pandas as pd
import warnings

In [7]:
# Plotting libraries

import matplotlib.pyplot as plt
import seaborn as sns

import plotly
plotly.tools.set_credentials_file(username='shubhkotal', api_key='V8cbLCj4d7YTNhqYM5CL')

import plotly.figure_factory as ff
import plotly.plotly as py
import plotly.offline as pyoff
import plotly.graph_objs as go

# Initializing some settings
sns.set_style('whitegrid')
sns.set(color_codes=True)
warnings.filterwarnings('ignore')
pyoff.init_notebook_mode(connected=True)
get_ipython().magic('matplotlib inline')

In [8]:
# Reading and create from local files
data=spark.read.format("csv")\
        .option("header", "true")\
        .option("inferSchema", "true")\
        .load("file:///home/2386B49/CustomerData.csv")

In [9]:
data.printSchema()

root
 |-- CustomerID: integer (nullable = true)
 |-- City: integer (nullable = true)
 |-- NoOfChildren: integer (nullable = true)
 |-- MinAgeOfChild: integer (nullable = true)
 |-- MaxAgeOfChild: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- FrquncyOfPurchase: integer (nullable = true)
 |-- NoOfUnitsPurchased: integer (nullable = true)
 |-- FrequencyOFPlay: integer (nullable = true)
 |-- NoOfGamesPlayed: integer (nullable = true)
 |-- NoOfGamesBought: integer (nullable = true)
 |-- FavoriteChannelOfTransaction: string (nullable = true)
 |-- FavoriteGame: string (nullable = true)
 |-- TotalRevenueGenerated: double (nullable = true)



In [10]:
data.dtypes

[('CustomerID', 'int'),
 ('City', 'int'),
 ('NoOfChildren', 'int'),
 ('MinAgeOfChild', 'int'),
 ('MaxAgeOfChild', 'int'),
 ('Tenure', 'int'),
 ('FrquncyOfPurchase', 'int'),
 ('NoOfUnitsPurchased', 'int'),
 ('FrequencyOFPlay', 'int'),
 ('NoOfGamesPlayed', 'int'),
 ('NoOfGamesBought', 'int'),
 ('FavoriteChannelOfTransaction', 'string'),
 ('FavoriteGame', 'string'),
 ('TotalRevenueGenerated', 'double')]

In [11]:
data.take(3)

[Row(CustomerID=1001, City=1, NoOfChildren=2, MinAgeOfChild=3, MaxAgeOfChild=8, Tenure=210, FrquncyOfPurchase=11, NoOfUnitsPurchased=11, FrequencyOFPlay=2344, NoOfGamesPlayed=108, NoOfGamesBought=10, FavoriteChannelOfTransaction=u'Uniform', FavoriteGame=u'Uniform', TotalRevenueGenerated=107.51),
 Row(CustomerID=1002, City=1, NoOfChildren=2, MinAgeOfChild=3, MaxAgeOfChild=6, Tenure=442, FrquncyOfPurchase=20, NoOfUnitsPurchased=20, FrequencyOFPlay=245, NoOfGamesPlayed=22, NoOfGamesBought=7, FavoriteChannelOfTransaction=u'Favorite', FavoriteGame=u'Uniform', TotalRevenueGenerated=382.4),
 Row(CustomerID=1003, City=1, NoOfChildren=4, MinAgeOfChild=3, MaxAgeOfChild=5, Tenure=424, FrquncyOfPurchase=18, NoOfUnitsPurchased=18, FrequencyOFPlay=1059, NoOfGamesPlayed=130, NoOfGamesBought=18, FavoriteChannelOfTransaction=u'Favorite', FavoriteGame=u'Uniform', TotalRevenueGenerated=135.01)]

In [12]:
data.show(3)

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|      1001|   1|           2|            3|            8|   210|               11|                11|           2344|            108|             10|                     Uniform|     Uniform|               107.51|
|      1002|   1|           2|            3|            6|   442|               20|                20|            245|             22|      

In [13]:
data.cache()

DataFrame[CustomerID: int, City: int, NoOfChildren: int, MinAgeOfChild: int, MaxAgeOfChild: int, Tenure: int, FrquncyOfPurchase: int, NoOfUnitsPurchased: int, FrequencyOFPlay: int, NoOfGamesPlayed: int, NoOfGamesBought: int, FavoriteChannelOfTransaction: string, FavoriteGame: string, TotalRevenueGenerated: double]

In [15]:
# Creating a list of categorical and numerical features:
cols = data.columns

categorical_Var_Names = ['City', 'FavoriteChannelOfTransaction', 'FavoriteGame']

numerical_Var_Names =  list(set(cols) - set(categorical_Var_Names))

for col in categorical_Var_Names:
    data = data.withColumn(col, data[col].cast("string"))

for col in numerical_Var_Names:
    data = data.withColumn(col, data[col].cast("double"))

In [16]:
## To Count the number of rows in DataFrame
print('Total records count is {}'.format(data.count()))
## Columns count and column names
print("Total Columns count is {}".format(len(data.columns)))
print("\n\nColumns are: {} \n".format(data.columns))

Total records count is 3209
Total Columns count is 14


Columns are: ['CustomerID', 'City', 'NoOfChildren', 'MinAgeOfChild', 'MaxAgeOfChild', 'Tenure', 'FrquncyOfPurchase', 'NoOfUnitsPurchased', 'FrequencyOFPlay', 'NoOfGamesPlayed', 'NoOfGamesBought', 'FavoriteChannelOfTransaction', 'FavoriteGame', 'TotalRevenueGenerated'] 



In [17]:
##Checking Summary:
data.describe().show()

+-------+----------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+----------------------------+------------+---------------------+
|summary|      CustomerID|              City|      NoOfChildren|     MinAgeOfChild|    MaxAgeOfChild|           Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|   FrequencyOFPlay|  NoOfGamesPlayed|   NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+-------+----------------+------------------+------------------+------------------+-----------------+-----------------+-----------------+------------------+------------------+-----------------+------------------+----------------------------+------------+---------------------+
|  count|            3209|              3209|              3209|              3209|             3209|             3209|             3209|              3209|             

In [18]:
data.dtypes

[('CustomerID', 'double'),
 ('City', 'string'),
 ('NoOfChildren', 'double'),
 ('MinAgeOfChild', 'double'),
 ('MaxAgeOfChild', 'double'),
 ('Tenure', 'double'),
 ('FrquncyOfPurchase', 'double'),
 ('NoOfUnitsPurchased', 'double'),
 ('FrequencyOFPlay', 'double'),
 ('NoOfGamesPlayed', 'double'),
 ('NoOfGamesBought', 'double'),
 ('FavoriteChannelOfTransaction', 'string'),
 ('FavoriteGame', 'string'),
 ('TotalRevenueGenerated', 'double')]

In [None]:
data=data.drop("CustomerID")

In [19]:
data.dtypes

[('CustomerID', 'double'),
 ('City', 'string'),
 ('NoOfChildren', 'double'),
 ('MinAgeOfChild', 'double'),
 ('MaxAgeOfChild', 'double'),
 ('Tenure', 'double'),
 ('FrquncyOfPurchase', 'double'),
 ('NoOfUnitsPurchased', 'double'),
 ('FrequencyOFPlay', 'double'),
 ('NoOfGamesPlayed', 'double'),
 ('NoOfGamesBought', 'double'),
 ('FavoriteChannelOfTransaction', 'string'),
 ('FavoriteGame', 'string'),
 ('TotalRevenueGenerated', 'double')]

In [20]:
data = data.na.drop( how = 'any' )
print('Before Dropping Null Values', data.count())
print('After Dropping Null Values', data.count())

('Before Dropping Null Values', 3209)
('After Dropping Null Values', 3209)


In [21]:
data.dtypes

[('CustomerID', 'double'),
 ('City', 'string'),
 ('NoOfChildren', 'double'),
 ('MinAgeOfChild', 'double'),
 ('MaxAgeOfChild', 'double'),
 ('Tenure', 'double'),
 ('FrquncyOfPurchase', 'double'),
 ('NoOfUnitsPurchased', 'double'),
 ('FrequencyOFPlay', 'double'),
 ('NoOfGamesPlayed', 'double'),
 ('NoOfGamesBought', 'double'),
 ('FavoriteChannelOfTransaction', 'string'),
 ('FavoriteGame', 'string'),
 ('TotalRevenueGenerated', 'double')]

In [22]:
data.dtypes

[('CustomerID', 'double'),
 ('City', 'string'),
 ('NoOfChildren', 'double'),
 ('MinAgeOfChild', 'double'),
 ('MaxAgeOfChild', 'double'),
 ('Tenure', 'double'),
 ('FrquncyOfPurchase', 'double'),
 ('NoOfUnitsPurchased', 'double'),
 ('FrequencyOFPlay', 'double'),
 ('NoOfGamesPlayed', 'double'),
 ('NoOfGamesBought', 'double'),
 ('FavoriteChannelOfTransaction', 'string'),
 ('FavoriteGame', 'string'),
 ('TotalRevenueGenerated', 'double')]

In [23]:
data.where((data['MinAgeOfChild']==113) | (data['MaxAgeOfChild']==113)).show()

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|    1571.0|   1|         2.0|          4.0|        113.0| 205.0|             17.0|              17.0|          158.0|           51.0|            8.0|                    Favorite|     Uniform|               218.85|
|    1585.0|   1|         2.0|          3.0|        113.0| 379.0|              6.0|               6.0|          242.0|           32.0|      

In [24]:
from pyspark.sql.functions import isnan, when, count, col, countDistinct
data = data.where((data['MinAgeOfChild'] != 113) & (data['MaxAgeOfChild'] != 113))

In [25]:
data.count()

3189

In [26]:
# Checking for null values at each column
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+
|         0|   0|           0|            0|            0|     0|                0|                 0|              0|              0|              0|                           0|           0|                    0|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+------

In [27]:
##Converting NA values to NUll values:

from pyspark.sql.functions import when 

for col in cols:
    data = data.withColumn(col, when(data[col]== "NA", None).otherwise(data[col]))

In [28]:
##Splitting Train and Test:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

In [29]:
numerical_Var_Names = list(set(numerical_Var_Names) - set(["TotalRevenueGenerated"]))

In [30]:
from pyspark.ml.feature import VectorAssembler

assembler_Num = VectorAssembler(inputCols=numerical_Var_Names, outputCol="num_features")

In [31]:

from pyspark.ml.feature import MinMaxScaler

min_Max_Scalar = MinMaxScaler(inputCol="num_features", outputCol="scaled_num_features")

In [34]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

indexers_Cat = [StringIndexer(inputCol=categorical_Var_Name, outputCol="{0}_index".format(categorical_Var_Name)) for categorical_Var_Name in categorical_Var_Names ]
encoders_Cat = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol="{0}_vec".format(indexer.getInputCol())) for indexer in indexers_Cat]
assembler_Cat = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders_Cat], outputCol="cat_features")

assembler = VectorAssembler(inputCols=["scaled_num_features","cat_features"], outputCol="features")

In [35]:
preprocessiong_Stages = [assembler_Num]+[min_Max_Scalar]+indexers_Cat+encoders_Cat+[assembler_Cat]+[assembler]

In [36]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(maxIter=100,labelCol="TotalRevenueGenerated", featuresCol="features")

In [37]:
# Adding Linear regression model to pipeline
from pyspark.ml import Pipeline

lr_Pipeline = Pipeline(stages=preprocessiong_Stages+[lr]) 

lr_Pipeline_model = lr_Pipeline.fit(trainingData)

In [38]:
print("Coefficients: " + str(lr_Pipeline_model.stages[-1].coefficients))
print("Intercept: " + str(lr_Pipeline_model.stages[-1].intercept))

Coefficients: [47.260443571136896,1002.2960041685042,57.49359752683924,1147.0727842455606,-31.549372062580392,34.498688131273184,-5.362432588019725,-1244.0090126426219,-1.8638342244960162,-16.027801800045975,-6.235764974451041,14.048519428357205,-10.54139549235645]
Intercept: 48.0417149475


In [39]:
train_predictions_lr = lr_Pipeline_model.transform(trainingData)

test_predictions_lr = lr_Pipeline_model.transform(testData)

In [41]:
test_predictions_lr.show(3)

+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+---------------+---------------+----------------------------+------------+---------------------+--------------------+--------------------+----------+----------------------------------+------------------+-------------+--------------------------------+----------------+-------------+--------------------+------------------+
|CustomerID|City|NoOfChildren|MinAgeOfChild|MaxAgeOfChild|Tenure|FrquncyOfPurchase|NoOfUnitsPurchased|FrequencyOFPlay|NoOfGamesPlayed|NoOfGamesBought|FavoriteChannelOfTransaction|FavoriteGame|TotalRevenueGenerated|        num_features| scaled_num_features|City_index|FavoriteChannelOfTransaction_index|FavoriteGame_index|     City_vec|FavoriteChannelOfTransaction_vec|FavoriteGame_vec| cat_features|            features|        prediction|
+----------+----+------------+-------------+-------------+------+-----------------+------------------+---------------+--

In [42]:
# Find the error metric - RMSE
from pyspark.ml.evaluation import RegressionEvaluator

eval = RegressionEvaluator(labelCol="TotalRevenueGenerated",
                            predictionCol="prediction",
                            metricName="rmse" )

In [45]:
##Checking the error metric:
lm_Train_rmse = eval.evaluate(train_predictions_lr)
print('RMSE value on Train is', lm_Train_rmse)

lm_Test_rmse = eval.evaluate(test_predictions_lr)
print('RMSE value on Test is', lm_Test_rmse)

('RMSE value on Train is', 41.36907526786541)
('RMSE value on Test is', 47.568862455797806)
