## 1. Mounting Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=afe5db4d8b0d94864bebe18585d22e62ae6f90a22e5ec0c013d5c4ebc4633d73
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## 2. Install pyspark to the current runtime session

In [None]:
!pip install pyspark



## 3. Import libraries and declare transform functions.

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.evaluation import RegressionMetrics

# Transform each RDD item to LabeledPoint
def row_to_labeled_point(row, feature_columns, labelCol):
  '''
    Convert DataFrame row to LabeledPoint
  '''
  features = [row[col] for col in feature_columns]
  label = row[labelCol]
  return LabeledPoint(label, features)

# transform data RDD to LabeledPoints RDD
def transform_data_to_labeledpoints_rdd(data, labelCol):
  '''
    Transform RDD data input to LabeledPoint RDD.
  '''
  #*****************************************
  print("-> log the schema to verify the csv")
  #*****************************************
  data.printSchema()

  #*****************************************
  print('-> Handle categorical features - convert categorical columns to numerical values')
  #*****************************************
  # List of categorical columns
  categorical_columns = [f"cat{i}" for i in range(1, 117)]

  # Apply StringIndexer to each categorical column
  indexers = [StringIndexer(inputCol=column, outputCol=column + "_index").fit(data) for column in categorical_columns]

  # Transform data
  for indexer in indexers:
    data = indexer.transform(data)

  # Drop original categorical columns
  data = data.drop(*categorical_columns)

  # List of feature columns
  indexed_categorical_columns = [f"cat{i}_index" for i in range(1, 117)]
  continuous_columns = [f"cont{i}" for i in range(1, 15)]
  feature_columns = indexed_categorical_columns + continuous_columns

  return data.rdd.map(lambda x: row_to_labeled_point(x, feature_columns, labelCol))

## 4. Create Random Forest Regression model tuner which will be reused for testing params sets.

In [None]:
def rfm_tuner(params):
  '''
    Random Forest Regression model training and evaluation tuner.
  '''
  if params['trainInput'] is None \
    or params['modelPath'] is None \
    or params['numTrees'] is None \
    or params['maxDepth'] is None \
    or params['maxBins'] is None:
    raise ValueError("Missing tuner parameters. Please have a double-check")

  spark = SparkSession \
    .builder \
    .appName("assignment2_AllstateClaimsSeverityRandomForestRegressor") \
    .getOrCreate()

  #****************************
  print('-> BGD301x_o37 - Loading training data from ', params['trainInput'])
  #****************************

  data = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferschema', 'true') \
    .load(params['trainInput']) \
    .cache() \
    .withColumnRenamed("loss", "label")

  #****************************
  print('-> BGD301x_o38 - Convert data to LabeledPoint RDD')
  #****************************
  rdd = transform_data_to_labeledpoints_rdd(data, 'label')

  #*****************************************
  print("-> Preparing data - Split the data into training and test sets, part 1 - 70% data is for training, remain 30% is testing")
  #*****************************************
  (trainingData, valData) = rdd.randomSplit([0.7, 0.3])
  trainingData.cache()
  valData.cache()

  #*****************************************
  print("-> Train RandomForest regressor")
  #*****************************************
  model = RandomForest.trainRegressor(
      trainingData,
      categoricalFeaturesInfo={},
      numTrees=params['numTrees'],
      featureSubsetStrategy="auto",
      impurity="variance",
      maxDepth=params['maxDepth'],
      maxBins=params['maxBins']
  )

  #*****************************************
  print("-> Complete training Random Forest model, Saving model for the next use")
  #*****************************************
  model.save(spark.sparkContext, params['modelPath'])

  # model = RandomForestModel.load(spark.sparkContext, params['modelPath'])

  #*****************************************
  print("-> Evaluating model on train and val data and calculating RMSE")
  #*****************************************
  predictions = model.predict(valData.map(lambda x: x.features))

  # Zip predictions with actual values for evaluation
  prediction_and_label = predictions.zip(valData.map(lambda lp: lp.label))

  # Calculate evaluation metrics (example: RMSE)
  metrics = RegressionMetrics(prediction_and_label)
  print("- Root Mean Squared Error (RMSE) = ", metrics.rootMeanSquaredError)
  print("- Validation data MSE = ", metrics.meanSquaredError)
  print("- Validation data RMSE = ", metrics.rootMeanSquaredError)
  print("- Validation data R-squared = ", metrics.r2)
  print("- Validation data MAE = ", metrics.meanAbsoluteError)
  print("- Validation data Explained variance = ", metrics.explainedVariance)

  # Stop the Spark session
  spark.stop()

## 5. Create function to run the best model on the test data and export the submission file.

In [None]:
def process_test_data(params):
  spark = SparkSession \
    .builder \
    .appName("assignment2_AllstateClaimsSeverityRandomForestRegressor") \
    .getOrCreate()

  testInput = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferschema', 'true') \
    .load(params['testInput']) \
    .cache()

  # Convert DataFrame to RDD[LabeledPoint]
  testRdd = transform_data_to_labeledpoints_rdd(testInput, "id")
  testRdd.cache()

  model = RandomForestModel.load(spark.sparkContext, params['modelPath'])

  #*****************************************
  print("-> Run the model on data test.")
  #*****************************************
  predictions = model.predict(testRdd.map(lambda x: x.features))

  print("-> Zip predictions with actual values for evaluation")
  prediction_and_label = testRdd.map(lambda lp: int(lp.label)).zip(predictions)

  # Convert predictions to DataFrame
  results = prediction_and_label.toDF(["id", "loss"])

  #*****************************************
  print("-> Save the results to a CSV file.")
  #*****************************************

  results.write.csv(params['outputFile'], header=True)

  spark.stop()

## 6. Train Random Forest with different param sets to determine the best params configuration.

### 6.1. 10 - 4 - 32

In [None]:
params = {
  'trainInput': '/content/drive/MyDrive/ColabDrive/allstates/resources/train.csv',
  'numTrees': 10,
  'maxDepth': 4,
  'maxBins': 32,
  'modelPath': '/content/drive/MyDrive/ColabDrive/models/allstates_rf10_4_32'
}

rfm_tuner(params)

BGD301x_o37 - Loading training data from  /content/drive/MyDrive/ColabDrive/allstates/resources/train.csv
log the schema to verify the csv
root
 |-- id: integer (nullable = true)
 |-- cat1: string (nullable = true)
 |-- cat2: string (nullable = true)
 |-- cat3: string (nullable = true)
 |-- cat4: string (nullable = true)
 |-- cat5: string (nullable = true)
 |-- cat6: string (nullable = true)
 |-- cat7: string (nullable = true)
 |-- cat8: string (nullable = true)
 |-- cat9: string (nullable = true)
 |-- cat10: string (nullable = true)
 |-- cat11: string (nullable = true)
 |-- cat12: string (nullable = true)
 |-- cat13: string (nullable = true)
 |-- cat14: string (nullable = true)
 |-- cat15: string (nullable = true)
 |-- cat16: string (nullable = true)
 |-- cat17: string (nullable = true)
 |-- cat18: string (nullable = true)
 |-- cat19: string (nullable = true)
 |-- cat20: string (nullable = true)
 |-- cat21: string (nullable = true)
 |-- cat22: string (nullable = true)
 |-- cat23: stri



Root Mean Squared Error (RMSE) =  2296.5633006758185
Validation data MSE =  5274202.99401101
Validation data RMSE =  2296.5633006758185
Validation data R-squared =  0.3780186003664311
Validation data MAE =  1454.8678000676305
Validation data Explained variance =  2455157.8613324366


### 6.2. 3 - 4 - 32

In [None]:
params = {
  'trainInput': '/content/drive/MyDrive/ColabDrive/allstates/resources/train.csv',
  'numTrees': 3,
  'maxDepth': 4,
  'maxBins': 32,
  'modelPath': '/content/drive/MyDrive/ColabDrive/models/allstates_rf3_4_32'
}

rfm_tuner(params)

-> BGD301x_o37 - Loading training data from  /content/drive/MyDrive/ColabDrive/allstates/resources/train.csv
-> BGD301x_o38 - Convert data to LabeledPoint RDD
-> log the schema to verify the csv
root
 |-- id: integer (nullable = true)
 |-- cat1: string (nullable = true)
 |-- cat2: string (nullable = true)
 |-- cat3: string (nullable = true)
 |-- cat4: string (nullable = true)
 |-- cat5: string (nullable = true)
 |-- cat6: string (nullable = true)
 |-- cat7: string (nullable = true)
 |-- cat8: string (nullable = true)
 |-- cat9: string (nullable = true)
 |-- cat10: string (nullable = true)
 |-- cat11: string (nullable = true)
 |-- cat12: string (nullable = true)
 |-- cat13: string (nullable = true)
 |-- cat14: string (nullable = true)
 |-- cat15: string (nullable = true)
 |-- cat16: string (nullable = true)
 |-- cat17: string (nullable = true)
 |-- cat18: string (nullable = true)
 |-- cat19: string (nullable = true)
 |-- cat20: string (nullable = true)
 |-- cat21: string (nullable = tru



- Root Mean Squared Error (RMSE) =  2240.199896086758
- Validation data MSE =  5018495.574427121
- Validation data RMSE =  2240.199896086758
- Validation data R-squared =  0.38127144897412935
- Validation data MAE =  1478.7279755396378
- Validation data Explained variance =  2466765.4558192524


### 15 - 5 - 32

In [None]:
params = {
  'trainInput': '/content/drive/MyDrive/ColabDrive/allstates/resources/train.csv',
  'numTrees': 15,
  'maxDepth': 5,
  'maxBins': 32,
  'modelPath': '/content/drive/MyDrive/ColabDrive/models/allstates_rf15_5_32'
}

rfm_tuner(params)

-> BGD301x_o37 - Loading training data from  /content/drive/MyDrive/ColabDrive/allstates/resources/train.csv
-> log the schema to verify the csv
root
 |-- id: integer (nullable = true)
 |-- cat1: string (nullable = true)
 |-- cat2: string (nullable = true)
 |-- cat3: string (nullable = true)
 |-- cat4: string (nullable = true)
 |-- cat5: string (nullable = true)
 |-- cat6: string (nullable = true)
 |-- cat7: string (nullable = true)
 |-- cat8: string (nullable = true)
 |-- cat9: string (nullable = true)
 |-- cat10: string (nullable = true)
 |-- cat11: string (nullable = true)
 |-- cat12: string (nullable = true)
 |-- cat13: string (nullable = true)
 |-- cat14: string (nullable = true)
 |-- cat15: string (nullable = true)
 |-- cat16: string (nullable = true)
 |-- cat17: string (nullable = true)
 |-- cat18: string (nullable = true)
 |-- cat19: string (nullable = true)
 |-- cat20: string (nullable = true)
 |-- cat21: string (nullable = true)
 |-- cat22: string (nullable = true)
 |-- cat23

### 20 - 7 -32

In [None]:
params = {
  'trainInput': '/content/drive/MyDrive/ColabDrive/allstates/resources/train.csv',
  'numTrees': 20,
  'maxDepth': 7,
  'maxBins': 32,
  'modelPath': '/content/drive/MyDrive/ColabDrive/models/allstates_rf20_7_32'
}

rfm_tuner(params)

-> BGD301x_o37 - Loading training data from  /content/drive/MyDrive/ColabDrive/allstates/resources/train.csv
-> BGD301x_o38 - Convert data to LabeledPoint RDD
-> log the schema to verify the csv
root
 |-- id: integer (nullable = true)
 |-- cat1: string (nullable = true)
 |-- cat2: string (nullable = true)
 |-- cat3: string (nullable = true)
 |-- cat4: string (nullable = true)
 |-- cat5: string (nullable = true)
 |-- cat6: string (nullable = true)
 |-- cat7: string (nullable = true)
 |-- cat8: string (nullable = true)
 |-- cat9: string (nullable = true)
 |-- cat10: string (nullable = true)
 |-- cat11: string (nullable = true)
 |-- cat12: string (nullable = true)
 |-- cat13: string (nullable = true)
 |-- cat14: string (nullable = true)
 |-- cat15: string (nullable = true)
 |-- cat16: string (nullable = true)
 |-- cat17: string (nullable = true)
 |-- cat18: string (nullable = true)
 |-- cat19: string (nullable = true)
 |-- cat20: string (nullable = true)
 |-- cat21: string (nullable = tru



- Root Mean Squared Error (RMSE) =  2203.432733950088
- Validation data MSE =  4855115.813042758
- Validation data RMSE =  2203.432733950088
- Validation data R-squared =  0.45726522185584695
- Validation data MAE =  1355.3167200906312
- Validation data Explained variance =  3091417.2843252826


### 40 - 9 - 32

In [None]:
params = {
  'trainInput': '/content/drive/MyDrive/ColabDrive/allstates/resources/train.csv',
  'numTrees': 40,
  'maxDepth': 9,
  'maxBins': 32,
  'modelPath': '/content/drive/MyDrive/ColabDrive/models/allstates_rf40_9_32'
}

rfm_tuner(params)

-> BGD301x_o37 - Loading training data from  /content/drive/MyDrive/ColabDrive/allstates/resources/train.csv
-> BGD301x_o38 - Convert data to LabeledPoint RDD
-> log the schema to verify the csv
root
 |-- id: integer (nullable = true)
 |-- cat1: string (nullable = true)
 |-- cat2: string (nullable = true)
 |-- cat3: string (nullable = true)
 |-- cat4: string (nullable = true)
 |-- cat5: string (nullable = true)
 |-- cat6: string (nullable = true)
 |-- cat7: string (nullable = true)
 |-- cat8: string (nullable = true)
 |-- cat9: string (nullable = true)
 |-- cat10: string (nullable = true)
 |-- cat11: string (nullable = true)
 |-- cat12: string (nullable = true)
 |-- cat13: string (nullable = true)
 |-- cat14: string (nullable = true)
 |-- cat15: string (nullable = true)
 |-- cat16: string (nullable = true)
 |-- cat17: string (nullable = true)
 |-- cat18: string (nullable = true)
 |-- cat19: string (nullable = true)
 |-- cat20: string (nullable = true)
 |-- cat21: string (nullable = tru

### 60 - 10 - 48

In [None]:
params = {
  'trainInput': '/content/drive/MyDrive/ColabDrive/allstates/resources/train.csv',
  'numTrees': 60,
  'maxDepth': 10,
  'maxBins': 48,
  'modelPath': '/content/drive/MyDrive/ColabDrive/models/allstates_rf100_15_64'
}

rfm_tuner(params)

ConnectionRefusedError: [Errno 111] Connection refused

## 7. Run Random Forest Regressor on the test data and export submission file.

In [None]:
params = {
  'testInput': '/content/drive/MyDrive/ColabDrive/allstates/resources/test.csv',
  'outputFile': '/content/drive/MyDrive/ColabDrive/allstates/submission_rf40_9_32_1',
  'modelPath': '/content/drive/MyDrive/ColabDrive/models/allstates_rf40_9_32'
}

process_test_data(params)

-> log the schema to verify the csv
root
 |-- id: integer (nullable = true)
 |-- cat1: string (nullable = true)
 |-- cat2: string (nullable = true)
 |-- cat3: string (nullable = true)
 |-- cat4: string (nullable = true)
 |-- cat5: string (nullable = true)
 |-- cat6: string (nullable = true)
 |-- cat7: string (nullable = true)
 |-- cat8: string (nullable = true)
 |-- cat9: string (nullable = true)
 |-- cat10: string (nullable = true)
 |-- cat11: string (nullable = true)
 |-- cat12: string (nullable = true)
 |-- cat13: string (nullable = true)
 |-- cat14: string (nullable = true)
 |-- cat15: string (nullable = true)
 |-- cat16: string (nullable = true)
 |-- cat17: string (nullable = true)
 |-- cat18: string (nullable = true)
 |-- cat19: string (nullable = true)
 |-- cat20: string (nullable = true)
 |-- cat21: string (nullable = true)
 |-- cat22: string (nullable = true)
 |-- cat23: string (nullable = true)
 |-- cat24: string (nullable = true)
 |-- cat25: string (nullable = true)
 |-- cat

# Final full source code

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.evaluation import RegressionMetrics

#===============================================================================
# Transform each RDD item to LabeledPoint
#===============================================================================
def row_to_labeled_point(row, feature_columns, labelCol):
  '''
    Convert DataFrame row to LabeledPoint
  '''
  features = [row[col] for col in feature_columns]
  label = row[labelCol]
  return LabeledPoint(label, features)

#===============================================================================
# Transform data RDD to LabeledPoints RDD.
#===============================================================================
def transform_data_to_labeledpoints_rdd(data, labelCol):
  '''
    Transform RDD data input to LabeledPoint RDD.
  '''
  #*****************************************
  print("-> log the schema to verify the csv")
  #*****************************************
  data.printSchema()

  #*****************************************
  print('-> Handle categorical features - convert categorical columns to numerical values')
  #*****************************************
  # List of categorical columns
  categorical_columns = [f"cat{i}" for i in range(1, 117)]

  # Apply StringIndexer to each categorical column
  indexers = [StringIndexer(inputCol=column, outputCol=column + "_index").fit(data) for column in categorical_columns]

  # Transform data
  for indexer in indexers:
    data = indexer.transform(data)

  # Drop original categorical columns
  data = data.drop(*categorical_columns)

  # List of feature columns
  indexed_categorical_columns = [f"cat{i}_index" for i in range(1, 117)]
  continuous_columns = [f"cont{i}" for i in range(1, 15)]
  feature_columns = indexed_categorical_columns + continuous_columns

  return data.rdd.map(lambda x: row_to_labeled_point(x, feature_columns, labelCol))

#===============================================================================
# Train Random Forest Regression model tuner.
#===============================================================================
def rfm_tuner(params):
  '''
    Random Forest Regression model training and evaluation tuner.
  '''
  if params['trainInput'] is None \
    or params['modelPath'] is None \
    or params['numTrees'] is None \
    or params['maxDepth'] is None \
    or params['maxBins'] is None:
    raise ValueError("Missing tuner parameters. Please have a double-check")

  spark = SparkSession \
    .builder \
    .appName("assignment2_AllstateClaimsSeverityRandomForestRegressor") \
    .getOrCreate()

  #****************************
  print('-> BGD301x_o37 - Loading training data from ', params['trainInput'])
  #****************************

  data = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferschema', 'true') \
    .load(params['trainInput']) \
    .cache() \
    .withColumnRenamed("loss", "label")

  #****************************
  print('-> BGD301x_o38 - Convert data to LabeledPoint RDD')
  #****************************
  rdd = transform_data_to_labeledpoints_rdd(data, 'label')

  #*****************************************
  print("-> Preparing data - Split the data into training and test sets, part 1 - 70% data is for training, remain 30% is testing")
  #*****************************************
  (trainingData, valData) = rdd.randomSplit([0.7, 0.3])
  trainingData.cache()
  valData.cache()

  #*****************************************
  print("-> Train RandomForest regressor")
  #*****************************************
  model = RandomForest.trainRegressor(
      trainingData,
      categoricalFeaturesInfo={},
      numTrees=params['numTrees'],
      featureSubsetStrategy="auto",
      impurity="variance",
      maxDepth=params['maxDepth'],
      maxBins=params['maxBins']
  )

  #*****************************************
  print("-> Complete training Random Forest model, Saving model for the next use")
  #*****************************************
  model.save(spark.sparkContext, params['modelPath'])

  #*****************************************
  print("-> Evaluating model on train and val data and calculating RMSE")
  #*****************************************
  predictions = model.predict(valData.map(lambda x: x.features))

  # Zip predictions with actual values for evaluation
  prediction_and_label = predictions.zip(valData.map(lambda lp: lp.label))

  # Calculate evaluation metrics (example: RMSE)
  metrics = RegressionMetrics(prediction_and_label)
  print("- Root Mean Squared Error (RMSE) = ", metrics.rootMeanSquaredError)
  print("- Validation data MSE = ", metrics.meanSquaredError)
  print("- Validation data RMSE = ", metrics.rootMeanSquaredError)
  print("- Validation data R-squared = ", metrics.r2)
  print("- Validation data MAE = ", metrics.meanAbsoluteError)
  print("- Validation data Explained variance = ", metrics.explainedVariance)

  # Stop the Spark session
  spark.stop()

#===============================================================================
# Run Random Forest Regressor on the test data and export submission file.
#===============================================================================
def process_test_data(params):
  spark = SparkSession \
    .builder \
    .appName("assignment2_AllstateClaimsSeverityRandomForestRegressor") \
    .getOrCreate()

  testInput = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferschema', 'true') \
    .load(params['testInput']) \
    .cache()

  # Convert DataFrame to RDD[LabeledPoint]
  testRdd = transform_data_to_labeledpoints_rdd(testInput, "id")
  testRdd.cache()

  model = RandomForestModel.load(spark.sparkContext, params['modelPath'])

  #*****************************************
  print("-> Run the model on data test.")
  #*****************************************
  predictions = model.predict(testRdd.map(lambda x: x.features))

  print("-> Zip predictions with actual values for evaluation")
  prediction_and_label = testRdd.map(lambda lp: int(lp.label)).zip(predictions)

  # Convert predictions to DataFrame
  results = prediction_and_label.toDF(["id", "loss"])

  #*****************************************
  print("-> Save the results to a CSV file.")
  #*****************************************

  results.write.csv(params['outputFile'], header=True)

  spark.stop()

#===============================================================================
# Train Random Forest with different param sets to determine the best params
# configuration.
#
# Tested with the following values:
# - numTrees: 3, 10, 15, 20, 40
# - maxDepth: 4, 5, 7, 9
# - maxBins: 32, 48#
#===============================================================================
params = {
  'trainInput': '/content/drive/MyDrive/ColabDrive/allstates/resources/train.csv',
  'numTrees': 40,
  'maxDepth': 9,
  'maxBins': 32,
  'modelPath': '/content/drive/MyDrive/ColabDrive/models/allstates_rf40_9_32'
}

rfm_tuner(params)

#===============================================================================
# Run best model on the test data and export submission file.
#===============================================================================
params = {
  'testInput': '/content/drive/MyDrive/ColabDrive/allstates/resources/test.csv',
  'outputFile': '/content/drive/MyDrive/ColabDrive/allstates/submission_rf40_9_32',
  'modelPath': '/content/drive/MyDrive/ColabDrive/models/allstates_rf40_9_32'
}

process_test_data(params)