## Download data source

* Download the data needed for this jupyter notebook from kaggle and store it in a new folder (the-movies-dataset) in the current directory.


* Upon running this cell, the user will be asked for their username and key which can be found in a fresh api token from kaggle.

* Instructions to get api token to authenticate the data request (Note: kaggle account required):
    1. Sign into kaggle.
    2. Go to the 'Account' tab of your user profile and select 'Create New Token'. 
    3. This will trigger the download of kaggle.json, a file containing your API credentials.

* If the folder has been created and the files are already in that folder, than this cell does nothing and requires no credentials.

* Data Source Information: https://www.kaggle.com/datasets/rounakbanik/the-movies-dataset?select=movies_metadata.csv

In [None]:
import opendatasets as od

od.download("https://www.kaggle.com/datasets/rounakbanik/the-movies-dataset")

## Combine Raw Data

Combining certain data from the necessary csv files into a single dataframe (complete_df).

* Rows are removed from each dataframe when they do not have sufficient data for a column or the data from a column does not exist.
* This kind of row removal is done before multiple copies of the same movie data becomes present in multiple rows, to save time and space.
* Iteration through rows of a dataframe at this level is inefficient compared to list iteration.
* This is why the dataframes are converted into lists before iteration and then back again to dataframes, so the merge function can be applied to combine the data into a single dataframe (complete_df).

In [2]:
import pandas as pd
import time

start_time = time.time()


pd.set_option('display.max_colwidth', None)

movies_df = pd.read_csv('./the-movies-dataset/movies_metadata.csv',usecols=("genres","id" ,"title","tagline", "overview","production_companies"),
                          dtype={'genres':"string","id":"string","title": "string", "tagline": "string","overview":"string",
                                    "production_companies" :"string"})[["genres","id" ,"title","tagline", "overview","production_companies"]]
movies_df.dropna(inplace = True)
movies_lst = [row for row in movies_df.values.tolist() if not (row[0][len(row[0])  - 2:] == "[]" or row[5][len(row[5]) - 2:] == "[]")]
movies_df = pd.DataFrame(movies_lst, columns = ("genres","id" ,"title","tagline", "overview","production_companies"), dtype = str)



ratings_df = pd.read_csv('./the-movies-dataset/ratings.csv', usecols = ("userId", "movieId", "rating"),
                       dtype={"userId": "string","movieId": "string","rating": "string"})[["userId", "movieId", "rating"]]
ratings_df.rename(columns={"movieId": "id"}, inplace = True)
ratings_df.dropna(inplace = True)


# Question: What if the removal of duplicate movie ids per user was processed here instead of the cell below???
# Answer: The duplicate removal function can be ran here,...
# but the complete_list in the cell below can also be iterated over with relative complexity in order to remove duplicates.
# The iteration in the next cell also populates the gap list...
# which is critical to be ran directly before the function that determines bounds for users rated movies.
# So, omitting the no duplicate function in this cell and making it run in the next cell avoids redundant iteration.


# Question: What if the test and train ratings bounds was enforced here instead of the cell below???
# Answer: The merge functions below needs to be executed before determining test and train users, because merge will remove rows and ratings from users...
# before enforcing the users to be in a certain bounds for the number of their ratings. 
# The current timing of this function will ensure that the final users are within the set train or test bounds.


keywords_df = pd.read_csv('./the-movies-dataset/keywords.csv', usecols = ("id", "keywords"), dtype={"id": "string","keywords":"string"})[["id", "keywords"]]
keywords_df.dropna(inplace = True)
keywords_lst = [row for row in keywords_df.values.tolist() if not (row[1][len(row[1])  - 2:] == "[]")]
keywords_df = pd.DataFrame(keywords_lst, columns = ("id", "keywords"), dtype = str)


credits_df = pd.read_csv("./the-movies-dataset/credits.csv", usecols = ("cast", "id"), dtype={"cast": "string", "id": "string"})[["cast", "id"]]
credits_df.dropna(inplace = True)
credits_lst = [row for row in credits_df.values.tolist() if (not row[0][len(row[0])  - 2:] == "[]")]
credits_df = pd.DataFrame(credits_lst, columns = ("cast", "id"), dtype = str)


# Default merge is inner: This only keeps movies that have the id existing in both dataframes.
complete_df =  pd.merge(movies_df, ratings_df, on ="id")
complete_df =  pd.merge(complete_df,keywords_df, on ="id")
complete_df  = pd.merge(complete_df,credits_df, on ="id")


complete_df.sort_values(by = 'userId', inplace = True)


# Master dataframe: For each (user id, movie id) row combination there is the combined movie data from movies_df, ratings_df, keywords_df, and credits_df for the movie id in question.
# The columns are reordered.
complete_df  = complete_df.loc[:,['userId','id','rating',"title", "genres","production_companies","keywords", "cast", "tagline", "overview" ]]

# For testing:
print("Minutes taken:", (time.time()-start_time)/60)

# Notice: With the movies, keywords, and credits dataframes, list conversion happens before dropping empty entries
# Tested on personal machine:
# tested without list conversion (old code): 1 minute and 5.7 seconds
# tested with list conversion (current code): 37.1 seconds

Minutes taken: 0.6067105770111084


## User Selection and Data Extraction

1. Remove duplicate movies rated by the same user
2. Randomly choose users that fall into the appropriate bounds for the number of ratings to be a svd user, train user, or test user
3. Extract the data from those users and structure it into a list to be written too a csv file

In [3]:
import ast
import time
from numpy.random import Generator, PCG64

def populate_names(item):
    """Extract names from the syntax of certain data entries:"""
    string  = item[1:-1]
    jsons = string.split("}, ")   
    names = ""
    index = 0
    for item in jsons:
        if(index == len(jsons)-1):
            temp_dict = ast.literal_eval(item)
            names+=str(temp_dict["name"])
        else:
            temp_dict = ast.literal_eval(item+"}")
            names+=str(str(temp_dict["name"])+" ")
        index += 1
    return names


def provide_data(row):
    """Extract data from row of complete_list:"""
    movie_data = []
    movie_data.append(int(row[0]))
    movie_data.append(int(row[1]))
    movie_data.append(float(row[2]))
    movie_data.append(row[3])  

    movie_data.append(populate_names(row[4]))
    movie_data.append(populate_names(row[5]))
    movie_data.append(populate_names(row[6]))
    movie_data.append(populate_names(row[7]))

    movie_data.append(str(row[8]))
    movie_data.append(str(row[9]))
    return movie_data
    


# main:
start_time = time.time()

SEED_INT = 42
outer_gen = Generator(PCG64(SEED_INT))
# The list of rows with users id, the users rating for the movie, and metadata for the movie:
# Note: It is sorted by user_id.
complete_list = complete_df.values.tolist()

print("Complete number of users:", len(list(complete_df["userId"].unique()))) # 260788

# The same as complete_list where data is omitted for movies that have already been rated by the user in a previous row
complete_list_no_dups = []

# Distinguish the user the row belongs to:
last_id = complete_list[0][0]

# The set of movies that a user has rated:
# It is used to omit later ratings of a movie that the user has already rated.
movie_set = set()

# The number of rows of movie data a single user takes up for each user:
gaps = []

# Appended to gaps when all of a users rows of movie data have been counted:
gap_len = 0


# Populates gaps and complete_list_no_dups by omitting movies that already have a rating in respect to each user:
# Note: This code is faster than using dataframe methods.
# Example: Filter data by user and then remove duplicate movie ids for each user.
# This avoids slow dataframe iteration, but the filter method is also slow.
for row in complete_list:
    if last_id != row[0]:
        movie_set= set()
        complete_list_no_dups.append(row)
        movie_set.add(row[1])
        gaps.append(gap_len)
        gap_len = 1
    else:
        if row[1] not in movie_set:
            complete_list_no_dups.append(row)
            gap_len+=1
            movie_set.add(row[1])
    last_id = row[0]

# Add the last gap_len:
gaps.append(gap_len)



# Bounds represents the first index and last index(non inclusive) of the range of ratings for a user in the sorted complete_list_no_dups
full_index = 0 
bounds = [] 

for user_index in range(len(gaps)):
    bounds.append([full_index, full_index+gaps[user_index]])
    full_index+=gaps[user_index]    
 



# These set the rating requirements for svd, train, and test users:
SVD_USER_RATING_LB = 20
SVD_USER_RATING_UB = 30
USER_RATING_LB = 5
USER_RATING_UB = 10

# Makes selection of user bounds random:
outer_gen.shuffle(bounds)

NOF_SVD_USERS = 10000
NOF_TRAIN_USERS = 10000
NOF_TEST_USERS = 10000


last_index = -1
bounds_svd_users = []
bounds_train_users = []
bounds_test_users = []


index = 0
for item in bounds:
    if item[1]-item[0] >=SVD_USER_RATING_LB and item[1]-item[0] <=SVD_USER_RATING_UB:
        bounds_svd_users.append(item)
        if len(bounds_svd_users) == NOF_SVD_USERS:
            last_index = index
            print("nof svd users met")
            break
    index+=1



index+=1
for item in bounds[last_index:]:
    if item[1]-item[0] >=USER_RATING_LB and item[1]-item[0] <=USER_RATING_UB:
        bounds_train_users.append(item)
        if len(bounds_train_users) == NOF_TRAIN_USERS:
            last_index = index
            print("nof train users met")
            break
    index+=1

index+=1
for item in bounds[last_index:]:
    if item[1]-item[0] >=USER_RATING_LB and item[1]-item[0] <=USER_RATING_UB:
        bounds_test_users.append(item)
        if len(bounds_test_users) == NOF_TEST_USERS:
            print("nof test users met")
            break



# Sample the data from complete_list_no_dups once the bounds (low memory) have been randomly selected:
sampled_data = []


for bound in bounds_svd_users:
    for movie in complete_list_no_dups[bound[0]:bound[1]]:
        movie_data = provide_data(movie)
        sampled_data.append(movie_data)


for bound in bounds_train_users:
    for movie in complete_list_no_dups[bound[0]:bound[1]]:
        movie_data = provide_data(movie)
        sampled_data.append(movie_data)



for bound in bounds_test_users:
    for movie in complete_list_no_dups[bound[0]:bound[1]]:
        movie_data = provide_data(movie)
        sampled_data.append(movie_data)



print("Minutes taken:", (time.time()-start_time)/60)


Complete number of users: 260788
nof svd users met
nof train users met
nof test users met
Minutes taken: 6.106799916426341


## Write Data

Save selected data in constructed_data.csv file so that cells below it can run without running this cell and above.


In [4]:
import csv
import os

current_directory = os.getcwd()
final_directory = os.path.join(current_directory, 'constructed_data')
if not os.path.exists(final_directory):
   os.makedirs(final_directory)

output_path = os.path.join("constructed_data", "constructed_data.csv")

with open(output_path, "w", encoding="utf-8", newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['userId','id','rating',"title", "genres","production_companies","keywords", "cast", "tagline", "overview"])
    writer.writerows(sampled_data)

## Read Data
This is the starting cell to run if the data is already saved to the constructed_data.csv. 

In [1]:
import csv
import os

data_list =[]


input_path = os.path.join("constructed_data", "constructed_data.csv")

with open(input_path, 'r', encoding="utf-8") as f:
    csv_reader = csv.reader(f)
    data_list = list(csv_reader)

data_list = data_list[1:]

## Organize Data:

Extract data from data_list into user_to_data_svd, user_to_data_train, and user_to_data_test.

Each of these can be thought of a list of users and each user can be though of a list of movies ratings by the users and the movie data for that movie.

In [2]:
NOF_SVD_USERS = 10000
NOF_TRAIN_USERS = 10000
NOF_TEST_USERS = 10000


user_to_data_svd = []
user_to_data_train= []
user_to_data_test = []

user_id = data_list[0][0]
ratings = []
user_index = 0



for row in data_list:
    if (row[0]!=user_id):
        if(user_index<NOF_SVD_USERS and user_index>=0):
            user_to_data_svd.append(ratings)
        elif(user_index<NOF_TRAIN_USERS+NOF_TRAIN_USERS and user_index>=NOF_SVD_USERS):
            user_to_data_train.append(ratings)
        else:
            user_to_data_test.append(ratings)         
        user_id = row[0]
        ratings = [row]
        user_index+=1
    else:
        ratings.append(row)



user_to_data_test.append(ratings)

## Bayesian Optimization
* The following cell tries to find the best set of hyperparameters within the given bounds for the svd function.
* After this process, the SVD function with the best performing hyperparameters is used in the full_model notebook where the actual model takes place.
* Since the bounds are not all inclusive and the iterations of the gp.minimize function is limited, there is potential for improvement if the time was taken to run a deeper search.
* A very high number of iterations are ran with mixed data to test each set of hyperparameters before evaluation, to reduce the factor of noise attributing to the performance of the hyperparameters.

In [3]:
from sklearn.metrics import mean_squared_error
from numba import njit
import copy
import time
import numpy as np
from numpy.random import Generator, PCG64
from skopt import gp_minimize
from skopt.space import Real
from skopt.space import Integer
from dask.distributed import Client, LocalCluster
import dask.array as da
import os





@njit
def epoch(list, b1, b2, p, q, overall_average, lr, rt):
    """
    Update the parameters (b1, b2, q, and p) for each row in the list using stochastic gradient descent.
    """
    for row in list:
        u = int(row[0])
        i = int(row[1])
        r = row[2]

        pred = overall_average+b1[u]+b2[i]+np.dot(p[u],q[i])
        error = r-pred
        b1[u] += lr*(error- rt*b1[u])
        b2[i] += lr*(error- rt*b2[i])
        temp = lr*(error*q[i] -rt*p[u])
        q[i] += lr*(error*p[u] -rt*q[i])
        p[u] += temp





def svd_iterative(gen_input, list, n, epochs, rt, lr, overall_average, nof_users, nof_movies):
    """
    An iterative SVD method that has been shown to out perform non-iterative svd methods
    """
    
    q = gen_input.normal(0, .1, (nof_movies, n))
    p = gen_input.normal(0, .1, (nof_users, n))


    b1 = np.zeros(nof_users)
    b2 = np.zeros(nof_movies)

    np_array = np.array(list)

    for _ in range(epochs):
        epoch(np_array, b1, b2, p, q, overall_average, lr, rt)

    return b1, b2, p, q


def rmse_sum(block):
    """
    This function computes the rmse for each row in the block then returns the sum of them.
    """
    total_sum = 0 

    for row in block:
        seed_input, user_to_data_svd_temp, user_to_data_test_temp, nof_latent_features, epochs, rt, lr = row

        gen = Generator(PCG64(seed_input))


        # re-index the user ids and the movie ids in the order of their occurrence:
        old_to_new_svd  = dict()
        last_index_svd = 0
        svd_cnt = 0

        for user in user_to_data_svd_temp:
            for movie in user: 
                if(movie[1] in old_to_new_svd.keys()):
                    movie[1] = old_to_new_svd[movie[1]]
                else:
                    old_to_new_svd[movie[1]] = last_index_svd
                    movie[1] = last_index_svd
                    last_index_svd+=1      
                movie[0] = svd_cnt
            svd_cnt+=1

        old_to_new_test = copy.deepcopy(old_to_new_svd)
        last_index_test = last_index_svd
        test_cnt = svd_cnt

        for user in user_to_data_test_temp:
            for movie in user: 
                if(movie[1] in old_to_new_test.keys()):
                    movie[1] = old_to_new_test[movie[1]]
                else:
                    old_to_new_test[movie[1]] = last_index_test
                    movie[1] = last_index_test
                    last_index_test+=1      
                movie[0] = test_cnt
            test_cnt+=1


        # Populate the variables that are needed for the svd method to make predictions:
        target_rating_test = []
        test_list = []

        movies_order_svd = set()
        overall_average_svd = 0 
        cnt_svd = 0

        for user in user_to_data_svd_temp:
            for movie in user:
                movies_order_svd.add(movie[1])
                test_list.append([int(movie[0]), int(movie[1]), float(movie[2])])
                overall_average_svd+=float(movie[2])
                cnt_svd += 1

        movies_order_test = copy.deepcopy(movies_order_svd)
        overall_average_test = overall_average_svd 
        cnt_test = cnt_svd
        test_rating_to_predict = []

        for user in user_to_data_test_temp:
            rand_num  = gen.integers(0, len(user))
            index = 0
            for movie in user:
                movies_order_test.add(movie[1])
                if(index == rand_num):
                    test_rating_to_predict.append([int(movie[0]), int(movie[1])])
                    target_rating_test.append(float(movie[2]))
                else:
                    overall_average_test+=float(movie[2])
                    cnt_test += 1
                    test_list.append([int(movie[0]), int(movie[1]), float(movie[2])])
                index+=1

        overall_average_test = overall_average_test/cnt_test

        gen.shuffle(test_list)


        # Make predictions with the svd method and add the rmse to total_sum
        b1, b2, p, q = svd_iterative(gen, test_list, nof_latent_features, epochs, rt, lr,
                                    overall_average_test, len(user_to_data_svd_temp)+len(user_to_data_test_temp), len(movies_order_test))

        prediction = [overall_average_test + b1[pair[0]]+b2[pair[1]]
                                    +np.dot(p[pair[0]],q[pair[1]]) for pair in test_rating_to_predict]
    
        total_sum+=mean_squared_error(target_rating_test, prediction, squared = False)

    return (np.array([[total_sum]], dtype="float32"))





def objective_function(vars):
    """
    This function is responsible for testing the optimization hyperparameters...
    for many iterations on a great variety of users to reduce the noise of the rmse metric.
    It makes sure that the best hyperparameter are not just the best by chance but by raw effectiveness.
    """
    nof_svd_users, nof_test_users,nof_latent_features, epochs, rt, lr = vars

    mse_sum = 0

    ITERATIONS = 320

    parameters_list = []

    # Note: 
    # This loop has a high cost because it makes a new choice of users every iteration.
    # However, if the choice of users was repeated for a number of iterations before switching then there would be more noise.
    # The more variety of users the less noise. Also, the cost is relatively low compared to the rest of the operations.

    # Note:
    # user_to_data_svd_copy and user_to_data_train_copy are converted from list to numpy array of objects, then sampled, and then convert back to a list. 
    # This is to avoid using more than one random type and suppresses the warning about arrays being a ragged nested sequences. 

    for _ in range(ITERATIONS):
        parameters_list.append([outer_gen.integers(0,100000),
                                list(copy.deepcopy(outer_gen.choice(np.array(user_to_data_svd, dtype='object'), nof_svd_users, replace = False))),
                                list(copy.deepcopy(outer_gen.choice(np.array(user_to_data_test, dtype = "object"), nof_test_users, replace = False))),
                                nof_latent_features, epochs, rt, lr])


    parameters_arr = np.array(parameters_list, dtype="object")
    dask_array = da.from_array(parameters_arr, chunks=(int(ITERATIONS/8),7))
    results = dask_array.map_blocks(rmse_sum, chunks = (1,1), dtype="float32").compute()


    for block in results:
        mse_sum+= block[0]

    return mse_sum/ITERATIONS

# main:
start = time.time()

cluster = LocalCluster(n_workers=os.cpu_count())

client = Client(cluster)

SEED_INT = 5
outer_gen = Generator(PCG64(SEED_INT))

mid_points = [(300, 500),(100, 200),(100,300),(100,400),(.01, .075),(.001, .05)]

# Note: This works because the midpoints between the integer bounds are integers.
mid_points = [(lambda pair : int((pair[0]+pair[1])/2) if((pair[0]+pair[1])/2==int((pair[0]+pair[1])/2)) else (pair[0]+pair[1])/2)(item) for item in mid_points]


bounds = [Integer(300, 500, name = 'nof_svd_users'),Integer(100, 200, name = 'nof_test_users'),
          Integer(100,300, name = 'nof_latent_features'),Integer(100,400, name = 'epochs'),
          Real(.01, .075, name = 'rt'),Real(.001, .05, name = 'lr')]


res = gp_minimize(objective_function,                 
                  bounds,      
                  n_calls=30,    
                  n_initial_points = 10, 
                  x0 = mid_points,    
                  random_state= SEED_INT,
                  n_points = 10000,
                  )


client.close()
cluster.close()


print("Solution: x", res.x)
print("Result: y", res.fun)
print("Minutes taken:", (time.time()-start)/60)





This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Solution: x [311, 183, 173, 394, 0.015838367452944445, 0.020440093655358895]
Result: y 1.0476970911026
Minutes taken: 2.2690266489982607


## Test hyperparameters
* The following cell is an method to test the optimized hyperparameters found in the cell above.
* These tests are used to make sure the hyperparameters are good at generalizing new data.

* Worse performance on average in the output of this cell when using the same optimized hyperparameters show there was some sensitivity to noise that contributed to the selection of those hyperparameters. In other words, some of the performance was simply due to chance.

* To mitigate this noise, a large number of tests are done in the cell above.
* A large number of tests are done in the following cell to reduce the noise of this test itself.

In [6]:
import numpy as np
from sklearn.metrics import mean_squared_error
from numba import njit
import copy
from numpy.random import Generator, PCG64
from dask.distributed import Client, LocalCluster
import dask.array as da
import time
import os


@njit
def epoch(list, b1, b2, p, q, overall_average, lr, rt):
    """
    Update the parameters (b1, b2, q, and p) for each row in the list using stochastic gradient descent.
    """
    for row in list:
        u = int(row[0])
        i = int(row[1])
        r = row[2]

        pred = overall_average+b1[u]+b2[i]+np.dot(p[u],q[i])
        error = r-pred
        b1[u] += lr*(error- rt*b1[u])
        b2[i] += lr*(error- rt*b2[i])
        temp = lr*(error*q[i] -rt*p[u])
        q[i] += lr*(error*p[u] -rt*q[i])
        p[u] += temp





def svd_iterative(gen_input, list, n, epochs, rt, lr, overall_average, nof_users, nof_movies):
    """
    An iterative SVD method that has been shown to out perform non-iterative svd methods
    """
    
    q = gen_input.normal(0, .1, (nof_movies, n))
    p = gen_input.normal(0, .1, (nof_users, n))


    b1 = np.zeros(nof_users)
    b2 = np.zeros(nof_movies)

    np_array = np.array(list)

    for _ in range(epochs):
        epoch(np_array, b1, b2, p, q, overall_average, lr, rt)

    return b1, b2, p, q



def rmse_sum(block):
    """
    This function computes the rmse for each row in the block then returns the sum of them.
    """
    total_sum = 0 

    for row in block:
        seed_input, user_to_data_svd_temp, user_to_data_test_temp, nof_latent_features, epochs, rt, lr = row

        gen = Generator(PCG64(seed_input))


        # re-index the user ids and the movie ids in the order of their occurrence:
        old_to_new_svd  = dict()
        last_index_svd = 0
        svd_cnt = 0

        for user in user_to_data_svd_temp:
            for movie in user: 
                if(movie[1] in old_to_new_svd.keys()):
                    movie[1] = old_to_new_svd[movie[1]]
                else:
                    old_to_new_svd[movie[1]] = last_index_svd
                    movie[1] = last_index_svd
                    last_index_svd+=1      
                movie[0] = svd_cnt
            svd_cnt+=1

        old_to_new_test = copy.deepcopy(old_to_new_svd)
        last_index_test = last_index_svd
        test_cnt = svd_cnt

        for user in user_to_data_test_temp:
            for movie in user: 
                if(movie[1] in old_to_new_test.keys()):
                    movie[1] = old_to_new_test[movie[1]]
                else:
                    old_to_new_test[movie[1]] = last_index_test
                    movie[1] = last_index_test
                    last_index_test+=1      
                movie[0] = test_cnt
            test_cnt+=1


        # Populate the variables that are needed for the svd method to make predictions:
        target_rating_test = []
        test_list = []

        movies_order_svd = set()
        overall_average_svd = 0 
        cnt_svd = 0

        for user in user_to_data_svd_temp:
            for movie in user:
                movies_order_svd.add(movie[1])
                test_list.append([int(movie[0]), int(movie[1]), float(movie[2])])
                overall_average_svd+=float(movie[2])
                cnt_svd += 1

        movies_order_test = copy.deepcopy(movies_order_svd)
        overall_average_test = overall_average_svd 
        cnt_test = cnt_svd
        test_rating_to_predict = []

        for user in user_to_data_test_temp:
            rand_num  = gen.integers(0, len(user))
            index = 0
            for movie in user:
                movies_order_test.add(movie[1])
                if(index == rand_num):
                    test_rating_to_predict.append([int(movie[0]), int(movie[1])])
                    target_rating_test.append(float(movie[2]))
                else:
                    overall_average_test+=float(movie[2])
                    cnt_test += 1
                    test_list.append([int(movie[0]), int(movie[1]), float(movie[2])])
                index+=1

        overall_average_test = overall_average_test/cnt_test

        gen.shuffle(test_list)


        # Make predictions with the svd method and add the rmse to total_sum:
        b1, b2, p, q = svd_iterative(gen, test_list, nof_latent_features, epochs, rt, lr,
                                    overall_average_test, len(user_to_data_svd_temp)+len(user_to_data_test_temp), len(movies_order_test))

        prediction = [overall_average_test + b1[pair[0]]+b2[pair[1]]
                                    +np.dot(p[pair[0]],q[pair[1]]) for pair in test_rating_to_predict]
    
        total_sum+=mean_squared_error(target_rating_test, prediction, squared = False)

    return (np.array([[total_sum]], dtype="float32"))

# main:
start_time = time.time()

cluster = LocalCluster(n_workers=os.cpu_count())
client = Client(cluster)

mse_sum =0 
ITERATIONS = 320

# Note: Set the best hyperparameters found in the bayesian optimization process in the cell above.
nof_svd_users, nof_test_users, nof_latent_features, epochs, rt, lr = (467, 189, 292, 268, 0.01588566520994121, 0.04980345674001443)


SEED_INT = 5
outer_gen = Generator(PCG64(SEED_INT))

parameters_list = []

# Note: 
# This loop has a high cost because it makes a new choice of users every iteration.
# However, if the choice of users was repeated for a number of iterations before switching then there would be more noise.
# The more variety of users the less noise. Also, the cost is relatively low compared to the rest of the operations.

# Note:
# user_to_data_svd_copy and user_to_data_train_copy are converted from list to numpy array of objects, then sampled, and then convert back to a list. 
# This is to avoid using more than one random type and suppresses the warning about arrays being a ragged nested sequences. 

for _ in range(ITERATIONS):
    parameters_list.append([outer_gen.integers(0,100000),
                            list(copy.deepcopy(outer_gen.choice(np.array(user_to_data_svd, dtype='object'), nof_svd_users, replace = False))),
                            list(copy.deepcopy(outer_gen.choice(np.array(user_to_data_test, dtype = "object"), nof_test_users, replace = False))),
                            nof_latent_features, epochs, rt, lr])

parameters_arr = np.array(parameters_list, dtype="object")
dask_array = da.from_array(parameters_arr, chunks=(int(ITERATIONS/8),7))
results = dask_array.map_blocks(rmse_sum, chunks = (1,1), dtype="float32").compute()

for row in results:
    mse_sum+= row[0]

print("Average RMSE score:",mse_sum/ITERATIONS)

client.close()
cluster.close()

print("Minutes taken:", (time.time()- start_time)/60)


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


Fixed Parameters rmse score: 1.0430915474891662
Time Taken 8.625545132160187
