In [0]:
dataPath = "/FileStore/tables/df_silver.csv"
df0 = spark.read.format("csv")\
  .option("header","true")\
  .option("inferSchema", "true")\
  .load(dataPath)

display(df0)

name,review_count,categories,rating,latitude,price,distance,longitude,postal_code
El Sur,769,Spanish,4.5,40.41105,2.0,1323.9351440693945,-3.6995454,28012
Alhambra,448,Spanish,4.0,40.41587,2.0,847.3790987480967,-3.7016,28012
La Bodega de Los Secretos,31,Spanish,4.5,40.410683,3.0,1375.936331500051,-3.694688,28014
Casa Julio,42,Spanish,4.5,40.42427,2.0,546.9655482728417,-3.70368,28004
Celso y Manolo,182,Spanish,4.5,40.420185,2.0,297.6040361411656,-3.6974769,28004
Taberna El Sur de Huertas,41,Spanish,5.0,40.41366,,1036.183034652163,-3.69943,28014
Malacatín,24,Spanish,4.5,40.41031,,1638.679922735727,-3.70765,28005
Más Al Sur,126,Spanish,4.5,40.41002,2.0,1409.3953170255295,-3.69693,28012
La Casa del Abuelo,161,Spanish,4.0,40.415768,2.0,859.0034856078098,-3.701606,28012
Oink,16,Spanish,4.5,40.42004,1.0,398.07076724778057,-3.700481,28013


In [0]:
from pyspark.sql.functions import col,length

#Here we notice that there are some null values for the "price" column: there are some restaurants that don't have a price feature (so how expensive or cheap they are). Let's see how many are there to decide how to clean them (remove or mean imputation or regression...)
df0.where(length(col("price")) > 1).count()


Out[2]: 45

We notice that there are 45 values which are missing for price. Since price is an important variable as we saw in the EDA, we will not remove it. Plus, we have very few variables, so removing one would not be rational. Also, imputing would bias the data in our opinion, so we will just proceed to remove the rows with these nulls

In [0]:
#Remove rows with nulls
df0 = df0.where(length(col("price")) < 2)

#Also remove columns that we won't use: name (no use), postal code (no use), longitude and latitude (because we have distance)
df0 = df0.drop('name')
df0 = df0.drop('postal_code')
df0 = df0.drop('longitude')
df0 = df0.drop('latitude')
df0.write.csv('Newrestsss.csv', header=True)

In [0]:
# Load the CSV file from the data section of Databricks
df = spark.read.csv('/Newrestsss.csv', header=True)

In [0]:
from pyspark.ml.feature import StringIndexer
# We only have 1 categorical variable, which is "categories", so we just labelEncode it using StringIndexer
indexer = StringIndexer(inputCol='categories', outputCol='categories_index')
df = indexer.fit(df).transform(df)

In [0]:
#We have to make sure that all of the variables are numerical for the ML process
df = df.withColumn("rating", df["rating"].cast('double'))
df = df.withColumn("review_count", df["review_count"].cast('double'))
# df = df.withColumn("latitude", df["latitude"].cast('double'))
df = df.withColumn("price", df["price"].cast('double'))
df = df.withColumn("distance", df["distance"].cast('double'))
# df = df.withColumn("postal_code", df["postal_code"].cast('double'))
df = df.withColumn("rating", df["rating"].cast('double'))
# df = df.withColumn("longitude", df["longitude"].cast('double'))
df = df.withColumn("categories_index", df["categories_index"].cast('double'))
df = df.drop('categories')
display(df)

review_count,rating,price,distance,categories_index
769.0,4.5,2.0,1323.9351440693945,0.0
448.0,4.0,2.0,847.3790987480967,0.0
31.0,4.5,3.0,1375.936331500051,0.0
42.0,4.5,2.0,546.9655482728417,0.0
182.0,4.5,2.0,297.6040361411656,0.0
126.0,4.5,2.0,1409.3953170255295,0.0
161.0,4.0,2.0,859.0034856078098,0.0
16.0,4.5,1.0,398.07076724778057,0.0
94.0,4.5,3.0,1035.445088896326,0.0
33.0,5.0,3.0,1168.8084348720097,0.0


In [0]:
#Let's save this dataframe as a gold and final table: this is the table pre-processed for ML problem
df.write.format("delta").mode("overwrite").saveAsTable("gold_table")

In [0]:
# Split the data into training and test sets
(train_df, test_df) = df.randomSplit([0.8, 0.2])
    
    
# Print the number of records
print(f'There are {train_df.cache().count()} records in the training dataset.')
print(f'There are {test_df.cache().count()} records in the testing dataset.')

There are 124 records in the training dataset.
There are 31 records in the testing dataset.


In [0]:
#Import the necessary libraries
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
import mlflow
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StandardScaler

#We will have one experiment where we will group all of the runs, each containing a different estimator/regressor (linear regression, rf and dt)
experiment_name = "/Users/mviol.exstudents2022@student.ie.edu/our_models_final_project"
experiment_id = mlflow.create_experiment(experiment_name)

with mlflow.start_run(run_name="model_LR", experiment_id=experiment_id) as run:
    vector_assembler = VectorAssembler(inputCols=['review_count', 'categories_index', 'price', 'distance'], outputCol='features')
    # Standardize the columns using StandardScaler
    scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withStd=True, withMean=False)

    

    # Print the number of records
    print(f'There are {train_df.cache().count()} records in the training dataset.')
    print(f'There are {test_df.cache().count()} records in the testing dataset.')


    # Define the regressors
    lr = LinearRegression(featuresCol='scaled_features', labelCol='rating', predictionCol="prediction_lr")
    

    # Define the pipeline
    pipeline = Pipeline(stages=[vector_assembler, scaler, lr])

    # Train the model
    print(train_df)
    model = pipeline.fit(train_df)
    
    
    # Log parameters
    mlflow.log_param("target_variable", "rating")

    # Log the model for this run
    mlflow.spark.log_model(model, "SparkML-linear-regression")

    # Evaluate the model on the test set
    lr_predictions = model.transform(test_df)
    
    
    # Save the prediction as csv
    lr_predictions.toPandas().to_csv('lr_predictions.csv', index=False)
   
    
    # Log the saved prediction as artifact
    mlflow.log_artifact("lr_predictions.csv")

    
    
    # Evaluate predictions for linear regression
    regressionEvaluator_lr = RegressionEvaluator(predictionCol="prediction_lr", labelCol="rating")
    rmse_lr = regressionEvaluator_lr.setMetricName("rmse").evaluate(lr_predictions)
    r2_lr = regressionEvaluator_lr.setMetricName("r2").evaluate(lr_predictions)
    mse_lr = regressionEvaluator_lr.setMetricName("mse").evaluate(lr_predictions)
    mae_lr = regressionEvaluator_lr.setMetricName("mae").evaluate(lr_predictions)
    
    # Log metrics
    mlflow.log_metric("rmse", rmse_lr)  
    mlflow.log_metric("mae", mae_lr)    
    


There are 124 records in the training dataset.
There are 31 records in the testing dataset.
DataFrame[review_count: double, rating: double, price: double, distance: double, categories_index: double]
  Unable to convert the field features. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT()
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [0]:
#For decision trees and random forest scaling is not required, but it doesn't hurt so we didn't remove it.

with mlflow.start_run(run_name="model_dt",  experiment_id=experiment_id) as run:
    vector_assembler = VectorAssembler(inputCols=['review_count', 'categories_index', 'price', 'distance'], outputCol='features')
    # Standardize the columns using StandardScaler
    scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withStd=True, withMean=False)

    

    # Print the number of records
    print(f'There are {train_df.cache().count()} records in the training dataset.')
    print(f'There are {test_df.cache().count()} records in the testing dataset.')


    # Define the regressors
    dt = DecisionTreeRegressor(featuresCol='scaled_features', labelCol='rating', predictionCol="prediction_dt")
    
    
    # Define the pipeline
    pipeline = Pipeline(stages=[vector_assembler, scaler, dt])

    # Train the model
    print(train_df)
    model = pipeline.fit(train_df)
    
    
    # Log parameters
    mlflow.log_param("target_variable", "rating")

    # Log the model for this run
    mlflow.spark.log_model(model, "SparkML-decision-tree")

    # Evaluate the model on the test set
    dt_predictions = model.transform(test_df)
    
    # Save the prediction as csv
    dt_predictions.toPandas().to_csv('dt_predictions.csv', index=False)
    
    # Log the saved prediction as artifact
    mlflow.log_artifact("dt_predictions.csv")
    

    # Evaluate predictions for decision tree
    regressionEvaluator_dt = RegressionEvaluator(predictionCol="prediction_dt", labelCol="rating")
    rmse_dt = regressionEvaluator_dt.setMetricName("rmse").evaluate(dt_predictions)
    r2_dt = regressionEvaluator_dt.setMetricName("r2").evaluate(dt_predictions)
    mse_dt = regressionEvaluator_dt.setMetricName("mse").evaluate(dt_predictions)
    mae_dt = regressionEvaluator_dt.setMetricName("mae").evaluate(dt_predictions)
    
    # Log metrics
    mlflow.log_metric("rmse", rmse_dt)  
    mlflow.log_metric("mae", mae_dt)   

    

There are 124 records in the training dataset.
There are 31 records in the testing dataset.
DataFrame[review_count: double, rating: double, price: double, distance: double, categories_index: double]
  Unable to convert the field features. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT()
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [0]:
#For decision trees and random forest scaling is not required, but it doesn't hurt so we didn't remove it.

with mlflow.start_run(run_name="model_rf", experiment_id=experiment_id) as run:
    vector_assembler = VectorAssembler(inputCols=['review_count', 'categories_index', 'price', 'distance'], outputCol='features')
    # Standardize the columns using StandardScaler
    scaler = StandardScaler(inputCol='features', outputCol='scaled_features', withStd=True, withMean=False)


    # Print the number of records
    print(f'There are {train_df.cache().count()} records in the training dataset.')
    print(f'There are {test_df.cache().count()} records in the testing dataset.')


    # Define the regressors
    rf = RandomForestRegressor(featuresCol='scaled_features', labelCol='rating', predictionCol="prediction_rf")
    
    
    # Define the pipeline
    pipeline = Pipeline(stages=[vector_assembler, scaler, rf])

    # Train the model
    print(train_df)
    model = pipeline.fit(train_df)
    
    
    # Log parameters
    mlflow.log_param("target_variable", "rating")

    # Log the model for this run
    mlflow.spark.log_model(model, "SparkML-random-forest")

    # Evaluate the model on the test set
    rf_predictions = model.transform(test_df)
    
    # Save the prediction as csv
    rf_predictions.toPandas().to_csv('rf_predictions.csv', index=False)
    
    # Log the saved prediction as artifact
    mlflow.log_artifact("rf_predictions.csv")
    
    

    
    # Evaluate predictions for rf
    regressionEvaluator_rf = RegressionEvaluator(predictionCol="prediction_rf", labelCol="rating")
    rmse_rf = regressionEvaluator_rf.setMetricName("rmse").evaluate(rf_predictions)
    r2_rf = regressionEvaluator_rf.setMetricName("r2").evaluate(rf_predictions)
    mse_rf = regressionEvaluator_rf.setMetricName("mse").evaluate(rf_predictions)
    mae_rf = regressionEvaluator_rf.setMetricName("mae").evaluate(rf_predictions)
    
    # Log metrics
    mlflow.log_metric("rmse", rmse_rf)  
    mlflow.log_metric("mae", mae_rf)   


There are 124 records in the training dataset.
There are 31 records in the testing dataset.
DataFrame[review_count: double, rating: double, price: double, distance: double, categories_index: double]
  Unable to convert the field features. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT()
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [0]:
from mlflow.tracking import MlflowClient
MlflowClient().list_experiments()

  MlflowClient().list_experiments()
Out[67]: [<Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/221565742489086', creation_time=1671477686020, experiment_id='221565742489086', last_update_time=1671477753556, lifecycle_stage='active', name='/Users/mviol.exstudents2022@student.ie.edu/our_models_final_project', tags={'mlflow.experiment.sourceName': '/Users/mviol.exstudents2022@student.ie.edu/our_models_final_project',
  'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
  'mlflow.ownerEmail': 'mviol.exstudents2022@student.ie.edu',
  'mlflow.ownerId': '6801149048339153'}>,
 <Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/221565742489080', creation_time=1671475981474, experiment_id='221565742489080', last_update_time=1671476050887, lifecycle_stage='active', name='/Users/mviol.exstudents2022@student.ie.edu/best_models_final_project', tags={'mlflow.experiment.sourceName': '/Users/mviol.exstudents2022@student.ie.edu/best_models_final_project',
  'mlflow.experimentTyp

In [0]:
# Get all runs for a given experiment
experiment_id = run.info.experiment_id
runs_df = mlflow.search_runs(experiment_id)

# Display information
runs_df.T

Unnamed: 0,0,1,2
run_id,ee85c71d39c648cdb4e92f229ce9483a,7058e86692a0431e8d609051e1fd9fe6,98a49dd76bea42ef95687a18b3151005
experiment_id,221565742489086,221565742489086,221565742489086
status,FINISHED,FINISHED,FINISHED
artifact_uri,dbfs:/databricks/mlflow-tracking/2215657424890...,dbfs:/databricks/mlflow-tracking/2215657424890...,dbfs:/databricks/mlflow-tracking/2215657424890...
start_time,2022-12-19 19:22:33.556000+00:00,2022-12-19 19:21:59.225000+00:00,2022-12-19 19:21:26.063000+00:00
end_time,2022-12-19 19:23:12.332000+00:00,2022-12-19 19:22:33.347000+00:00,2022-12-19 19:21:59.043000+00:00
metrics.mae,0.271531,0.321453,0.279372
metrics.rmse,0.323026,0.424287,0.329652
params.target_variable,rating,rating,rating
tags.mlflow.databricks.notebookRevisionID,1671477792575,1671477753508,1671477719202


We see that that random forest has the lowest rmse and mae, so it has the best performance between all the tested models. However, when we performed other runs we found that linear regression was a good competitor as well (and we see here it is pretty close)

In [0]:
# Get the the latest run
runs = MlflowClient().search_runs(experiment_id, order_by=["metrics.mae asc"], max_results=1)
 
# Get the metrics from the latest run
runs[0].data.metrics

Out[69]: {'mae': 0.27153090764958965, 'rmse': 0.3230257782717405}

Here we could order by time to get latest results, but to make it a bit different we ordered by the model having the best performance (in this case mae, but could also use rmse). Let's interpret these results and what they mean for us. This means that on this occasion we are making an error in rating prediction by approximately 0.27 (if we consider mae) to 0.32 (if we consider rmse which penalizes big errors more). And since our scale of ratings is from 0 to 5, we believe that this is very good since in practice if a restaurant is rated 4.27 or 4.0, it wouldn't matter that much for the user who is going to the restaurant. We conclude that random forest is the best model on this occasion and that a variation of 0.27 on average is very good considering the scale of the ratings.