# Example Machine Learning Using Spark

Big Data, 9/8/16

In [1]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
train = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('train.csv')
test = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('test.csv')

In [3]:
# Ananlyze Data

train.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [4]:
train.head(10)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
 Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
 Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057),
 Row(User_ID=1000002, Product_ID='P0

In [5]:
train.count()

550068

In [6]:
# Impute Missing Values

train.na.drop().count(),test.na.drop('any').count()

(166821, 71037)

In [7]:
train = train.fillna(-1)
test = test.fillna(-1)

In [8]:
# Take a look at numerical variables

train.describe().show()

+-------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+
|summary|           User_ID|       Occupation|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|          Purchase|
+-------+------------------+-----------------+-------------------+------------------+------------------+------------------+------------------+
|  count|            550068|           550068|             550068|            550068|            550068|            550068|            550068|
|   mean|1003028.8424013031|8.076706879876669|0.40965298835780306| 5.404270017525106| 6.419769919355425| 3.145214773446192| 9263.968712959126|
| stddev|1727.5915855308265|6.522660487341778| 0.4917701263173273|3.9362113692014082| 6.565109781181287| 6.681038828257762|5023.0653938206015|
|    min|           1000001|                0|                  0|                 1|                -1|                -1|                12|

In [9]:
# Sub Setting

train.select('User_ID').show()

+-------+
|User_ID|
+-------+
|1000001|
|1000001|
|1000001|
|1000001|
|1000002|
|1000003|
|1000004|
|1000004|
|1000004|
|1000005|
|1000005|
|1000005|
|1000005|
|1000005|
|1000006|
|1000006|
|1000006|
|1000006|
|1000007|
|1000008|
+-------+
only showing top 20 rows



In [10]:
# Take a look at categorical variables

train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count()

(3631, 3491)

In [11]:
# Let's check what are the categories for Product_ID, which are in test but not in train by applying subtract method.

diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))
diff_cat_in_train_test.distinct().count()# For distict count

46

In [12]:
# String Indexerhttps://spark.apache.org/docs/latest/ml-features.html#stringindexer
# Build labeller

from pyspark.ml.feature import StringIndexer
plan_indexer = StringIndexer(inputCol = 'Product_ID', outputCol = 'product_ID')
labeller = plan_indexer.fit(train)

In [13]:
# Transform our train and test Dataframe with the help of labeller

Train1 = labeller.transform(train)
Test1 = labeller.transform(test)

In [14]:
Train1.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_ID|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|     766.0|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|     183.0|
|1000001| P00087842|     F| 0-17|        10|            A|                         2|             0|          

In [15]:
# Build model

from pyspark.ml.feature import RFormula
formula = RFormula(formula="Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Gender",featuresCol="features",labelCol="label")

In [16]:
# Pass the data through the model

t1 = formula.fit(Train1)
train1 = t1.transform(Train1)
test1 = t1.transform(Test1)

In [17]:
train1.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------+--------------------+-------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_ID|            features|  label|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------+--------------------+-------+
|1000001| P00069042|     F| 0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|     766.0|(16,[6,10,13,14],...| 8370.0|
|1000001| P00248942|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|     1

In [18]:
# Look at the 2 extra columns we created with Rformula, they are transformed to numerical 

train1.select('features').show()
train1.select('label').show()

+--------------------+
|            features|
+--------------------+
|(16,[6,10,13,14],...|
|(16,[6,10,13,14],...|
|(16,[6,10,13,14],...|
|(16,[6,10,13,14],...|
|(16,[5,6,8,12,13,...|
|(16,[0,6,11,13,14...|
|(16,[3,6,7,10,13,...|
|(16,[3,6,7,10,13,...|
|(16,[3,6,7,10,13,...|
|(16,[0,6,9,13,14,...|
|(16,[0,6,9,13,14,...|
|(16,[0,6,9,13,14,...|
|(16,[0,6,9,13,14,...|
|(16,[0,6,9,13,14,...|
|(16,[4,6,9,13,14]...|
|(16,[4,6,9,13,14]...|
|(16,[4,6,9,13,14]...|
|(16,[4,6,9,13,14]...|
|(16,[1,6,7,9,13,1...|
|(16,[0,6,8,12,13,...|
+--------------------+
only showing top 20 rows

+-------+
|  label|
+-------+
| 8370.0|
|15200.0|
| 1422.0|
| 1057.0|
| 7969.0|
|15227.0|
|19215.0|
|15854.0|
|15686.0|
| 7871.0|
| 5254.0|
| 3957.0|
| 6073.0|
|15665.0|
| 5378.0|
| 2079.0|
|13055.0|
| 8851.0|
|11788.0|
|19614.0|
+-------+
only showing top 20 rows



In [19]:
# ML model, random forest regressor

from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor()

In [20]:
# Dividing datframe in 70% for train and 30% for test

(train_cv, test_cv) = train1.randomSplit([0.7, 0.3])

In [21]:
# Build model and make predictions

model1 = rf.fit(train_cv)
predictions = model1.transform(test_cv)

In [22]:
# Check for predictions it has prediction result for test_cv

predictions.head(5)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=-1, Product_Category_3=-1, Purchase=8370, product_ID=766.0, features=SparseVector(16, {6: 10.0, 10: 1.0, 13: 3.0, 14: -1.0}), label=8370.0, prediction=8839.176689079284),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200, product_ID=183.0, features=SparseVector(16, {6: 10.0, 10: 1.0, 13: 1.0, 14: 6.0}), label=15200.0, prediction=13251.89007989439),
 Row(User_ID=1000004, Product_ID='P00184942', Gender='M', Age='46-50', Occupation=7, City_Category='B', Stay_In_Current_City_Years='2', Marital_Status=1, Product_Category_1=1, Product_Category_2=8, Product_Category_3=17, Purchase=19215, product_ID=5.0, features=Sparse

In [23]:
# Evaluate model with mean squared error, 3811.4226156703771

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()
mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" })
import numpy as np
np.sqrt(mse), mse

(3829.194166912273, 14662727.967914976)

In [24]:
# Now make prediction on entire training set

model = rf.fit(train1)
predictions1 = model.transform(test1)

In [25]:
# Make data frame for submission

df = predictions1.selectExpr("User_ID as User_ID", "Product_ID as Product_ID", 'prediction as Purchase')

In [26]:
# Write file 

df.toPandas().to_csv('submission.csv')