In [181]:
!pip install pyspark



In [182]:
import pyspark

In [183]:
from pyspark.sql import SparkSession

In [184]:
spark=SparkSession.builder.appName("regression").getOrCreate()

In [185]:
spark

In [186]:
bftrain=spark.read.csv("/train.csv",inferSchema=True,header=True)

In [187]:
bftest=spark.read.csv("/test.csv",inferSchema=True,header=True)

In [188]:
bftest.count()

233599

In [189]:
bftrain.count()

550068

In [190]:
len(bftest.columns)

11

In [191]:
len(bftrain.columns)

12

In [192]:
bftrain.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [193]:
from pyspark.sql.functions import col,isnull,when,count

In [194]:
bftrain.select([count(when(isnull(c),c)).alias(c) for c in bftrain.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|            173638|            383247|       0|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



In [195]:
bftest.select([count(when(isnull(c),c)).alias(c) for c in bftest.columns]).show()

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|      0|         0|     0|  0|         0|            0|                         0|             0|                 0|             72344|            162562|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+



In [196]:
bftrain.groupBy("Product_Category_2").count().show()

+------------------+------+
|Product_Category_2| count|
+------------------+------+
|                12|  5528|
|              null|173638|
|                13| 10531|
|                 6| 16466|
|                16| 43255|
|                 3|  2884|
|                 5| 26235|
|                15| 37855|
|                 9|  5693|
|                17| 13320|
|                 4| 25677|
|                 8| 64088|
|                 7|   626|
|                10|  3043|
|                11| 14134|
|                14| 55108|
|                 2| 49217|
|                18|  2770|
+------------------+------+



In [197]:
bftrain=bftrain.na.fill(value=888,subset='Product_Category_2')

In [198]:
bftest=bftest.na.fill(value=888,subset='Product_Category_2')

In [199]:
bftrain=bftrain.na.fill(value=999,subset='Product_Category_3')

In [200]:
bftest=bftest.na.fill(value=999,subset='Product_Category_3')

In [201]:
bftrain.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|               888|               999|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|               888|               999|    1422|
|100

In [202]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [203]:
indexer=[StringIndexer(inputCol=col, outputCol=col+"_index").fit(bftrain) for col in list(set(bftrain.columns)-set (["purchase","User_ID","Product_ID"]))]

In [204]:
pipeline=Pipeline(stages=indexer)
bftraindf=pipeline.fit(bftrain).transform(bftrain)

In [205]:
bftraindf.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- Product_Category_3_index: double (nullable = false)
 |-- Age_index: double (nullable = false)
 |-- Purchase_index: double (nullable = false)
 |-- City_Category_index: double (nullable = false)
 |-- Gender_index: double (nullable = false)
 |-- Stay_In_Current_City_Years_index: double (nullable = false)
 |-- Product_Category_1_index: double (nullable = false)
 |-- Marital_Status_index: double (nullable = false)
 |-- Occupation_index: double (null

In [206]:
bftestdf=pipeline.fit(bftest).transform(bftest)

In [207]:
bftestdf.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Product_Category_3_index: double (nullable = false)
 |-- Age_index: double (nullable = false)
 |-- City_Category_index: double (nullable = false)
 |-- Gender_index: double (nullable = false)
 |-- Stay_In_Current_City_Years_index: double (nullable = false)
 |-- Product_Category_1_index: double (nullable = false)
 |-- Marital_Status_index: double (nullable = false)
 |-- Occupation_index: double (nullable = false)
 |-- Product_Category_2_index: double (nullable = false)



In [208]:
bftraindf.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase',
 'Product_Category_3_index',
 'Age_index',
 'Purchase_index',
 'City_Category_index',
 'Gender_index',
 'Stay_In_Current_City_Years_index',
 'Product_Category_1_index',
 'Marital_Status_index',
 'Occupation_index',
 'Product_Category_2_index']

In [209]:
columns_todrp=['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3','Purchase_index']

In [210]:
bftraindf=bftraindf.drop(*columns_todrp)

In [211]:
bftraindf.columns

['Purchase',
 'Product_Category_3_index',
 'Age_index',
 'City_Category_index',
 'Gender_index',
 'Stay_In_Current_City_Years_index',
 'Product_Category_1_index',
 'Marital_Status_index',
 'Occupation_index',
 'Product_Category_2_index']

In [212]:
from pyspark.ml.feature import RFormula

In [213]:
formula=RFormula(formula='Purchase~.',featuresCol='features',labelCol='label')

In [214]:
output=formula.fit(bftraindf).transform(bftraindf)

In [215]:
output.show(5)

+--------+------------------------+---------+-------------------+------------+--------------------------------+------------------------+--------------------+----------------+------------------------+--------------------+-------+
|Purchase|Product_Category_3_index|Age_index|City_Category_index|Gender_index|Stay_In_Current_City_Years_index|Product_Category_1_index|Marital_Status_index|Occupation_index|Product_Category_2_index|            features|  label|
+--------+------------------------+---------+-------------------+------------+--------------------------------+------------------------+--------------------+----------------+------------------------+--------------------+-------+
|    8370|                     0.0|      6.0|                2.0|         1.0|                             1.0|                     6.0|                 0.0|            12.0|                     0.0|[0.0,6.0,2.0,1.0,...| 8370.0|
|   15200|                     3.0|      6.0|                2.0|         1.0|      

In [216]:
from pyspark.ml.regression import LinearRegression

In [217]:
reg=LinearRegression(featuresCol='features', labelCol='label')

In [218]:
regmodel=reg.fit(output)

In [219]:
regmodel.summary.r2adj

0.06356300117930425

In [220]:
regmodel.summary.rootMeanSquaredError

4860.759869813398

In [221]:
bftestdf=bftestdf.drop(*columns_todrp)

In [222]:

bftestdf.columns

['Product_Category_3_index',
 'Age_index',
 'City_Category_index',
 'Gender_index',
 'Stay_In_Current_City_Years_index',
 'Product_Category_1_index',
 'Marital_Status_index',
 'Occupation_index',
 'Product_Category_2_index']

In [223]:
from pyspark.ml.feature import VectorAssembler

In [224]:
assembler=VectorAssembler(inputCols=['Product_Category_3_index',
 'Age_index',
 'City_Category_index',
 'Gender_index',
 'Stay_In_Current_City_Years_index',
 'Product_Category_1_index',
 'Marital_Status_index',
 'Occupation_index',
 'Product_Category_2_index'], outputCol='features')

In [225]:
bftestdf=assembler.transform(bftestdf)

In [226]:
bftestdf=regmodel.transform(bftestdf)

In [227]:
from pyspark.ml.regression import DecisionTreeRegressor

In [228]:
tree=DecisionTreeRegressor(featuresCol='features',labelCol='label', maxBins=40)

In [229]:
treemodel=tree.fit(output)

In [230]:

treemodel.featureImportances

SparseVector(9, {0: 0.0069, 1: 0.0001, 2: 0.0001, 3: 0.0, 5: 0.9877, 8: 0.0053})

In [231]:
bftestdf=bftestdf.drop('prediction')

In [232]:
bftestdf=treemodel.transform(bftestdf)

In [237]:
bftestdf.select('prediction').show(5)

+------------------+
|        prediction|
+------------------+
|13374.789151742352|
|11629.629884169884|
| 6146.131335728846|
| 2125.686569280953|
|2646.6090788893785|
+------------------+
only showing top 5 rows



In [238]:
bftestdf.toPandas().to_csv('tree.csv')