# **Initiate and Configure Spark**

In [1]:
!pip3 install pyspark

Collecting pyspark
  Downloading pyspark-3.5.4.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m0:00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.4-py2.py3-none-any.whl size=317849765 sha256=b19bea3382a1fb764932370fa63b583a908b719a60defa66ba5cc2703171d7f1
  Stored in directory: /root/.cache/pip/wheels/d9/1c/98/31e395a42d1735d18d42124971ecbbade844b50bb9845b6f4a
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.4


In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                    .appName("movieDataset") \
                    .master("local[*]") \
                    .config("spark.executor.memory", "10g") \
                    .config("spark.driver.memory", "10g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.sql.inMemoryColumnarStorage.compressed", "true") \
                    .getOrCreate()
spark

In [3]:
from google.colab import drive
drive.mount('/content/drive')

NotImplementedError: Mounting drive is unsupported in this environment. Use PyDrive instead. See examples at https://colab.research.google.com/notebooks/io.ipynb#scrollTo=7taylj9wpsA2.

---
# **Task 1 - Data Loading and Preprocessing**
---

In [None]:
# Load the data CSV file from google drive

df = spark.read.csv("/content/drive/MyDrive/ML&Bigdata/movies.csv", header=True, inferSchema=True)

In [None]:
# View the dataset

df.show()

In [None]:
# View number of non null values and columns in the data frame

print((df.count(), len(df.columns)))

In [None]:
# View the names of columns of the dataframe

df.columns

In [None]:
# View data types in the dataset

df.printSchema()

In [None]:
df.groupBy('genre').count().orderBy('count').show(10,False)

Missing Values

In [None]:
# As there are a total of 2593384 and roughly 2450 have missing values, this is a small percentage, less than 2% of missing data which should not largely impact the findings if deleted


from pyspark.sql.functions import col, sum

missing_values = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

missing_values.show()

In [None]:
# Delete missing values within the dataset

df_cleaned = df.dropna()
df_cleaned.count()

In [None]:
# Check that all rows with missing values have been removed

missing_values = df_cleaned.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_cleaned.columns])

missing_values.show()

In [None]:
# Drop unecessary columns - the directors_name instead of directors id and stars_name instead of stars_id. They both represent the same thing.

df2=df_cleaned.drop('id', 'directors_name', 'stars_name')
df2.show(10)

In [None]:
# Convert Columns 'rating','votes' and 'gross_income' to numerical format

from pyspark.sql.functions import col
df2 = df2.withColumn("ranking", col("rating").cast("double"))
df2 = df2.withColumn("public_vote", col("votes").cast("double"))
df2 = df2.withColumn("income", col("gross_income").cast("double"))

In [None]:
# View data types to check if the new columns with their new data types have been created

df2.printSchema()

In [None]:
df3=df2.select('name','description','year','certificate','duration','genre','directors_id','stars_id','income','public_vote','ranking')
df3.show()

String Indexer

In [None]:
# Convert all categorical data columns into numerical format

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol = 'certificate', outputCol = 'certified')
indexer2 = StringIndexer(inputCol = 'genre', outputCol = 'movie_genre')
indexer3 = StringIndexer(inputCol = 'duration', outputCol = 'movie_length')
indexer4 = StringIndexer(inputCol = 'directors_id', outputCol = 'directors')
indexer5 = StringIndexer(inputCol = 'stars_id', outputCol = 'stars')
indexer6 = StringIndexer(inputCol = 'year', outputCol = 'date')
df3 = indexer.fit(df3).transform(df3)
df3 = indexer2.fit(df3).transform(df3)
df3 = indexer3.fit(df3).transform(df3)
df3 = indexer4.fit(df3).transform(df3)
df3 = indexer5.fit(df3).transform(df3)
df3 = indexer6.fit(df3).transform(df3)
df3.show(10)

In [None]:
# Select all string indexed columns into a new dataframe

df4=df3.select('name','description','date','certified','movie_length','movie_genre','directors','stars','income','public_vote','ranking')
df4.show()

In [None]:
# Starting with tokenization of the two text columns to split the text into individual words/tokens

from pyspark.ml.feature import Tokenizer
tokenization=Tokenizer(inputCol='name',outputCol='title')
tokenization2=Tokenizer(inputCol='description',outputCol='summary')
tokenized_df2 = tokenization.transform(df4)
tokenized_df2 = tokenization2.transform(tokenized_df2)
tokenized_df2.show(4,False)

In [None]:
# This has now removed stop words such as the, and, in from both 'summary' and 'title' columns and outputted into new columns 'refined_summary' and 'refined_title'

from pyspark.ml.feature import StopWordsRemover

stopword_removal=StopWordsRemover(inputCol='title',outputCol='refined_title')
stopword_removal2=StopWordsRemover(inputCol='summary',outputCol='refined_summary')
refined_df2=stopword_removal.transform(tokenized_df2)
refined_df2=stopword_removal2.transform(refined_df2)

refined_df2.show(10,False)

In [None]:
import nltk
nltk.download('wordnet')

In [None]:
# Tidy up the text by using lemmatizer to reduce words to their base form

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from nltk.stem import WordNetLemmatizer


def lemmatize_text(tokens):
    lemmatizer = WordNetLemmatizer()
    lemmas = [lemmatizer.lemmatize(token) for token in tokens]
    return lemmas


lemmatize_udf = udf(lemmatize_text, ArrayType(StringType()))

# Apply lemmatization using the User Defined Function
refined_df2 = refined_df2.withColumn("lemmatized_title", lemmatize_udf("refined_title"))
refined_df2 = refined_df2.withColumn("lemmatized_summary", lemmatize_udf("refined_summary"))

# View the resulting Dataframe
refined_df2.show(truncate=False)



In [None]:
# Use Hashing TF-DF to find out the importance of each word which is then given a number

from pyspark.ml.feature import HashingTF,IDF
hashing_vec2=HashingTF(inputCol='lemmatized_title',outputCol='title_features')
hashing_vec3=HashingTF(inputCol='lemmatized_summary',outputCol='summary_features')
hashing_df2=hashing_vec2.transform(refined_df2)
hashing_df2=hashing_vec3.transform(hashing_df2)
hashing_df2.select(['lemmatized_title','title_features','lemmatized_summary','summary_features']).show(4,False)

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

input_cols=['title_features', 'summary_features','date','public_vote','movie_genre','directors','stars','movie_length','certified','income',]

# Assemble all features together using vector assembler so this can be easily fed into the maching learning model

vec_assembler = VectorAssembler(inputCols = input_cols, outputCol='features')
final_data = vec_assembler.transform(hashing_df2)
final_data.show()

In [None]:
# Select final data frame being the features and label column 'ranking'

final_data3=final_data.select('features','ranking')
final_data3.show()

In [None]:
# Adjust the numerical data so that they are centered around zero and have a standard deviation of 1 to limit bias

from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol = 'features', outputCol = 'scaledFeatures')


scaler_model = scaler.fit(final_data3)
final_data3 = scaler_model.transform(final_data3)

final_data3 = final_data3.select('scaledFeatures', 'ranking')
final_data3.show(5, truncate = False)

Training data, test data split

In [None]:
# Split data to prepare for training

train_data, test_data = final_data3.randomSplit([0.7, 0.3], seed = 42)
train_data.show(5, truncate = False)

---
# **Task 2 - Model Selection and Implementation**
---

In [None]:
# Train linear regression model

lr = LinearRegression(labelCol = 'ranking', featuresCol = 'scaledFeatures', predictionCol = 'prediction')

lr_model = lr.fit(train_data)

In [None]:
lr_predictions = lr_model.transform(test_data)

In [None]:
# Show predictions for the model

lr_predictions.select('prediction', 'ranking').show(10, truncate = False)

In [None]:
# Apply the coefficients and intecepts

coefficients = lr_model.coefficients
intercept = lr_model.intercept

---
# **Task 3 - Model Parameter Tuning**
---

In [None]:
# Train the Lasso model
lasso = LinearRegression(labelCol = 'ranking', featuresCol = 'scaledFeatures',
                         predictionCol = 'prediction', elasticNetParam = 1.0, regParam = 0.15)
lasso_model = lasso.fit(train_data)
lasso_predictions = lasso_model.transform(test_data)

# Train the Ridge model
ridge = LinearRegression(labelCol = 'ranking', featuresCol = 'scaledFeatures',
                         predictionCol = 'prediction', elasticNetParam = 0.0, regParam = 0.15)
ridge_model = ridge.fit(train_data)
ridge_predictions = ridge_model.transform(test_data)

In [None]:
# Print lasso model predictions

lasso_predictions.select('prediction', 'ranking').show(10, truncate = False)

---
# **Task 4 - Model Evaluation and Accuracy Calculation**
---

In [None]:
# Measure the average squared difference between predicted values and actual values

from pyspark.ml.evaluation import RegressionEvaluator

evaluator_mse = RegressionEvaluator(labelCol = 'ranking', predictionCol = 'prediction', metricName = 'mse')
# calculate MSE
mse1 = evaluator_mse.evaluate(lr_predictions)
mse2 = evaluator_mse.evaluate(lasso_predictions)
mse3 = evaluator_mse.evaluate(ridge_predictions)


In [None]:
# Measure the square root of the MSE

evaluator_rmse = RegressionEvaluator(labelCol = 'ranking', predictionCol = 'prediction', metricName = 'rmse')
# calculate RMSE
rmse1 = evaluator_rmse.evaluate(lr_predictions)
rmse2 = evaluator_rmse.evaluate(lasso_predictions)
rmse3 = evaluator_rmse.evaluate(ridge_predictions)

# Measure how well the independant variables predict the actual data

evaluator_r2 = RegressionEvaluator(labelCol = 'ranking', predictionCol = 'prediction', metricName = 'r2')
# calculate R_squared
r2_score1 = evaluator_r2.evaluate(lr_predictions)
r2_score2 = evaluator_r2.evaluate(lasso_predictions)
r2_score3 = evaluator_r2.evaluate(ridge_predictions)

In [None]:
# Print full evaluation metrics
print('Regression - MSE: ', mse1, ', RMSE: ', rmse1, ', R^2: ', r2_score1)
print('Lasso - MSE: ', mse2, ', RMSE: ', rmse2, ', R^2: ', r2_score2)
print('Ridge - MSE: ', mse3, ', RMSE: ', rmse3, ', R^2: ', r2_score3)

---
# **Task 5 - Results Visualization or Printing**
---

In [None]:
import matplotlib.pyplot as plt
import numpy as np

mse = [mse1, mse2, mse3]
rmse = [rmse1, rmse2, rmse3]
r2_score = [r2_score1, r2_score2, r2_score3]

positions = np.arange(len(mse))
bar_width = 0.2

plt.bar(positions - bar_width, mse, width = bar_width, label = 'MSE')
plt.bar(positions, rmse, width = bar_width, label = 'RMSE')
plt.bar(positions + bar_width, r2_score, width = bar_width, label = 'R2_Score')

# add titles
plt.xlabel('Model')
plt.ylabel('scores')
plt.title('Comparison of Regression Metrics')

# add legend
plt.legend()
plt.xticks(positions, ['Regression', 'Lasso', 'Ridge'])
plt.show()

**Regression model:**

- The regression model has a high MSE of 3.162624008531466 and RMSE of 1.7783767903713392 meaning the predicted values are roughly 1.7783 from the actual values.

- R-squared at 0.0376 suggests that only a small percentage, roughly 4% of the variability in movie ratings is explained by the features. This indicates that the linear regression model may not be capturing the complex relationships between the features and movie ratings well.

- This suggests that the features used, have a very weak relationship with the target variable (movie ratings).


**Lasso model:**

- The Lasso model has a lower MSE 2.3875812451389478 and RMSE 1.5451800041221566 which shows that this models predictions are closer to the actual values - This improvement suggests that Lasso's feature selection helps eliminate irrelevant or less impactful features, making the model more effective at explaining the variance in movie ratings

**Ridge model:**

- The Ridge model also has lower MSE 2.4118655262705526 and RMSE  1.5530181989502096 - This indicates that both regularised models are effective in addressing overfitting and improving prediction accuracy compared to the baseline linear regression.


