In [2]:
import os

""" # Set Spark version
spark_version = 'spark-3.5.2'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-23"
os.environ["SPARK_HOME"] = "C:\\spark-3.5.3-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()
"""
# Import necessary libraries
import pandas as pd
import numpy as np
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.multioutput import MultiOutputRegressor
from sklearn.metrics import mean_squared_error

In [3]:
# Start a Spark session
spark = SparkSession.builder.appName("CoffeeRatings").getOrCreate()

In [6]:
# Load the dataset into a Spark DataFrame
file_name = '/content/drive/MyDrive/Coffee practice/coffee_lat_lon_updated_cleaned(latest2).csv'  # The name should match the uploaded file
df_spark = spark.read.csv(file_name, header=True, inferSchema=True)

# Show the first few rows to confirm the DataFrame is loaded
df_spark.show(5)



+--------------------+--------------------+------+--------------------+----------------+--------------------+------------+---------+-----------+-----+----+----+------+----------+---------+--------------------+--------------------+--------------------+-----------------+------------------+---------------+----------------+-------------------+--------------------+
|                name|                slug|rating|             roaster|        location|              origin|       roast|cost_12oz|review_date|aroma|acid|body|flavor|aftertaste|with_milk|              desc_1|              desc_2|              desc_3|Location_Latitude|Location_Longitude|Origin_Latitude|Origin_Longitude|Blend/Single Origin|      desc_1_cleaned|
+--------------------+--------------------+------+--------------------+----------------+--------------------+------------+---------+-----------+-----+----+----+------+----------+---------+--------------------+--------------------+--------------------+-----------------+-----

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [7]:
# Drop the 'with_milk' column
df_spark = df_spark.drop('with_milk')

# Drop rows with missing values in the target columns
df_spark = df_spark.dropna(subset=['aroma', 'acid', 'aftertaste'])

# Drop unnecessary columns: 'desc_2', 'desc_3'
df_spark = df_spark.drop('desc_2', 'desc_3')

# Show remaining missing values
df_spark.select([df_spark.columns[i] for i in range(len(df_spark.columns)) if df_spark.agg({df_spark.columns[i]: "sum"}).collect()[0][0] is None]).show()


+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+
|                name|                slug|             roaster|            location|              origin|       roast|              desc_1|
+--------------------+--------------------+--------------------+--------------------+--------------------+------------+--------------------+
|GW01 Finca Sophia...|https://www.coffe...|           GK Coffee|       Yilan, Taiwan|Nueva Suiza, Chir...|Medium-Light|Graceful, polishe...|
|Panama Carmen Gei...|https://www.coffe...|           GK Coffee|       Yilan, Taiwan|  Paso Ancho, Panama|       Light|Multi-layered, su...|
|Ninety Plus Panam...|https://www.coffe...|Plat Coffee Roastery|    Hong Kong, China|Chiriqui Province...|Medium-Light|Elegantly fruit-t...|
|Panama Mokkita Na...|https://www.coffe...|   Paradise Roasters|        Hilo, Hawaii|     Boquete, Panama|       Light|Richly floral, fr...|
|   Mama Cata

In [8]:
# Check data types of the relevant columns
df_spark.printSchema()

# Check for null values in the relevant columns
df_spark.select([col for col in ['cost_12oz', 'rating', 'roaster', 'origin', 'roast']]).show()

# Verify that all relevant columns are numeric and have no nulls
df_spark.select([col("cost_12oz").isNull(), col("rating").isNull()]).show()


root
 |-- name: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- roaster: string (nullable = true)
 |-- location: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- roast: string (nullable = true)
 |-- cost_12oz: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- aroma: string (nullable = true)
 |-- acid: string (nullable = true)
 |-- body: string (nullable = true)
 |-- flavor: string (nullable = true)
 |-- aftertaste: string (nullable = true)
 |-- desc_1: string (nullable = true)
 |-- Location_Latitude: string (nullable = true)
 |-- Location_Longitude: string (nullable = true)
 |-- Origin_Latitude: string (nullable = true)
 |-- Origin_Longitude: string (nullable = true)
 |-- Blend/Single Origin: string (nullable = true)
 |-- desc_1_cleaned: string (nullable = true)

+---------+------+--------------------+--------------------+------------+
|cost_12oz|rating|             roaster|              o

In [9]:
from pyspark.sql.functions import col

# Convert relevant columns to double (numeric type)
columns_to_convert = ['aroma', 'acid', 'body', 'flavor', 'aftertaste']

for column in columns_to_convert:
    df_spark = df_spark.withColumn(column, col(column).cast("double"))

# Verify that the conversion was successful
df_spark.printSchema()


root
 |-- name: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- roaster: string (nullable = true)
 |-- location: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- roast: string (nullable = true)
 |-- cost_12oz: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- aroma: double (nullable = true)
 |-- acid: double (nullable = true)
 |-- body: double (nullable = true)
 |-- flavor: double (nullable = true)
 |-- aftertaste: double (nullable = true)
 |-- desc_1: string (nullable = true)
 |-- Location_Latitude: string (nullable = true)
 |-- Location_Longitude: string (nullable = true)
 |-- Origin_Latitude: string (nullable = true)
 |-- Origin_Longitude: string (nullable = true)
 |-- Blend/Single Origin: string (nullable = true)
 |-- desc_1_cleaned: string (nullable = true)



In [10]:
# Check and drop rows with null values in the original columns
# Assuming df_spark is your main DataFrame
train_data, test_data = df_spark.randomSplit([0.8, 0.2], seed=42)
columns_to_check = ['cost_12oz', 'aroma', 'acid', 'body', 'flavor', 'aftertaste', 'roaster', 'origin', 'roast']
train_data = train_data.na.drop(subset=columns_to_check)
test_data = test_data.na.drop(subset=columns_to_check)



In [11]:
from pyspark.sql.functions import col

# Convert 'cost_12oz' to double
train_data = train_data.withColumn("cost_12oz", col("cost_12oz").cast("double"))
test_data = test_data.withColumn("cost_12oz", col("cost_12oz").cast("double"))

# Verify the schema to ensure the conversion
train_data.printSchema()


root
 |-- name: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- roaster: string (nullable = true)
 |-- location: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- roast: string (nullable = true)
 |-- cost_12oz: double (nullable = true)
 |-- review_date: string (nullable = true)
 |-- aroma: double (nullable = true)
 |-- acid: double (nullable = true)
 |-- body: double (nullable = true)
 |-- flavor: double (nullable = true)
 |-- aftertaste: double (nullable = true)
 |-- desc_1: string (nullable = true)
 |-- Location_Latitude: string (nullable = true)
 |-- Location_Longitude: string (nullable = true)
 |-- Origin_Latitude: string (nullable = true)
 |-- Origin_Longitude: string (nullable = true)
 |-- Blend/Single Origin: string (nullable = true)
 |-- desc_1_cleaned: string (nullable = true)



In [12]:
from pyspark.sql.functions import col

# Convert 'rating' to double in both train and test data
train_data = train_data.withColumn("rating", col("rating").cast("double"))
test_data = test_data.withColumn("rating", col("rating").cast("double"))

# Verify the schema to ensure the conversion
train_data.printSchema()
test_data.printSchema()

root
 |-- name: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- roaster: string (nullable = true)
 |-- location: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- roast: string (nullable = true)
 |-- cost_12oz: double (nullable = true)
 |-- review_date: string (nullable = true)
 |-- aroma: double (nullable = true)
 |-- acid: double (nullable = true)
 |-- body: double (nullable = true)
 |-- flavor: double (nullable = true)
 |-- aftertaste: double (nullable = true)
 |-- desc_1: string (nullable = true)
 |-- Location_Latitude: string (nullable = true)
 |-- Location_Longitude: string (nullable = true)
 |-- Origin_Latitude: string (nullable = true)
 |-- Origin_Longitude: string (nullable = true)
 |-- Blend/Single Origin: string (nullable = true)
 |-- desc_1_cleaned: string (nullable = true)

root
 |-- name: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- roaster:

In [13]:
# Check for non-numeric values in 'rating' column
train_data.filter(col("rating").cast("double").isNull()).select("rating").show()


+------+
|rating|
+------+
+------+



In [16]:
# Modify StringIndexer to handle unseen labels
from pyspark.ml.feature import StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="skip").fit(train_data) for column in ['roaster', 'origin', 'roast']]


In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor

# defining the stages for the pipeline
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(train_data) for column in ['roaster', 'origin', 'roast']]
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_vec") for column in ['roaster', 'origin', 'roast']]

# Assembling the features
assembler = VectorAssembler(inputCols=['cost_12oz', 'aroma', 'acid', 'body', 'flavor', 'aftertaste'] + [col+"_vec" for col in ['roaster', 'origin', 'roast']], outputCol="features")

# Define the RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol="rating")

# Create the pipeline
pipeline_simple = Pipeline(stages=indexers + encoders + [assembler, rf])

# Now you can fit the model
model_simple = pipeline_simple.fit(train_data)



In [18]:
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Convert 'rating' to double in both train and test data
train_data = train_data.withColumn("rating", col("rating").cast("double"))
test_data = test_data.withColumn("rating", col("rating").cast("double"))

# Ensure that 'cost_12oz' is also of type double
train_data = train_data.withColumn("cost_12oz", col("cost_12oz").cast("double"))
test_data = test_data.withColumn("cost_12oz", col("cost_12oz").cast("double"))

# Modify StringIndexer to handle unseen labels
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="skip").fit(train_data) for column in ['roaster', 'origin', 'roast']]
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_vec") for column in ['roaster', 'origin', 'roast']]
assembler = VectorAssembler(inputCols=['cost_12oz', 'aroma', 'acid', 'body', 'flavor', 'aftertaste'] + [col+"_vec" for col in ['roaster', 'origin', 'roast']], outputCol="features")

# Define the RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol="rating")

# Create the pipeline
pipeline_simple = Pipeline(stages=indexers + encoders + [assembler, rf])

# Fit the simplified model
model_simple = pipeline_simple.fit(train_data)

# Make predictions
predictions_simple = model_simple.transform(test_data)

# Define the evaluator for regression
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")

# Evaluate the simplified model
rmse_simple = evaluator.evaluate(predictions_simple)

print(f"RMSE of simplified model: {rmse_simple}")


RMSE of simplified model: 0.5133443318676517


In [19]:
# Fit the simplified model
model_simple = pipeline_simple.fit(train_data)

# Check if the simplified pipeline works
predictions_simple = model_simple.transform(test_data)

# Evaluate the simplified model
rmse_simple = evaluator.evaluate(predictions_simple)

print(f"RMSE of simplified model: {rmse_simple}")


RMSE of simplified model: 0.5133443318676517


In [20]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

# Simplified pipeline
assembler = VectorAssembler(inputCols=['cost_12oz'], outputCol="features")

# Random Forest Regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="rating", numTrees=10, maxDepth=5)

# Simple pipeline with only one feature
pipeline_simple = Pipeline(stages=[assembler, rf])

# Fit the simplified model
model_simple = pipeline_simple.fit(train_data)

# Check if the simplified pipeline works
predictions_simple = model_simple.transform(test_data)
predictions_simple.show()


+--------------------+--------------------+------+--------------------+--------------------+--------------------+------------+---------+-----------+-----+----+----+------+----------+--------------------+-----------------+------------------+---------------+----------------+-------------------+--------------------+--------+-----------------+
|                name|                slug|rating|             roaster|            location|              origin|       roast|cost_12oz|review_date|aroma|acid|body|flavor|aftertaste|              desc_1|Location_Latitude|Location_Longitude|Origin_Latitude|Origin_Longitude|Blend/Single Origin|      desc_1_cleaned|features|       prediction|
+--------------------+--------------------+------+--------------------+--------------------+--------------------+------------+---------+-----------+-----+----+----+------+----------+--------------------+-----------------+------------------+---------------+----------------+-------------------+--------------------+--

In [21]:
# Add more features
assembler = VectorAssembler(inputCols=['cost_12oz', 'aroma', 'acid'], outputCol="features")

# Update the pipeline
pipeline_simple = Pipeline(stages=[assembler, rf])

# Fit the model
model_simple = pipeline_simple.fit(train_data)

# Check predictions
predictions_simple = model_simple.transform(test_data)
predictions_simple.show()


+--------------------+--------------------+------+--------------------+--------------------+--------------------+------------+---------+-----------+-----+----+----+------+----------+--------------------+-----------------+------------------+---------------+----------------+-------------------+--------------------+------------------+-----------------+
|                name|                slug|rating|             roaster|            location|              origin|       roast|cost_12oz|review_date|aroma|acid|body|flavor|aftertaste|              desc_1|Location_Latitude|Location_Longitude|Origin_Latitude|Origin_Longitude|Blend/Single Origin|      desc_1_cleaned|          features|       prediction|
+--------------------+--------------------+------+--------------------+--------------------+--------------------+------------+---------+-----------+-----+----+----+------+----------+--------------------+-----------------+------------------+---------------+----------------+-------------------+---

In [22]:
assembler = VectorAssembler(inputCols=['cost_12oz', 'aroma', 'acid', 'body', 'flavor', 'aftertaste'], outputCol="features", handleInvalid='skip')


In [23]:
# List of columns to check for nulls and remove them
columns_to_check = [
    'cost_12oz',
    'aroma',
    'acid',
    'body',
    'flavor',
    'aftertaste',
    'roaster',
    'origin',
    'roast'
]

# Drop rows with null values in any of these columns
train_data = train_data.na.drop(subset=columns_to_check)
test_data = test_data.na.drop(subset=columns_to_check)

# Verify that no null values remain in these columns
train_data.select([col for col in columns_to_check]).show()
test_data.select([col for col in columns_to_check]).show()


+---------+-----+----+----+------+----------+--------------------+--------------------+------------+
|cost_12oz|aroma|acid|body|flavor|aftertaste|             roaster|              origin|       roast|
+---------+-----+----+----+------+----------+--------------------+--------------------+------------+
|    47.04| 18.0|16.0|18.0|  18.0|      16.0|Pacific Coffee Re...|Ka‘ū growing regi...|       Light|
|     43.5| 18.0|16.0|16.0|  18.0|      16.0|   Kona Hills Coffee|Kona growing dist...|Medium-Light|
|    47.91| 18.0|18.0|18.0|  18.0|      16.0|    Kona Farm Direct|Kona growing regi...|Medium-Light|
|     19.5| 16.0|16.0|18.0|  18.0|      14.0|Colibrije Special...|Chiapas State, Me...|      Medium|
|    15.38| 16.0|16.0|16.0|  16.0|      16.0|    Coffee by Design|            Ethiopia|Medium-Light|
|     4.81| 18.0|16.0|16.0|  18.0|      16.0|     Mr. Chao Coffee|               Kenya|Medium-Light|
|    13.99| 18.0|16.0|16.0|  18.0|      16.0|        Coffee Hound| Costa Rica; Sumatra|Medi

In [24]:
# Check the schema of the DataFrame to confirm data types
train_data.printSchema()

# Check for null values in each column
for column in columns_to_check:
    null_count = train_data.filter(train_data[column].isNull()).count()
    print(f"Null values in {column}: {null_count}")


root
 |-- name: string (nullable = true)
 |-- slug: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- roaster: string (nullable = true)
 |-- location: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- roast: string (nullable = true)
 |-- cost_12oz: double (nullable = true)
 |-- review_date: string (nullable = true)
 |-- aroma: double (nullable = true)
 |-- acid: double (nullable = true)
 |-- body: double (nullable = true)
 |-- flavor: double (nullable = true)
 |-- aftertaste: double (nullable = true)
 |-- desc_1: string (nullable = true)
 |-- Location_Latitude: string (nullable = true)
 |-- Location_Longitude: string (nullable = true)
 |-- Origin_Latitude: string (nullable = true)
 |-- Origin_Longitude: string (nullable = true)
 |-- Blend/Single Origin: string (nullable = true)
 |-- desc_1_cleaned: string (nullable = true)

Null values in cost_12oz: 0
Null values in aroma: 0
Null values in acid: 0
Null values in body: 0
Null values in flavor: 0
Null 

In [25]:
# Drop rows with null values in the relevant columns
columns_to_check = [
    'cost_12oz',
    'aroma',
    'flavor',
    'aftertaste'
]

# Drop rows with nulls in these columns
train_data_clean = train_data.na.drop(subset=columns_to_check)
test_data_clean = test_data.na.drop(subset=columns_to_check)

# Verify no null values remain
for column in columns_to_check:
    null_count = train_data_clean.filter(train_data_clean[column].isNull()).count()
    print(f"Null values in {column} after cleaning: {null_count}")


Null values in cost_12oz after cleaning: 0
Null values in aroma after cleaning: 0
Null values in flavor after cleaning: 0
Null values in aftertaste after cleaning: 0


In [27]:
import csv
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

# Step 1: Encoding Categorical Variables with handleInvalid='skip'
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='skip').fit(train_data_clean) for column in ['roaster', 'origin', 'roast']]

# One-Hot Encoding the indexed columns
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_vec") for column in ['roaster', 'origin', 'roast']]

# Step 2: Assembling the Features
assembler = VectorAssembler(inputCols=['cost_12oz', 'aroma', 'acid', 'body', 'flavor', 'aftertaste', 'roaster_vec', 'origin_vec', 'roast_vec'], outputCol="features")

# Step 3: Define the RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol="rating")

# Creating a Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

# Initialize a list to store the results
results = []

# Step 4: Define the parameter grid
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 150])
             .addGrid(rf.maxDepth, [5, 10, 15])
             .addGrid(rf.maxBins, [32, 64])
             .addGrid(rf.minInstancesPerNode, [1, 2])
             .build())

# Step 5: Define the cross-validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse"),
                          numFolds=3)  # Use 3 folds for cross-validation

# Step 6: Run cross-validation to tune hyperparameters
cvModel = crossval.fit(train_data_clean)

# Step 7: Extract hyperparameters and RMSE for each model
for params, metric in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics):
    result = {
        "numTrees": params[rf.numTrees],
        "maxDepth": params[rf.maxDepth],
        "maxBins": params[rf.maxBins],
        "minInstancesPerNode": params[rf.minInstancesPerNode],
        "RMSE": metric
    }
    results.append(result)

# Step 8: Save the results to a CSV file
csv_file = 'model_tuning_results.csv'
with open(csv_file, mode='w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=["numTrees", "maxDepth", "maxBins", "minInstancesPerNode", "RMSE"])
    writer.writeheader()
    writer.writerows(results)

# Step 9: Evaluate the best model
bestModel = cvModel.bestModel
predictions = bestModel.transform(test_data_clean)
rmse = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse").evaluate(predictions)

print(f"Best RMSE on test data: {rmse}")
print(f"Best Model Params: {bestModel.stages[-1].extractParamMap()}")  # Extract best model parameters


Best RMSE on test data: 0.24428841078670538
Best Model Params: {Param(parent='RandomForestRegressor_378c601dd7d0', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestRegressor_378c601dd7d0', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestRegressor_378c601dd7d0', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestRegressor_378c601dd7d0', name='featureSubsetStrategy', doc="The number of

In [28]:
from pyspark.sql.functions import col

# Create interaction features
train_data_clean = train_data_clean.withColumn('aroma_acid', col('aroma') * col('acid'))
train_data_clean = train_data_clean.withColumn('aroma_body', col('aroma') * col('body'))
train_data_clean = train_data_clean.withColumn('acid_body', col('acid') * col('body'))

test_data_clean = test_data_clean.withColumn('aroma_acid', col('aroma') * col('acid'))
test_data_clean = test_data_clean.withColumn('aroma_body', col('aroma') * col('body'))
test_data_clean = test_data_clean.withColumn('acid_body', col('acid') * col('body'))

In [29]:
# Update the assembler to include the new interaction features
assembler = VectorAssembler(inputCols=[
    'cost_12oz',
    'aroma',
    'acid',
    'body',
    'flavor',
    'aftertaste',
    'roaster_vec',
    'origin_vec',
    'roast_vec',
    'aroma_acid',
    'aroma_body',
    'acid_body'
], outputCol="features")

In [30]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

# Define the RandomForestRegressor with the new feature set
rf = RandomForestRegressor(featuresCol="features", labelCol="rating")

# Creating a Pipeline that includes the new feature assembler
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

# Fit the model using the training data
model = pipeline.fit(train_data_clean)

# Make predictions on the test data
predictions = model.transform(test_data_clean)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE) on test data after adding interaction features: {rmse}")

Root Mean Squared Error (RMSE) on test data after adding interaction features: 0.37963322628434437


In [31]:
import csv

# Initialize a list to store the results
results = []

# Document the results
results.append({
    "Interaction_Features_Used": "Yes",
    "numTrees": rf.getOrDefault("numTrees"),
    "maxDepth": rf.getOrDefault("maxDepth"),
    "maxBins": rf.getOrDefault("maxBins"),
    "minInstancesPerNode": rf.getOrDefault("minInstancesPerNode"),
    "RMSE": rmse
})

# Save the results to a CSV file
csv_file = 'model_tuning_results_with_interactions.csv'
with open(csv_file, mode='w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=["Interaction_Features_Used", "numTrees", "maxDepth", "maxBins", "minInstancesPerNode", "RMSE"])
    writer.writeheader()
    writer.writerows(results)

print(f"Results have been saved to {csv_file}")

Results have been saved to model_tuning_results_with_interactions.csv


In [32]:
from pyspark.ml.feature import PolynomialExpansion

# Create polynomial features for numerical columns
poly_expansion = PolynomialExpansion(inputCol="features", outputCol="poly_features", degree=2)

# Update the assembler to use polynomial features instead of the original features
assembler = VectorAssembler(inputCols=[
    'log_cost_12oz',
    'aroma',
    'acid',
    'body',
    'flavor',
    'aftertaste',
    'roaster_vec',
    'origin_vec',
    'roast_vec',
    'aroma_acid',
    'aroma_body',
    'acid_body'
], outputCol="features")

# Create a pipeline including polynomial expansion
pipeline = Pipeline(stages=indexers + encoders + [assembler, poly_expansion, rf])

In [33]:
from pyspark.ml.feature import StandardScaler

# Scale the features
scaler = StandardScaler(inputCol="poly_features", outputCol="scaled_features")

# Update the pipeline to include scaling
pipeline = Pipeline(stages=indexers + encoders + [assembler, poly_expansion, scaler, rf])

In [34]:
from pyspark.sql.functions import log

# Apply log transformation to 'cost_12oz' and create 'log_cost_12oz'
train_data_clean = train_data_clean.withColumn('log_cost_12oz', log(col('cost_12oz') + 1))
test_data_clean = test_data_clean.withColumn('log_cost_12oz', log(col('cost_12oz') + 1))

# Check if the column exists
train_data_clean.select("log_cost_12oz").show(5)

+------------------+
|     log_cost_12oz|
+------------------+
|3.8720339972117825|
|3.7954891891721947|
|3.8899818745512658|
|3.0204248861443626|
|2.7960610784249234|
+------------------+
only showing top 5 rows



In [35]:
assembler = VectorAssembler(inputCols=[
    'log_cost_12oz',
    'aroma',
    'acid',
    'body',
    'flavor',
    'aftertaste',
    'roaster_vec',
    'origin_vec',
    'roast_vec',
    'aroma_acid',
    'aroma_body',
    'acid_body'
], outputCol="features")

In [36]:
# Define the RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol="rating")

# Create the pipeline with the new feature set
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

# Run cross-validation
cvModel = crossval.fit(train_data_clean)

# Evaluate the best model
bestModel = cvModel.bestModel
predictions = bestModel.transform(test_data_clean)
rmse = evaluator.evaluate(predictions)

print(f"Best RMSE on test data with advanced feature engineering and cross-validation: {rmse}")

Best RMSE on test data with advanced feature engineering and cross-validation: 0.24428841078670538


In [37]:
# Ensure accessing the correct stage
rf_model_stage = bestModel.stages[-1]

# Extract the parameter map from the best model stage
best_params = rf_model_stage.extractParamMap()

# access parameters using .get() to avoid KeyErrors
num_trees = best_params.get(rf.numTrees, "Not Set")
max_depth = best_params.get(rf.maxDepth, "Not Set")
max_bins = best_params.get(rf.maxBins, "Not Set")
min_instances = best_params.get(rf.minInstancesPerNode, "Not Set")

# Prepare the result dictionary
result = {
    "numTrees": num_trees,
    "maxDepth": max_depth,
    "maxBins": max_bins,
    "minInstancesPerNode": min_instances,
    "RMSE": rmse_simple,
    "Features": "cost_12oz, aroma, acid, body, flavor, aftertaste, roaster_vec, origin_vec, roast_vec"
}

# Convert to DataFrame for logging
result_df = pd.DataFrame([result])

# Append or save the result to the CSV
result_df.to_csv("model_results.csv", mode='a', header=not os.path.exists("model_results.csv"), index=False)

print("Results logged successfully.")


Results logged successfully.


In [38]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Define the parameter grid again
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 150])
             .addGrid(rf.maxDepth, [5, 10, 15])
             .addGrid(rf.maxBins, [32, 64])
             .addGrid(rf.minInstancesPerNode, [1, 2])
             .build())

# Set up the cross-validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse"),
                          numFolds=3)  # 3-fold cross-validation

# Run cross-validation
cvModel = crossval.fit(train_data_clean)

# Evaluate the best model
bestModel = cvModel.bestModel
predictions = bestModel.transform(test_data_clean)
rmse = evaluator.evaluate(predictions)

print(f"Best RMSE on test data with advanced feature engineering and cross-validation: {rmse}")


Best RMSE on test data with advanced feature engineering and cross-validation: 0.21023411898273872


In [39]:
# After the model evaluation
import csv

# Log the results to a CSV file
csv_file = 'model_tuning_results.csv'
with open(csv_file, mode='a', newline='') as file:  # 'a' for append mode
    writer = csv.DictWriter(file, fieldnames=["numTrees", "maxDepth", "maxBins", "minInstancesPerNode", "RMSE"])

    # Check if the file is empty, write the header
    if file.tell() == 0:
        writer.writeheader()

    # Extract best model hyperparameters
    best_params = bestModel.stages[-1].extractParamMap()
    result = {
        "numTrees": best_params[rf.numTrees],
        "maxDepth": best_params[rf.maxDepth],
        "maxBins": best_params[rf.maxBins],
        "minInstancesPerNode": best_params[rf.minInstancesPerNode],
        "RMSE": rmse
    }
    writer.writerow(result)

print(f"Results have been saved to {csv_file}")

Results have been saved to model_tuning_results.csv
