In [2]:
#set environment
import os
import sys
 
os.environ["SPARK_HOME"] = "/usr/hdp/current/spark2-client"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/local/anaconda/bin/python" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/local/anaconda/bin/python"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.4-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [3]:
#import Sparksession driver
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Regression on BigMart Sales") \
    .getOrCreate()

In [4]:
spark.version

'2.1.1.2.6.2.0-205'

In [5]:
BigMart = spark.read.csv('data/BigMart_copy.csv',header=True,inferSchema=True)
BigMart.show()

+-----------+----------------+--------------------+--------------------+--------+-----------+------------------+
|Item_Weight|Item_Fat_Content|     Item_Visibility|           Item_Type|Item_MRP|Outlet_Type| Item_Outlet_Sales|
+-----------+----------------+--------------------+--------------------+--------+-----------+------------------+
|        9.3|               1|         0.016047301|               Dairy|249.8092|          2|          3735.138|
|       5.92|               2|         0.019278216|         Soft Drinks| 48.2692|          3|          443.4228|
|       17.5|               1|         0.016760075|                Meat| 141.618|          2|           2097.27|
|       19.2|               2| 0.06613202877895127|Fruits and Vegeta...| 182.095|          1|            732.38|
|       8.93|               1| 0.06613202877895127|           Household| 53.8614|          2|          994.7052|
|     10.395|               2| 0.06613202877895127|        Baking Goods| 51.4008|          3|   

In [6]:
#Label encoder
from pyspark.ml.feature import StringIndexer
indexed = BigMart
for col in ["Item_Type"]:
    stringIndexer = StringIndexer(inputCol=col, outputCol=col+"_encoded")
    indexed = stringIndexer.fit(indexed).transform(indexed)
indexed.show()

+-----------+----------------+--------------------+--------------------+--------+-----------+------------------+-----------------+
|Item_Weight|Item_Fat_Content|     Item_Visibility|           Item_Type|Item_MRP|Outlet_Type| Item_Outlet_Sales|Item_Type_encoded|
+-----------+----------------+--------------------+--------------------+--------+-----------+------------------+-----------------+
|        9.3|               1|         0.016047301|               Dairy|249.8092|          2|          3735.138|              4.0|
|       5.92|               2|         0.019278216|         Soft Drinks| 48.2692|          3|          443.4228|              8.0|
|       17.5|               1|         0.016760075|                Meat| 141.618|          2|           2097.27|              9.0|
|       19.2|               2| 0.06613202877895127|Fruits and Vegeta...| 182.095|          1|            732.38|              0.0|
|       8.93|               1| 0.06613202877895127|           Household| 53.8614|  

In [67]:
type(encoded)

pyspark.sql.dataframe.DataFrame

In [7]:
#One hot encoder
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="Item_Type_encoded",outputCol="Item_Type_vec",dropLast=True)
encoded = encoder.transform(indexed)
encoded.show(3)

+-----------+----------------+---------------+-----------+--------+-----------+-----------------+-----------------+--------------+
|Item_Weight|Item_Fat_Content|Item_Visibility|  Item_Type|Item_MRP|Outlet_Type|Item_Outlet_Sales|Item_Type_encoded| Item_Type_vec|
+-----------+----------------+---------------+-----------+--------+-----------+-----------------+-----------------+--------------+
|        9.3|               1|    0.016047301|      Dairy|249.8092|          2|         3735.138|              4.0|(15,[4],[1.0])|
|       5.92|               2|    0.019278216|Soft Drinks| 48.2692|          3|         443.4228|              8.0|(15,[8],[1.0])|
|       17.5|               1|    0.016760075|       Meat| 141.618|          2|          2097.27|              9.0|(15,[9],[1.0])|
+-----------+----------------+---------------+-----------+--------+-----------+-----------------+-----------------+--------------+
only showing top 3 rows



## CHOSSEN ONLY 2 VARIABLES SAME AS SKLEARN (as OLS SELECTED FEATURES ITEM_MRP;OUTLET_TYPE)

In [8]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
assembler = VectorAssembler(inputCols=["Item_MRP","Outlet_Type","Item_Type_vec"], 
                            outputCol="features")
feature_vec=assembler.transform(encoded)
feature_vec.select("features").take(3)

[Row(features=SparseVector(17, {0: 249.8092, 1: 2.0, 6: 1.0})),
 Row(features=SparseVector(17, {0: 48.2692, 1: 3.0, 10: 1.0})),
 Row(features=SparseVector(17, {0: 141.618, 1: 2.0, 11: 1.0}))]

In [9]:
# Split the data into train and test sets
train_data, test_data = feature_vec.randomSplit([.8,.2],seed=1234)

In [10]:
train_data.head(5)

[Row(Item_Weight=4.555, Item_Fat_Content=2, Item_Visibility=0.034410585, Item_Type='Frozen Foods', Item_MRP=111.3544, Outlet_Type=2, Item_Outlet_Sales=1342.2528, Item_Type_encoded=3.0, Item_Type_vec=SparseVector(15, {3: 1.0}), features=SparseVector(17, {0: 111.3544, 1: 2.0, 5: 1.0})),
 Row(Item_Weight=4.555, Item_Fat_Content=2, Item_Visibility=0.060154967999999996, Item_Type='Frozen Foods', Item_MRP=110.1544, Outlet_Type=1, Item_Outlet_Sales=111.8544, Item_Type_encoded=3.0, Item_Type_vec=SparseVector(15, {3: 1.0}), features=SparseVector(17, {0: 110.1544, 1: 1.0, 5: 1.0})),
 Row(Item_Weight=4.59, Item_Fat_Content=1, Item_Visibility=0.06613202877895127, Item_Type='Soft Drinks', Item_MRP=111.186, Outlet_Type=2, Item_Outlet_Sales=1245.046, Item_Type_encoded=8.0, Item_Type_vec=SparseVector(15, {8: 1.0}), features=SparseVector(17, {0: 111.186, 1: 2.0, 10: 1.0})),
 Row(Item_Weight=4.59, Item_Fat_Content=1, Item_Visibility=0.07043780000000001, Item_Type='Soft Drinks', Item_MRP=112.886000000000

In [11]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)
scalerModel = scaler.fit(train_data)
scaledData = scalerModel.transform(train_data)
scaledData_test = scalerModel.transform(test_data)
scaledData.select("scaledFeatures").take(3)

[Row(scaledFeatures=DenseVector([-0.4855, -0.2511, -0.4163, -0.4017, -0.346, 3.0436, -0.2957, -0.2903, -0.2877, -0.249, -0.2354, -0.2326, -0.1725, -0.1618, -0.1416, -0.1321, -0.1157])),
 Row(scaledFeatures=DenseVector([-0.5048, -1.5168, -0.4163, -0.4017, -0.346, 3.0436, -0.2957, -0.2903, -0.2877, -0.249, -0.2354, -0.2326, -0.1725, -0.1618, -0.1416, -0.1321, -0.1157])),
 Row(scaledFeatures=DenseVector([-0.4882, -0.2511, -0.4163, -0.4017, -0.346, -0.3285, -0.2957, -0.2903, -0.2877, -0.249, 4.2469, -0.2326, -0.1725, -0.1618, -0.1416, -0.1321, -0.1157]))]

# APPLYING RANDOMFOREST REGRESSOR

In [13]:
from pyspark.ml.regression import RandomForestRegressor

# Create initial LogisticRegression model
Rf = RandomForestRegressor(labelCol="Item_Outlet_Sales", featuresCol="features")

# Train model with Training Data
RfModel = Rf.fit(train_data)
predictions = RfModel.transform(test_data)
predictions.printSchema()

root
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: integer (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Type: integer (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)
 |-- Item_Type_encoded: double (nullable = true)
 |-- Item_Type_vec: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [14]:
from pyspark.ml.evaluation import RegressionEvaluator

RfModel1 = Rf.fit(train_data)
pred = RfModel1.transform(train_data)
pred.printSchema()

root
 |-- Item_Weight: double (nullable = true)
 |-- Item_Fat_Content: integer (nullable = true)
 |-- Item_Visibility: double (nullable = true)
 |-- Item_Type: string (nullable = true)
 |-- Item_MRP: double (nullable = true)
 |-- Outlet_Type: integer (nullable = true)
 |-- Item_Outlet_Sales: double (nullable = true)
 |-- Item_Type_encoded: double (nullable = true)
 |-- Item_Type_vec: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = true)



In [15]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Item_Outlet_Sales', metricName='r2')
evaluator.evaluate(predictions)

0.5124673016905278

In [16]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Item_Outlet_Sales', metricName='r2')
evaluator.evaluate(predictions)

0.5124673016905278

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Item_Outlet_Sales')
print("R2 TEST SCORE ", evaluator.evaluate(predictions,{evaluator.metricName: "r2"}))
print("MAE TEST SCORE ", evaluator.evaluate(predictions,{evaluator.metricName: "mae"}))
print("RMSE TEST SCORE " ,evaluator.evaluate(predictions,{evaluator.metricName: "rmse"}))

R2 TEST SCORE  0.5124673016905278
MAE TEST SCORE  843.5182435361971
RMSE TEST SCORE  1159.6815087665502


In [20]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

paramGrid = (ParamGridBuilder()\
             .addGrid(Rf.numTrees,[10,20])\
             .addGrid(Rf.maxDepth,[4,5,6])\
             .build())

# Create 4-fold CrossValidator
cv = CrossValidator(estimator=Rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

cvModel = cv.fit(train_data)


In [22]:
list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))

[(1276.5033715290494,
  {Param(parent='RandomForestRegressor_4d6a8e29a503c68e2330', name='numTrees', doc='Number of trees to train (>= 1).'): 10,
   Param(parent='RandomForestRegressor_4d6a8e29a503c68e2330', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 4}),
 (1226.4313609687597,
  {Param(parent='RandomForestRegressor_4d6a8e29a503c68e2330', name='numTrees', doc='Number of trees to train (>= 1).'): 10,
   Param(parent='RandomForestRegressor_4d6a8e29a503c68e2330', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5}),
 (1183.2062686650086,
  {Param(parent='RandomForestRegressor_4d6a8e29a503c68e2330', name='numTrees', doc='Number of trees to train (>= 1).'): 10,
   Param(parent='RandomForestRegressor_4d6a8e29a503c68e2330', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; d

In [23]:
#Best Model Params
score_params_list = list(zip(cvModel.avgMetrics, cvModel.getEstimatorParamMaps()))
max(score_params_list,key=lambda item:item[0])

(1276.5033715290494,
 {Param(parent='RandomForestRegressor_4d6a8e29a503c68e2330', name='numTrees', doc='Number of trees to train (>= 1).'): 10,
  Param(parent='RandomForestRegressor_4d6a8e29a503c68e2330', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 4})

In [25]:
predictions = cvModel.bestModel.transform(test_data)

##### SCORE AFTER TUNNING

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='Item_Outlet_Sales')
print("R2 TEST SCORE ", evaluator.evaluate(predictions,{evaluator.metricName: "r2"}))
print("MAE TEST SCORE ", evaluator.evaluate(predictions,{evaluator.metricName: "mae"}))
print("RMSE TEST SCORE " ,evaluator.evaluate(predictions,{evaluator.metricName: "rmse"}))

R2 TEST SCORE  0.5458342370654748
MAE TEST SCORE  805.100588718376
RMSE TEST SCORE  1119.2936863547534


# RANDOMFOREST REGRESSION on SKLEARN gave R2 SCORE 60% ,in PYSPARK RESULATED in LITTLE LOW SCORE 54%  AFTER TUNNING 