In [1]:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
import time

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RankingMetrics 
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [2]:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql import Row
from pyspark.sql.types import IntegerType

#A class used to preprocess data and to return train/val/test splits
class DataPreprocessor():
    def __init__(self, spark, file_path) -> None:
        self.spark = spark                              #Spark Driver
        self.file_path = file_path                      #File Path to Read in Data


    #Main Method - Call this in partition_data.py to get train/val/test splits returned
    def preprocess(self, sanity_checker=False):
        """
        Goal: Save train/val/test splits to netID/scratch - all using self methods
        Step 1: self.clean_data: clean the data, format timestamp to date, and remove duplicate movie titles
        Step 2: self.create_train_val_test_splits: reformats data, drops nans, and returns train,val and test splits
        input:
        -----
        sanity_checker: boolean - Flag that decides if we call self.sanity_check()
        -----
        output: 
        train: RDD of Training Set Data
        val: RDD of Validation Set Data
        test: RDD of Validation Set Data
        """
        #Format Date Time and Deduplicate Data
        clean_data = self.clean_data()                                                  #No args need to be passed, returns RDD of joined data (movies,ratings), without duplicates
        #Get Utility Matrix
        train, val, test = self.create_train_val_test_splits(clean_data)                #Needs clean_data to run, returns train/val/test splits
        
        #Check if we should perform sanity check
        if sanity_checker:
            flag = self.sanity_check(train,val,test)
            #If flag == True we're good
            if flag:
                print("The val and test splits are disjoint!")
            #Otherwise raise exception
            else:
                raise Exception("The Validation and Test sets are not disjoint!")

        #Return train val test sets
        return train, val, test
    
    #preprocess calls this function
    def clean_data(self):
        """
        goal: for movie titles with multiple movieIDs, in the movies dataset,
        remove the duplicate IDs with the least ratings for each movie. 
        Additionally, remove those IDs from the ratings dataset, so we get a 1:1 mapping
        between movie title and movie ID

        inputs: None, however - self.file_path -> this should link to your hfs/netid/
        outputs: all_data - a RDD of joined data (movies,reviews) - deduplicated of titles that appear more than once
                this loses only 6 records (reviews from users) for small
        """

        #Import the movies data + add to schema so it can be used by SQL + header=True because there's a header
        movies = self.spark.read.csv(self.file_path + 'movies.csv', header=True, \
                                    schema='movieId INT, title STRING, genres STRING')
    
        #Same for ratings - TIMESTAMP MUST BE STRING
        ratings = self.spark.read.csv(self.file_path + 'ratings.csv', header=True, \
                    schema='userId INT, movieId INT, rating FLOAT, timestamp STRING') 
        
        #Get the MM-dd-yyyy format for timestamp values producing new column, Date
        ratings = ratings.withColumn("date",from_unixtime(col("timestamp"),"MM-dd-yyyy"))
        ratings = ratings.drop("timestamp") #Drop timestamp, we now have date

        #Join Dfs - Join Movies with Ratings on movieId, LEFT JOIN used, select only rating, userId, movieId, title and date
        joined = ratings.join(movies, ratings.movieId==movies.movieId, how='left').select(\
                            ratings.rating,ratings.userId,\
                            ratings.movieId,ratings.date,movies.title)

        #Find Movie Titles that map to multiple IDs
        dupes = joined.groupby("title").agg(countDistinct("movieId").alias("countD")).filter(col("countD")>1)

        #Isolate non-dupes into a df
        non_dupes = joined.join(dupes, joined.title==dupes.title, how='leftanti')
    
        #Get all of the dupes data - ratings, userId, ect - again from Joined
        dupes = dupes.join(joined, joined.title==dupes.title, how='inner').select(\
                                        joined.movieId,joined.rating,\
                                        joined.date,dupes.title,joined.userId)
    
        #Clean the dupes accordingly
        #Step 1: Aggregate by title/movie Id, then count userId - give alias
        #Step 2: Create a window to partition by - we iterate over titles ranking by 
        #countD (count distinct of userId) - movieId forces a deterministic ranking based off movieId
        #Step 3: Filter max_dupes so we only grab top ranking movieIds
        windowSpec = Window.partitionBy("title").orderBy("countD","movieId")
        max_dupes = dupes.groupBy(["title","movieId"]).agg(countDistinct("userId").alias("countD"))
        max_dupes = max_dupes.withColumn("dense_rank",dense_rank().over(windowSpec))
        max_dupes = max_dupes.filter(max_dupes.dense_rank=="2")
        max_dupes = max_dupes.drop("countD","dense_rank")
        
        #Get a list of movie ids ~len(5) for small - which are the ones we want to keep
        ids = list(max_dupes.toPandas()['movieId'])
        cleaned_dupes = dupes.where(dupes.movieId.isin(ids))
        
        #Reorder Columns so union works
        cleaned_dupes = cleaned_dupes.select('rating', 'userId', 'movieId', 'date', 'title')

        
        #Get the union of the non_dupes and cleaned_dupes
        clean_data = non_dupes.union(cleaned_dupes)

        #Subtract 2.5 from each review to create negative reviews
        clean_data = clean_data.withColumn("rating",col("rating")-2.5)
        
        #For testing purposes should be 100,830 for small dataset
        #print(f"The length of the combined and de-deduped joined data-set is: {len(clean_data.collect())}")

        #Repartition for efficiency:
        clean_data = clean_data.repartition(30)

        #Return clean_data -> Type: Spark RDD Ready for more computation
        return clean_data

    #Create Train Test Val Splits - .preprocess() calls this function
    def create_train_val_test_splits(self, clean_data):
        """
        Procedure: 
        Create two columns - the first will measure the specific row count for a specific user
        the other will be static fixed at the total number of reviews for that user. The row count
        is sorted by date ascending, so the first row is the oldest review.
        
        Then, subset training to be where row_count <= .6 *length, grabbing the oldest 60% of reviews, for
        all users.
        
        We then subset the remaining data into a hold out, with the goal of creating two disjoint validation
        and test data sets when looking at userId (meaning they should not have any shared userId values), 
        but still have roughly the same amount of data, or whatever percentage we want to achieve
        
        To obtain approximate equality and disjoint userId membership, for the remiaining data
        sort userId by user_review_count descending, then alternate values in that list, assigning
        half to test and half to validation.
        -----
        input: RDD created by joining ratings.csv and movies.csv - cleaned of duplicates and formatted accordingly
        -----
        -----
        output: training 60%, val 20%, test 20% splits with colums cast to integer type and na's dropped
        -----
        """
        #Type Cast the cols to numeric
        ratings = clean_data.withColumn('movieId',col('movieId').cast(IntegerType())).withColumn("userId",col("userId").cast(IntegerType()))
        #Drop nulls
        ratings = clean_data
        ratings = ratings.na.drop("any")
    
        #strategy, partition by userId, and userId order by date, 
        #take the first 60% of reviews for all users
        w1 = Window.partitionBy("userId")
        w2 = Window.partitionBy("userId").orderBy("date")
        ratings = (ratings.withColumn("row_num", row_number().over(w2))
                       .withColumn('length', count('userId').over(w1))
                  )

        #store in training RDD by 
        #selecting all rows where the row_count for that user <= 60% total reviews for that user
        
        training = ratings.filter("row_num <=.6*length")
        #now for validation and test set, we want those to have no users in common, but for them to
        #be approximately equal size. 
        holdout_df = ratings.filter("row_num >.6*length")
        
        #strategy, of the data not in my train set, group users by number of movies they have seen
        #sort descending
        holdout_split = holdout_df.groupBy("userId").count().orderBy("count", ascending=False).toPandas()
        
        #store the list of userIds sorted by descending total movie count
        holdout_split = list(holdout_split.userId)
        
        #partition list of userIds by taking every other index and putting it in the validation set
        val_users = holdout_split[::2]
        
        #create a validation and test set by filtering holdout data based on whether movieId isin val_users
        val = holdout_df.filter(holdout_df.userId.isin(val_users))
        test = holdout_df.filter(~holdout_df.userId.isin(val_users))

        #Repartition for efficiency
        training = training.coalesce(1)
        val = val.coalesce(1)
        test = test.coalesce(1)
        #Return train/val/test splits
        return training, val, test

    #TO DO?? Should we enforce min_review cutoff to make sure no cold-start for any prediction?
    def enforce_min_review(self):
        pass
    
    def train_leakage_check(self,train,val):
        """

        returnFlag: boolean - True means test and val splits are disjoint on userId
        """

        #Get observatio counts for training, val, and test sets
        training_obs = train.count()
        val_obs = val.count()

        #Print them out
        print(f"Training Data Len: {training_obs} Val Len: {val_obs}")
        #Check if there are any overlapping_ids in the sets
        cond = [train.userId == val.userId, train.movieId == val.movieId]
        overllaping_ids = train.join(val, cond,how='inner').count()
        if overllaping_ids != 0:
            overlap = train.join(val, cond,how='inner').select(val.userId,val.movieId).collect()
            overlap = [(x[0],x[1]) for x in overlap]
            print(f"Overlapping movieIds: {overlap}")
        #Return True if they're disjoint, False if there's overlap
        return overllaping_ids == 0

    #Check to train/val/test splits to make sure approx 60/20/20 split is achieved
    def sanity_check(self,train,val,test):
        """
        Method to print out the shape of train/val/test splits, and a check to make sure that
        val and test splits are disjoint (no distinct userId appears in both)
        input:
        -----
        train: RDD - Training data split created from .create_train_val_test_splits
        val: RDD - Validation data split created from .create_train_val_test_splits
        test: RDD - Testing data split created from .create_train_val_test_splits
        -----
        output:
        -----
        returnFlag: boolean - True means test and val splits are disjoint on userId
        """

        #Get observatio counts for training, val, and test sets
        training_obs = train.count()
        val_obs = val.count()
        test_obs = test.count()
        
        print(f"Train/Val Leakage test result: (True is good) {self.train_leakage_check(train,val)}")
        #Print them out
        print(f"Training Data Len: {training_obs} Val Len: {val_obs}, Test Len: {test_obs}")
        print(f"Partitions, Train: {train.rdd.getNumPartitions()}, Val: {val.rdd.getNumPartitions()}, Test: {test.rdd.getNumPartitions()}")
        #Check if there are any overlapping_ids in the sets
        overllaping_ids = val.join(test, test.userId==val.userId,how='inner').count()
        
        #Return True if they're disjoint, False if there's overlap
        return overllaping_ids == 0

In [35]:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.mllib.evaluation import RankingMetrics
import time
from datetime import datetime
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Row
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import *
from pyspark.sql.window import Window


class Model():
    """
    Abstract Model class that will contain various methods to deploy collaborative filtering.
    Model Parameters that need to be passed thorugh:
    ### For ALS Model ###
    -----
    rank: int - Rank of latent factors used in decomposition
    maxIter: int - represents number of iterations to run algorithm
    regParam: float - Regularization Parameter
    model_save: boolean - Flag to determine if we should save the model progress or not
    -----
    ### For baseline Model ###
    -----
    min_ratings: int - Minimum number of reviews to qualify for baseline (Greater Than or Equal to be included)
    -----
    ### No Input Necessary ###
    -----
    model_size: str - Either "large" or "small" used to demarcate which dataset we are running on
    model_type: str - Which model type we intent to run, i.e. ALS or baseline
    evaluation_data_name: str - Dummy variable used to keep track of which dataset we are making predictions on, either "Val" or "Test"
    time_when_ran: datetime - Time when model was run
    time_to_fit: datetime - Time it took to fit the model
    time_to_predict: datetime - Time it took to make predictions
    metrics: dict - Dictionary used to store the various metrics calculated in self.record_metrics()
    -----
    ### Misc ###
    -----
    num_recs: int - Top X number of reccomendations to return - default set to 100
    -----
    ### Model Methods ###
    -----
    run_model: Runs the corresponding method that was passed to self.model_type
    alternatingLeastSquares: Latent Factor model which uses the Alternating Least Squares Pyspark Class to fit and predict.
    baseline: uses a baseline popularity model that returns the top X most popular movies (decided by avg rating per movie)
    record_metrics: Calculates metrics for prediction,label pairs
    save_model: Used for advanced models like ALS or extensions where we may want to save the model itself
    -----
    """

    # Constructor for Model
    def __init__(self, model_size=None, model_type=None, rank=None, maxIter=None, regParam=None,
                 model_save=False, num_recs=100, min_ratings=0, positive_rating_threshold = 0):
        # Model Attributes
        # NO Arg needed to be passed thorugh
        # Dictionary to access variable methods
        self.methods = {"als": self.alternatingLeastSquares,
                        "baseline": self.baseline}
        # Top X number of reccomendations to return - set to 100, probably won't change
        self.num_recs = num_recs

        # Passed through by user
        self.model_size = model_size
        self.model_type = model_type
        self.positive_rating_threshold = positive_rating_threshold

        # For ALS
        self.rank = rank  # Rank of latent factors used in decomposition
        self.maxIter = maxIter  # Number of iterations to run algorithm, recommended 5-20
        self.regParam = regParam  # Regularization Parameter
        # (Optional) Flag used to determine whether or not we should save our model somewhere
        self.model_save = model_save

        # For baseline
        # Minimum number of reviews to qualify for baseline (Greater Than or Equal to be included)
        self.min_ratings = min_ratings

        # Add the attributes we're gonna compute when we fit and predict
        self.evaluation_data_name = None
        self.time_when_ran = None
        self.time_to_fit = None
        self.time_to_predict = None
        self.metrics = {}

    def run_model(self, train, val=None, test=None):
        """
        Run_model is what is called to fit, run, and record the metrics for respective model types.
        Function behavior is dependent on the argument passed to self.model_type.
        -----
        inputs:
        -----
        train: RDD - Training data set
        val: RDD - Validation data set
        test: RDD - Test set
        -----
        outputs:
        -----
        model_output: Variable Type - Output of whichever model ran -> check self.model_type
        -----
        """
        # Get when model was ran
        self.time_when_ran = datetime.now().strftime("%m/%d/%Y, %H:%M:%S")

        # Identify if we're predicting on the Validation Set or the Test Set
        if val:
            self.evaluation_data_name = "Val"
            evaluation_data = val
        elif test:
            self.evaluation_data_name = "Test"
            evaluation_data = test

        # Grab method for whichever model corresponds to self.model_type
        model = self.methods[self.model_type]
        # Run model on training / evaluation data
        model_output = model(train, evaluation_data)
        # Return model output
        return model_output

    # This method uses the Alternating Least Squares Pyspark Class to fit and run a model
    def alternatingLeastSquares(self, training, evaluation_data):
        """
        Builds and fits a PySpark alternatingLeastSquares latent factor model. Calls self.record_metrics(precitions,labels)
        to record the results. Some dummy variables are made to record whether or not we are using the validation set
        or the testing set. This will help us record our results accurately. Training and predicting are also timed. 
        -----
        Input: 
        training: RDD - Training data set
        evaluation_data: RDD - Either Validation data set, or Training data set
        -----
        Output: [userRecs, movieRecs] - list containing two lists, each of length == self.numrecs 
        -----
        """

        # Time the function start to finish
        start = time.time()
        # Create the model with certain params - coldStartStrategy="drop" means that we'll have no nulls in val / test set
        als = ALS(maxIter=self.maxIter, rank=self.rank, regParam=self.regParam,
                  nonnegative=False, seed=10, userCol="userId",
                  itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

        # Fit the model
        model = als.fit(training)
        # End time and calculate delta
        end = time.time()
        self.time_to_fit = end - start

        # Time predictions as well
        start = time.time()
        regression_predictions = model.transform(evaluation_data)
        #Generate 100 Top Movies for All Users
        userRecs = model.recommendForAllUsers(self.num_recs)
        #Unpack userRecs, go from userId, list({movieId:predicted_rating}) -> userId, movieId
        ranking_predictions = userRecs.select("userId",explode("recommendations.movieId"))
        end = time.time()
        self.time_to_predict = end - start

        # Use self.record_metrics to evaluate model on Precision at K, Mean Precision, and NDGC
        self.ranking_metrics(predictions=ranking_predictions, labels=evaluation_data)
        #Use self.non_ranking_metrics to compute RMSE, R^2, and ROC of Top 100 Predictions - No special Filtering ATM
        self.non_ranking_metrics(regression_predictions)        
        

        # Save model if we need to
        if self.model_save:
            self.save_model(model_type=self.model_type, model=als)

        # Return top self.num_recs movie recs for each user
        return userRecs

    # Baseline model that returns top X most popular items (highest avg rating)
    def baseline(self, training, evaluation_data):
        """
        Baseline model for recommendation system. No personalization, just recommend the Top 100 movies by avg(rating)
        A movie must have at least self.min_ratings to be considered
        input:
        -----
        training: RDD - training set data
        evaluation_data: RDD - Validation set or Test set data
        self.min_ratings: int - how many ratings a movie must have in order to be considered in top 100
        -----
        output: RDD of Top 100 movieIds by avg(rating)
        """
        #Make sure the right params have been passed to Model()
        if self.min_ratings is None:
            raise Exception("Must pass through a value for self.min_ratings for baselien to compute")

        # Time model Fit
        start = time.time()
        # Get Top 100 Most Popular Movies - Avg(rating) becomes prediction
        temp = training
        top_100_movies = temp.groupBy("movieId").agg(avg("rating").alias("prediction"),count("movieId").alias("movie_count"))
        top_100_movies = top_100_movies.where(col("movie_count")>=self.min_ratings)
        top_100_movies = top_100_movies.select("movieId").orderBy("prediction", ascending=False).limit(100)
        
        # Grab Distinct User Ids
        temp2 = evaluation_data
        ids = temp2.select("userId").distinct()
        # Cross Join Distinct userIds with Top 100 Most Popular Movies
        predictions = ids.crossJoin(top_100_movies)
        # Record end time after RDD operations
        end = time.time()
        self.time_to_fit = end - start

        # Time predictions as well
        self.time_to_predict = 0  # Recommends in constant time
        returned_df = predictions.alias("returned_df")
        predictions.unpersist()
        return returned_df
        # Use self.record_metrics to evaluate model on RMSE, R^2, Precision at K, Mean Precision, and NDGC
        return self.ranking_metrics(predictions=predictions, labels=evaluation_data)

        self.metrics['precision'] = precision
        self.metrics['MAP'] = MAP

        # Return The top 100 most popular movies above self.min_ratings threshold
        return top_100_movies

    #Non-Ranking Metrics Calculated Here
    def non_ranking_metrics(self,predictions):
        ##Evaluate Predictions for Regression Task##
        evaluator = RegressionEvaluator(labelCol="rating", predictionCol="prediction")
        # Calculate RMSE and r_2 metrics and append to metrics
        self.metrics["rmse"] = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
        self.metrics["r2"] = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})

        ##ROC Metric Evaluation##
        # Make predictions Binary
        binary_predicts = predictions.withColumn("prediction", when(predictions.rating > 0, 1).otherwise(0).cast("double"))
        evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='rating', metricName='areaUnderROC')
        # Append ROC to our Metrics list
        self.metrics["ROC"] = evaluator.evaluate(binary_predicts)

    def ranking_metrics(self, predictions, labels):
        """
        Method that will contain all the code to evaluate model on metrics: RMSE, R^2, ROC, Precistion At K, Mean Precision, and NDGC
        input:
        -----
        predictions: RDD - PySpark Dataframe containing the following columns at the minimum: [userId,movieId,prediction] - if not baseline model must include rating column
        labels: RDD - PySpark Dataframe containing the following columns at the minimum: [userId,movieId,rating, date]
        -----
        returns: 
        None - Writes the results to self.metrics dictionary
        """
        
        #Join labels and predictions on [userId,movieId]
        label_inner_predictions = labels.join(predictions, ['userId', 'movieId'], how ='inner').select('userId', 'movieId', "rating")

        #Collect ratings by userId where the rating is above some self.review_score_threshold -> userId, [movie1,...movieN]
        pos_label_inner_prediction = label_inner_predictions.where(f"rating>{self.positive_rating_threshold}"\
                                            ).groupBy('userId').agg(expr('collect_list(movieId) as movieId'))
        label_inner_predictions = label_inner_predictions.groupBy('userId').agg(expr('collect_list(movieId) as movieId'))
        
        ranking_metrics_data = label_inner_predictions.join(
                pos_label_inner_prediction, 'userId').rdd.map(lambda row: (row[1], row[2]))
        
        ranking_metrics_data = ranking_metrics_data.coalesce(1)
        
        return ranking_metrics_data
        #Get RankingMetrics object
        metrics = RankingMetrics(ranking_metrics_data)
        #Calculate MAP
        # self.metrics['Precision - Intersection'] = metrics.recallAt(self.num_recs)
        # self.metrics['MAP - Intersection'] = metrics.meanAveragePrecision
        precision = metrics.recallAt(self.num_recs)
        MAP = metrics.meanAveragePrecision

        return precision,MAP
    # Method to save model to const.MODEL_SAVE_FILE_PATH
    def save_model(self, model_type=None, model=None):
        """
        Inputs:
        -----
        model_type: str - string designating what type of model is being saved
        model: obj - model object that has .save method
        -----
        """
        # Make sure a non-null object was passed
        if model and model_type:
            model.save(const.MODEL_SAVE_FILE_PATH + model_type)
        # Otherwise throw error
        else:
            raise Exception("Model and or Model_type not passed through")


In [36]:
def tuple_checker(rdd):
    rdd = rdd.collect()
    for row in rdd:
        for val in row[2]:
            if val not in row[1]:
                raise Exception(f"Error raised at {row[1]} movieId:{val}")

def prediction_checker(predicts,val,min_reviews):
    """
    predicts: RDD - Tuples of 
    (userId, [intersected predicted positive labels],[ground truth intersected positive labels])
    Check that precision score is correct by 
    For everything in the predicted set that shows up in RDD, verify that the user liked it in val
    """
    #Make correct format
    predicts = predicts.collect()
    val = val.where(col("rating")>=min_reviews)
    df = val.toPandas()
    #Dummy Variables
    correct, not_correct = 0,0
    #Iterate and ensure precision
    for row in predicts:
        for movie in row[1]:
            testy = df[(df['movieId']==movie)&(df['userId']==row[0])]
            if len(testy) > 0:
                correct+=1
            else:
                print(f"User: {row[0]} Movie: {movie}")
                not_correct+=1
    #Return Precision TP / (TP+FP)
    return (correct / (correct+not_correct))

def baseline_prediction_check(preds):
    x = preds.groupBy("movieId").count().collect()
    y = np.array([int(y[1]) for y in x])
    if y.sum()/len(y) != 305:
        raise Exception(f"Baseline Predicts are Wrong, movieId count = {y.sum()}, len:{len(y)}")
    x = preds.groupBy("userId").count().collect()
    y = np.array([int(y[1]) for y in x])
    if y.sum()/len(y) != 100:
        raise Exception(f"Baseline predictions are wrong, userId count = {y.sum()}, len:{len(y)}")
    print("Passed Baseline Prediction Check")
    

def train_leakage_check(train,val):
    """

    returnFlag: boolean - True means test and val splits are disjoint on userId
    """

    #Get observatio counts for training, val, and test sets
    training_obs = train.count()
    val_obs = val.count()

    #Print them out
    print(f"Training Data Len: {training_obs} Val Len: {val_obs}")
    #Check if there are any overlapping_ids in the sets
    cond = [train.userId == val.userId, train.movieId == val.movieId]
    overllaping_ids = train.join(val, cond,how='inner').count()
    if overllaping_ids != 0:
        overlap = train.join(val, cond,how='inner').select(val.userId,val.movieId).collect()
        overlap = [(x[0],x[1]) for x in overlap]
        print(f"Overlapping movieIds: {overlap}")
    #Return True if they're disjoint, False if there's overlap
    return overllaping_ids == 0

def dupe_checker(dataFrame):

    a=dataFrame.select(col("userId"),col("movieId")).collect()
    seen = dict()

    for row in a:

        if (row[0],row[1]) not in seen:
            seen[(row[0],row[1])] =1

        else:
            print(f"Dupe found at: userId:{row[0]},movieId:{row[1]}")

    print(f"len of keys: {len(list(seen.keys()))}")

In [23]:
folder_path = "../../ml-latest-small/"
spark = SparkSession.builder.appName('Spark_Session_Name').getOrCreate()

In [7]:
train, val, test = DataPreprocessor(spark,folder_path).preprocess(sanity_checker=True)

                                                                                

Training Data Len: 60250 Val Len: 20510


                                                                                

Train/Val Leakage test result: (True is good) True
Training Data Len: 60250 Val Len: 20510, Test Len: 20070
Partitions, Train: 1, Val: 1, Test: 1




The val and test splits are disjoint!


                                                                                

In [37]:
m = Model(model_type='baseline', min_ratings=0)

In [38]:
temp = m.baseline(train,val)
baseline_prediction_check(temp)



Passed Baseline Prediction Check


                                                                                

In [64]:
movie_recs = predictions.select("movieId").distinct().rdd.map(lambda x: x.movieId).collect()

                                                                                

[496,
 3475,
 8911,
 113829,
 152711,
 5513,
 4294,
 84273,
 633,
 5222,
 6732,
 876,
 27373,
 7587,
 6460,
 2007,
 114265,
 3795,
 626,
 167064,
 25906,
 5328,
 127096,
 1564,
 124851,
 190,
 86237,
 3073,
 27523,
 68486,
 187717,
 998,
 40,
 3925,
 120130,
 1349,
 71268,
 6086,
 3473,
 26147,
 299,
 69860,
 140627,
 4441,
 1151,
 3086,
 5088,
 1310,
 184245,
 86721,
 82744,
 38388,
 79274,
 90943,
 4495,
 4444,
 495,
 100556,
 97866,
 84414,
 6408,
 120635,
 467,
 109687,
 4142,
 2512,
 102084,
 3266,
 4402,
 3787,
 108795,
 5059,
 3851,
 3531,
 102194,
 3678,
 95149,
 179135,
 110501,
 3303,
 26249,
 70946,
 100083,
 7071,
 117531,
 2151,
 64499,
 3223,
 26366,
 5746,
 26073,
 102217,
 99,
 55167,
 5537,
 7756,
 5477,
 128914,
 5833,
 6306]

In [26]:
#Tests start here
positive_rating_threshold = 0.0
eval_data = spark.createDataFrame(val.collect()).alias("eval_data")
predictions = spark.createDataFrame(temp.collect()).alias("predictions")

eval_data = eval_data.withColumn("e_concat",concat_ws('-','userId','movieId'))
predictions = predictions.withColumn("p_concat",concat_ws('-','userId','movieId'))
# predictions = labels.withColumn("userId",col("userId").cast("long")).withColumn("movieId",col("movieId").cast("long"))
#Condition Joins

# cond = [eval_data.userId == predictions.userId, eval_data.movieId == predictions.movieId]
#Join labels and predictions on [userId,movieId]
# inter = eval_data.join(predictions, (predictions.userId == eval_data.userId)&(predictions.movieId == eval_data.movieId), how='inner')\
# .select(predictions.userId, eval_data.rating,predictions.movieId)
inter = eval_data.join(predictions, col('p_concat') == col('e_concat'), how='inner')\
.select(predictions.userId, eval_data.rating,predictions.movieId)#.drop(eval_data.movieId,eval_data.userId,eval_data.concat)

inter = spark.createDataFrame(inter.collect())
temp = inter
inter = inter.select(col("userId"),col("movieId"),col("rating"))#.where(col("userId")==603)

pos = inter.select(col("userId"),col("movieId"),col("rating")).where(col("rating")>0.0)#

pos = pos.select("userId","movieId").withColumn("movieId",col("movieId").cast("double"))
pos = pos.groupBy(col("userId")).agg(collect_list(col('movieId')).alias('movieId'))

#Repeat double casting + collapse procedure for labels
inter = inter.withColumn("movieId",col("movieId").cast("double"))
inter = inter.groupBy('userId').agg(collect_list(col("movieId")).alias("movieId"))

#Join
ranking_metrics_data = inter.join(pos, inter.userId == pos.userId,how='inner')\
.select(inter.userId, inter.movieId,pos.movieId).rdd.map(lambda row: (row[1], row[2]))


#print(ranking_metrics_data.collect())

metrics = RankingMetrics(ranking_metrics_data)

precision = metrics.recallAt(100)
precision2 = metrics.precisionAt(10)
MAP = metrics.meanAveragePrecision
print(f"Precision {precision}, Precision function 2 {precision2} MAP {MAP}")



Precision 1.0, Precision function 2 0.12857142857142864 MAP 0.9811507936507937


                                                                                

In [30]:
train_leakage_check(train, temp)

                                                                                

Training Data Len: 60250 Val Len: 44


[Stage 471:>                                                        (0 + 1) / 1]

Overlapping movieIds: [(79, 3266), (286, 6086), (567, 3266), (398, 26147), (96, 299), (585, 27373), (191, 496), (603, 299), (191, 99), (195, 190), (377, 4294), (544, 626)]


                                                                                

False

In [56]:
temp.show()

[Stage 388:>                                                        (0 + 1) / 1]

+------+-------+
|userId|movieId|
+------+-------+
|   148|  71268|
|   148|   6306|
|   148|   6732|
|   148|   3795|
|   148|  55167|
|   148| 120130|
|   148|  26366|
|   148|   7071|
|   148|    876|
|   148| 184245|
|   148|   4441|
|   148|  84273|
|   148|   4142|
|   148|  26073|
|   148|  25906|
|   148|  86237|
|   148|  26249|
|   148| 117531|
|   148|  64499|
|   148| 179135|
+------+-------+
only showing top 20 rows



                                                                                

In [None]:
inter.show()

In [None]:
# ranking_metrics_data = label_inner_predictions.join(pos_label_inner_prediction, label_inner_predictions.userId == pos_label_inner_prediction.userId,how='inner')\
# .select(label_inner_predictions.userId, label_inner_predictions.movieId,pos_label_inner_prediction.movieId).rdd.map(lambda row: (row[0],row[1], row[2]))
# a= prediction_checker(ranking_metrics_data,val,0)
# tuple_checker(ranking_metrics_data)

In [65]:
def baseline_OTB_ranking_metrics(preds,labels,k):
    perUserPredictedItemsDF = preds \
        .select('userId', 'movieId')\
        .groupBy('userId') \
        .agg(expr('collect_list(movieId) as movieId'))

    windowSpec = Window.partitionBy('userId').orderBy(col('date').desc())
    perUserActualItemsDF = labels \
        .select('userId', 'movieId', 'date', rank().over(windowSpec).alias('rank')) \
        .where('rank <= {0}'.format(k)) \
        .groupBy('userId') \
        .agg(expr('collect_list(movieId) as movieId'))

    perUserItemsRDD = perUserPredictedItemsDF.join(broadcast(perUserActualItemsDF), 'userId', 'inner') \
        .rdd \
        .map(lambda row: (row[1], row[2]))
    rankingMetrics = RankingMetrics(perUserItemsRDD)
    return rankingMetrics.meanAveragePrecision, rankingMetrics.ndcgAt(k), rankingMetrics.ndcgAt(k)

In [66]:
baseline_OTB_ranking_metrics(temp,val,100)

                                                                                

(3.073334984382854e-05, 0.0010301271058742066, 0.0010301271058742057)

In [147]:
def baseline_CUSTOM_ranking_metrics(preds,labels):
    """
    Take in predictions and labels PySpark DataFrames
    Step 1) Collect movie_recs into a Python set
    Step 2) Collapse labels into a list of seen movies (orderBy to preserve ordering)
    Step 3) Collapse labels that have been filtered by positive ratings into list of seen movies (also OrderBy)
    Step 4) Iterate over the sets and see if there's an intersection between label movies sets and movie recs sets
    Step 5) If there is an intersection, calculate the precision
    Step 6) Precision is calculated by taking the length of intersection of positive ratings label sets and movie 
            rec set, divided by the length of the label seen movies set intersected with movie rec set
    """
    #Dummy var
    precision_arr = []   
    #Collect movie_recs into set
    movie_recs = set(preds.select("movieId").distinct().rdd.map(lambda x: x.movieId).collect())

    #Collect val labels into list
    label_set = labels \
    .select('userId', 'movieId')\
    .groupBy('userId') \
    .agg(expr('collect_list(movieId) as movieId'))\
    .orderBy("userId")\
    .rdd.map(lambda x: x.movieId).collect()
    
    #Collect val labels into list
    pos_set = labels \
    .select('userId', 'movieId')\
    .where(col("rating")>0)\
    .groupBy('userId') \
    .agg(expr('collect_list(movieId) as movieId'))\
    .orderBy("userId")\
    .rdd.map(lambda x: x.movieId).collect()
    
    #Iterate for every userId
    for i in range(len(label_set)):
        #See if there's an intersection between seen movies and recs
        temp_label_set = set(label_set[i]) #Create set of watched movies for user i
        #get the length of intersection between seen movies and reccomended
        intersection = len(temp_label_set.intersection(movie_recs)) 
        #If there's an intersection continue
        if intersection > 0:
            #Get positive set of seen movies for user i
            temp_pos_set = set(pos_set[i])
            #Get intersection between positively rated and movie_recs for user i
            positive_intersection = len(temp_pos_set.intersection(movie_recs))
            precision_arr.append(positive_intersection/intersection)
                
    precision_arr = np.array(precision_arr)
    return precision_arr

In [144]:
precision_arr = baseline_CUSTOM_ranking_metrics(temp,val)

                                                                                

In [146]:
print(precision_arr,precision_arr.mean())

[1.         1.         0.         1.         1.         1.
 0.         1.         0.66666667 0.5        1.         1.
 1.         1.         1.         1.         1.         0.
 1.         1.         0.75       0.         0.         0.
 0.         0.         0.         1.         0.        ] 0.6178160919540229


In [None]:
perUserActualItemsDF = val \
.select('userId', 'movieId')\
.groupBy('userId') \
.agg(expr('collect_list(movieId) as movieId')).orderBy("userId").rdd.map(lambda x: x.movieId).collect()

In [116]:
pos = val \
.select('userId', 'movieId')\
.where(col("rating")>0)\
.groupBy('userId') \
.agg(expr('collect_list(movieId) as movieId')).orderBy("userId").rdd.map(lambda x: x.movieId).collect()

                                                                                

In [135]:
list(set(pos[0]).intersection(set(perUserActualItemsDF[0])))

[318,
 115713,
 86345,
 80906,
 131724,
 333,
 77455,
 3578,
 79132,
 68157,
 106782,
 58559]

In [126]:
len(perUserActualItemsDF)

304

In [127]:
movie_recs = set(temp.select("movieId").distinct().rdd.map(lambda x: x.movieId).collect())

                                                                                

In [106]:
count 

29

In [124]:
for i in range(2):
    print(i)

0
1
