In [1]:
from pyspark.sql import SparkSession


from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorIndexer,Imputer,VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col,isnan, when, count
from pyspark.ml.regression import LinearRegression,RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator,BinaryClassificationEvaluator

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator




In [2]:
spark = (SparkSession.builder
    .master("local")
    .appName("ml_tutorial")
    .getOrCreate())
spark

25/04/14 15:12:22 WARN Utils: Your hostname, MacBook-Pro-de-de.local resolves to a loopback address: 127.0.0.1; using 192.168.1.17 instead (on interface en0)
25/04/14 15:12:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/14 15:12:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
schema="number_courses int,time_study float,Marks float"

In [4]:
df_marks_students = spark.read.format("csv").option("header","true").schema(schema).load("/Users/derib/data_architecture/tp_spark/Student_Marks.csv")
df_marks_students.show()
df_marks_students.printSchema()

+--------------+----------+------+
|number_courses|time_study| Marks|
+--------------+----------+------+
|             3|     4.508|19.202|
|             4|     0.096| 7.734|
|             4|     3.133|13.811|
|             6|     7.909|53.018|
|             8|     7.811|55.299|
|             6|     3.211|17.822|
|             3|     6.063|29.889|
|             5|     3.413|17.264|
|             4|      4.41|20.348|
|             3|     6.173|30.862|
|             3|     7.353|42.036|
|             7|     0.423|12.132|
|             7|     4.218|24.318|
|             3|     4.274|17.672|
|             3|     2.908|11.397|
|             4|      4.26|19.466|
|             5|     5.719|30.548|
|             8|      6.08| 38.49|
|             6|     7.711|50.986|
|             8|     3.977|25.133|
+--------------+----------+------+
only showing top 20 rows

root
 |-- number_courses: integer (nullable = true)
 |-- time_study: float (nullable = true)
 |-- Marks: float (nullable = true)



In [5]:
#Display the number of missing values in each column
df_marks_students.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_marks_students.columns]
   ).show()

+--------------+----------+-----+
|number_courses|time_study|Marks|
+--------------+----------+-----+
|             0|         0|    0|
+--------------+----------+-----+



In [6]:
#Describe the data
df_marks_students.describe().show()

+-------+------------------+------------------+------------------+
|summary|    number_courses|        time_study|             Marks|
+-------+------------------+------------------+------------------+
|  count|               100|               100|               100|
|   mean|              5.29| 4.077140005081892|24.417689995765684|
| stddev|1.7995229446391696|2.3729141462032586|14.326198467356926|
|    min|                 3|             0.096|             5.609|
|    max|                 8|             7.957|            55.299|
+-------+------------------+------------------+------------------+



In [7]:
df_marks_students.printSchema()

root
 |-- number_courses: integer (nullable = true)
 |-- time_study: float (nullable = true)
 |-- Marks: float (nullable = true)



## Transformers
A transformer is an object that able to transform DataFrame => different types of transformers
- SQLTransformer = using SQL to select and transform the columns
- VectorAssembler
...


In [8]:
#Create a oneshot encoder for the categorical features
encoder = OneHotEncoder(inputCols=["number_courses"], outputCols=["number_courses_ohe"])

In [9]:
#Create a vector assembler to combine the features into a single vector
assembler = VectorAssembler(inputCols=["number_courses_ohe", "time_study"], outputCol="features")
#Create a standard scaler to normalize the features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=False)

In [10]:
train, test = df_marks_students.randomSplit([0.8, 0.2], 42)

In [11]:
train.printSchema()

root
 |-- number_courses: integer (nullable = true)
 |-- time_study: float (nullable = true)
 |-- Marks: float (nullable = true)



In [12]:
#create the regression linear model
lr = LinearRegression(featuresCol="scaled_features", labelCol="Marks", maxIter=10, regParam=0.3, elasticNetParam=0.8)

In [None]:
#create the pipeline
pipeline = Pipeline(stages=[encoder, assembler, scaler,lr])
model = pipeline.fit(train)


25/04/14 15:12:26 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/04/14 15:12:26 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [14]:
predictions = model.transform(test)
predictions.select("number_courses", "time_study", "Marks", "scaled_features", "prediction").show(10)

+--------------+----------+------+--------------------+------------------+
|number_courses|time_study| Marks|     scaled_features|        prediction|
+--------------+----------+------+--------------------+------------------+
|             3|     0.803| 6.217|(9,[3,8],[2.46291...| 3.274524640283653|
|             3|     1.629| 7.014|(9,[3,8],[2.46291...| 7.777745088381882|
|             3|     2.061| 8.924|(9,[3,8],[2.46291...|10.132941230598021|
|             3|     4.633|20.398|(9,[3,8],[2.46291...| 24.15507487078258|
|             3|     6.335|32.357|(9,[3,8],[2.46291...| 33.43410885182787|
|             4|      0.14| 7.336|(9,[4,8],[2.41186...|0.5602217137003125|
|             4|     2.966|13.119|(9,[4,8],[2.41186...|15.967124530178156|
|             4|      4.41|20.348|(9,[4,8],[2.41186...| 23.83958152094306|
|             5|     1.803|11.253|(9,[5,8],[4.17747...|12.556170686166167|
|             5|     2.051|12.209|(9,[5,8],[4.17747...|13.908228019620653|
+--------------+---------

In [22]:

#use a regressionEvaluater to evaluate the model
evaluator = RegressionEvaluator(labelCol="Marks", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
evaluator = RegressionEvaluator(labelCol="Marks", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2 on test data = %g" % r2)

Root Mean Squared Error (RMSE) on test data = 3.3746
R2 on test data = 0.861876


In [None]:
#create a cross validation for the linear regression model
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.01])
             .addGrid(lr.fitIntercept, [False, True])\
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())
crossval = CrossValidator(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)  # use 3+ folds in practice
# Fit the model 
cvModel = crossval.fit(train)
# Make predictions on test data
predictions = cvModel.transform(test)
predictions.select("number_courses", "time_study", "Marks", "scaled_features", "prediction").show(10)
#use a regressionEvaluater to evaluate the model
evaluator = RegressionEvaluator(labelCol="Marks", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
#use a regressionEvaluater to evaluate the model
evaluator = RegressionEvaluator(labelCol="Marks", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("R2 on test data = %g" % r2)

25/04/14 15:12:27 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


+--------------+----------+------+--------------------+-------------------+
|number_courses|time_study| Marks|     scaled_features|         prediction|
+--------------+----------+------+--------------------+-------------------+
|             3|     0.803| 6.217|(9,[3,8],[2.46291...| 2.8141891327271855|
|             3|     1.629| 7.014|(9,[3,8],[2.46291...|   7.36169673242334|
|             3|     2.061| 8.924|(9,[3,8],[2.46291...|  9.740055171443586|
|             3|     4.633|20.398|(9,[3,8],[2.46291...| 23.900090201401657|
|             3|     6.335|32.357|(9,[3,8],[2.46291...|  33.27037931626444|
|             4|      0.14| 7.336|(9,[4,8],[2.41186...|0.08178843236385624|
|             4|     2.966|13.119|(9,[4,8],[2.41186...| 15.640211221837028|
|             4|      4.41|20.348|(9,[4,8],[2.41186...| 23.590090293136633|
|             5|     1.803|11.253|(9,[5,8],[4.17747...| 11.927241511963397|
|             5|     2.051|12.209|(9,[5,8],[4.17747...| 13.292595722349828|
+-----------

## RandomForestRegressor

In [17]:
# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="scaled_features",labelCol="Marks", numTrees=30, maxDepth=20, seed=42)
pipeline_2 = Pipeline(stages=[encoder, assembler, scaler, rf])


# Train model.  This also runs the indexer.
model_2 = pipeline_2.fit(train)

# Make predictions.
predictions_2 = model_2.transform(test)

# Select example rows to display.
predictions_2.select("prediction", "Marks", "scaled_features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="Marks", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions_2)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

evaluator = RegressionEvaluator(labelCol="Marks",\
                                predictionCol="prediction", metricName="r2")
print("R Squared (R2) on test data =", evaluator.evaluate(predictions_2))

rfModel = model_2.stages
print(rfModel)  # summary only

+------------------+------+--------------------+
|        prediction| Marks|     scaled_features|
+------------------+------+--------------------+
|15.267259167507481| 6.217|(9,[3,8],[2.46291...|
|15.396839756642969| 7.014|(9,[3,8],[2.46291...|
| 17.33859478600342| 8.924|(9,[3,8],[2.46291...|
| 19.59041194126907|20.398|(9,[3,8],[2.46291...|
|  32.7994070784218|32.357|(9,[3,8],[2.46291...|
+------------------+------+--------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 5.59788
R Squared (R2) on test data = 0.6199226389618218
[OneHotEncoderModel: uid=OneHotEncoder_7e9b21f4a816, dropLast=true, handleInvalid=error, numInputCols=1, numOutputCols=1, VectorAssembler_dfc375bd2218, StandardScalerModel: uid=StandardScaler_23373989f9c8, numFeatures=9, withMean=false, withStd=true, RandomForestRegressionModel: uid=RandomForestRegressor_fe7d3c1f5e47, numTrees=30, numFeatures=9]


In [18]:
#create a cross validation for the random forest model
#define the parameter grid for the cross validatation of the random forest model
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [10, 20, 30])
             .addGrid(rf.maxDepth, [5, 10,15])
             .addGrid(rf.maxBins, [10, 20,30,40])
             .build())
# Create 5-fold CrossValidator
crossval = CrossValidator(estimator=pipeline_2,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           numFolds=5)  # use 3+ folds in practice
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)
# Make predictions.
predictions_cv = cvModel.transform(test)
# Select example rows to display.
predictions_cv.select("prediction", "Marks", "scaled_features").show(5)

+------------------+------+--------------------+
|        prediction| Marks|     scaled_features|
+------------------+------+--------------------+
|14.246069966035174| 6.217|(9,[3,8],[2.46291...|
|16.599491428185022| 7.014|(9,[3,8],[2.46291...|
|17.967762246577458| 8.924|(9,[3,8],[2.46291...|
| 20.49776913071525|20.398|(9,[3,8],[2.46291...|
| 30.32154104742755|32.357|(9,[3,8],[2.46291...|
+------------------+------+--------------------+
only showing top 5 rows



In [19]:
# Evaluate the model
rmse = evaluator.evaluate(predictions_cv)
r2 = RegressionEvaluator(predictionCol="prediction", labelCol="Marks", metricName="r2").evaluate(predictions)

print("Root Mean Squared Error (RMSE):", rmse)
print("Coefficient of Determination (R2):", r2)

Root Mean Squared Error (RMSE): 0.5766536603887017
Coefficient of Determination (R2): 0.8618755536682241


In [20]:
lasso_model = cvModel.bestModel

In [21]:
lasso_model.stages[3].extractParamMap()

{Param(parent='RandomForestRegressor_fe7d3c1f5e47', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True,
 Param(parent='RandomForestRegressor_fe7d3c1f5e47', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False,
 Param(parent='RandomForestRegressor_fe7d3c1f5e47', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10,
 Param(parent='RandomForestRegressor_fe7d3c1f5e47', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supporte