In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *


# Machine Learning Libraries
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# PART A

## Q1. Integers

In [0]:
spark = SparkSession.builder.appName('MIE1628_A2_PartA').getOrCreate()

file_path_integers = '/FileStore/tables/integer.txt'

feeRDD = spark.sparkContext.textFile(file_path_integers,4)

feeRDD = feeRDD.map(lambda x: int(x))

In [0]:
feeRDD.count()

1010

In [0]:
even = feeRDD.filter(lambda x:x % 2 == 0)
print("The number of even numbers in the integer dataset is:",even.count())

The number of even numbers in the integer dataset is: 514


In [0]:
odd = feeRDD.filter(lambda x:x % 2 != 0)
print("The number of odd numbers in the integer dataset is:",odd.count())

The number of odd numbers in the integer dataset is: 496


## Q2. Calculate the salary sum per department

In [0]:
file_path_salary = '/FileStore/tables/salary.txt'

salaryRDD = spark.sparkContext.textFile(file_path_salary,4)

salaryRDD = salaryRDD.map(lambda x: x.split(" "))
salaryRDD = salaryRDD.map(lambda x: (x[0],float(x[1])))

In [0]:
salaryRDD.collect()

[('Sales', 9136.0),
 ('Research', 13391.0),
 ('Developer', 22220.0),
 ('QA', 31888.0),
 ('Marketing', 22215.0),
 ('Sales', 45567.0),
 ('Research', 4023.0),
 ('Developer', 7262.0),
 ('QA', 5243.0),
 ('Marketing', 11425.0),
 ('Sales', 11956.0),
 ('Research', 24149.0),
 ('Developer', 18258.0),
 ('QA', 22962.0),
 ('Marketing', 28960.0),
 ('Sales', 8766.0),
 ('Research', 18343.0),
 ('Developer', 13686.0),
 ('QA', 27626.0),
 ('Marketing', 32430.0),
 ('Sales', 761.0),
 ('Research', 22690.0),
 ('Developer', 11703.0),
 ('QA', 591.0),
 ('Marketing', 15249.0),
 ('Sales', 15831.0),
 ('Research', 28373.0),
 ('Developer', 31034.0),
 ('QA', 27821.0),
 ('Marketing', 25192.0),
 ('Sales', 12640.0),
 ('Research', 15100.0),
 ('Developer', 2781.0),
 ('QA', 17773.0),
 ('Marketing', 9047.0),
 ('Sales', 20425.0),
 ('Research', 19555.0),
 ('Developer', 10498.0),
 ('QA', 18714.0),
 ('Marketing', 1209.0),
 ('Sales', 16369.0),
 ('Research', 3513.0),
 ('Developer', 3362.0),
 ('QA', 27723.0),
 ('Marketing', 25341.0

In [0]:
sum_Salary = salaryRDD.reduceByKey(lambda x,y:x+y)
sum_Salary.collect()

[('Developer', 3221394.0),
 ('Sales', 3488491.0),
 ('Research', 3328284.0),
 ('QA', 3360624.0),
 ('Marketing', 3158450.0)]

## Q3. Implement MapReduce using Pyspark on file ‘shakespeare.txt’ 
### Show how many times these particular words appear in the document: Shakespeare, why, Lord, Library, GUTENBERG,WILLIAM, COLLEGE and WORLD.

In [0]:
file_path_shakespeare = '/FileStore/tables/shakespeare_1.txt'

shakeRDD = spark.sparkContext.textFile(file_path_shakespeare,4)

shakeRDD = shakeRDD.map(lambda x: x.split(" "))
shakeRDD.collect()

[['This',
  'eBook',
  'is',
  'for',
  'the',
  'use',
  'of',
  'anyone',
  'anywhere',
  'at',
  'no',
  'cost',
  'and',
  'with'],
 ['almost',
  'no',
  'restrictions',
  'whatsoever.',
  '',
  'You',
  'may',
  'copy',
  'it,',
  'give',
  'it',
  'away',
  'or'],
 ['re-use',
  'it',
  'under',
  'the',
  'terms',
  'of',
  'the',
  'Project',
  'Gutenberg',
  'License',
  'included'],
 ['with', 'this', 'eBook', 'or', 'online', 'at', 'www.gutenberg.org'],
 [''],
 ['**',
  'This',
  'is',
  'a',
  'COPYRIGHTED',
  'Project',
  'Gutenberg',
  'eBook,',
  'Details',
  'Below',
  '**'],
 ['**',
  '',
  '',
  '',
  '',
  'Please',
  'follow',
  'the',
  'copyright',
  'guidelines',
  'in',
  'this',
  'file.',
  '',
  '',
  '',
  '',
  '**'],
 [''],
 ['Title:', 'The', 'Complete', 'Works', 'of', 'William', 'Shakespeare'],
 [''],
 ['Author:', 'William', 'Shakespeare'],
 [''],
 ['Posting', 'Date:', 'September', '1,', '2011', '[EBook', '#100]'],
 ['Release', 'Date:', 'January,', '1994'],


In [0]:
list_shakes = ['Shakespeare', 'why', 'Lord', 'Library', 'GUTENBERG', 'WILLIAM', 'COLLEGE', 'WORLD']
count_df = [shakeRDD.map(lambda x: x.count(i)).sum() for i in list_shakes]

# Print the word count for each word in count_df
for i, X in enumerate(count_df):
    print(f"The word count of {list_shakes[i]} in the Shakespeare text is: {X}")


The word count of Shakespeare in the Shakespeare text is: 22
The word count of why in the Shakespeare text is: 91
The word count of Lord in the Shakespeare text is: 341
The word count of Library in the Shakespeare text is: 2
The word count of GUTENBERG in the Shakespeare text is: 99
The word count of WILLIAM in the Shakespeare text is: 115
The word count of COLLEGE in the Shakespeare text is: 98
The word count of WORLD in the Shakespeare text is: 98


## Q4. Calculate top 10 and bottom 10 words using file ‘shakespeare.txt’
###Show 10 words with most count and 10 words with least count. You can limit by 10 in ascending and descending order of count. Show your code and output.

In [0]:
shakeRDD = spark.sparkContext.textFile(file_path_shakespeare,4)
wordcount_df = shakeRDD.map(lambda x: x.split(" "))
wordcount_df = wordcount_df.flatMap(set)

wordcount_df.collect()

['cost',
 'at',
 'anyone',
 'for',
 'This',
 'eBook',
 'is',
 'with',
 'use',
 'the',
 'anywhere',
 'of',
 'no',
 'and',
 '',
 'copy',
 'it,',
 'You',
 'give',
 'away',
 'may',
 'almost',
 'restrictions',
 'no',
 'it',
 'whatsoever.',
 'or',
 'Gutenberg',
 'under',
 're-use',
 'included',
 'License',
 'the',
 'Project',
 'of',
 'terms',
 'it',
 'this',
 'at',
 'www.gutenberg.org',
 'eBook',
 'with',
 'online',
 'or',
 '',
 '**',
 'Gutenberg',
 'Below',
 'a',
 'This',
 'is',
 'Project',
 'Details',
 'COPYRIGHTED',
 'eBook,',
 '',
 '**',
 'Please',
 'this',
 'follow',
 'copyright',
 'guidelines',
 'in',
 'the',
 'file.',
 '',
 'William',
 'Title:',
 'of',
 'Works',
 'Shakespeare',
 'The',
 'Complete',
 '',
 'Author:',
 'William',
 'Shakespeare',
 '',
 'Posting',
 '[EBook',
 '1,',
 'September',
 '2011',
 '#100]',
 'Date:',
 'Release',
 'January,',
 'Date:',
 '1994',
 '',
 'Language:',
 'English',
 '',
 'Character',
 'set',
 'ASCII',
 'encoding:',
 '',
 'GUTENBERG',
 'COMPLETE',
 '***',
 '

In [0]:
words_df = wordcount_df.countByValue()
words_df = spark.createDataFrame(list(words_df.items()),["Word","Count"])

In [0]:
# Order the DataFrame by count in descending order
top_10 = words_df.orderBy(desc("Count")).limit(10)

# Show the top 10 words and their counts
top_10.show()

+----+-----+
|Word|Count|
+----+-----+
|    |55946|
| the|10262|
| and| 8348|
|   I| 7790|
|  of| 7517|
|  to| 7041|
|   a| 5263|
|  my| 4616|
|  in| 4461|
| you| 3766|
+----+-----+



In [0]:
# Filter out rows where the "Word" column has an empty string
filtered_words_df = words_df.filter(words_df["Word"] != "")

# Order the DataFrame by count in descending order
top_10_new = filtered_words_df.orderBy(desc("Count")).limit(10)

# Show the top 10 words and their counts
top_10_new.show()

+----+-----+
|Word|Count|
+----+-----+
| the|10262|
| and| 8348|
|   I| 7790|
|  of| 7517|
|  to| 7041|
|   a| 5263|
|  my| 4616|
|  in| 4461|
| you| 3766|
| And| 3543|
+----+-----+



In [0]:
# Order the DataFrame by count in ascending order
least_10= wordcount_df.map(lambda x: (x,1)).reduceByKey(lambda x,y:x+y).sortBy(lambda x: x[1]).take(10)

# Show the top 10 words and their counts
least_10

[('whatsoever.', 1),
 ('re-use', 1),
 ('eBook,', 1),
 ('file.', 1),
 ('Author:', 1),
 ('Posting', 1),
 ('1,', 1),
 ('Language:', 1),
 ('Character', 1),
 ('***', 1)]

# PART B

In [0]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName('MIE1628_A2_PartA').getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 4)

# Specify the file path
file_path_movies = '/FileStore/tables/movies.csv'

# Read the CSV file into a DataFrame
df = spark.read \
    .format("csv") \
    .option("inferSchema", True) \
    .option("header", True) \
    .option("sep", ',') \
    .option("path", file_path_movies) \
    .load()
df.show()

+-------+------+------+
|movieId|rating|userId|
+-------+------+------+
|      2|     3|     0|
|      3|     1|     0|
|      5|     2|     0|
|      9|     4|     0|
|     11|     1|     0|
|     12|     2|     0|
|     15|     1|     0|
|     17|     1|     0|
|     19|     1|     0|
|     21|     1|     0|
|     23|     1|     0|
|     26|     3|     0|
|     27|     1|     0|
|     28|     1|     0|
|     29|     1|     0|
|     30|     1|     0|
|     31|     1|     0|
|     34|     1|     0|
|     37|     1|     0|
|     41|     2|     0|
+-------+------+------+
only showing top 20 rows



In [0]:
df.count()

1501

In [0]:
summary_df = df.summary()
summary_df.show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|                 1|                 0|
|    25%|                24|                 1|                 7|
|    50%|                50|                 1|                14|
|    75%|                74|                 2|                22|
|    max|                99|                 5|                29|
+-------+------------------+------------------+------------------+



#### The dataset consists of 3 columns/features and 1501 rows.
###### The first feature: movieId: It shows the unqiue ID for each movie starting from 0 to 99. (Hence 100 movies in total)
###### The second feature: rating: It shows the user rating for that particular movie, which ranges from 1 (lowest) to 5 (highest).
###### The third feature: userId: It shows the unique id for each user, starting from 0 to 29 (Hence 30 users have rated movies in this dataset)

# Q1:
##Top 20 movies with highest ratings.
##Top 15 users who provided highest ratings.

In [0]:
# Selecting the top 20 movies based on the Highest Ratings (Average)
top_20_movies = df.groupBy('movieId').agg(mean('rating').alias('avg_rating')).sort(desc('avg_rating')).select('movieId').limit(20).alias('Top 20 Movies')

top_20_movies = top_20_movies.select('movieId').alias('Top 20 Movies with the Highest Ratings')
# Display the top 20 movies
top_20_movies.show()

+-------+
|movieId|
+-------+
|     32|
|     90|
|     30|
|     94|
|     23|
|     49|
|     18|
|     29|
|     52|
|     62|
|     53|
|     92|
|     46|
|     68|
|     87|
|      2|
|     69|
|     27|
|     88|
|     22|
+-------+



#### Calculating the top 15 users based on highest average rating given to different movies

In [0]:
# Group by 'userId', calculate the mean rating, and alias it as 'avg_rating'
grouped_df = df.groupBy('userId').agg(mean('rating').alias('avg_rating'))

# Sort in descending order based on 'avg_rating', select 'userId', and limit to top 15
top_15_users = grouped_df.sort(desc('avg_rating')).select('userId').limit(15)

# Display the top 15 users
top_15_users.show()

+------+
|userId|
+------+
|    11|
|    26|
|    22|
|    23|
|     2|
|    17|
|     8|
|    24|
|    12|
|     3|
|    29|
|    28|
|     9|
|    14|
|    16|
+------+



#### Calculating the top 15 users based on sum of ratings given to different movies

In [0]:
# Group by 'userId', calculate the total sum of ratings, and alias it as 'sum_rating'
grouped_df = df.groupBy('userId').agg(sum('rating').alias('sum_rating'))

# Sort in descending order based on 'sum_rating', select 'userId', and limit to top 15
top_15_users = grouped_df.sort(desc('sum_rating')).select('userId').limit(15)

# Display the top 15 users
top_15_users.show()

+------+
|userId|
+------+
|    11|
|    22|
|    23|
|    26|
|    12|
|    14|
|    24|
|     9|
|     2|
|     8|
|    28|
|    18|
|    17|
|     3|
|     7|
+------+



# Q2: Train Test Splitting and Training ML model  
#####Split dataset into train and test. Try 2 different combinations: 60/40, 70/30, 75/25 and 80/20
#####Train your model and use collaborative for testing


In [0]:
train_ratio_list = [0.6, 0.75]
models = []
predictions_list = []

for train_ratio in train_ratio_list:
    # Split the dataset into train and test
    test_ratio = 1 - train_ratio
    train_data, test_data = df.randomSplit([train_ratio, test_ratio], seed=42)

    # Build the ALS model
    als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')
    model = als.fit(train_data)

    # Append the tuple (train_ratio, test_ratio, model) to the models list
    models.append((train_ratio, test_ratio, model))

    # Make predictions on the test set
    predictions = model.transform(test_data)
    predictions_list.append(predictions)

    # Extract and print the parameter map
    param_map = model.extractParamMap()
    print(f"Parameter map for the train-test split {train_ratio} : {test_ratio}:")
    for param, value in param_map.items():
        print(f"{param.name}: {value}")

    print(f"The predictions for the train-test split: {train_ratio} : {test_ratio} is:")
    predictions.show(5)
    print('******************************************************************************')
    print('------------------------------------------------------------------------------')


Parameter map for the train-test split 0.6 : 0.4:
blockSize: 4096
predictionCol: prediction
coldStartStrategy: drop
itemCol: movieId
userCol: userId
The predictions for the train-test split: 0.6 : 0.4 is:
+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+------+----------+
|      1|     1|    14|  0.446367|
|      2|     1|    12|  1.281587|
|      4|     1|    12| 1.3095553|
|      5|     1|    14| 1.0203584|
|      7|     1|    18| 1.0273854|
+-------+------+------+----------+
only showing top 5 rows

******************************************************************************
------------------------------------------------------------------------------
Parameter map for the train-test split 0.75 : 0.25:
blockSize: 4096
predictionCol: prediction
coldStartStrategy: drop
itemCol: movieId
userCol: userId
The predictions for the train-test split: 0.75 : 0.25 is:
+-------+------+------+----------+
|movieId|rating|userId|prediction|
+-------+------+--

# Q3: MSE, RMSE and MAE 
#####Explain MSE, RMSE and MAE
#####Compare and evaluate both of the models with evaluation metrics

## Mean Squared Error
####MSE = 1/n ∑(y - yi)^2  (y = actual value, yi = predicted value, n = number of datapoints)
####MSE is a measure of the average squared difference between the actual values and the predicted values in regression.
######MSE is sensitive to the magnitude of errors because it involves squaring them.
######It is commonly used in regression to assess how well a model is performing in terms of prediction accuracy.
######Squaring makes MSE especially useful for identifying and penalizing large errors, making it a suitable metric for situations where large errors are particularly undesirable. 


## Root Mean Squared Error (RMSE)
#### RMSE = sqrt(1/n ∑(y - ŷ)^2)  (y = actual value, ŷ = predicted value, n = number of datapoints)
#### RMSE is the square root of the average squared difference between the actual values and the predicted values in regression.
###### RMSE is similar to MSE but provides a measure of the standard deviation of the residuals.
###### It is commonly used in regression to assess how well a model is performing, considering both the bias and variance.
###### Lower RMSE values indicate better model performance, and it is a popular metric for comparing models with different scales of dependent variables.


## Mean Absolute Error (MAE)
#### MAE = 1/n ∑|y - ŷ|  (y = actual value, ŷ = predicted value, n = number of datapoints)
#### MAE is a measure of the average absolute difference between the actual values and the predicted values in regression.
###### MAE is less sensitive to outliers compared to MSE and RMSE, as it does not involve squaring the errors.
###### It provides a linear measure of the average magnitude of errors, making it suitable when large errors should not be overly penalized.
###### MAE is easy to interpret and is widely used for models where absolute errors are more important than the squared differences.


In [0]:
for idx, predictions in enumerate(predictions_list):
    train_ratio, test_ratio, model = models[idx]

    # Evaluate the model
    eval_mse = RegressionEvaluator(metricName='mse', labelCol='rating', predictionCol='prediction')
    mse = eval_mse.evaluate(predictions)

    eval_rmse = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
    rmse = eval_rmse.evaluate(predictions)

    eval_mae = RegressionEvaluator(metricName='mae', labelCol='rating', predictionCol='prediction')
    mae = eval_mae.evaluate(predictions)

    # Print the metrics along with training_ratio and test_ratio
    print(f"Metrics for Train ratio: {train_ratio}, Test ratio: {test_ratio}")
    print(f"MSE: {mse}")
    print(f"RMSE: {rmse}")
    print(f"MAE: {mae}")
    print('******************************************************************************')
    print('------------------------------------------------------------------------------')


Metrics for Train ratio: 0.6, Test ratio: 0.4
MSE: 1.2017600590659898
RMSE: 1.0962481740308578
MAE: 0.7453300890299371
******************************************************************************
------------------------------------------------------------------------------
Metrics for Train ratio: 0.75, Test ratio: 0.25
MSE: 0.9384949172461176
RMSE: 0.9687594733710311
MAE: 0.6563835651316541
******************************************************************************
------------------------------------------------------------------------------


##### The second model with the Training Testing ratio of 0.75:0.25 works better than the first model with the Training Testing ratio of 0.6:0.4 due to the following reasons:

- **Larger Training Set**: The second model has a larger training set (75% of the data) compared to the first model (60% of the data). A larger training set allows the model to learn more complex patterns and relationships within the data, potentially leading to better performance on the test set.
- **Stability in Performance**: With more data in the training set, the model trained with a 0.75:0.25 ratio is likely to be more stable and less sensitive to variations in the test set. This stability contributes to consistent and reliable performance.
- **Optimal Model Complexity**: The additional data in the training set allows the model to better capture the underlying patterns in the data, avoiding underfitting. This can result in an optimal model complexity that generalizes well to unseen data.
- **Better Representation of User Preferences**: Collaborative filtering models, such as ALS, rely on learning user preferences and item similarities. A larger training set provides a more comprehensive view of user-item interactions, leading to better representation learning.

In summary, the 0.75:0.25 training-testing ratio is favored because it strikes a balance between providing sufficient training data for the model to learn complex patterns and allocating a significant portion for robust evaluation on the test set.


# Q4: Hyper Tuning Parameters
## Tune the parameters of the algorithm to get the best set of parameters
## Explain different parameters of the algorithm which have been used for tuning the algorithms.
## Evaluate all the models again

In [0]:
train_data_1, test_data_1 = df.randomSplit([0.6, 0.4], seed=42)

als_1 = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')

# Define the parameter grid to search through
param_grid_1 = ParamGridBuilder() \
        .addGrid(als_1.maxIter, [3, 5, 7, 10]) \
        .addGrid(als_1.regParam, [0.001, 0.01, 0.1, 0.5]) \
        .addGrid(als_1.alpha, [0.3, 0.5, 1.0, 2.0]) \
        .build()

evaluator_1 = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
cross_validator_1 = CrossValidator(estimator=als_1,estimatorParamMaps=param_grid_1,evaluator=evaluator_1,numFolds=3,seed=42)

# Fit the cross-validator to find the best model
cv_model_1 = cross_validator_1.fit(train_data_1)
# Get the best model from cross-validation
best_model_1 = cv_model_1.bestModel

# Make predictions on the test set
best_predictions_1 = best_model_1.transform(test_data_1)
best_rmse_1 = evaluator_1.evaluate(best_predictions_1)
print(f"RMSE on Test Data: {best_rmse_1}")
print("Best Model Parameters:")

# Get the index of the best model based on avgMetrics
best_model_1_index = np.argmax(cv_model_1.avgMetrics)

# Extract the best parameter map from the best model
best_param_map_1 = cv_model_1.getEstimatorParamMaps()[best_model_1_index]

# Print the best parameter map
for param, value in best_param_map_1.items():
    print(f"{param.name}: {value}")

RMSE on Test Data: 1.0962481740308583
Best Model Parameters:
maxIter: 3
regParam: 0.001
alpha: 0.5


In [0]:
# Split the dataset into train and test
train_data_2, test_data_2 = df.randomSplit([0.75, 0.25], seed=42)

# Define the ALS model
als_2 = ALS(userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')

# Define the parameter grid to search through
param_grid_2 = ParamGridBuilder() \
        .addGrid(als_2.maxIter, [3, 5, 7, 10]) \
        .addGrid(als_2.regParam, [0.001, 0.01, 0.1, 0.5]) \
        .addGrid(als_2.alpha, [0.3, 0.5, 1.0, 2.0]) \
        .build()

evaluator_2 = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
cross_validator_2 = CrossValidator(estimator=als_2,estimatorParamMaps=param_grid_2,evaluator=evaluator_2,numFolds=3,seed=42)

cv_model_2 = cross_validator_2.fit(train_data_2)
# Get the best model from cross-validation
best_model_2 = cv_model_2.bestModel
best_predictions_2 = best_model_2.transform(test_data_2)
best_rmse_2 = evaluator_2.evaluate(best_predictions_2)
print(f"RMSE on Test Data: {best_rmse_2}")
print("Best Model Parameters:")
best_model_2_index = np.argmax(cv_model_2.avgMetrics)
best_param_map_2 = cv_model_2.getEstimatorParamMaps()[best_model_2_index]

# Print the best parameter map
for param, value in best_param_map_2.items():
    print(f"{param.name}: {value}")


RMSE on Test Data: 0.9687594733710311
Best Model Parameters:
maxIter: 3
regParam: 0.001
alpha: 0.5


##### Hyperparameters

- **maxIter**: Controls the maximum number of iterations in ALS, influencing model convergence.(defaults to 10)
- **regParam**: A non-negative value regulating the extent of regularization in the ALS model, preventing overfitting.(defaults to 1.0)
- **alpha**: A parameter for implicit preference data in ALS, determining confidence in implicit feedback.(defaults to 1.0)

##### Hyperparameters values used in this test
- **maxIter** 3, 5, 7, 10
- **regParam**  0.001, 0.01, 0.1, 0.5
- **regParam** 0.3, 0.5, 1.0, 2.0

# Q5: Top 15 movies recommendations
######Calculate top 15 movies recommendations for user id 10 and
######user id 14. Show your code and output

In [0]:
# 'best_model_2' is the best trained ALS model for Model 2 (75:25)
user_ids = [10, 14]
user_rows = [Row(userId=user_id) for user_id in user_ids]
user_df = spark.createDataFrame(user_rows)

# Get movie recommendations for the specified users
recommendations = best_model_2.recommendForUserSubset(user_df, 15)

# Show the recommendations
recommendations.display(truncate=False)


userId,recommendations
10,"List(List(92, 3.4194121), List(40, 2.9391577), List(25, 2.7375712), List(62, 2.674789), List(49, 2.6539521), List(9, 2.6319222), List(42, 2.5832038), List(89, 2.5199132), List(12, 2.4898849), List(81, 2.4231887), List(4, 2.3019376), List(2, 2.2492824), List(0, 2.235539), List(26, 2.1538856), List(82, 2.078905))"
14,"List(List(29, 4.8289194), List(52, 4.442275), List(76, 4.3515997), List(63, 4.0404634), List(25, 3.478157), List(72, 3.3571694), List(58, 3.3119564), List(96, 3.2318575), List(85, 3.223844), List(43, 3.2003627), List(2, 2.9597998), List(53, 2.9035501), List(74, 2.8654978), List(70, 2.8334794), List(14, 2.776963))"


In [0]:
# 'best_model_2' is the best trained ALS model for Model 2 (75:25)
user_ids = [10, 14]
user_rows = [Row(userId=user_id) for user_id in user_ids]
user_df = spark.createDataFrame(user_rows)

# Get movie recommendations for the specified users
recommendations = best_model_2.recommendForAllUsers(15)

# Show the recommendations
display(recommendations.where((recommendations.userId==10) |(recommendations.userId==14)),truncate=False)


userId,recommendations
10,"List(List(92, 3.4194121), List(40, 2.9391577), List(25, 2.7375712), List(62, 2.674789), List(49, 2.6539521), List(9, 2.6319222), List(42, 2.5832038), List(89, 2.5199132), List(12, 2.4898849), List(81, 2.4231887), List(4, 2.3019376), List(2, 2.2492824), List(0, 2.235539), List(26, 2.1538856), List(82, 2.078905))"
14,"List(List(29, 4.8289194), List(52, 4.442275), List(76, 4.3515997), List(63, 4.0404634), List(25, 3.478157), List(72, 3.3571694), List(58, 3.3119564), List(96, 3.2318575), List(85, 3.223844), List(43, 3.2003627), List(2, 2.9597998), List(53, 2.9035501), List(74, 2.8654978), List(70, 2.8334794), List(14, 2.776963))"
