In [1]:
# install necessary libaries
!pip install numpy scipy dask distributed




In [2]:
####################################################################################################
## NOTE : the comments in this code were generated with the help of AI , the code itself was not. ##
####################################################################################################

# import neccesary libraries

import dask.bag as db
import numpy as np
from scipy.spatial.distance import cosine
from dask.distributed import Client, LocalCluster
import urllib.request
import time
import sys
import random

def run_model(UserIDofInterest , problemSize , numberOfWorkers , k):
    """
    Runs a collaborative filtering recommendation model to suggest streamers to a user.

    Args:
        UserIDofInterest (int): The ID of the user for whom to generate recommendations.
        problemSize (int): The number of records to process from the dataset.
        numberOfWorkers (int): The number of Dask workers to use for parallel computation.
        k (int): The number of top recommendations to display.

        please insure that the UserIDofInterest < problemSize / 30 , this will guarantee that the target user is in the problem set
    """

    # download data set (100k csv file)
    filename = '100k_a.csv'
    urllib.request.urlretrieve('https://mcauleylab.ucsd.edu/public_datasets/gdrive/twitch/100k_a.csv', filename) # Downloads the 100k_a.csv dataset from mcauleylab.ucsd.edu.


    cluster.scale(numberOfWorkers) # Scales the Dask cluster to the specified number of workers.
    client.wait_for_workers(numberOfWorkers) # Waits until all specified workers are ready.


    # load data into dask bag and preprocess
    def processIntoLists(line):
        """
        Processes a single line of input data into a list of strings.
        Removes newline characters and splits the line by commas.
        """
        line = line.rstrip('\n') # Removes trailing newline character.
        line = list(line.split(',')) # Splits the string by commas and converts it to a list.
        return line


    def doWatchTimeprocessing(line):
        """
        Calculates the watch time for a record by subtracting the start time from the end time.
        """
        line[-1] = int(line[-1]) - int(line[-2]) # Calculates watch time (end_time - start_time).
        return line


    def discardStreamID(line):
        """
        Discards specific elements (Stream IDs) from the input line.
        Assumes elements at index 1 and 2 are to be removed.
        """
        del line[1] # Deletes the element at index 1 (Stream ID).
        del line[2]
        return line


    def preProcessing(inputBag):
        """
        Applies a series of preprocessing steps to a Dask Bag.
        Includes converting lines to lists, calculating watch times, and discarding stream IDs.
        """
        inputBag = inputBag.map(processIntoLists) # Applies processIntoLists to each element in the bag.
        inputBag = inputBag.map(doWatchTimeprocessing) # Applies doWatchTimeprocessing to each element.
        inputBag = inputBag.map(discardStreamID) # Applies discardStreamID to each element.
        return inputBag


    bag = db.read_text('100k_a.csv') # Reads the CSV file into a Dask Bag.
    bag = preProcessing(bag) # Applies the defined preprocessing steps to the Dask Bag.
    bag = bag.persist() # Persists the Dask Bag in memory, forcing computation and making it available for subsequent operations.
    bag = bag.take(problemSize , compute = False) # Takes a problemSize number of elements from the bag without immediate computation.


    def createLookupTable(inputBag):
        """
        Creates a dictionary mapping streamer names to unique integer IDs.
        Extracts distinct streamer names from the input bag and assigns incremental IDs.
        """
        distinctBag = inputBag.distinct(key = lambda x: x[1]) # Gets distinct streamer names (at index 1) from the bag.
        streamerNames = distinctBag.map(lambda x: x[1]) # Extracts just the streamer names.
        streamerNamesList = list(streamerNames) # Collects streamer names into a local list.

        streamerIDlookUpDict = dict() # Initializes an empty dictionary for the lookup table.
        for i, steamerName in enumerate(streamerNamesList): # Iterates through distinct streamer names.
            streamerIDlookUpDict[steamerName] = i # Assigns a unique integer ID to each streamer name.
        return streamerIDlookUpDict

    streamerIDlookUpDict = createLookupTable(bag) # Creates the streamer name to ID lookup dictionary.


    def mapStreamerNametoStreamerID(line):
        """
        Maps the streamer name in a record to its corresponding integer ID using the lookup dictionary.
        """
        line[1] = streamerIDlookUpDict[line[1]] # Replaces the streamer name with its ID.
        return line

    transformedBag = bag.map(mapStreamerNametoStreamerID) # Applies the mapping function to the Dask Bag.


    def combinedKey(x):
        """
        Creates a combined key from UserID and Streamer ID for grouping.
        """
        return x[0], x[1] # Returns a tuple of UserID and StreamerID.

    # Folds the bag, grouping by (UserID, StreamerID) and summing the watch times (x[2]) for each unique pair.
    folded = transformedBag.foldby(key = combinedKey , binop = lambda acc,x: acc + x[2] , initial=0 , combine = lambda x,y: x+y)


    def reformatData(x):
        """
        Reformats the data from the folded bag into a (UserID, StreamerID, WatchTime) tuple.
        Ensures UserID is an integer.
        """
        userID , StreamerID = x[0] # Unpacks the combined key.
        watchTime = x[1] # Gets the summed watch time.
        return (int(userID) , StreamerID , watchTime) # Returns the reformatted tuple.

    folded2 = folded.map(reformatData) # Applies the reformatting function to the folded bag.

    # group by userID
    foldedGrouped = folded2.groupby(lambda x: x[0]).persist() # Groups the data by UserID and persists the result.


    # Filters the grouped data to get the specific UserID of interest and extracts their streamer watch data.
    fullDataSet = foldedGrouped.filter(lambda x : x[0] == UserIDofInterest).pluck(1)
    fullDataSet = fullDataSet.compute()[0] # Computes and retrieves the full watch data for the user of interest.


    def predicitonTestingFunc(line , UserIDofInterest):
        """
        Omits specific data points (streamer watch entries) for the target user to simulate unknown preferences for testing.
        This allows for evaluating the model's ability to predict these "omitted" values.
        """
        userID , data = line[0], line[1] # Unpacks UserID and their associated data.
        if userID == UserIDofInterest: # Checks if the current line belongs to the target user.
            data[3:4] = [] # Omits data at index 3.
            data[6:7] = [] # Omits data at index 6.
            data[10:11] = [] # Omits data at index 10.
            data[13:14] = [] # Omits data at index 13.
        return (userID , data) # Returns the modified user data.

    # Applies the prediction testing function to the grouped data, omitting some entries for the target user.
    foldedGroupedOmitted = foldedGrouped.map(predicitonTestingFunc , UserIDofInterest).persist()


    def mappingFunct2(line):
        """
        Transforms a user's watch data into a NumPy array (vector) where each index
        corresponds to a StreamerID and the value at that index is the watch time.
        Initializes a large array and populates it based on the streamer IDs and watch times.
        """
        userID , data = line[0] , line[1] # Unpacks UserID and their watch data.
        v = np.zeros(170000) # Initializes a NumPy array of zeros with a size large enough to cover all possible streamer IDs.

        for element in data: # Iterates through each streamer watch entry for the user.
            v[element[1]] = element[2] # Populates the array: index is StreamerID, value is watch time.

        return (userID , v) # Returns the UserID and their watch preference vector.

    data = foldedGroupedOmitted.map(mappingFunct2) # Applies the mapping function to create watch preference vectors for all users.


    def datanormalisationfunct(line):
        """
        Normalizes a user's watch preference array so that watch time values are percentages
        of that user's total watch time. This creates a probability distribution of watch preferences.
        """
        userID , array = line[0] , line[1] # Unpacks UserID and their watch preference array.
        array = array / array.sum() # Divides each element by the sum of all elements to get percentages.

        return (userID , array) # Returns the UserID and their normalized watch preference array.

    datanormalisation = data.map(datanormalisationfunct) # Applies the normalization function to all user watch arrays.


    def downCastArrayDtype(line):
        """
        Downcasts the data type of the NumPy arrays to float16 to reduce memory usage.
        """
        userID , array = line[0] ,line[1] # Unpacks UserID and array.
        return userID , array.astype(np.float16) # Returns UserID and the array cast to float16.

    datanormalisation = datanormalisation.map(downCastArrayDtype) # Applies the downcasting function.


    datanormalisation = datanormalisation.persist() # Persists the normalized and downcasted data in memory.


    def getUserInputArray(UserIDofInterest):
        """
        Retrieves the normalized watch preference array for the specific target user.
        """
        # inputUserArray = list(datanormalisation.filter(lambda x: x[0] == UserIDofInterest))[0][1].flatten()

        # Filters the normalized data to find the target user's array, takes the first result,
        # extracts the array, and flattens it to a 1D array.
        inputUserArray = datanormalisation.filter(lambda x: x[0] == UserIDofInterest).take(1)[0][1].flatten()

        return inputUserArray # Returns the target user's watch preference array.


    def cosineSimilaritys2(givenUser):
        """
        Calculates the cosine similarity between the target user's watch preference array
        and another given user's watch preference array.
        """
        userID, givenUserMatrix = givenUser[0] , givenUser[1] # Unpacks the given user's ID and matrix.
        givenUserArray = np.array(givenUserMatrix).flatten() # Flattens the given user's matrix into a 1D array.
        cosineSimilarity = 1 - cosine(inputUserArray , givenUserArray) # Calculates cosine similarity (1 - cosine distance).
        return (userID , cosineSimilarity) # Returns the given user's ID and their cosine similarity to the target user.

    def computeCosineSimilarities( inputUserArray , datanormalisation ):
        """
        Computes cosine similarities between the target user and all other users in the dataset.
        """
        cosineSimilaritys = datanormalisation.map(cosineSimilaritys2) # Applies the cosine similarity calculation to all users.
        return cosineSimilaritys

    inputUserArray = getUserInputArray(UserIDofInterest) # Gets the target user's normalized watch preference array.

    cosineSimilairtys = computeCosineSimilarities( inputUserArray , datanormalisation ) # Computes cosine similarities for all users.


    # Selects the top 5 users with the highest cosine similarity to the target user.
    topknearestneighbours = cosineSimilairtys.topk(5 , key = lambda x: x[1]).persist()

    cosineSimilarityValues = topknearestneighbours.pluck(1) # Extracts only the cosine similarity values from the nearest neighbors.

    totalCosineSimilarity = cosineSimilarityValues.sum() # Calculates the sum of these cosine similarity values.


    def upsize(input):
        """
        Upsizes the data type of the arrays back to float64 for increased precision in subsequent calculations.
        """
        userID , array = input[0] , input[1] # Unpacks UserID and array.
        array = array.astype(np.float64) # Casts the array to float64.
        return userID , array # Returns UserID and the upsized array.

    topknearestneighbours = topknearestneighbours.map(upsize) # Applies the upsize function to the nearest neighbors' data.


    def normaliseCosineSimilarityValues(line , totalCosineSimilarity):
        """
        Normalizes the cosine similarity values of the nearest neighbors.
        Each similarity value is divided by the total sum of all similarities of the nearest neighbors.
        """
        userID , value = line[0] , line[1] # Unpacks UserID and cosine similarity value.
        value = value /totalCosineSimilarity # Normalizes the value.
        return (userID , value) # Returns UserID and the normalized cosine similarity.

    # Applies the normalization to the nearest neighbors' cosine similarity values and persists the result.
    topknearestneighboursNormlaised = topknearestneighbours.map(normaliseCosineSimilarityValues , totalCosineSimilarity).persist()

    nearestNeighbourIDs = topknearestneighboursNormlaised.pluck(0) # Extracts only the UserIDs of the normalized nearest neighbors.

    # Joins the normalized full dataset with the nearest neighbor IDs to filter for only the nearest neighbors' data.
    nearestneighboursDataNormilisation = datanormalisation.join(nearestNeighbourIDs , on_self = lambda x : x[0] , on_other = lambda x : x)

    def reformat(line):
        """
        Reformats the joined data, extracting only the relevant data from the joined tuple.
        """
        extraUserID , dataWeWant = line[0] , line[1] # Unpacks the joined tuple.
        return dataWeWant # Returns only the desired data.

    nearestneighboursDataNormilisation = nearestneighboursDataNormilisation.map(reformat) # Applies the reformatting.


    def normaliseWatchTimes(line):
        """
        Normalizes the watch times within each nearest neighbor's data.
        Each streamer's watch time for a given neighbor is expressed as a percentage of that neighbor's total watch time.
        """
        UserID , data = line # Unpacks UserID and their associated watch data.
        totalWatchTime = 0
        for element in data: # Calculates the total watch time for the current user.
            totalWatchTime += element[2]
        normalisedData = []
        for element in data: # Normalizes each streamer's watch time.
            normalisedElement = (element[0] , element[1] , element[2] / totalWatchTime) # Creates a new tuple with normalized watch time.
            normalisedData.append(normalisedElement)
        return UserID , normalisedData # Returns UserID and their normalized watch data.

    normalisedfoldedGroupedOmitted = foldedGroupedOmitted.map(normaliseWatchTimes) # Applies watch time normalization to the omitted grouped data.

    # Joins the normalized omitted data with the normalized nearest neighbors (by cosine similarity)
    # to combine their watch data with their normalized similarity scores.
    onlyNearestNeighbours = normalisedfoldedGroupedOmitted.join(topknearestneighboursNormlaised , lambda x : x[0])


    def weightByCosineSimilarity(line):
        """
        Weights each streamer's watch time by the corresponding nearest neighbor's normalized cosine similarity.
        This gives more importance to recommendations from more similar neighbors.
        """
        userID , cosineSimilarity = line[0] # Unpacks UserID and normalized cosine similarity.
        UserID2 , data = line[1] # Unpacks the neighbor's UserID and their normalized watch data.

        weighted_data = []
        for element in data: # Iterates through each streamer in the neighbor's watch data.
            element = list(element) # Converts the tuple to a list to modify.
            element[2] = element[2] * cosineSimilarity # Weights the watch time by cosine similarity.
            weighted_data.append(tuple(element)) # Converts back to tuple and appends.

        return (userID, weighted_data) # Returns the neighbor's UserID and their weighted watch data.

    weightedOnlyNearestNeighbours = onlyNearestNeighbours.map(weightByCosineSimilarity) # Applies the weighting function.


    def reformating(line):
        """
        Reformats the weighted data by removing the UserID from each streamer's watch entry.
        """
        UserID = line[0] # Unpacks UserID.
        data = line[1] # Unpacks the weighted watch data.
        for index , element in enumerate(data): # Iterates through each streamer entry.
            element = list(element) # Converts to list.
            del element[0] # Deletes the UserID from the streamer entry.
            data[index] = tuple(element) # Converts back to tuple.

        return data # Returns the reformatted (StreamerID, weighted_watch_time) list.

    # Applies the reformatting and flattens the bag, resulting in a bag of (StreamerID, weighted_watch_time) tuples.
    refrommatedWeightedOnlyNearestNeighbours = weightedOnlyNearestNeighbours.map(reformating).flatten()



    # Folds the reformatted data, grouping by StreamerID and summing the weighted watch times.
    # The sum represents the prediction score for each streamer.
    groupedStreamerWatchTime = refrommatedWeightedOnlyNearestNeighbours.foldby(key = lambda x : x[0] , binop = lambda acc , x : acc + x[1] , initial = 0)

    # Repartitions the bag for better parallelism and persists the result.
    groupedStreamerWatchTime = groupedStreamerWatchTime.repartition(numberOfWorkers * 2).persist()


    # Filters out predictions below a certain threshold to keep only significant recommendations.
    groupedStreamerWatchTime = groupedStreamerWatchTime.filter(lambda x : x[1] > 0.0005 ).persist()

    # Selects the top 75 streamers based on their prediction value (highest score).
    predictionsBag = groupedStreamerWatchTime.topk(75 , key = lambda x : x[1])

    predictions = predictionsBag.compute() # Computes and retrieves the top predictions.

# map StreamerID back to Streamer name and print predictions in form of (Streamer Name , prediction value) sorted by prediction value
    reverseDicitonary = dict() # Initializes an empty dictionary for reverse lookup (ID to Name).
    for key , value in streamerIDlookUpDict.items(): # Populates the reverse dictionary.
        reverseDicitonary[value] = key


    print("\n" + "="*40 + "\n     Predictions\n     (Streamer Name , prediction ranking value) \n" + "="*40 + "\n")

    for element in predictions: # Iterates through the computed predictions.

        streamerID , value = element[0] , element[1] # Unpacks StreamerID and prediction value.
        streamerName = reverseDicitonary[streamerID] # Gets the streamer name using the reverse lookup.
        print(streamerName , value) # Prints the streamer name and its prediction value.

# print the input data for the target user that the model "saw"
    print("\n" + "="*40 + "\n     Input Data (ie Data Seen By Model)\n     (Streamer Name , total watch time for input user)\n" + "="*40 + "\n")
    # Filters the omitted grouped data to get the specific target user's data.
    targetUserData = foldedGroupedOmitted.filter(lambda x : x[0] == UserIDofInterest)
    InputData = targetUserData.pluck(1).compute() # Extracts and computes the input data for the target user.
    InputData = InputData[0] # Gets the first (and only) element.
    InputData = sorted(InputData , key = lambda x : x[2]) # Sorts the input data by watch time.
    InputData = InputData[::-1] # Reverses the sorted list to get descending order.

    inputData2 = []

    for element in InputData: # Iterates through the sorted input data.
        element2 , element3 = element[1] , element[2] # Unpacks StreamerID and watch time.
        inputData2.append(reverseDicitonary[element2]) # Appends the streamer name to a list of seen streamers.
        print(reverseDicitonary[element2] , element3) # Prints the streamer name and watch time.


# print the final streamer recomendations from the model
# which are just all streamers that are in predictions (ie predicted by the model) but not in the input data
# because we dont watch to recommend streamers that we know the target user already watches
    print("\n" + "="*40 + "\n     Recomendations\n" + "="*40 + "\n")
    recomendations = []
    for element in predictions: # Iterates through the generated predictions.
        streamerID , value = element[0] , element[1] # Unpacks StreamerID and prediction value.
        streamerName = reverseDicitonary[streamerID] # Gets the streamer name.
        if streamerName not in inputData2: # Checks if the predicted streamer was NOT in the user's input data.
            recomendations.append((streamerName , value)) # Adds it to recommendations if not seen before.

    recomendations = recomendations[:k] # Truncates the recommendations to the top 'k' recommendations.

    for streamerName , value in recomendations: # Prints the final recommendations.
        print(streamerName , value)



# print the ground truths that we ommitted from the data the model "saw"
    print("\n" + "="*40 + "\n     Omitted Data\n" + "="*40 + "\n")
    TrainningDataSet = inputData2 # The streamers the model "saw" for the target user.
    fullDataSet = [reverseDicitonary[element[1]] for element in fullDataSet] # Converts full dataset streamer IDs to names.

    ommitedData = []
    for element in fullDataSet: # Identifies streamers that were in the original full data but omitted for testing.
        if element not in TrainningDataSet:
            ommitedData.append(element)
    print(ommitedData) # Prints the list of omitted streamers.

# print the recovered ground truths , ie what streamers that are in ommited data and were also predicted by our model
    print("\n" + "="*20 + "\n     Correct Recomendations\n" + "="*20 + "\n")

    correctRecomendations = []

    for streamer , _ in recomendations: # Checks which of the recommendations are among the omitted data.
        if streamer in ommitedData:
            correctRecomendations.append(streamer)
            print(streamer) # Prints correctly recommended (recovered) streamers.

    print()
    if len(ommitedData) != 0: # Calculates and prints Recall@K if there was omitted data.
        print("Recall at K : " , len(correctRecomendations) / len(ommitedData))
    else:
        print("Recall at K : 0") # If no data was omitted, Recall@K is 0.


if __name__ == '__main__':

    cluster = LocalCluster(n_workers= 1, threads_per_worker=1) # Initializes a local Dask cluster with 1 worker and 1 thread.
    client = cluster.get_client() # Gets a Dask client connected to the local cluster.


    run_model(1, 25000, 8 , 20) # Calls the run_model function with specified parameters:
                                # UserIDofInterest = 1, problemSize = 25000, numberOfWorkers = 8, k = 20.
                                # i choose these arguments for the test because user 1 has a good number of nearest neighbours

INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:41343
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:8787/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:44163'
INFO:distributed.scheduler:Register worker addr: tcp://127.0.0.1:39589 name: 0
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:39589
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:33168
INFO:distributed.scheduler:Receive client connection: Client-4cf5ee48-3f61-11f0-8596-0242ac1c000c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:56814
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:45491'
INFO:distributed.nanny:        Start Nanny at: 't


     Predictions
     (Streamer Name , prediction ranking value) 

kendinemuzisyen 0.38217882779290085
jahrein 0.0903926596543872
zeon 0.0803713280777501
wtcn 0.05443016584179818
zeusidiouss 0.05238674605885096
jrokezftw 0.046176582873830585
jtgtv 0.037216710825417776
mithrain 0.027529210755798957
ogrencievi 0.022496141632371514
elraenn 0.022386812243139056
raufbaba25 0.019306643744068014
unlostv 0.015498847223790968
towshun 0.015498847223790968
grimnax 0.015498847223790968
pqueen 0.01287509659069292
elwind 0.012147468569486918
teasy 0.009906982670744139
alptv 0.008113237622498486
berkriptepe 0.007385609601292482
pintipanda 0.006548652190854026
esl_csgo 0.005821024169648023
tecone 0.00509339614844202
zade 0.00509339614844202
beatpug 0.00509339614844202
xantarescn 0.00509339614844202
imorr 0.004365768127236017
h3x_tv 0.002461869867097494
videoyun 0.0021828840636180084
educatedeartw 0.0021828840636180084
glaxycsgo 0.0021828840636180084
lurzbob 0.0014552560424120058
s1mple 0.001455256042