In [1]:
# Import spark session
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("BlackFriday") \
      .getOrCreate() 

#### Load dataset

In [12]:
df_train = spark.read.csv("data/train.csv", header=True, inferSchema=True)

In [15]:
df_test = spark.read.csv("data/test.csv", header=True, inferSchema=True)

In [16]:
df_train.count(), df_test.count()

(550068, 233599)

In [17]:
df_train.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [18]:
type(df_train)

pyspark.sql.dataframe.DataFrame

In [19]:
df_train.head(10)

[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370),
 Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),
 Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422),
 Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057),
 Row(User_ID=1000002, Product_ID='P0

#### Missing values

In [20]:
df_train.na.drop().count(),df_test.na.drop('any').count()


(166821, 71037)

In [21]:
df_train = df_train.fillna(-1)
df_test = df_test.fillna(-1)

### EDA

#### Summary

In [23]:
df_train.describe().show()

+-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|        Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|            550068|       550068|                    550068|             550068|            550068|            550068|            550068|           550068|
|   mean|1003028.8424013031|      null|  null|  null| 8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 6.419769919

In [24]:
df_train.select('User_ID', 'Gender').show()

+-------+------+
|User_ID|Gender|
+-------+------+
|1000001|     F|
|1000001|     F|
|1000001|     F|
|1000001|     F|
|1000002|     M|
|1000003|     M|
|1000004|     M|
|1000004|     M|
|1000004|     M|
|1000005|     M|
|1000005|     M|
|1000005|     M|
|1000005|     M|
|1000005|     M|
|1000006|     F|
|1000006|     F|
|1000006|     F|
|1000006|     F|
|1000007|     M|
|1000008|     M|
+-------+------+
only showing top 20 rows



##### Categorical variables

In [27]:
df_train.select('Product_ID').distinct().count(), df_test.select('Product_ID').distinct().count()


(3631, 3491)

In [30]:
diff_cat_in_train_test=df_test.select('Product_ID').subtract(df_train.select('Product_ID'))
diff_cat_in_train_test.count()# For distict count

46

In [32]:
# Encode categorical variables

In [31]:
from pyspark.ml.feature import StringIndexer


In [33]:
plan_indexer = StringIndexer(inputCol = 'Product_ID', outputCol = 'product_ID')
labeller = plan_indexer.fit(df_train)

In [34]:
train = labeller.transform(df_train)
test = labeller.transform(df_test)

In [35]:
train.show()

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001|     765.0|     F| 0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|
|1000001|     183.0|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001|    1496.0|     F| 0-17|        10|            A|                         2|             0|                12|                -1|                -1|    1422

#### Feature selection

In [36]:
from pyspark.ml.feature import RFormula

In [37]:
formula = RFormula(formula="Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Gender",featuresCol="features",labelCol="label")


In [38]:
t1 = formula.fit(train)
train1 = t1.transform(train)
test1 = t1.transform(test)

In [39]:
train1.show()


+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+--------------------+-------+
|User_ID|product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|            features|  label|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+--------------------+-------+
|1000001|     765.0|     F| 0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|(16,[6,10,13,14],...| 8370.0|
|1000001|     183.0|     F| 0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|(16,[6,10,13,14],...|15200.0|
|1000001|    1496.0|

In [41]:
train1.select('features', 'label').show()


+--------------------+-------+
|            features|  label|
+--------------------+-------+
|(16,[6,10,13,14],...| 8370.0|
|(16,[6,10,13,14],...|15200.0|
|(16,[6,10,13,14],...| 1422.0|
|(16,[6,10,13,14],...| 1057.0|
|(16,[5,6,8,12,13,...| 7969.0|
|(16,[0,6,11,13,14...|15227.0|
|(16,[3,6,7,10,13,...|19215.0|
|(16,[3,6,7,10,13,...|15854.0|
|(16,[3,6,7,10,13,...|15686.0|
|(16,[0,6,9,13,14,...| 7871.0|
|(16,[0,6,9,13,14,...| 5254.0|
|(16,[0,6,9,13,14,...| 3957.0|
|(16,[0,6,9,13,14,...| 6073.0|
|(16,[0,6,9,13,14,...|15665.0|
|(16,[4,6,9,13,14]...| 5378.0|
|(16,[4,6,9,13,14]...| 2079.0|
|(16,[4,6,9,13,14]...|13055.0|
|(16,[4,6,9,13,14]...| 8851.0|
|(16,[1,6,7,9,13,1...|11788.0|
|(16,[0,6,8,12,13,...|19614.0|
+--------------------+-------+
only showing top 20 rows



#### Build ML model

In [42]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor()

##### Train test split

In [44]:
train_cv, test_cv = train1.randomSplit([0.7, 0.3])


In [46]:
model1 = rf.fit(train_cv)
predictions = model1.transform(test_cv)

In [47]:
predictions.select('prediction').show()

+------------------+
|        prediction|
+------------------+
|12990.598964606765|
|11106.191438608055|
| 7983.877639625988|
| 6558.700996441343|
| 7604.204878710871|
| 9338.022160582595|
| 7604.204878710871|
| 7604.204878710871|
| 7604.204878710871|
| 7025.959420126334|
|13859.521730006094|
|14069.372472914634|
| 12354.85518161917|
| 8085.672692503707|
|13859.521730006094|
| 8263.133751775735|
|   6778.5717360544|
| 8085.672692503707|
|13859.521730006094|
|14043.098398061402|
+------------------+
only showing top 20 rows



#### Model evaluation

In [49]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()
mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" })


In [50]:
import numpy as np
np.sqrt(mse), mse

(3767.337701059926, 14192833.35382749)

#### Model Prediction

In [51]:
model = rf.fit(train1)
predictions1 = model.transform(test1)

In [52]:
### Select required columns
df = predictions1.selectExpr("User_ID as User_ID", "Product_ID as Product_ID", 'prediction as Purchase')


In [53]:
### Write to csv
df.toPandas().to_csv('submission.csv')


Py4JJavaError: An error occurred while calling o558.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 121.0 failed 1 times, most recent failure: Lost task 0.0 in stage 121.0 (TID 93) (192.168.1.33 executor driver): org.apache.spark.SparkException: Failed to execute user defined function (StringIndexerModel$$Lambda$3682/1643174960: (string) => double)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Unseen label: P00168242. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:406)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 17 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:394)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3538)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3535)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (StringIndexerModel$$Lambda$3682/1643174960: (string) => double)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:136)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Unseen label: P00168242. To handle unseen labels, set Param handleInvalid to keep.
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1(StringIndexer.scala:406)
	at org.apache.spark.ml.feature.StringIndexerModel.$anonfun$getIndexer$1$adapted(StringIndexer.scala:391)
	... 17 more
