In [37]:
# imports
from pyspark.sql import SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Word2Vec, Tokenizer
from pyspark import keyword_only
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.types import IntegerType
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import numpy as np
import math
import os
from pprint import pprint

# Connect to Hive

In [2]:
# Add here your team number teamx
team = "team21"
db_name = f"{team}_projectdb"
table_name = "car_prices_part_buck"

# location of your Hive database in HDFS
warehouse = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName(f"{team} - spark ML")\
        .master("yarn")\
        .config("hive.metastore.uris", "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", warehouse)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

sc = spark.sparkContext

In [3]:
spark

# list Hive databases

In [4]:
# print(spark.catalog.listDatabases())
spark.sql("SHOW DATABASES;").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|             root_db|
|     team0_projectdb|
|team12_hive_proje...|
|    team13_projectdb|
|    team14_projectdb|
|    team15_projectdb|
|    team16_projectdb|
|    team17_projectdb|
|    team18_projectdb|
|    team19_projectdb|
|     team1_projectdb|
|    team20_projectdb|
|    team21_projectdb|
|    team22_projectdb|
|    team23_projectdb|
|    team24_projectdb|
|    team25_projectdb|
|    team26_projectdb|
|    team27_projectdb|
+--------------------+
only showing top 20 rows



# Specify the input and output features

In [5]:
# We will use the following features
# Excluded 'vin' because it is like an ID of a car
# Excluded 'mmr' because we want to make our own estimate of price without dependence on other estimations
# 'saledate' will be replaced by 'saledate_year', 'saledate_month', 'saledate_day'
features = ['year', 'make', 'model', 'trim', 'body', 'state', 'condition',
            'odometer', 'color', 'interior', 'seller', 'transmission',
            'saledate_year', 'saledate_month', 'saledate_day']

# The output/target of our model
label = 'sellingprice'

# Read hive tables

In [6]:
car_prices = spark.read.format("avro").table(f'{db_name}.{table_name}')

In [7]:
split = F.split(F.to_date("saledate"), "-")
car_prices = car_prices.withColumn("saledate_year", split.getItem(0).cast(IntegerType()))\
    .withColumn("saledate_month", split.getItem(1).cast(IntegerType()))\
    .withColumn("saledate_day", split.getItem(2).cast(IntegerType()))

In [8]:
car_prices.show()

+----+-------------+----------------+-------------+-------------+-----------------+-----+---------+--------+------+--------+--------------------+-----+------------+----------+--------------------+-------------+--------------+------------+
|year|         make|           model|         trim|         body|              vin|state|condition|odometer| color|interior|              seller|  mmr|sellingprice|  saledate|        transmission|saledate_year|saledate_month|saledate_day|
+----+-------------+----------------+-------------+-------------+-----------------+-----+---------+--------+------+--------+--------------------+-----+------------+----------+--------------------+-------------+--------------+------------+
|2012|       Nissan|          Maxima|       3.5 SV|        sedan|1n4aa5ap7cc840019|   pa|       42|    9861| brown|     tan|  nissan-infiniti lt|18650|       19700|2015-07-07|__HIVE_DEFAULT_PA...|         2015|             7|           7|
|2011|     Infiniti|   G Convertible|       

# Feature selection

In [9]:
car_prices = car_prices.select(features + [label]).na.drop()
car_prices = car_prices.withColumnRenamed(label, "label")

car_prices.show()

+----+-------------+----------------+-------------+-------------+-----+---------+--------+------+--------+--------------------+--------------------+-------------+--------------+------------+-----+
|year|         make|           model|         trim|         body|state|condition|odometer| color|interior|              seller|        transmission|saledate_year|saledate_month|saledate_day|label|
+----+-------------+----------------+-------------+-------------+-----+---------+--------+------+--------+--------------------+--------------------+-------------+--------------+------------+-----+
|2012|       Nissan|          Maxima|       3.5 SV|        sedan|   pa|       42|    9861| brown|     tan|  nissan-infiniti lt|__HIVE_DEFAULT_PA...|         2015|             7|           7|19700|
|2011|     Infiniti|   G Convertible|          G37|G Convertible|   mo|       41|   30304| black|     tan|  nissan infiniti lt|__HIVE_DEFAULT_PA...|         2015|             7|           2|23400|
|2014|       Ni

In [10]:
categoricalCols = ['make', 'body', 'state', 'color', 'interior', 'transmission']
textCols = ['model', 'trim', 'seller']
cyclicalCols = [('saledate_month', 12), ('saledate_day', 31)]
others = ['year', 'odometer', 'condition', 'saledate_year']

# Feature extraction

In [11]:
class CyclicalTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    input_col = Param(Params._dummy(), "input_col", "input column name.", typeConverter=TypeConverters.toString)
    output_col = Param(Params._dummy(), "output_col", "output column name.", typeConverter=TypeConverters.toString)

    @keyword_only
    def __init__(self, period: float = 1, input_col: str = "input", output_col: str = "output"):
        super(CyclicalTransformer, self).__init__()
        self._setDefault(input_col=None, output_col=None)
        kwargs = self._input_kwargs
        del(kwargs['period'])
        self.set_params(**kwargs)
        self.period = period

    @keyword_only
    def set_params(self, input_col: str = "input", output_col: str = "output"):
        kwargs = self._input_kwargs
        self._set(**kwargs)

    def get_input_col(self):
        return self.getOrDefault(self.input_col)

    def get_output_col(self):
        return self.getOrDefault(self.output_col)

    def _transform(self, df: DataFrame):
        input_col = self.get_input_col()
        output_col = self.get_output_col()

        sin_feature = F.sin(2 * math.pi / self.period * df[input_col])
        cos_feature = F.cos(2 * math.pi / self.period * df[input_col])
        return df.withColumn(output_col + "_sin", sin_feature)\
            .withColumn(output_col + "_cos", cos_feature)

In [12]:
stages = []
final_features = others.copy()

for textCol in textCols:
    output_col = f"{textCol}_encoded"

    tokenizer = Tokenizer(inputCol=textCol, outputCol=f"{textCol}_tokens")
    word2Vec = Word2Vec(
        vectorSize=5,
        seed=42,
        minCount=1,
        inputCol=tokenizer.getOutputCol(),
        outputCol=output_col
    )

    stages.append(tokenizer)
    stages.append(word2Vec)

    final_features.append(output_col)

for categoricalCol in categoricalCols:
    # Create String indexer to assign index for the string fields
    # String Indexer is required as an input for One-Hot Encoder
    # We set the case as `skip` for any string out of the input strings
    output_col = f"{categoricalCol}_encoded"

    indexer = StringIndexer(
        inputCol=categoricalCol,
        outputCol=f"{categoricalCol}_indexed"
    ).setHandleInvalid("skip")

    # Encode the strings using One Hot encoding
    encoder = OneHotEncoder(
        inputCol=indexer.getOutputCol(),
        outputCol=output_col
    )

    stages.append(indexer)
    stages.append(encoder)

    final_features.append(output_col)

for cyclicalCol, period in cyclicalCols:
    cyclical_transormer = CyclicalTransformer(
        period=period,
        input_col=cyclicalCol,
        output_col=cyclicalCol
    )

    stages.append(cyclical_transormer)

    final_features += [cyclicalCol + "_sin", cyclicalCol + "_cos"]

# This will concatenate the input cols into a single column.
assembler = VectorAssembler(inputCols=final_features, outputCol="features")
stages.append(assembler)

# You can create a pipeline to use only a single fit and transform on the data.
pipeline = Pipeline(stages=stages)


# Fit the pipeline ==> This will call the fit functions for all transformers if exist
model = pipeline.fit(car_prices)
# Fit the pipeline ==> This will call the transform functions for all transformers
data = model.transform(car_prices)

# We delete all features and keep only the features and label columns
data = data.select(["features", "label"])

data.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(232,[0,1,2,3,4,5...|19700|
|(232,[0,1,2,3,4,5...|23400|
|(232,[0,1,2,3,4,5...|25500|
|(232,[0,1,2,3,4,5...|25900|
|(232,[0,1,2,3,4,5...|17900|
|(232,[0,1,2,3,4,5...|10500|
|(232,[0,1,2,3,4,5...|11800|
|(232,[0,1,2,3,4,5...|21600|
|(232,[0,1,2,3,4,5...|13500|
|(232,[0,1,2,3,4,5...|14800|
|(232,[0,1,2,3,4,5...|49750|
|(232,[0,1,2,3,4,5...|26000|
|(232,[0,1,2,3,4,5...|25250|
|(232,[0,1,2,3,4,5...|11000|
|(232,[0,1,2,3,4,5...|26800|
|(232,[0,1,2,3,4,5...|32250|
|(232,[0,1,2,3,4,5...| 8500|
|(232,[0,1,2,3,4,5...|12600|
|(232,[0,1,2,3,4,5...|18000|
|(232,[0,1,2,3,4,5...|19000|
+--------------------+-----+
only showing top 20 rows



# Split the dataset

In [13]:
#  split the data into 70% training and 30% test (it is not stratified)
(train_data, test_data) = data.randomSplit([0.7, 0.3], seed=126)


def run(command):
    return os.popen(command).read()


train_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/train")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/train/*.json > data/train.json")

test_data.select("features", "label")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("json")\
    .save("project/data/test")

# Run it from root directory of the repository
run("hdfs dfs -cat project/data/test/*.json > data/test.json")

''

# First model

## Build a model

In [14]:
# Create Linear Regression Model
lr = LinearRegression()

# Fit the data to the pipeline stages
model_lr = lr.fit(train_data)

## Predict for test data

In [15]:
predictions = model_lr.transform(test_data)
predictions.show()

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|(232,[0,1,2,3,4,5...|  500|-5988.128014426678|
|(232,[0,1,2,3,4,5...| 1700| 3170.290679457132|
|(232,[0,1,2,3,4,5...| 9600|11206.983481583185|
|(232,[0,1,2,3,4,5...| 3200|  4213.02116114134|
|(232,[0,1,2,3,4,5...|14000|14225.120232007932|
|(232,[0,1,2,3,4,5...|11100|13085.688207281753|
|(232,[0,1,2,3,4,5...| 7500|12934.137325179763|
|(232,[0,1,2,3,4,5...| 9600|17033.889598370995|
|(232,[0,1,2,3,4,5...|13400|15284.020652809646|
|(232,[0,1,2,3,4,5...| 2200|-2272.230101299938|
|(232,[0,1,2,3,4,5...| 9600| 12728.64963001851|
|(232,[0,1,2,3,4,5...| 8600|12500.740779379848|
|(232,[0,1,2,3,4,5...|19300|15732.625467086677|
|(232,[0,1,2,3,4,5...|12900|12896.748976017348|
|(232,[0,1,2,3,4,5...| 8200|10330.337433189154|
|(232,[0,1,2,3,4,5...| 8200|13174.678719126154|
|(232,[0,1,2,3,4,5...|18200| 15750.10872284323|
|(232,[0,1,2,3,4,5...|14000|14440.448721

## Evaluate the model

In [16]:
# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse = evaluator1_rmse.evaluate(predictions)
r2 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse))
print("R^2 on test data = {}".format(r2))

Root Mean Squared Error (RMSE) on test data = 5298.623134126164
R^2 on test data = 0.698950056980985


## Hyperparameter optimization

In [17]:
model_lr.params

[Param(parent='LinearRegression_4634515bed4d', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='LinearRegression_4634515bed4d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'),
 Param(parent='LinearRegression_4634515bed4d', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'),
 Param(parent='LinearRegression_4634515bed4d', name='featuresCol', doc='features column name.'),
 Param(parent='LinearRegression_4634515bed4d', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='LinearRegression_4634515bed4d', name='labelCol', doc='label column name.'),
 Param(parent='LinearRegression_4634515bed4d', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'),
 Param(parent='LinearRegression_4634515bed4d', name='m

In [18]:
grid = ParamGridBuilder()
grid = grid.addGrid(model_lr.aggregationDepth, [2, 3, 4])\
    .addGrid(model_lr.regParam, np.logspace(1e-3, 1e-1))\
    .build()

cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=grid,
                    evaluator=evaluator1_rmse,
                    parallelism=5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

LinearRegressionModel: uid=LinearRegression_4634515bed4d, numFeatures=232

## Best model 1


In [19]:
model1 = bestModel
pprint(model1.extractParamMap())

{Param(parent='LinearRegression_4634515bed4d', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LinearRegression_4634515bed4d', name='epsilon', doc='The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber'): 1.35,
 Param(parent='LinearRegression_4634515bed4d', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LinearRegression_4634515bed4d', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LinearRegression_4634515bed4d', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 4,
 Param(parent='LinearRegression_4634515bed4d', name='loss', doc='The loss function to be optimized. Supported options: squaredError, huber.'): 'squaredError',
 Param(parent='LinearRegression_4634515bed4d', name='predictionCol', doc='prediction column name.

## Save the model to HDFS

In [20]:
model1.write().overwrite().save("project/models/model1")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model1 models/model1")

''

## Predict for test data using best model1

In [21]:
predictions = model1.transform(test_data)
predictions.show()

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|(232,[0,1,2,3,4,5...|  500| -5980.38898344012|
|(232,[0,1,2,3,4,5...| 1700| 3172.332767353393|
|(232,[0,1,2,3,4,5...| 9600|11202.183271531481|
|(232,[0,1,2,3,4,5...| 3200| 4210.415339704137|
|(232,[0,1,2,3,4,5...|14000|14226.190328635741|
|(232,[0,1,2,3,4,5...|11100|13085.886779097375|
|(232,[0,1,2,3,4,5...| 7500| 12933.00871546939|
|(232,[0,1,2,3,4,5...| 9600|17040.248342816718|
|(232,[0,1,2,3,4,5...|13400|15284.608482542913|
|(232,[0,1,2,3,4,5...| 2200|-2263.311521926895|
|(232,[0,1,2,3,4,5...| 9600|12728.291434477549|
|(232,[0,1,2,3,4,5...| 8600|12503.606818498578|
|(232,[0,1,2,3,4,5...|19300|15732.822662615683|
|(232,[0,1,2,3,4,5...|12900|12899.383867735043|
|(232,[0,1,2,3,4,5...| 8200| 10328.42289335467|
|(232,[0,1,2,3,4,5...| 8200| 13179.59306069091|
|(232,[0,1,2,3,4,5...|18200|15754.140016666614|
|(232,[0,1,2,3,4,5...|14000|14441.694837

In [22]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/model1_predictions")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model1_predictions/*.csv > output/model1_predictions.csv")

''

## Evaluate the best model1

In [23]:
# Evaluate the performance of the model
evaluator1_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator1_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse1 = evaluator1_rmse.evaluate(predictions)
r21 = evaluator1_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse1))
print("R^2 on test data = {}".format(r21))

Root Mean Squared Error (RMSE) on test data = 5298.560290247655
R^2 on test data = 0.698957198094203


# Second model

## Build a model

In [24]:
# Create Linear Regression Model
gbt = GBTRegressor()

# Fit the data to the pipeline stages
model_gbt = gbt.fit(train_data)

## Predict for test data

In [25]:
predictions = model_gbt.transform(test_data)
predictions.show()

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|(232,[0,1,2,3,4,5...|  500|-95.15970194761485|
|(232,[0,1,2,3,4,5...| 1700| 4526.920370323895|
|(232,[0,1,2,3,4,5...| 9600|11093.793324139378|
|(232,[0,1,2,3,4,5...| 3200| 9534.472139025716|
|(232,[0,1,2,3,4,5...|14000| 11184.35709828305|
|(232,[0,1,2,3,4,5...|11100| 10493.58677912748|
|(232,[0,1,2,3,4,5...| 7500|10179.605760507387|
|(232,[0,1,2,3,4,5...| 9600|14962.896408346336|
|(232,[0,1,2,3,4,5...|13400|13472.832774739198|
|(232,[0,1,2,3,4,5...| 2200| 2805.189880675169|
|(232,[0,1,2,3,4,5...| 9600|10040.339391392394|
|(232,[0,1,2,3,4,5...| 8600|10040.339391392394|
|(232,[0,1,2,3,4,5...|19300|19699.394101970927|
|(232,[0,1,2,3,4,5...|12900|10517.036266751084|
|(232,[0,1,2,3,4,5...| 8200|10171.118288216103|
|(232,[0,1,2,3,4,5...| 8200|10559.661673406099|
|(232,[0,1,2,3,4,5...|18200| 20971.62688544504|
|(232,[0,1,2,3,4,5...|14000|10660.857676

## Evaluate the model

In [26]:
# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 4962.198805456441
R^2 on test data = 0.7359654216515437


## Hyperparameter optimization

In [27]:
model_gbt.params

[Param(parent='GBTRegressor_1dd260ca99a6', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'),
 Param(parent='GBTRegressor_1dd260ca99a6', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'),
 Param(parent='GBTRegressor_1dd260ca99a6', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'a

In [28]:
grid = ParamGridBuilder()
grid = grid.addGrid(model_gbt.maxDepth, [2, 5, 10])\
    .addGrid(model_gbt.lossType, ['squared', 'absolute'])\
    .build()

cv = CrossValidator(estimator=gbt,
                    estimatorParamMaps=grid,
                    evaluator=evaluator2_rmse,
                    parallelism=5,
                    numFolds=3)

cvModel = cv.fit(train_data)
bestModel = cvModel.bestModel
bestModel

GBTRegressionModel: uid=GBTRegressor_1dd260ca99a6, numTrees=20, numFeatures=232

## Best model 2


In [29]:
model2 = bestModel
pprint(model2.extractParamMap())

{Param(parent='GBTRegressor_1dd260ca99a6', name='maxMemoryInMB', doc='Maximum memory in MB allocated to histogram aggregation. If too small, then 1 node will be split per iteration, and its aggregates may exceed this size.'): 256,
 Param(parent='GBTRegressor_1dd260ca99a6', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 32,
 Param(parent='GBTRegressor_1dd260ca99a6', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 10,
 Param(parent='GBTRegressor_1dd260ca99a6', name='lossType', doc='Loss function which GBT tries to minimize (case-insensitive). Supported options: squared, absolute'): 'squared',
 Param(parent='GBTRegressor_1dd260ca99a6', name='leafCol', doc='Leaf indices column name. Predicted leaf index of each instance in each tree by preorder.'): '',
 Param(parent='GBTReg

## Save the model to HDFS

In [30]:
model2.write().overwrite().save("project/models/model2")

# Run it from root directory of the repository
run("hdfs dfs -get project/models/model2 models/model2")

''

## Predict for test data using best model2

In [31]:
predictions = model2.transform(test_data)
predictions.show()

+--------------------+-----+------------------+
|            features|label|        prediction|
+--------------------+-----+------------------+
|(232,[0,1,2,3,4,5...|  500| 665.7420907454955|
|(232,[0,1,2,3,4,5...| 1700| 3942.141265518452|
|(232,[0,1,2,3,4,5...| 9600|  8851.78095841201|
|(232,[0,1,2,3,4,5...| 3200| 4525.475454542871|
|(232,[0,1,2,3,4,5...|14000| 12720.70746890064|
|(232,[0,1,2,3,4,5...|11100|11257.131483840887|
|(232,[0,1,2,3,4,5...| 7500|10325.765997052285|
|(232,[0,1,2,3,4,5...| 9600|13436.277833197244|
|(232,[0,1,2,3,4,5...|13400|14386.846501391263|
|(232,[0,1,2,3,4,5...| 2200|442.38172087819305|
|(232,[0,1,2,3,4,5...| 9600| 9564.426569998535|
|(232,[0,1,2,3,4,5...| 8600| 9789.236137954353|
|(232,[0,1,2,3,4,5...|19300|17557.545623933624|
|(232,[0,1,2,3,4,5...|12900|12389.153249749048|
|(232,[0,1,2,3,4,5...| 8200|   9812.8332846262|
|(232,[0,1,2,3,4,5...| 8200|11554.364083046568|
|(232,[0,1,2,3,4,5...|18200| 17780.96220812469|
|(232,[0,1,2,3,4,5...|14000|12792.523157

In [39]:
predictions.select("label", "prediction")\
    .coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/model2_predictions")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/model2_predictions/*.csv > output/model2_predictions.csv")

''

## Evaluate the best model2

In [33]:
# Evaluate the performance of the model
evaluator2_rmse = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
evaluator2_r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2")

rmse2 = evaluator2_rmse.evaluate(predictions)
r22 = evaluator2_r2.evaluate(predictions)

print("Root Mean Squared Error (RMSE) on test data = {}".format(rmse2))
print("R^2 on test data = {}".format(r22))

Root Mean Squared Error (RMSE) on test data = 3410.151855938535
R^2 on test data = 0.8753018747211165


# Compare best models

In [34]:
models = [[str(model1), rmse1, r21], [str(model2), rmse2, r22]]

df = spark.createDataFrame(models, ["model", "RMSE", "R2"])
df.show(truncate=False)

+-------------------------------------------------------------------------------+-----------------+------------------+
|model                                                                          |RMSE             |R2                |
+-------------------------------------------------------------------------------+-----------------+------------------+
|LinearRegressionModel: uid=LinearRegression_4634515bed4d, numFeatures=232      |5298.560290247655|0.698957198094203 |
|GBTRegressionModel: uid=GBTRegressor_1dd260ca99a6, numTrees=20, numFeatures=232|3410.151855938535|0.8753018747211165|
+-------------------------------------------------------------------------------+-----------------+------------------+



In [35]:
df.coalesce(1)\
    .write\
    .mode("overwrite")\
    .format("csv")\
    .option("sep", ",")\
    .option("header", "true")\
    .save("project/output/evaluation")

# Run it from root directory of the repository
run("hdfs dfs -cat project/output/evaluation/*.csv > output/evaluation.csv")

''