In [1]:
#Import all required packages
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession,SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Row
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql.window import Window
from pyspark.ml.feature import StopWordsRemover, RegexTokenizer,HashingTF
from pyspark.ml import Pipeline
import re
from nltk.stem import PorterStemmer
from pyspark.ml.linalg import Vectors
from pyspark.mllib.clustering import LDA
from pyspark.ml.feature import IDF

In [2]:
# Initialise Spark Session
spark = SparkSession.builder.appName("Experiment4")\
                            .config("spark.sql.broadcastTimeout", "36000")\
                            .config("spark.driver.memory", "20g")\
                            .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

21/08/04 22:58:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
#PREPARING DATAFRAMES FOR DATSETS

#Authors Dataframe
#df_authors = spark.read.csv("Datasets/authors.csv", sep = ",", header = True, quote = '"')

#PaperCsv dataframe
papersCsvSchema = StructType([
    StructField("paper_id",StringType(),False),
    StructField("type",StringType(),False),
    StructField("journal",StringType(),False),
    StructField("book_title",StringType(),False),
    StructField("series",StringType(),False),
    StructField("publisher",StringType(),False),
    StructField("pages",StringType(),False),
    StructField("volume",StringType(),False),
    StructField("number",StringType(),False),
    StructField("year",StringType(),False),
    StructField("month",StringType(),False),
    StructField("postedat",StringType(),False),
    StructField("address",StringType(),False),
    StructField("title",StringType(),False),
    StructField("abstract",StringType(),False),
])
df_paperCsv = spark.read.csv("Datasets/papers.csv", sep = ",", header = False, schema = papersCsvSchema, quote = '"')

#UserLibrary dataframe
userLibrarySchema = StructType([
    StructField("user_hash_id",StringType(),False),
    StructField("user_library",StringType(),False)
])
df_userLibrary = spark.read.csv("Datasets/users_libraries.txt", sep = ";", header = False, schema = userLibrarySchema)
df_userLibrary = df_userLibrary.selectExpr("user_hash_id","split(user_library,',') AS user_library")

In [4]:
#Cleaning and Tokenizing the data
def phraseTokenization(x):
    rawPhrase = x[13] + " " + x[14] #concatenating title and abstract
    rawPhrase = rawPhrase.replace("-","") #removing - from phrase
    rawPhrase = rawPhrase.replace("_","") #removing _ from phrase
    rawPhrase = rawPhrase.strip() #removing any trailing or leading whitespaces
    
    #spliting phrase based on non-alphaNumeric characters
    phraseArray = re.split('[^a-zA-Z0-9]+',rawPhrase) 
    
    #remove words with less than 3 char
    phraseArrayFilteredWords = [i for i in phraseArray if len(i) >= 3]
    
    return (x[0],list(phraseArrayFilteredWords))


df_tokenize = df_paperCsv.na.fill(value="").rdd.map(phraseTokenization).toDF()

#Removing StopWords using ML
swRemover = StopWordsRemover(inputCol="_2", outputCol="cleaned_terms")
df_cleanedData = swRemover.transform(df_tokenize)
df_cleanedData = df_cleanedData.selectExpr("_1 AS paper_id","cleaned_terms")

#Stemming using Porter stemmer Algo
ps =  PorterStemmer()

def stemmingTerms(x):
    stemmedWords = []
    for word in x:
        rootWord = ps.stem(word)
        stemmedWords.append(rootWord)
    return stemmedWords

df_cleanedData = df_cleanedData.rdd.mapValues(stemmingTerms).toDF().selectExpr("_1 AS paper_id","_2 AS cleaned_terms")

                                                                                

In [5]:
# Find the count of papers in which the term is present
df_paperCount = df_cleanedData.selectExpr("paper_id","explode(cleaned_terms) AS terms").distinct().groupBy("terms").count().withColumnRenamed("count", "paper_count")

#10 percent of total papers present in file
noOfDistinctPapers_df = int(df_cleanedData.select(countDistinct("paper_id")).collect()[0][0])
tenPercentOfTotalPapers = int(noOfDistinctPapers_df/10)

# remove words appear in more than 10% of the papers and keep only the words that appear in at least 20 papers 
df_filterdTerms = df_paperCount.filter((df_paperCount["paper_count"]<=tenPercentOfTotalPapers) & (df_paperCount["paper_count"]>=20))

#Fetch top 1000 terms 
top1000Terms = df_filterdTerms.orderBy(col("paper_count").desc()).limit(1000)

                                                                                

In [6]:
#associate unique integer values to each term
df_termsWithUniqueIndex = top1000Terms.withColumn("unique_index",row_number().over(Window.orderBy("paper_count"))).selectExpr("terms","unique_index-1 AS unique_index")

#Collect all terms in a list
terms_collection =  [row.terms for row in df_termsWithUniqueIndex.collect()]

# Generating Termfrequency Vector for each paper
df_cleanedDataExplode = df_cleanedData.selectExpr("paper_id","explode(cleaned_terms) AS terms")

#Getting the Unique_index of term 
df_cleanedDataJoinIndex = df_cleanedDataExplode.join(df_termsWithUniqueIndex,df_cleanedDataExplode.terms == df_termsWithUniqueIndex.terms , how = "inner").select(df_cleanedDataExplode.paper_id,df_cleanedDataExplode.terms,df_termsWithUniqueIndex.unique_index)

#Getting the term_frequency in each paper
df_cleanedDataJoinIndex = df_cleanedDataJoinIndex.groupBy("paper_id","unique_index").count().withColumnRenamed("count", "term_frquency")

#Creating a sparseVector respresentation for each paper
rdd_CleanedDataReducedByPaperId = df_cleanedDataJoinIndex.rdd.map(lambda x: (x[0], [(x[1], x[2])])).reduceByKey(lambda a, b: a + b)
rdd_CleanedDataReducedByPaperId = rdd_CleanedDataReducedByPaperId.map(lambda x: (x[0],Vectors.sparse(1000,x[1])))

df_CleanedDataSparseVector = rdd_CleanedDataReducedByPaperId.toDF().selectExpr("_1 AS paper_id","_2 AS term_frequency_vector")

21/08/04 22:59:34 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
21/08/04 23:00:09 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [7]:
df_CleanedDataSparseVector.take(1)

[Row(paper_id='498902', term_frequency_vector=SparseVector(1000, {33: 3.0, 47: 1.0, 79: 1.0, 97: 1.0, 138: 1.0, 170: 6.0, 354: 1.0, 368: 1.0, 394: 1.0, 482: 2.0, 491: 1.0, 541: 1.0, 550: 1.0, 566: 1.0, 581: 1.0, 596: 1.0, 622: 1.0, 632: 1.0, 663: 1.0, 670: 1.0, 720: 2.0, 723: 1.0, 762: 1.0, 764: 1.0, 773: 1.0, 797: 1.0, 820: 1.0, 826: 1.0, 837: 2.0, 843: 1.0, 879: 1.0, 881: 1.0, 890: 1.0, 892: 1.0, 894: 1.0, 928: 1.0, 937: 1.0, 946: 2.0, 949: 3.0, 952: 1.0, 962: 1.0, 965: 1.0, 984: 1.0, 985: 4.0, 990: 1.0, 992: 1.0}))]

In [8]:
#Ex 3.2

#TF-IDF Representation for each paper

idf = IDF(inputCol="term_frequency_vector", outputCol="tf_idf_vector")
tf_idf_model = idf.fit(df_CleanedDataSparseVector)
df_rescaledCleanedData = tf_idf_model.transform(df_CleanedDataSparseVector)
df_rescaledCleanedData = df_rescaledCleanedData.select("paper_id", "tf_idf_vector")

                                                                                

In [9]:
#TF-IDF Vector for papers
df_rescaledCleanedData.take(1)

[Row(paper_id='498902', tf_idf_vector=SparseVector(1000, {33: 13.8439, 47: 4.596, 79: 4.5494, 97: 4.5325, 138: 4.4866, 170: 26.6474, 354: 4.1522, 368: 4.1302, 394: 4.0846, 482: 7.8051, 491: 3.8889, 541: 3.7983, 550: 3.7834, 566: 3.7571, 581: 3.7152, 596: 3.6986, 622: 3.6484, 632: 3.6256, 663: 3.5658, 670: 3.5395, 720: 6.7871, 723: 3.3875, 762: 3.2929, 764: 3.2917, 773: 3.2717, 797: 3.2131, 820: 3.1488, 826: 3.1306, 837: 6.153, 843: 3.0651, 879: 2.9588, 881: 2.9516, 890: 2.9076, 892: 2.9043, 894: 2.8955, 928: 2.7393, 937: 2.6945, 946: 5.2459, 949: 7.8223, 952: 2.5913, 962: 2.5231, 965: 2.5148, 984: 2.3717, 985: 9.4584, 990: 2.328, 992: 2.3177}))]

In [10]:
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.ml.clustering import LDA

# Latent Direchlet Allocation

num_topics = 40

# Transform data into LDA supported format
df_lda_format = df_CleanedDataSparseVector.selectExpr("paper_id AS id","term_frequency_vector AS features")

lda = LDA(k=40)
lda_model = lda.fit(df_lda_format)
df_lda_paper_topic_model = lda_model.transform(df_lda_format)

21/08/04 23:01:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/08/04 23:01:34 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [11]:
#LDA Vector for Papers
df_lda_paper_topic_model.show()

+-------+--------------------+--------------------+
|     id|            features|   topicDistribution|
+-------+--------------------+--------------------+
| 498902|(1000,[33,47,79,9...|[0.03661775127919...|
| 201593|(1000,[38,90,104,...|[2.65645710653724...|
|1727709|(1000,[24,55,80,1...|[4.49222917643613...|
| 383220|(1000,[93,104,114...|[0.05777618935500...|
|  23055|(1000,[24,56,225,...|[3.98494353942264...|
|1287740|(1000,[79,238,260...|[5.14751023148192...|
|2090908|(1000,[177,280,44...|[3.08819633694069...|
|9106608|(1000,[8,27,70,89...|[2.05869430618133...|
|1857331|(1000,[61,62,169,...|[4.57543715045570...|
|2707871|(1000,[0,1,2,4,7,...|[2.08809005148650...|
|2798913|(1000,[1,55,56,65...|[3.98494353942264...|
| 460407|(1000,[45,101,139...|[2.60052460791407...|
| 423550|(1000,[32,54,232,...|[3.92167838156203...|
|6500865|(1000,[31,140,194...|[5.37137359436407...|
|2945717|(1000,[217,229,24...|[0.05825001748362...|
| 674581|(1000,[3,26,154,1...|[4.84464404059742...|
| 560037|(10

In [12]:
#User Profiling

#1)  produces a user profile for each user as the summation of the TF-IDF vectors of the papers that appear in the user’s library

df_userLibrary_explode = df_userLibrary.selectExpr("user_hash_id","explode(user_library) AS paper_id")
df_userJoined_TfIdf = df_userLibrary_explode.join(df_rescaledCleanedData,df_userLibrary_explode.paper_id == df_rescaledCleanedData.paper_id, how="inner").select(df_userLibrary_explode.user_hash_id,df_userLibrary_explode.paper_id,df_rescaledCleanedData.tf_idf_vector)

In [13]:
import collections

# Adding 2 sparse vectors
def addSparseVectors(v1, v2):
    values = collections.defaultdict(float) # Initialize Dictionary with default value 0.0
    
    # Add values from v1 SparseVector
    for i in range(v1.indices.size):
        values[v1.indices[i]] += v1.values[i]
    # Add values from v2 SParseVector
    for i in range(v2.indices.size):
        values[v2.indices[i]] += v2.values[i]
    return Vectors.sparse(v1.size, dict(values))

#final Df : summation of TF-IDF vector for each userlibrary
df_userprofile_tfidf = df_userJoined_TfIdf.selectExpr("user_hash_id","tf_idf_vector").rdd.reduceByKey(lambda x,y: addSparseVectors(x,y)).toDF().selectExpr("_1 AS user_hash_id","_2 AS sum_tf_idf_vector")

                                                                                

In [14]:
df_userprofile_tfidf.take(1)

                                                                                

[Row(user_hash_id='6931f7f79678cf72aae416ff7cb43bb1', sum_tf_idf_vector=SparseVector(1000, {0: 13.9931, 1: 23.287, 3: 9.3123, 4: 37.249, 5: 4.6555, 6: 4.6536, 7: 32.5708, 10: 9.2996, 11: 4.6492, 12: 23.2428, 13: 4.6467, 14: 41.8145, 18: 9.2734, 20: 4.633, 22: 4.6305, 23: 41.6692, 24: 4.6299, 26: 97.1893, 27: 9.2561, 28: 37.0147, 29: 4.6256, 31: 4.6238, 33: 4.6146, 34: 13.8384, 35: 4.6122, 36: 13.8293, 37: 23.0429, 38: 13.8257, 39: 9.2147, 41: 4.6044, 42: 9.2087, 47: 18.3839, 48: 9.192, 51: 13.7737, 52: 4.5889, 53: 4.5889, 55: 13.756, 56: 4.583, 57: 9.1601, 60: 13.7331, 62: 13.7243, 65: 4.5707, 67: 9.1298, 68: 4.5643, 70: 13.6809, 72: 173.1171, 74: 95.6219, 75: 36.4183, 76: 9.1034, 77: 4.5511, 78: 4.55, 79: 13.6483, 80: 9.0989, 81: 9.0977, 84: 4.5466, 86: 40.8787, 87: 18.166, 88: 4.5415, 89: 4.5409, 92: 27.2389, 93: 72.5921, 94: 31.7512, 95: 4.5353, 99: 18.1279, 100: 9.0628, 102: 9.0484, 104: 4.5236, 105: 9.045, 107: 9.0362, 110: 4.5153, 111: 13.5444, 113: 4.5148, 115: 9.023, 116: 9.023

In [15]:
#2  LDA-based profiles for each user as the summation of the paper-topics vectors of the papers 

df_userJoined_LDA = df_userLibrary_explode.join(df_lda_paper_topic_model,df_userLibrary_explode.paper_id == df_lda_paper_topic_model.id, how="inner").select(df_userLibrary_explode.user_hash_id,df_userLibrary_explode.paper_id,df_lda_paper_topic_model.topicDistribution)

In [16]:
#function to add 2 LDA dense vectors
def addLDAPaperTopicVectors(v1,v2):
    return (v1+v2)
    
df_userprofile_lda = df_userJoined_LDA.selectExpr("user_hash_id","topicDistribution").rdd.reduceByKey(lambda x,y: addLDAPaperTopicVectors(x,y)).toDF().selectExpr("_1 AS user_hash_id","_2 AS sum_lda_vector")

                                                                                

In [17]:
df_userprofile_lda.take(1)

[Row(user_hash_id='6931f7f79678cf72aae416ff7cb43bb1', sum_lda_vector=DenseVector([0.3177, 2.7137, 0.1215, 0.2662, 0.3415, 0.3474, 0.1214, 0.2329, 0.4597, 1.4139, 0.2767, 0.5888, 0.1218, 18.0877, 0.3621, 0.1308, 0.513, 0.1927, 0.3893, 0.239, 0.9663, 0.5952, 1.1469, 0.3024, 0.1182, 0.1218, 0.122, 0.8721, 0.3173, 3.1643, 0.4668, 0.1197, 0.2334, 0.5449, 0.2284, 0.2457, 1.2261, 15.3653, 4.7057, 9.8997]))]

In [26]:
#Ex 4.2 Funtion to calculate cosine similarity

from sklearn.metrics.pairwise import cosine_similarity

# UDF for calculating cosine similarity

#def cos_similarity(a,b):
#    cosSimValue = float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
#    return cosSimValue

def calculateCosineSimilarity(userVector,paperVector):
    cosineSimilarityValue = float(cosine_similarity([userVector],[paperVector])[0,0])
    return cosineSimilarityValue


In [27]:
#Ex 4.3 

#Broadcast variable containing master set of all the paperId

PaperIds = list(df_paperCsv.selectExpr("paper_id").toPandas()['paper_id'])
PaperIdsBroadcast = sc.broadcast(PaperIds)

In [28]:
import numpy as np

#Find Delta library for the user: Fetching the Papers excluding the papers which are already present in user Library
def findDeltaLibrary(userLib):
    deltaUserLibrary = np.setdiff1d(PaperIdsBroadcast.value,list(userLib),assume_unique=True).tolist()    
    return deltaUserLibrary

FindDeltaLibraryUDF = udf(findDeltaLibrary, ArrayType(StringType()))


In [67]:

#a)

#Function for Content Based Recommendation on TF-IDF User Profile
def tf_idf_CBRS(df_UserRecord,numberOfRecommendation):
    

    #Fetching the Papers along with their TF-IDF excluding the papers which are already present in user Library
    df_UserRecord_DeltaLibrary = df_UserRecord.withColumn('delta_UserLibrary', explode(FindDeltaLibraryUDF(df_UserRecord.user_library)))
    df_DeltaPapers = df_UserRecord_DeltaLibrary.join(df_rescaledCleanedData, df_UserRecord_DeltaLibrary.delta_UserLibrary == df_rescaledCleanedData.paper_id,how="inner")
    df_DeltaPapers = df_DeltaPapers.selectExpr("user_hash_id","sum_tf_idf_vector AS user_tf_idf_vector","paper_id","tf_idf_vector")
    
    #Repartioning the data
    df_DeltaPapers = df_DeltaPapers.repartition(40)
    
    #Calculate the Cosine Similarity
    df_cosine_similarity_result = df_DeltaPapers.rdd.map(lambda x: (x['user_hash_id'],x['paper_id'],calculateCosineSimilarity(x['user_tf_idf_vector'],x['tf_idf_vector']))).toDF(schema=['user_hash_id', 'paper_id', 'cosine_similarity_value'])
    
    #Repartioning the data
    df_cosine_similarity_result = df_cosine_similarity_result.repartition(40)
    
    #Fetch the Top K recommendations for the User
    windowFn = Window.partitionBy(df_cosine_similarity_result['user_hash_id']).orderBy(df_cosine_similarity_result['cosine_similarity_value'].desc())
     
    df_top_k_recommendations = df_cosine_similarity_result.select('*', row_number().over(windowFn).alias('row_number')).filter(col('row_number') <= numberOfRecommendation)
    df_top_k_recommendations = df_top_k_recommendations.groupBy("user_hash_id").agg(collect_list("paper_id").alias("recommended_library"))
    
    return df_top_k_recommendations

In [70]:
#Ex4.3 b)

#Function for Content Based Recommendation on LDA User Profile
def lda_CBRS(df_UserRecord,numberOfRecommendation):
    
    #Fetching the Papers along with their LDA excluding the papers which are already present in user Library
    df_UserRecord_DeltaLibrary = df_UserRecord.withColumn('delta_UserLibrary', explode(FindDeltaLibraryUDF(df_UserRecord.user_library)))
    df_DeltaPapers = df_UserRecord_DeltaLibrary.join(df_lda_paper_topic_model, df_UserRecord_DeltaLibrary.delta_UserLibrary == df_lda_paper_topic_model.id,how="inner")
    df_DeltaPapers = df_DeltaPapers.selectExpr("user_hash_id","sum_lda_vector AS user_lda_vector","id AS paper_id","topicDistribution")
    
    #Repartioning the data
    df_DeltaPapers = df_DeltaPapers.repartition(40)
    
    #Calculate the Cosine Similarity
    df_cosine_similarity_result = df_DeltaPapers.rdd.map(lambda x: (x['user_hash_id'],x['paper_id'],calculateCosineSimilarity(x['user_lda_vector'],x['topicDistribution']))).toDF(schema=['user_hash_id', 'paper_id', 'cosine_similarity_value'])
    
    #Repartioning the data
    df_cosine_similarity_result = df_cosine_similarity_result.repartition(40)
    
    #Fetch the Top K recommendations for the User
    windowFn = Window.partitionBy(df_cosine_similarity_result['user_hash_id']).orderBy(df_cosine_similarity_result['cosine_similarity_value'].desc())
    
    df_top_k_recommendations = df_cosine_similarity_result.select('*', row_number().over(windowFn).alias('row_number')).filter(col('row_number') <= numberOfRecommendation)
    df_top_k_recommendations = df_top_k_recommendations.groupBy("user_hash_id").agg(collect_list("paper_id").alias("recommended_library"))
    
    return df_top_k_recommendations

In [68]:
#Ex 4.3 c) Top K recommendations for User = 1eac022a97d683eace8815545ce3153f

user_hash_id = '1eac022a97d683eace8815545ce3153f'
k = 10 #No Of Recommendation

#Recommendation using TF-IDF Vector

#Fetch the records for the Particular User along with User Library
df_UserRecord = df_userLibrary.filter((df_userLibrary.user_hash_id.isin(user_hash_id)))
    
#Fetching the User TF-IDF 
df_UserRecord = df_UserRecord.join(df_userprofile_tfidf, df_UserRecord.user_hash_id == df_userprofile_tfidf.user_hash_id, how="inner").select(df_UserRecord.user_hash_id,df_userprofile_tfidf.sum_tf_idf_vector,df_UserRecord.user_library)

#inputs((userHashId,Vector,UserLibrary),NoOfRecommendation)
recommendation_with_tf_idf=tf_idf_CBRS(df_UserRecord,k)

                                                                                

# Top 10 Recommendation for User:1eac022a97d683eace8815545ce3153f with TF-IDF 

In [69]:
#Top 10 Recommendation for User:1eac022a97d683eace8815545ce3153f with TF-IDF 
recommendation_with_tf_idf.show(truncate=False)

                                                                                

+--------------------------------+------------------------------------------------------------------------------------------+
|user_hash_id                    |recommended_library                                                                       |
+--------------------------------+------------------------------------------------------------------------------------------+
|1eac022a97d683eace8815545ce3153f|[854469, 2838456, 4209212, 11692301, 10100718, 6306064, 7326487, 2284574, 3374934, 940716]|
+--------------------------------+------------------------------------------------------------------------------------------+



In [71]:
## Recommendation using LDA vector

#Fetch the records for the Particular User along with User Library
df_UserRecord = df_userLibrary.filter((df_userLibrary.user_hash_id.isin(user_hash_id)))
    
#Fetching the User LDA Vector 
df_UserRecord = df_UserRecord.join(df_userprofile_lda, df_UserRecord.user_hash_id == df_userprofile_lda.user_hash_id, how="inner").select(df_UserRecord.user_hash_id,df_userprofile_lda.sum_lda_vector,df_UserRecord.user_library)

#inputs((userHashId,Vector,UserLibrary),NoOfRecommendation)
recommendation_with_lda=lda_CBRS(df_UserRecord,k)

                                                                                

# Top 10 Recommendation for User:1eac022a97d683eace8815545ce3153f with LDA

In [87]:
#Top 10 Recommendation for User:1eac022a97d683eace8815545ce3153f with LDA
recommendation_with_lda.show(truncate=False)

                                                                                

+--------------------------------+---------------------------------------------------------------------------------------+
|user_hash_id                    |recommended_library                                                                    |
+--------------------------------+---------------------------------------------------------------------------------------+
|1eac022a97d683eace8815545ce3153f|[361044, 1141545, 697335, 4892261, 2783973, 1522926, 118767, 10314804, 781559, 4285207]|
+--------------------------------+---------------------------------------------------------------------------------------+



In [221]:
#Ex 4.4 Offline evaluation metrics

import pyspark.sql.functions as f
import numpy as np

#Find the common elements between two list: to find the Hit elements list
def hitElements(list1,list2):
    commonElements = np.intersect1d(list1,list2)
    return list(commonElements)

#df_UserMetricsEvaluation Schema --> user_hash_id, test_set_user_library, top_recommendations
def calculatePrecisionAndRecallMetrics(df_UserMetricsEvaluation,k):
    
    #If list of top-k recommendations is larger than k,then only the first k elements must be taken into account
    df_firstKElements = df_UserMetricsEvaluation.withColumn("new_top_recommendations",
                                                            when(size(col("top_recommendations")) > k,
                                                                 f.slice("top_recommendations",start=1,length=k))
                                                            .otherwise(col("top_recommendations")))
    
    #Find the hitElementsList and Count the Number of Hits
    HitElementsUDF = udf(hitElements, ArrayType(StringType()))
    
    df_HitElements = df_firstKElements.withColumn("HitsList",HitElementsUDF(df_firstKElements.new_top_recommendations,df_firstKElements.test_set_user_library))\
                     .withColumn("NoOfHits",size(col("HitsList")))
    
    #Calculate the precision for each user
    df_precision = df_HitElements.withColumn("Precision_Value",col("NoOfHits")/k)
    
    #Calculate the recall for each user
    df_recall = df_precision.withColumn("Recall_Value",col("NoOfHits")/when(size(col("test_set_user_library")) == 0,1)
                                                                        .otherwise(size(col("test_set_user_library"))))
    
    df_finalMetrics = df_recall.select("user_hash_id","test_set_user_library","top_recommendations","new_top_recommendations AS recommendations_considered","NoOfHits","Precision_Value","Recall_Value")
  
    return df_finalMetrics


#Calculate the MRR value
def calculateMRR(recommendationList,HitList):
    
    mrrvalue = 0
    
    if(len(HitList) == 0):
        mrrvalue = 0    
    else:
        firstElement = HitList[0]
        position = recommendationList.index(firstElement)
        mrrvalue = position + 1
        
    return mrrvalue
         

[Stage 3707:(24 + 16) / 40][Stage 3713:> (0 + 0) / 4][Stage 3714:>(0 + 0) / 40]

In [168]:
#Ex 3.5 
# 1: Randomly selects n users
fraction_of_users = 20/(df_userLibrary.count())
df_sampleUsers = df_userLibrary.sample(withReplacement=False, fraction=fraction_of_users)


In [169]:
#Ex3.5
#2 Divide sampled data into raining set and test set for each user
import math
import random

def divideList(masterList,trainingSetFraction):
    size_masterList = len(masterList)
    size_TrainingList = int(math.ceil(trainingSetFraction * size_masterList))
    trainingList = masterList[:size_TrainingList]
    testList = masterList[size_TrainingList:]
    return [trainingList,testList]

df_divide_train_test_data = df_sampleUsers.rdd.map(lambda x: (x[0],divideList(x[1],0.8))).toDF()
df_divide_train_test_data = df_divide_train_test_data.select(df_divide_train_test_data._1.alias("user_hash_id"),df_divide_train_test_data._2[0].alias("training_data"),df_divide_train_test_data._2[1].alias("test_data"))

df_training_data = df_divide_train_test_data.selectExpr("user_hash_id","training_data")
df_test_data = df_divide_train_test_data.selectExpr("user_hash_id","test_data")


In [116]:
df_training_data.show()

+--------------------+--------------------+
|        user_hash_id|       training_data|
+--------------------+--------------------+
|488fb15e8c77f8054...|[1523301, 5281566...|
|39dc28d81c6f4d6e6...|[130743, 130683, ...|
|c4328f856a7c676ff...|[257231, 225187, ...|
|eebe1b4e5663deee3...|[9077591, 774343,...|
|fb65d75b2051b110a...|[685336, 556249, ...|
|610d9dcb3071cf289...|[1276165, 3835963...|
|3f6288d7b3e733e18...|[2602918, 2941832...|
|26b1c5cbd64854ffc...|[1041371, 261696,...|
|d8224267d1409bbcf...|  [7659888, 8951552]|
|1b0f9f2a96f40cef5...|[963530, 780088, ...|
|1f9cb9801e3f9372b...|[6527806, 261292,...|
|be833c5784aed1c75...|[13510470, 895012...|
|cf6f250eac956fe6f...|           [1226827]|
|9ed3bf65a07ac3007...|[553783, 515205, ...|
|27994f3f86658586a...|[543594, 531781, ...|
|e5e6be374e14a8252...|[5999093, 1254633...|
|655b753a309dbe4dc...|[941245, 316837, ...|
|73b3c17934cf71742...|[6041009, 8185963...|
|9b20a80145e4c8c3d...|[144027, 4165285,...|
+--------------------+----------

In [117]:
df_test_data.show()

+--------------------+--------------------+
|        user_hash_id|           test_data|
+--------------------+--------------------+
|488fb15e8c77f8054...|            [352724]|
|39dc28d81c6f4d6e6...|  [3190640, 1037773]|
|c4328f856a7c676ff...|    [220081, 208311]|
|eebe1b4e5663deee3...|                  []|
|fb65d75b2051b110a...|[197329, 556247, ...|
|610d9dcb3071cf289...|           [3718551]|
|3f6288d7b3e733e18...|            [691779]|
|26b1c5cbd64854ffc...|           [2283761]|
|d8224267d1409bbcf...|                  []|
|1b0f9f2a96f40cef5...|    [967051, 102131]|
|1f9cb9801e3f9372b...|[261290, 1058754,...|
|be833c5784aed1c75...|                  []|
|cf6f250eac956fe6f...|                  []|
|9ed3bf65a07ac3007...|[3914589, 6593246...|
|27994f3f86658586a...|[540907, 613662, ...|
|e5e6be374e14a8252...|[6609221, 1347966...|
|655b753a309dbe4dc...|            [975289]|
|73b3c17934cf71742...|[6350815, 3456526...|
|9b20a80145e4c8c3d...|[305879, 5826636,...|
+--------------------+----------

In [170]:
#Ex 3.5

#Creating User profiles for Training data set
df_training_data_explode = df_training_data.selectExpr("user_hash_id","explode(training_data) AS training_data_paper_id")

#a- user profile using sampled users data over Tf-Idf vector

df_training_data_TfIdf = df_training_data_explode.join(df_rescaledCleanedData,df_training_data_explode.training_data_paper_id == df_rescaledCleanedData.paper_id, how="inner").select(df_training_data_explode.user_hash_id,df_training_data_explode.training_data_paper_id,df_rescaledCleanedData.tf_idf_vector)
df_training_data_userprofile_tfidf = df_training_data_TfIdf.selectExpr("user_hash_id","tf_idf_vector").rdd.reduceByKey(lambda x,y: addSparseVectors(x,y)).toDF().selectExpr("_1 AS user_hash_id","_2 AS sum_tf_idf_vector")

                                                                                

In [119]:
df_training_data_userprofile_tfidf.take(1)

[Row(user_hash_id='1b0f9f2a96f40cef5384836ee74bf7b9', sum_tf_idf_vector=SparseVector(1000, {25: 18.5147, 32: 4.6189, 49: 4.5936, 67: 4.5649, 78: 4.55, 164: 4.4469, 188: 4.424, 191: 4.4156, 233: 4.3358, 237: 4.3289, 259: 4.305, 261: 4.3024, 269: 29.9781, 284: 4.2687, 297: 4.24, 312: 4.2177, 318: 29.4049, 321: 4.1943, 333: 4.1836, 335: 4.1804, 342: 4.1633, 352: 20.7761, 356: 4.1506, 372: 4.1161, 380: 4.1073, 386: 4.0986, 389: 8.1849, 433: 4.0083, 437: 4.0, 443: 3.987, 449: 7.9514, 457: 15.857, 493: 7.7656, 504: 3.864, 520: 3.8363, 523: 7.671, 524: 7.6704, 547: 3.7922, 555: 3.7808, 559: 3.7685, 562: 3.762, 569: 3.7517, 574: 3.7352, 577: 3.7259, 587: 3.7105, 593: 3.7074, 598: 7.3967, 612: 7.3468, 622: 3.6484, 633: 14.5013, 636: 3.6155, 643: 7.1968, 651: 3.5814, 652: 10.741, 654: 10.732, 656: 14.2972, 657: 3.5743, 661: 7.1388, 677: 3.514, 679: 3.5134, 682: 28.0236, 685: 3.4994, 700: 3.4499, 703: 3.4418, 705: 3.4393, 706: 3.4354, 711: 6.8338, 713: 6.8283, 715: 3.4122, 716: 3.4063, 717: 3.404

In [171]:
#b)- User profile using sampled users data over LDA vector

df_training_data_LDA = df_training_data_explode.join(df_lda_paper_topic_model,df_training_data_explode.training_data_paper_id == df_lda_paper_topic_model.id, how="inner").select(df_training_data_explode.user_hash_id,df_training_data_explode.training_data_paper_id,df_lda_paper_topic_model.topicDistribution)
    
df_training_data_userprofile_lda = df_training_data_LDA.selectExpr("user_hash_id","topicDistribution").rdd.reduceByKey(lambda x,y: addLDAPaperTopicVectors(x,y)).toDF().selectExpr("_1 AS user_hash_id","_2 AS sum_lda_vector")

                                                                                

In [121]:
df_training_data_userprofile_lda.take(1)

[Row(user_hash_id='1b0f9f2a96f40cef5384836ee74bf7b9', sum_lda_vector=DenseVector([1.5708, 0.0401, 0.3668, 3.8009, 0.4158, 0.0404, 0.041, 0.04, 0.0405, 0.1624, 0.0404, 0.0402, 0.0412, 0.0418, 1.0043, 0.1534, 0.3845, 0.0407, 0.3614, 0.0408, 0.0402, 0.0401, 0.0405, 0.0407, 0.0399, 0.4334, 0.8312, 0.5911, 0.0404, 0.1925, 0.0409, 0.3165, 0.4003, 0.0404, 0.0409, 0.0406, 0.0404, 0.0407, 0.0408, 0.0412]))]

In [172]:
#Ex4.5 Off-line evaluation

##########  LDA  ########

#Caluclating recommendation for Sampled users training data

#Making the dataframe format compatible with CBRS function
df_UserRecord = df_training_data_userprofile_lda.join(df_training_data,df_training_data_userprofile_lda.user_hash_id == df_training_data.user_hash_id , how="inner")\
                .select(df_training_data.user_hash_id,df_training_data_userprofile_lda.sum_lda_vector,df_training_data.training_data.alias("user_library"))


In [208]:
#fetching top 50 recommendations
recommendation_with_lda=lda_CBRS(df_UserRecord,50)

                                                                                0]

In [134]:
recommendation_with_lda.show()

                                                                                

+--------------------+--------------------+
|        user_hash_id| recommended_library|
+--------------------+--------------------+
|c4328f856a7c676ff...|[10722036, 917926...|
|488fb15e8c77f8054...|[955708, 2645008,...|
|39dc28d81c6f4d6e6...|[1429875, 845770,...|
|73b3c17934cf71742...|[854077, 13113325...|
|27994f3f86658586a...|[708229, 2710660,...|
|610d9dcb3071cf289...|[4058315, 1259066...|
|655b753a309dbe4dc...|[1014193, 1979432...|
|cf6f250eac956fe6f...|[10522254, 658448...|
|e5e6be374e14a8252...|[5969584, 2880302...|
|1b0f9f2a96f40cef5...|[2645008, 787476,...|
|26b1c5cbd64854ffc...|[3197951, 567792,...|
|fb65d75b2051b110a...|[469589, 401403, ...|
|9ed3bf65a07ac3007...|[562067, 1188019,...|
|eebe1b4e5663deee3...|[6286182, 2945990...|
|3f6288d7b3e733e18...|[3454708, 340017,...|
|d8224267d1409bbcf...|[1053002, 5806383...|
|1f9cb9801e3f9372b...|[1169705, 2392088...|
|be833c5784aed1c75...|[3843892, 2226990...|
|9b20a80145e4c8c3d...|[1340708, 5996736...|
+--------------------+----------

In [209]:
#Formating dataframe to the format accepted by function CalculatePreceisonAndRecall
df_lda_evaluationMetrics = recommendation_with_lda.join(df_test_data,recommendation_with_lda.user_hash_id==df_test_data.user_hash_id,how="inner")\
.select(df_test_data.user_hash_id,df_test_data.test_data.alias("test_set_user_library"),recommendation_with_lda.recommended_library.alias("top_recommendations"))


In [210]:
#When K=5
df_lda_evaluation_Kis5 = calculatePrecisionAndRecallMetrics(df_lda_evaluationMetrics,5)
df_lda_evaluation_Kis5.show()



+--------------------+---------------------+--------------------+--------+---------------+------------+
|        user_hash_id|test_set_user_library| top_recommendations|NoOfHits|Precision_Value|Recall_Value|
+--------------------+---------------------+--------------------+--------+---------------+------------+
|6ce6873d8f198e310...| [1096185, 4371317...|[2908449, 4028922...|       0|            0.0|         0.0|
|e49265f34f9326761...|                   []|[9866432, 2306778...|       0|            0.0|         0.0|
|b117b747c51aeab14...|    [6080985, 698109]|[4977, 2399933, 2...|       0|            0.0|         0.0|
|ea2e1d4f415235bf5...|   [2562165, 1714788]|[7055148, 6776666...|       0|            0.0|         0.0|
|cc00a76d4e82fcc19...| [635565, 635575, ...|[2819775, 2753445...|       0|            0.0|         0.0|
|44f5e138abead2399...| [893420, 708641, ...|[364201, 10595765...|       0|            0.0|         0.0|
|aa7828a01be2c0d45...|                   []|[2828338, 669566,...

                                                                                

In [211]:
#When K=10
df_lda_evaluation_Kis10 = calculatePrecisionAndRecallMetrics(df_lda_evaluationMetrics,10)
df_lda_evaluation_Kis10.show()



+--------------------+---------------------+--------------------+--------+---------------+------------+
|        user_hash_id|test_set_user_library| top_recommendations|NoOfHits|Precision_Value|Recall_Value|
+--------------------+---------------------+--------------------+--------+---------------+------------+
|6ce6873d8f198e310...| [1096185, 4371317...|[2908449, 4028922...|       0|            0.0|         0.0|
|e49265f34f9326761...|                   []|[9866432, 2306778...|       0|            0.0|         0.0|
|b117b747c51aeab14...|    [6080985, 698109]|[4977, 2399933, 2...|       0|            0.0|         0.0|
|ea2e1d4f415235bf5...|   [2562165, 1714788]|[7055148, 6776666...|       0|            0.0|         0.0|
|cc00a76d4e82fcc19...| [635565, 635575, ...|[2819775, 2753445...|       0|            0.0|         0.0|
|44f5e138abead2399...| [893420, 708641, ...|[364201, 10595765...|       0|            0.0|         0.0|
|aa7828a01be2c0d45...|                   []|[2828338, 669566,...

                                                                                

In [212]:
#When K=30
df_lda_evaluation_Kis30 = calculatePrecisionAndRecallMetrics(df_lda_evaluationMetrics,30)
df_lda_evaluation_Kis30.show()

21/08/06 10:11:47 ERROR Executor: Exception in task 54.0 in stage 3680.0 (TID 37093)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:94)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorF

Py4JJavaError: An error occurred while calling o4316.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 in stage 3680.0 failed 1 times, most recent failure: Lost task 54.0 in stage 3680.0 (TID 37093) (d451ade18897 executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:94)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at jdk.internal.reflect.GeneratedMethodAccessor231.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
	at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
	at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:773)
	at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:213)
	at net.razorvine.pickle.Unpickler.load(Unpickler.java:123)
	at net.razorvine.pickle.Unpickler.loads(Unpickler.java:136)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.$anonfun$evaluate$6(BatchEvalPythonExec.scala:94)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage8.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


21/08/06 10:11:48 WARN TaskSetManager: Lost task 36.0 in stage 3680.0 (TID 37091) (d451ade18897 executor driver): TaskKilled (Stage cancelled)


In [213]:
#Ex4.5 Off-line evaluation

##########  TF-IDF  ########

#Caluclating recommendation for Sampled users training data

#Making the dataframe format compatible with CBRS function
df_UserRecord = df_training_data_userprofile_tfidf.join(df_training_data,df_training_data_userprofile_tfidf.user_hash_id == df_training_data.user_hash_id , how="inner")\
                .select(df_training_data.user_hash_id,df_training_data_userprofile_tfidf.sum_tf_idf_vector,df_training_data.training_data.alias("user_library"))


In [214]:
#fetching top 50 recommendations
recommendation_with_tf_idf=tf_idf_CBRS(df_UserRecord,50)

                                                                                ]

In [215]:
#Formating dataframe to the format accepted by function CalculatePreceisonAndRecall
df_tfidf_evaluationMetrics = recommendation_with_tf_idf.join(df_test_data,recommendation_with_tf_idf.user_hash_id==df_test_data.user_hash_id,how="inner")\
.select(df_test_data.user_hash_id,df_test_data.test_data.alias("test_set_user_library"),recommendation_with_tf_idf.recommended_library.alias("top_recommendations"))


In [218]:
#When K=5
df_tfidf_evaluation_Kis5 = calculatePrecisionAndRecallMetrics(df_tfidf_evaluationMetrics,5)
df_tfidf_evaluation_Kis5.show()

[Stage 3707:(8 + 16) / 40][Stage 3713:> (0 + 0) / 4][Stage 3714:>(0 + 0) / 40]

KeyboardInterrupt: 

In [None]:
#When K=10
df_tfidf_evaluation_Kis10 = calculatePrecisionAndRecallMetrics(df_tfidf_evaluationMetrics,10)
df_tfidf_evaluation_Kis10.show()

In [None]:
#When K=30
df_tfidf_evaluation_Kis30 = calculatePrecisionAndRecallMetrics(df_tfidf_evaluationMetrics,30)
df_tfidf_evaluation_Kis30.show()

In [220]:
len(list([1,2]))

2

[Stage 3707:(24 + 16) / 40][Stage 3713:> (0 + 0) / 4][Stage 3714:>(0 + 0) / 40]