In [1]:
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.functions import col

from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline 
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import RandomForestRegressor

from pyspark.ml.evaluation import RegressionEvaluator

In [2]:
# initiating spark

spark = SparkSession.builder.master("local").appName('A2_Spark').getOrCreate()
sc = spark.sparkContext

sqlContext = SQLContext(spark.sparkContext)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/10 10:23:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv('white_wine_corr.csv', inferSchema=True, header=True)
df = df.drop('_c0')

X = df.drop('quality')
y = df.select(col('quality'))

In [4]:
X_Pandas = X.toPandas()
y_Pandas = y.toPandas()

In [5]:
X_train, X_test, y_train, y_test = train_test_split(X_Pandas, y_Pandas, test_size=0.2, random_state=0)

In [6]:
ss = {3: 10000, 4: 10000, 5: 10000, 6: 10000, 7: 10000, 8: 10000, 5: 10000}
sm = SMOTE(sampling_strategy=ss, random_state=12, k_neighbors=4)
X_train, y_train = sm.fit_resample(X_train, y_train)



In [7]:
df_1 = pd.DataFrame(X_train, columns=['fixed_acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides',
                                      'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates',
                                      'alcohol'])
df_2 = pd.DataFrame(y_train, columns=['quality'])

In [8]:
df_new = df_1.combine_first(df_2)
df_new = spark.createDataFrame(df_new)

In [9]:
print('Total number of rows increased to :  ', df_new.count())

21/12/10 10:23:58 WARN TaskSetManager: Stage 4 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.


Total number of rows increased to :   60005


                                                                                

In [10]:
assembler = VectorAssembler(inputCols=['fixed_acidity', 'volatile acidity', 'citric acid', 'residual sugar',
                                       'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH',
                                       'sulphates', 'alcohol'], outputCol='features')

df_output = assembler.transform(df_new)

In [11]:
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaled_df = standardScaler.fit(df_output).transform(df_output)

21/12/10 10:23:59 WARN TaskSetManager: Stage 7 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.


In [12]:
lr = (LinearRegression(featuresCol='features', labelCol='quality', predictionCol='pred_quality', standardization=True))
linearModel = lr.fit(scaled_df)

21/12/10 10:23:59 WARN Instrumentation: [3af2792c] regParam is zero, which might cause numerical instability and overfitting.
21/12/10 10:23:59 WARN TaskSetManager: Stage 10 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 10:24:00 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
21/12/10 10:24:00 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
21/12/10 10:24:00 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
21/12/10 10:24:00 WARN TaskSetManager: Stage 11 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.


In [13]:
scaled_df.columns

['alcohol',
 'chlorides',
 'citric acid',
 'density',
 'fixed_acidity',
 'free sulfur dioxide',
 'pH',
 'quality',
 'residual sugar',
 'sulphates',
 'total sulfur dioxide',
 'volatile acidity',
 'features',
 'features_scaled']

In [14]:
#train_data, test_data = scaled_df.randomSplit([.8, .2])

In [15]:
predictions = linearModel.transform(scaled_df)
predictions.select("pred_quality", "quality").show()

+------------------+-------+
|      pred_quality|quality|
+------------------+-------+
| 5.290416290080088|    5.0|
| 7.105420183050455|    6.0|
|  6.09691215251064|    7.0|
| 5.399626625489759|    5.0|
| 5.072889605542571|    5.0|
|  4.58458363257796|    5.0|
| 6.238527589759769|    5.0|
|6.3802143524383155|    5.0|
| 6.634884344408249|    6.0|
|  5.11326284572209|    6.0|
|5.2482599082741785|    4.0|
| 5.771741249214358|    5.0|
| 5.493506856016836|    6.0|
| 6.345636734530444|    5.0|
|5.5333422652802255|    6.0|
| 5.423304425489675|    6.0|
| 4.278330957751791|    7.0|
| 4.235551574586964|    5.0|
| 6.646556983794937|    6.0|
| 5.169984350961954|    5.0|
+------------------+-------+
only showing top 20 rows



21/12/10 10:24:00 WARN TaskSetManager: Stage 12 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.


In [16]:
X_test = X_test.combine_first(y_test)

In [17]:
X_test = spark.createDataFrame(X_test)

In [18]:
X_test = assembler.transform(X_test)

In [19]:
predictions = linearModel.transform(X_test)
predictions.select("pred_quality", "quality").show()

+------------------+-------+
|      pred_quality|quality|
+------------------+-------+
| 4.837158182826784|    5.0|
|  4.83759594903421|    6.0|
| 6.135279418568189|    7.0|
| 7.911884514788767|    8.0|
|6.1472170158438075|    5.0|
|1.0768872981565778|    4.0|
| 6.058923521664667|    6.0|
| 6.036249470177893|    5.0|
| 5.333225867046394|    7.0|
| 5.952383595652805|    5.0|
| 5.340760250564074|    6.0|
| 5.461908815039806|    7.0|
|  4.48027923383296|    5.0|
|5.4886468893681695|    5.0|
|6.0828574809864335|    6.0|
| 5.120784224783847|    5.0|
| 6.289322525549551|    6.0|
| 5.133417258957309|    5.0|
|7.6939665742767716|    6.0|
| 6.397771460782337|    6.0|
+------------------+-------+
only showing top 20 rows



In [20]:
evaluator = RegressionEvaluator(labelCol="quality", predictionCol="pred_quality", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(" RMSE on test data for LINEAR REGRESSION = %g" % rmse)

 RMSE on test data for LINEAR REGRESSION = 0.997106


In [57]:
# Hyper-parameter Tuning for Linear Regression

pipeline = Pipeline(stages=[lr])

paramGrid = ParamGridBuilder().addGrid(lr.aggregationDepth, [2,3]).addGrid(lr.elasticNetParam, [0.05,0.1])     \
    .addGrid(lr.maxIter, [2,7]).addGrid(lr.epsilon, [1.5]).addGrid(lr.regParam, [0.6]).build()

cross_val = CrossValidator(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator().setLabelCol("quality").setPredictionCol("pred_quality"),
                           numFolds=2)

cvModel = cross_val.fit(scaled_df)
print("Best model parameters", cvModel.getEstimatorParamMaps()[np.argmax(cvModel.avgMetrics)])

predictions = cvModel.transform(X_test)
predictions.select("pred_quality", "quality").show()

rmse = evaluator.evaluate(predictions)
print(" RMSE on test data for LINEAR REGRESSION_with_hyper-parameter = %g" % rmse)

21/12/10 11:06:11 WARN TaskSetManager: Stage 23591 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:11 WARN TaskSetManager: Stage 23592 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:11 WARN TaskSetManager: Stage 23593 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:11 WARN TaskSetManager: Stage 23594 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:11 WARN TaskSetManager: Stage 23595 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:11 WARN TaskSetManager: Stage 23596 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:11 WARN TaskSetManager: Stage 23597 contains a task of very large size (6510 KiB). The maximum recommended task siz

Best model parameters {Param(parent='LinearRegression_83c6b98549f2', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2, Param(parent='LinearRegression_83c6b98549f2', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.1, Param(parent='LinearRegression_83c6b98549f2', name='maxIter', doc='max number of iterations (>= 0).'): 2, Param(parent='LinearRegression_83c6b98549f2', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.5, Param(parent='LinearRegression_83c6b98549f2', name='regParam', doc='regularization parameter (>= 0).'): 0.6}
+------------------+-------+
|      pred_quality|quality|
+------------------+-------+
| 5.169794940936285|    5.0|
| 5.000719954455107|    6.0|
| 5.929909376704543|    7.0|
| 7.285155260837669|    8.0|
| 5.801508096350403|    5.0|
|  2.4104819322

In [58]:
# Random Forest Regressor

rf = RandomForestRegressor(featuresCol='features', labelCol='quality', predictionCol='pred_quality')
pipeline = Pipeline(stages=[rf])

rf_model = pipeline.fit(scaled_df)

predictions = rf_model.transform(X_test)
predictions.select("pred_quality", "quality").show()

rmse = evaluator.evaluate(predictions)
print(" RMSE on test data for RANDOM FOREST REGRESSION= %g" % rmse)

21/12/10 11:06:55 WARN TaskSetManager: Stage 23643 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:55 WARN TaskSetManager: Stage 23644 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:55 WARN TaskSetManager: Stage 23645 contains a task of very large size (6511 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:56 WARN TaskSetManager: Stage 23647 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:56 WARN TaskSetManager: Stage 23649 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:56 WARN TaskSetManager: Stage 23651 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:06:56 WARN TaskSetManager: Stage 23653 contains a task of very large size (6510 KiB). The maximum recommended task siz

+------------------+-------+
|      pred_quality|quality|
+------------------+-------+
| 4.429857231975658|    5.0|
| 4.704756931744831|    6.0|
|  6.45314167717846|    7.0|
| 7.317587999521299|    8.0|
|5.0627470048435725|    5.0|
| 4.124706117883991|    4.0|
| 6.781250904546292|    6.0|
| 4.915555541538952|    5.0|
| 5.691483645054998|    7.0|
| 5.033754855118982|    5.0|
| 5.230658870104921|    6.0|
| 6.176869049507982|    7.0|
| 4.372078832925473|    5.0|
| 5.197271019465137|    5.0|
|7.1577418849606005|    6.0|
| 5.661930238800854|    5.0|
| 6.889381291399258|    6.0|
|  4.97429722307871|    5.0|
| 7.164613089059119|    6.0|
| 6.363281919002826|    6.0|
+------------------+-------+
only showing top 20 rows

 RMSE on test data for RANDOM FOREST REGRESSION= 0.882006


In [72]:
# Hyper-parameter Tuning for Random Forest Regressor

pipeline = Pipeline(stages=[rf])

paramGrid = ParamGridBuilder().addGrid(rf.maxDepth, [5,30]).build()

cross_val = CrossValidator(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator().setLabelCol("quality").setPredictionCol("pred_quality"),
                           numFolds=2)

cvModel = cross_val.fit(scaled_df)
print(cvModel.getEstimatorParamMaps()[np.argmax(cvModel.avgMetrics)])

predictions = cvModel.transform(X_test)
predictions.select("pred_quality", "quality").show()

rmse = evaluator.evaluate(predictions)
print(" RMSE on test data for RANDOM FOREST REGRESSION with hyper-parameter tuning = %g" % rmse)

21/12/10 11:25:20 WARN TaskSetManager: Stage 24561 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:25:21 WARN TaskSetManager: Stage 24562 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:25:21 WARN TaskSetManager: Stage 24563 contains a task of very large size (6511 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:25:21 WARN TaskSetManager: Stage 24565 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:25:21 WARN TaskSetManager: Stage 24567 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:25:21 WARN TaskSetManager: Stage 24569 contains a task of very large size (6510 KiB). The maximum recommended task size is 1000 KiB.
21/12/10 11:25:21 WARN TaskSetManager: Stage 24571 contains a task of very large size (6510 KiB). The maximum recommended task siz

{Param(parent='RandomForestRegressor_5e78954ba92d', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 30}
+------------+-------+
|pred_quality|quality|
+------------+-------+
|        5.05|    5.0|
|        5.85|    6.0|
|        6.05|    7.0|
|        6.85|    8.0|
|        5.75|    5.0|
|        5.05|    4.0|
|         6.0|    6.0|
|         6.2|    5.0|
|        5.55|    7.0|
|        5.45|    5.0|
|         5.9|    6.0|
|         7.0|    7.0|
|         5.0|    5.0|
|        5.55|    5.0|
|        6.95|    6.0|
|        6.05|    5.0|
|        6.05|    6.0|
|         5.0|    5.0|
|        6.95|    6.0|
|         6.3|    6.0|
+------------+-------+
only showing top 20 rows

 RMSE on test data for RANDOM FOREST REGRESSION = 0.686309
