# Connect to Hive

In [1]:
from pyspark.sql import SparkSession

# Add here your team number teamx
team = "team4"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName("{} - spark ML".format(team))\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

sc = spark.sparkContext

In [2]:
spark

# list Hive databases

In [3]:
print(spark.catalog.listDatabases())
spark.sql("SHOW DATABASES;").show()

[Database(name='default', description='Default Hive database', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/apps/hive/warehouse'), Database(name='root_db', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/root/root_db'), Database(name='team0_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team0/project/hive/warehouse'), Database(name='team12_hive_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team12/project/hive/warehouse'), Database(name='team13_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team13/project/hive/warehouse'), Database(name='team14_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team14/project/hive/warehouse'), Database(name='team15_projectdb', description='', locationUri='hdfs://hadoop-02.uni.innopolis.ru:8020/user/team15/project/hive/warehouse'), Database(name='team16_projectdb', description

# Specify the input and output features

In [4]:
features = ['incident_number', 'primary_situation',
                'ems_units', 'suppression_units', 'action_taken_primary']
label= 'suppression_personnel'

# Read hive tables

In [5]:
inc_part = spark.read.format("avro").table('team4_projectdb.incidents_part')

In [6]:
inc_part.show()

+---------------+---------------+---------+--------------------+--------------------+-----------+--------------------+--------------------+--------------------+-------+---------+------------+----+-----------------+---------------------+---------+-------------+-----------+---------------+-------------------+-----------------------+-----------------------+---------------+-------------+-------------------+-----------------+----------------+--------------------+----------+--------------------+----------------------+------------------+--------------------------+--------------------+-------------------+--------------+-----------------------+-------------------------+-----------+------------------+--------------------------------------+--------------+----------------+--------------------+-----------+---------------+------------------------------------+----------------------------------------+----------------------------------+------------------------------------+-----------------+------------

In [7]:
inc_part.printSchema()

root
 |-- incident_number: decimal(10,0) (nullable = true)
 |-- exposure_number: decimal(10,0) (nullable = true)
 |-- id: decimal(10,0) (nullable = true)
 |-- address: string (nullable = true)
 |-- incident_date: string (nullable = true)
 |-- call_number: decimal(10,0) (nullable = true)
 |-- alarm_dttm: string (nullable = true)
 |-- arrival_dttm: string (nullable = true)
 |-- close_dttm: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- battalion: string (nullable = true)
 |-- station_area: string (nullable = true)
 |-- box: string (nullable = true)
 |-- suppression_units: decimal(10,0) (nullable = true)
 |-- suppression_personnel: decimal(10,0) (nullable = true)
 |-- ems_units: decimal(10,0) (nullable = true)
 |-- ems_personnel: decimal(10,0) (nullable = true)
 |-- other_units: decimal(10,0) (nullable = true)
 |-- other_personnel: decimal(10,0) (nullable = true)
 |-- first_unit_on_scene: string (nullable = true)
 |-- estimated_property_loss: decimal(10,0) (nullable 

# Feature selection

In [8]:
import pyspark.sql.functions as F

df = inc_part.select(features + [label])

for col_name in ['primary_situation', 'action_taken_primary']:
    df = df.withColumn(col_name, F.translate(col_name, "'", ""))

In [9]:
df.show()

+---------------+--------------------+---------+-----------------+--------------------+---------------------+
|incident_number|   primary_situation|ems_units|suppression_units|action_taken_primary|suppression_personnel|
+---------------+--------------------+---------+-----------------+--------------------+---------------------+
|       14031180|531 Smoke or odor...|        0|                2|    45 Remove hazard|                    5|
|       14046224|500 Service Call,...|        0|                2|70 Assistance, other|                    9|
|       14058148|462 Aircraft standby|        0|                3|          92 Standby|                    6|
|       14008067|150 - Outside rub...|        0|                2|     11 - Extinguish|                    9|
|       14009941|745 - Alarm syste...|        0|                3|63 - Restore fire...|                   10|
|       14021837|735 Alarm system ...|        0|                3|      86 Investigate|                   11|
|       14

In [10]:
df = df.withColumn('incident_number', df['incident_number'].cast('string'))
df = df.withColumn('ems_units', df['ems_units'].cast('integer'))
df = df.withColumn('suppression_units', df['suppression_units'].cast('integer'))

In [11]:
df = df.dropna(subset=features + [label])

In [12]:
from pyspark.sql.functions import col


df = df.filter(col(label) > 0)

In [13]:
df.show()

+---------------+--------------------+---------+-----------------+--------------------+---------------------+
|incident_number|   primary_situation|ems_units|suppression_units|action_taken_primary|suppression_personnel|
+---------------+--------------------+---------+-----------------+--------------------+---------------------+
|       14048364|650 Steam, other ...|        0|                2|70 Assistance, other|                    9|
|       14048617|735 Alarm system ...|        0|                3|      86 Investigate|                   10|
|       14052106|322 Motor vehicle...|        2|                1|31 Provide first ...|                    2|
|       14052706|500 Service Call,...|        0|                1|70 Assistance, other|                    5|
|       14070751|671 HazMat releas...|        3|               12|      86 Investigate|                   43|
|       14076628|700 False alarm o...|        0|                2|00 Action taken, ...|                    9|
|       14

# Feature extraction

In [14]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler,VectorIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

indexers = [StringIndexer(inputCol=col_name, outputCol=f"{col_name}_indexed", handleInvalid="keep") for col_name in ['primary_situation', 'action_taken_primary']]

# Apply One-Hot Encoding to indexed categorical columns
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=f"{indexer.getOutputCol()}_encoded") for indexer in indexers]

# Assemble features into a single vector
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + ['ems_units', 'suppression_units'], outputCol="features")

# Create Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler])

# Fit Pipeline and Transform Data
model = pipeline.fit(df)
transformed_data = model.transform(df)

transformed_data = transformed_data.withColumnRenamed(label, 'label')

# Select Features and Label Columns
transformed_data = transformed_data.select(["features", "label"])

# Automatically Identify Categorical Features and Index Them
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(transformed_data)
transformed_data = featureIndexer.transform(transformed_data)

# Display the Output Spark DataFrame
transformed_data.show()

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|(497,[11,362,496]...|   17|(497,[11,362,496]...|
|(497,[75,397,496]...|    4|(497,[75,397,496]...|
|(497,[38,373,496]...|    8|(497,[38,373,496]...|
|(497,[38,439,496]...|   10|(497,[38,439,496]...|
|(497,[21,375,496]...|    4|(497,[21,375,496]...|
|(497,[22,362,496]...|    4|(497,[22,362,496]...|
|(497,[116,362,496...|    9|(497,[116,362,496...|
|(497,[38,370,496]...|   33|(497,[38,370,496]...|
|(497,[133,393,496...|   12|(497,[133,393,496...|
|(497,[16,375,496]...|    3|(497,[16,375,496]...|
|(497,[149,464,496...|   21|(497,[149,464,496...|
|(497,[25,370,496]...|    4|(497,[25,370,496]...|
|(497,[22,362,496]...|   10|(497,[22,362,496]...|
|(497,[21,362,496]...|    4|(497,[21,362,496]...|
|(497,[11,420,495,...|    4|(497,[11,420,495,...|
|(497,[40,373,496]...|    4|(497,[40,373,496]...|
|(497,[37,362,496]...|    4|(497,[37,362,496]...|


# Split the dataset

In [15]:
(train_data, test_data) = transformed_data.randomSplit([0.6, 0.4], seed=10)

def run(command):
    import os
    return os.popen("cd ..\n" + command).read()

train_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from the root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

test_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from the root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > data/test.json")

''

In [16]:
train_data.show()

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|(497,[2,362,496],...|    4|(497,[2,362,496],...|
|(497,[3,362,496],...|   10|(497,[3,362,496],...|
|(497,[7,427,496],...|   11|(497,[7,427,496],...|
|(497,[11,381,495,...|    4|(497,[11,381,495,...|
|(497,[28,362,496]...|    4|(497,[28,362,496]...|
|(497,[38,375,496]...|    4|(497,[38,375,496]...|
|(497,[40,362,496]...|    4|(497,[40,362,496]...|
|(497,[47,382,496]...|    4|(497,[47,382,496]...|
|(497,[47,382,496]...|    9|(497,[47,382,496]...|
|(497,[72,362,496]...|    6|(497,[72,362,496]...|
|(497,[77,375,496]...|    4|(497,[77,375,496]...|
|(497,[77,382,496]...|    4|(497,[77,382,496]...|
|(497,[100,362,496...|   15|(497,[100,362,496...|
|(497,[113,464,496...|    4|(497,[113,464,496...|
|(497,[134,366,496...|    4|(497,[134,366,496...|
|(497,[148,362,496...|    4|(497,[148,362,496...|
|(497,[149,375,496...|   19|(497,[149,375,496...|


# First model

## Build a model

In [17]:
from pyspark.ml.regression import LinearRegression
# Create Linear Regression Model
lr = LinearRegression()

# Fit the data to the pipeline stages
model_lr = lr.fit(train_data)

## Predict for test data

In [18]:
predictions = model_lr.transform(test_data)
predictions.show()

+--------------------+-----+--------------------+------------------+
|            features|label|     indexedFeatures|        prediction|
+--------------------+-----+--------------------+------------------+
|(497,[0,362,496],...|    9|(497,[0,362,496],...|  8.52459533956089|
|(497,[0,362,496],...|    9|(497,[0,362,496],...|  8.52459533956089|
|(497,[0,362,496],...|    9|(497,[0,362,496],...|  8.52459533956089|
|(497,[0,362,496],...|   20|(497,[0,362,496],...|10.018928626098903|
|(497,[0,370,496],...|    9|(497,[0,370,496],...| 8.245704267908641|
|(497,[1,363,495,4...|    9|(497,[1,363,495,4...|11.103798927940547|
|(497,[1,363,495,4...|   31|(497,[1,363,495,4...|14.590576596529246|
|(497,[1,363,495,4...|   32|(497,[1,363,495,4...|15.088687692041919|
|(497,[1,363,495,4...|   32|(497,[1,363,495,4...|15.088687692041919|
|(497,[1,363,496],...|    4|(497,[1,363,496],...|10.016685189107111|
|(497,[1,363,496],...|    4|(497,[1,363,496],...|10.016685189107111|
|(497,[1,363,496],...|    9|(497,[

## Evaluate the model

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator1_rmse.evaluate(predictions)
r2 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse))
print("R^2 on test data = {}".format(r2))

Root Mean Squared Error (RMSE) on test data = 20.372364171532542
R^2 on test data = 0.03610669991162829


## Hyperparameter optimization

In [20]:
model_lr.params

[Param(parent='LinearRegression_c65222282cc4', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='LinearRegression_c65222282cc4', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'),
 Param(parent='LinearRegression_c65222282cc4', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'),
 Param(parent='LinearRegression_c65222282cc4', name='featuresCol', doc='features column name.'),
 Param(parent='LinearRegression_c65222282cc4', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='LinearRegression_c65222282cc4', name='labelCol', doc='label column name.'),
 Param(parent='LinearRegression_c65222282cc4', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'),
 Param(parent='LinearRegression_c65222282cc4', name='m

In [21]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np


grid = ParamGridBuilder()
grid = grid.addGrid(
                    model_lr.aggregationDepth, [2, 3, 4])\
                    .addGrid(model_lr.regParam, np.logspace(1e-3,1e-1)
                    )\
                    .build()

cv = CrossValidator(estimator = lr, 
                    estimatorParamMaps = grid, 
                    evaluator = evaluator1_rmse,
                    parallelism = 5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

LinearRegressionModel: uid=LinearRegression_c65222282cc4, numFeatures=497

## Best model 1


In [22]:
from pprint import pprint
model1 = bestModel
pprint(model1.extractParamMap())

{Param(parent='LinearRegression_c65222282cc4', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LinearRegression_c65222282cc4', name='standardization', doc='whether to standardize the training features before fitting the model.'): True,
 Param(parent='LinearRegression_c65222282cc4', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'): 'squaredError',
 Param(parent='LinearRegression_c65222282cc4', name='solver', doc='The solver algorithm for optimization. Supported options: auto, normal, l-bfgs.'): 'auto',
 Param(parent='LinearRegression_c65222282cc4', name='maxIter', doc='max number of iterations (>= 0).'): 100,
 Param(parent='LinearRegression_c65222282cc4', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LinearRegression_c65222282cc4', name='predictionCol', doc='pred

## Save the model to HDFS

In [23]:
model1.write().overwrite().save("project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model1 models/model1")

''

## Predict for test data using best model1

In [24]:
predictions = model1.transform(test_data)
predictions.show()

+--------------------+-----+--------------------+------------------+
|            features|label|     indexedFeatures|        prediction|
+--------------------+-----+--------------------+------------------+
|(497,[1,363,496],...|    9|(497,[1,363,496],...|10.642992690662371|
|(497,[1,363,496],...|   15|(497,[1,363,496],...|10.938201899747733|
|(497,[6,363,495,4...|    4|(497,[6,363,495,4...| 8.761004467435267|
|(497,[6,363,495,4...|    8|(497,[6,363,495,4...| 8.908609071977947|
|(497,[6,363,495,4...|    9|(497,[6,363,495,4...| 8.908609071977947|
|(497,[6,363,495,4...|    4|(497,[6,363,495,4...|10.880602176001172|
|(497,[6,363,495,4...|    4|(497,[6,363,495,4...|13.000199884567078|
|(497,[6,367,496],...|    4|(497,[6,367,496],...| 5.719503177107711|
|(497,[6,369,496],...|    4|(497,[6,369,496],...|5.3802025203291635|
|(497,[8,363,496],...|   14|(497,[8,363,496],...|11.303009122897032|
|(497,[10,363,495,...|    4|(497,[10,363,495,...| 9.762526222360323|
|(497,[10,369,496]...|    4|(497,[

In [25]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model1_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv > output/model1_predictions.csv")

''

## Evaluate the best model1

In [26]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse1 = evaluator1_rmse.evaluate(predictions)
r21 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse1))
print("R^2 on test data = {}".format(r21))

Root Mean Squared Error (RMSE) on test data = 20.516266655753885
R^2 on test data = 0.04268242231368702


# Second model

## Build a model

In [27]:
from pyspark.ml.regression import RandomForestRegressor

# Create Linear Regression Model
rf = RandomForestRegressor()

# Fit the data to the pipeline stages
model_rf = rf.fit(train_data)

## Predict for test data

In [28]:
predictions = model_rf.transform(test_data)
predictions.show()

+--------------------+-----+--------------------+-----------------+
|            features|label|     indexedFeatures|       prediction|
+--------------------+-----+--------------------+-----------------+
|(497,[83,367,495,...|    4|(497,[83,367,495,...|6.352705221534879|
|(497,[2,362,496],...|    2|(497,[2,362,496],...| 7.07338840797094|
|(497,[2,362,496],...|    3|(497,[2,362,496],...| 7.07338840797094|
|(497,[2,362,496],...|    5|(497,[2,362,496],...|9.018125565483434|
|(497,[2,362,496],...|    5|(497,[2,362,496],...|9.018125565483434|
|(497,[2,362,496],...|    5|(497,[2,362,496],...|9.018125565483434|
|(497,[2,362,496],...|    5|(497,[2,362,496],...|9.018125565483434|
|(497,[2,362,496],...|    5|(497,[2,362,496],...|9.018125565483434|
|(497,[2,362,496],...|    5|(497,[2,362,496],...|9.018125565483434|
|(497,[2,362,496],...|    5|(497,[2,362,496],...|9.018125565483434|
|(497,[2,362,496],...|    5|(497,[2,362,496],...|9.018125565483434|
|(497,[2,362,496],...|    5|(497,[2,362,496],...

## Evaluate the model

In [29]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 18.242236066029225
R^2 on test data = 0.15776669348737937


## Hyperparameter optimization

In [30]:
model_rf.params

[Param(parent='RandomForestRegressor_44722a3fe7ea', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'),
 Param(parent='RandomForestRegressor_44722a3fe7ea', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='RandomForestRegressor_44722a3fe7ea', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='RandomForestRegressor_44722a3fe7ea', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto'

In [31]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator 

import numpy as np

grid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [2, 5]) \
    .addGrid(rf.numTrees, [5, 10]) \
    .build()

cv = CrossValidator(estimator = rf, 
                    estimatorParamMaps = grid, 
                    evaluator = evaluator2_rmse,
                    parallelism = 5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

RandomForestRegressionModel: uid=RandomForestRegressor_44722a3fe7ea, numTrees=10, numFeatures=497

## Best model 2


In [32]:
from pprint import pprint
model2 = bestModel
pprint(model2.extractParamMap())

{Param(parent='RandomForestRegressor_44722a3fe7ea', name='seed', doc='random seed.'): -2270744963801036121,
 Param(parent='RandomForestRegressor_44722a3fe7ea', name='subsamplingRate', doc='Fraction of the training data used for learning each decision tree, in range (0, 1].'): 1.0,
 Param(parent='RandomForestRegressor_44722a3fe7ea', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='RandomForestRegressor_44722a3fe7ea', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 5,
 Param(parent='RandomForestRegressor_44722a3fe7ea', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 32,
 Param(parent='RandomForestRegressor_44722a3fe7ea', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0,
 Param(parent='Ra

## Save the model to HDFS

In [33]:
model2.write().overwrite().save("project/models/model2")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 models/model2")

''

## Predict for test data using best model2

In [34]:
predictions = model2.transform(test_data)
predictions.show()

+--------------------+-----+--------------------+------------------+
|            features|label|     indexedFeatures|        prediction|
+--------------------+-----+--------------------+------------------+
|(497,[2,362,496],...|    4|(497,[2,362,496],...| 7.568877859741323|
|(497,[11,375,496]...|    4|(497,[11,375,496]...|5.7578213608067745|
|(497,[14,362,496]...|   11|(497,[14,362,496]...| 9.270415997797958|
|(497,[14,362,496]...|   12|(497,[14,362,496]...| 9.270415997797958|
|(497,[16,362,496]...|   16|(497,[16,362,496]...| 9.104709247332405|
|(497,[16,375,496]...|    3|(497,[16,375,496]...|5.7578213608067745|
|(497,[16,381,495,...|    4|(497,[16,381,495,...| 6.265570125944789|
|(497,[16,382,496]...|    4|(497,[16,382,496]...|5.7578213608067745|
|(497,[16,431,495,...|    4|(497,[16,431,495,...| 6.265570125944789|
|(497,[21,362,496]...|    4|(497,[21,362,496]...| 6.124233606336789|
|(497,[21,362,496]...|   10|(497,[21,362,496]...| 9.104709247332405|
|(497,[21,362,496]...|   11|(497,[

In [35]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/model2_predictions.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv > output/model2_predictions.csv")

''

## Evaluate the best model2

In [36]:
from pyspark.ml.evaluation import RegressionEvaluator 

# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 22.852229176063442
R^2 on test data = 0.18150011377917763


# Compare best models

In [37]:
models = [[str(model1),rmse1, r21], [str(model2),rmse2, r22]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

+-------------------------------------------------------------------------------------------------+------------------+-------------------+
|model                                                                                            |RMSE              |R2                 |
+-------------------------------------------------------------------------------------------------+------------------+-------------------+
|LinearRegressionModel: uid=LinearRegression_c65222282cc4, numFeatures=497                        |20.516266655753885|0.04268242231368702|
|RandomForestRegressionModel: uid=RandomForestRegressor_44722a3fe7ea, numTrees=10, numFeatures=497|22.852229176063442|0.18150011377917763|
+-------------------------------------------------------------------------------------------------+------------------+-------------------+



In [38]:
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header","true")\
    .save("project/output/evaluation.csv")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation.csv/*.csv > output/evaluation.csv")

''