In [1]:
spark

In [2]:
from google.cloud import storage
from pyspark.sql.functions import col
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline


In [3]:
sdf = spark.read.parquet("gs://imdbreviews-bucket/features/transformed_data_with_features.parquet")
sdf.show(10, truncate=False)

                                                                                

+--------------------------+------+-----------+-----------+------------------------+-----------------------+-----------+------------------+-------------------------------------------------------------------+
|movie                     |rating|review_date|spoiler_tag|review_summary_sentiment|review_detail_sentiment|movie_index|movie_vector      |features                                                           |
+--------------------------+------+-----------+-----------+------------------------+-----------------------+-----------+------------------+-------------------------------------------------------------------+
|After Life (2019– )       |9.0   |3 May 2020 |0.0        |0.5633333333333334      |0.2727777777777778     |147.0      |(1001,[147],[1.0])|(1004,[148,1002,1003],[1.0,0.5633333333333334,0.2727777777777778]) |
|Special OPS (2020– )      |7.0   |3 May 2020 |0.0        |0.35714285714285715     |0.178125               |984.0      |(1001,[984],[1.0])|(1004,[985,1002,1003],[1.0,0.

In [4]:
# Check schema to confirm the features column exists
sdf.printSchema()

root
 |-- movie: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- review_date: string (nullable = true)
 |-- spoiler_tag: double (nullable = true)
 |-- review_summary_sentiment: double (nullable = true)
 |-- review_detail_sentiment: double (nullable = true)
 |-- movie_index: double (nullable = true)
 |-- movie_vector: vector (nullable = true)
 |-- features: vector (nullable = true)



In [5]:
#RandomForestRegressor
rf = RandomForestRegressor(featuresCol="features", labelCol="rating")

#split data into training and test sets
train_data, test_data = sdf.randomSplit([0.7, 0.3], seed=42)

#set up cross-validation with hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()


#evaluate the model
evaluator_rmse = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="rating", predictionCol="prediction", metricName="r2")

cv = CrossValidator(estimator=rf,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator_rmse,  # Evaluator for RMSE
                    numFolds=3)

#train the model
rf_model = cv.fit(train_data)

#make predictions on the test data
predictions = rf_model.transform(test_data)


rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R-squared (R²): {r2}")

#sample predictions
predictions.select("movie","rating", "prediction").show(10, truncate=False)

24/12/03 01:56:34 WARN DAGScheduler: Broadcasting large task binary with size 1108.5 KiB
24/12/03 01:56:38 WARN DAGScheduler: Broadcasting large task binary with size 1451.6 KiB
24/12/03 01:56:43 WARN DAGScheduler: Broadcasting large task binary with size 1809.6 KiB
24/12/03 01:57:13 WARN DAGScheduler: Broadcasting large task binary with size 1108.5 KiB
24/12/03 01:57:17 WARN DAGScheduler: Broadcasting large task binary with size 1451.6 KiB
24/12/03 01:57:21 WARN DAGScheduler: Broadcasting large task binary with size 1809.6 KiB
24/12/03 01:57:26 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
24/12/03 01:57:32 WARN DAGScheduler: Broadcasting large task binary with size 2.7 MiB
24/12/03 01:57:38 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/12/03 01:57:44 WARN DAGScheduler: Broadcasting large task binary with size 3.7 MiB
24/12/03 01:57:51 WARN DAGScheduler: Broadcasting large task binary with size 4.3 MiB
24/12/03 01:59:08 WARN DAGScheduler:

24/12/03 02:19:14 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
24/12/03 02:20:08 WARN DAGScheduler: Broadcasting large task binary with size 1304.3 KiB
24/12/03 02:20:13 WARN DAGScheduler: Broadcasting large task binary with size 1824.3 KiB
24/12/03 02:20:19 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/12/03 02:20:25 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/12/03 02:20:58 WARN DAGScheduler: Broadcasting large task binary with size 1304.3 KiB
24/12/03 02:21:04 WARN DAGScheduler: Broadcasting large task binary with size 1824.3 KiB
24/12/03 02:21:09 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/12/03 02:21:16 WARN DAGScheduler: Broadcasting large task binary with size 3.2 MiB
24/12/03 02:21:23 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB
24/12/03 02:21:30 WARN DAGScheduler: Broadcasting large task binary with size 5.1 MiB
24/12/03 02:21:39 WARN DAGScheduler: Broad

Root Mean Squared Error (RMSE): 2.375414639919392
Mean Absolute Error (MAE): 1.8340007179369966
R-squared (R²): 0.446477903625073


[Stage 750:>                                                        (0 + 1) / 1]

+--------------------------+------+------------------+
|movie                     |rating|prediction        |
+--------------------------+------+------------------+
|10 Cloverfield Lane (2016)|1.0   |4.190179660484369 |
|10 Cloverfield Lane (2016)|1.0   |3.4142547503966587|
|10 Cloverfield Lane (2016)|1.0   |3.7461223948824096|
|10 Cloverfield Lane (2016)|1.0   |2.739110083093564 |
|10 Cloverfield Lane (2016)|1.0   |3.7166856662901853|
|10 Cloverfield Lane (2016)|1.0   |3.516396463802042 |
|10 Cloverfield Lane (2016)|1.0   |3.9018494821458334|
|10 Cloverfield Lane (2016)|1.0   |4.747633098753778 |
|10 Cloverfield Lane (2016)|1.0   |5.550629172790054 |
|10 Cloverfield Lane (2016)|1.0   |5.418605507464043 |
+--------------------------+------+------------------+
only showing top 10 rows



                                                                                

In [9]:
best_model = rf_model.bestModel  # Best model after cross-validation

#Extract feature importances
feature_importances = best_model.featureImportances

#Print the feature importances
print("Feature Importances: ")
for feature, importance in zip(sdf, feature_importances):
    print(f"{feature}: {importance}")

#Save the trained model to a location 
best_model.save("gs://imdbreviews-bucket/models/imdb_model")

Feature Importances: 
Column<'movie'>: 0.006237457785416196
Column<'rating'>: 4.669557785406666e-05
Column<'review_date'>: 0.005325841866644272
Column<'spoiler_tag'>: 0.0019276701435910317
Column<'review_summary_sentiment'>: 0.008825961383685627
Column<'review_detail_sentiment'>: 3.704670377742539e-05
Column<'movie_index'>: 0.0022863716768154667
Column<'movie_vector'>: 0.0006293659283374953
Column<'features'>: 0.002423454561324252


24/12/03 03:16:56 WARN TaskSetManager: Stage 762 contains a task of very large size (1100 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [6]:
#Save the predictions to models folder
predictions.select("movie","rating", "prediction").write.parquet("gs://imdbreviews-bucket/models/rating_predictions")

AnalysisException: [PATH_ALREADY_EXISTS] Path gs://imdbreviews-bucket/models/rating_predictions already exists. Set mode as "overwrite" to overwrite the existing path.