## Experiment-04
### Amirreza Fosoul and Bithiah Yuan

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *
import string
import csv
import re
import time
spark = SparkSession.builder.appName('ex4').getOrCreate()

from pyspark.sql.functions import *
from pyspark.sql.functions import split, udf, desc, concat, col, lit
import pyspark.sql.functions as f
from pyspark.sql.types import ArrayType, FloatType, StringType, IntegerType, DoubleType, StructType, StructField
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.window import Window
from pyspark.ml.linalg import SparseVector, VectorUDT, DenseVector
import scipy.sparse
from pyspark.ml.linalg import Vectors, _convert_to_vector, VectorUDT
import numpy as np
from pyspark.sql import SQLContext
from nltk.stem import PorterStemmer
from nltk.tokenize import sent_tokenize, word_tokenize
import math
import re

sc = spark.sparkContext
sqlContext = SQLContext(sc)

#### We reduced our dataset for both the user_libraries and papers due to out of memory issues

In [31]:
# Read user ratings into Dataframe
#user_df = spark.read.option("delimiter", ";").csv('./users_libraries.txt')
user_df = spark.read.option("delimiter", ";").csv('./example0.txt')
user_df = user_df.select(col("_c0").alias("userID"), col("_c1").alias("paperID"))

# df to be used in 4.4
sampled_users = user_df

user_df_pre = user_df
user_df_pre = user_df_pre.withColumn("paperID", split(col("paperID"), ",").cast(ArrayType(IntegerType())).alias("paperID"))
#user_df_pre.show()

user_df = user_df.select("userID", f.split("paperID", ",").alias("papers"), f.explode(f.split("paperID", ",")).alias("paperID"))
user_df = user_df.drop("papers")

# Get a dataframe of the distinct papers
d_paper = user_df.select("paperID").distinct()

In [50]:
# Read in the stopwords as a list
with open('./stopwords_en.txt') as file:
    stopwordList = file.read().splitlines()

# Read in records of paper information
#w_df = spark.read.csv('./papers.csv')
w_df = spark.read.csv('./paper0.csv')
w_df = w_df.select("_c0", "_c13", "_c14")
w_df = w_df.select(col("_c0").alias("paperID"), col("_c13").alias("title"), col("_c14").alias("abstract"))
w_df = w_df.na.fill({'title': '', 'abstract': ''}) # to replace null values with empty string
# Get text from title and abstract
w_df = w_df.select(col("paperID"), concat(col("title"), lit(" "), col("abstract")).alias("words"))
#w_df.show()

# Transform the distinct paperIDs dataframe to a list
paper_list = list(d_paper.select('paperID').toPandas()['paperID'])
# Map each distinct paper into int
paper_list = list(map(int, paper_list))

In [51]:
# Function to call in udf
def unrated(papers):
    # Transform the list of distinct papers and the list of rated papers of each user to a set
    # Substract the two sets to get the list of unrated papers for each user
    # Transform back to list
    unrated = list(set(paper_list) - set(papers))
    
    return unrated


# udf to get a list of unrated papers with the length of rated papers for each user
get_unrated = udf(lambda x: unrated(x), ArrayType(IntegerType()))

# Add a new column of unrated papers for each user
unrated_df = user_df_pre.withColumn("unrated", get_unrated(user_df_pre.paperID))

unrated_df = unrated_df.drop("paperID")

unrated_df = unrated_df.withColumn("paperID", explode(unrated_df.unrated))

unrated_df = unrated_df.drop("unrated")

#unrated_df.show()

## Exercise 3. 1 Vector representation for the papers

In [52]:
# Extracting words from the papers and keeping "-" and "_"
tokenizer = RegexTokenizer(inputCol="words", outputCol="tokens", pattern="[a-zA-Z-_]+", gaps=False) 
# Built-in tokenizer
tokenized = tokenizer.transform(w_df)
tokenized = tokenized.select("paperID", "tokens")

# udf to remove "-" and "_" from the tokens
remove_hyphen_udf = udf(lambda x: [re.sub('[-|_]', '', word) for word in x], ArrayType(StringType()))
# Apply udf to the tokens
df = tokenized.withColumn('tokens', remove_hyphen_udf(col('tokens')))

# udf to remove words less than 3 letters
remove_short_words = udf(lambda x: [item for item in x if len(item) >= 3], ArrayType(StringType()))
# Apply udf to the tokens
df = df.withColumn('tokens', remove_short_words(col('tokens')))

# Built-in function to remove stopwords from our custom list
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered" , stopWords=stopwordList)
df = remover.transform(df)
df = df.select("paperID", "filtered")

# Apply stemming with NLTK
# Built-in class from NLTK
ps = PorterStemmer()
# udf to apply stemming
stemming = udf(lambda x: [ps.stem(item) for item in x], ArrayType(StringType()))
# apply udf to tokens
df = df.withColumn('stemmed', stemming(col('filtered')))
df = df.select("paperID", "stemmed")

# Create a new df to store the paperID and stemmed tokens
paper_terms = df

# Explode/Split the tokens in the list for each paperID and get the distinct tokens
df = df.select("paperID", f.explode("stemmed").alias("tokens")).distinct().orderBy("paperID")

# Assign count of 1 to each token w.r.t. the paperID since the tokens are distinct
df = df.groupBy("paperID","tokens").count()

# Get the number of distinct papers
num_papers = w_df.select("paperID").distinct().count()

# Get the value of ten percent of the number of papers
ten_percent = math.ceil(num_papers*.1)

# Create a new df with the tokens and count (without paperID)
df2 = df.select("tokens", "count")
# Count the number of papers containing the tokens
df2 = df2.groupBy("tokens").agg(f.collect_list("tokens").alias("duplicated_values"), f.sum("count").alias("count"))
# Filter out tokens that appeared in more than 10 percent of the papers
df2 = df2.drop("duplicated_values").orderBy((col("count")).desc()).filter(col("count") < ten_percent)
# Filter out tokens that appeared in less than 20 papers
# Limit the df to 1000 tokens
df2 = df2.filter(col("count") >= 20).limit(1000)
# Create a new df with terms and count
important_words = df2.select(col("tokens").alias("terms"), col("count"))
# Output the set of important words, T
important_words.show()

+---------+-----+
|    terms|count|
+---------+-----+
|     cell|   31|
|   experi|   31|
|     time|   31|
|   compar|   30|
|   measur|   30|
|     user|   30|
|  problem|   30|
|   requir|   30|
| challeng|   29|
|  pattern|   28|
|   featur|   28|
|    paper|   28|
|    emerg|   28|
|   effect|   28|
|  current|   27|
|  predict|   27|
|technolog|   27|
|      web|   27|
|   import|   26|
|      map|   26|
+---------+-----+
only showing top 20 rows



In [53]:
# Create a new df where each term is replaced by a unique index that takes a value from the range between 0 and |T| − 1
df = important_words.withColumn("row_num", row_number().over(Window.orderBy("count"))-1)

# Create a df to store the indices and the corresponding terms
terms_index_hash = df.select(col("row_num").alias("index"), "terms")

terms_index_hash.show()

+-----+----------+
|index|     terms|
+-----+----------+
|    0| character|
|    1|     addit|
|    2|     level|
|    3|   program|
|    4|  interest|
|    5|      year|
|    6|   resourc|
|    7|    reveal|
|    8|  demonstr|
|    9|experiment|
|   10|       key|
|   11|  properti|
|   12|     learn|
|   13|     field|
|   14|  principl|
|   15|    access|
|   16| distribut|
|   17|     enabl|
|   18|    effici|
|   19|    common|
+-----+----------+
only showing top 20 rows



In [54]:
num_terms = terms_index_hash.select("terms").distinct().count()

print(num_terms)

80


In [55]:
p_terms = paper_terms.select("paperID", f.explode("stemmed").alias("terms"))

# Join p_terms with the terms_index_hash to replace the terms with the indices
joined_df = terms_index_hash.join(p_terms, ["terms"])
joined_df = joined_df.drop("index")

# Create a new df to compute the term frequency vectors
tf_df = joined_df

tf_df = tf_df.groupby("paperID").agg(f.concat_ws(", ", f.collect_list(tf_df.terms)).alias("terms"))
tf_df = tf_df.withColumn("terms_", split(col("terms"), ",\s*").cast(ArrayType(StringType())).alias("terms"))
tf_df = tf_df.drop("terms")
#tf_df.show()


In [56]:
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel

cv = CountVectorizer(inputCol="terms_", outputCol="vectors")
model = cv.fit(tf_df)
vector_df = model.transform(tf_df)
vector_df = vector_df.select("paperID", col("vectors").alias("term_frequency_sparse"))

#vector_df.show(truncate=False)

In [57]:
### TF-IDF with built-in function ###

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

idf = IDF(inputCol="term_frequency_sparse", outputCol="features")
idfModel = idf.fit(vector_df)
rescaledData = idfModel.transform(vector_df)
tf_idf_built_in = rescaledData.select("paperID", "features")

#tf_idf_built_in.show()

In [58]:
# create the user profile using the tf_idf dataframe and the users' library dataframe
user_profile = tf_idf_built_in.join(user_df, ["paperID"]).orderBy("userID").select('userId', 'features')

# convert the dataframe to RDD to sum up the tf_idf vector of each user and then convert back to dataframe
user_profile = user_profile.rdd.mapValues(lambda v: v.toArray()).reduceByKey(lambda x, y: x + y).mapValues(lambda x: DenseVector(x))\
.toDF(["userId", "features"])

user_profile = user_profile.select("userId", col("features").alias("user_profile"))


In [59]:
def to_sparse(x):        
    # store the indices of non-zero elements
    nonzero_indices = np.nonzero(x)[0].tolist()
    # store the value of non-zero elements
    nonzero_counts = [num for num in x if num]
    # combine them to make a sparse vector
    sparse = SparseVector(num_terms, nonzero_indices, nonzero_counts)
    return sparse

to_sparse_udf = udf(lambda x: to_sparse(x), VectorUDT())

user_profile = user_profile.withColumn("user_profile", to_sparse_udf(col("user_profile")))

#user_profile.show()

In [60]:
df = unrated_df.join(tf_idf_built_in, ["paperID"]).join(user_profile, ["userID"])

df = df.select("userID", "paperID", "user_profile", col("features").alias("paper_profile"))

#df.show()

In [62]:
df = unrated_df.join(tf_idf_built_in, ["paperID"]).join(user_profile, ["userID"])

df = df.select("userID", "paperID", "user_profile", col("features").alias("paper_profile"))

# def to_dense(x):
#     return DenseVector(x.toArray())

# to_dense_udf = udf(lambda x: to_dense(x), VectorUDT())

# df = df.withColumn('paper_profile_dense', to_dense_udf(df.paper_profile))
# df = df.drop("paper_profile")

#### CHANGE THE USER HERE!!! ###
df_selected = df.where(df.userID=="589b870a611c25fa99bd3d7295ac0622")
#df_selected = df.where(df.userID=="1eac022a97d683eace8815545ce3153f")

#df_selected.show()

In [63]:
import numpy as np 

def cos_sim(u, p):
    result = (np.dot(u, p))/(np.linalg.norm(u) * np.linalg.norm(p))
    result = result.item()
    return result

compute_sim = udf(cos_sim, FloatType())

def cbrs(u, k):
    sim_df = u.withColumn('Similarity', compute_sim(u.user_profile, u.paper_profile))
    window = Window.partitionBy(col("userID")).orderBy((col("Similarity")).desc())
    sim_df = sim_df.select(col('*'), row_number().over(window).alias('row_number')).where(col('row_number') <= k)
    get_r = sim_df.select("userID", "paperID", col("row_number").alias("rank"))
    cbrs_df = get_r.select("userID", "paperID")
    cbrs_df = cbrs_df.groupby("userID").agg(f.concat_ws(", ", f.collect_list(cbrs_df.paperID)).alias("top_papers"))
    
    return cbrs_df

k = 5

user_rec = cbrs(df_selected, k)

print("The top " + str(k) + " papers for " + str(user_rec.head()[0]) + " are " + str(user_rec.head()[1]))

The top 5 papers for 589b870a611c25fa99bd3d7295ac0622 are 9045137, 115945, 8310458, 3728173, 11733005


In [64]:
### LDA ###
from pyspark.ml.clustering import LDA

# just rename the column to use it with the built-in methods
termFrequencyVector = vector_df.select('paperId', col('term_frequency_sparse').alias('features'))
# Trains a LDA model
# set k=40 to have 40 different topics
lda = LDA(k=40)
model = lda.fit(termFrequencyVector)

# each topic is described by 5 terms
topics = model.describeTopics(5)

# print("The topics described by their top-weighted terms:")
#topics.show(truncate=False)

# Shows the result
# it shows the probabilty of each topic for each paper
transformed = model.transform(termFrequencyVector)
#transformed.show()

In [65]:
# creating a user profile based on the LDA results

lda_user_profile = transformed.join(user_df, ["paperID"]).orderBy("userID").select('userId', col('topicDistribution').alias('features'))

lda_user_profile = lda_user_profile.rdd.mapValues(lambda v: v.toArray()).reduceByKey(lambda x, y: x + y).mapValues(lambda x: DenseVector(x))\
.toDF(["userId", "features"])
lda_user_profile = lda_user_profile.select("userId", col("features").alias("lda_user_profile"))

lda_user_profile = lda_user_profile.withColumn("user_profile", to_sparse_udf(col("lda_user_profile")))

#lda_user_profile.show()

In [66]:
df_lda = unrated_df.join(transformed, ["paperID"]).join(lda_user_profile, ["userID"])

df_lda = df_lda.select("userID", "paperID", "user_profile", col("features").alias("paper_profile"))

#df_lda.show()

df_selected_lda = df_lda.where(df_lda.userID=="1eac022a97d683eace8815545ce3153f")

#df_selected_lda.show()

In [80]:
cbrs(df_selected_lda, 5).show(truncate=False)

+------+----------+
|userID|top_papers|
+------+----------+
+------+----------+



## Exercise 4. 4 Sampling and data preparation

In [200]:
from pyspark.sql.functions import rand 

sampled_users = user_df_pre.orderBy(rand()).limit(2)

get_len_udf = udf(lambda x: len(x), IntegerType())

sampled_users = sampled_users.withColumn("libSize", get_len_udf("paperID"))

get_train = udf(lambda x: int(x*0.8), IntegerType())

sampled_users = sampled_users.withColumn("trainSize", get_train("libSize"))

sampled_users.show()

+--------------------+--------------------+-------+---------+
|              userID|             paperID|libSize|trainSize|
+--------------------+--------------------+-------+---------+
|589b870a611c25fa9...|[1283233, 1305474...|      8|        6|
|90f1a3e6fcdbf9bc5...|[115945, 11733005...|      5|        4|
+--------------------+--------------------+-------+---------+



In [276]:
sampled_exploded = sampled_users.withColumn("paperID", explode(col("paperID")))

window = Window.partitionBy(col("userID")).orderBy(rand())
#.orderBy("userID")

sampled_exploded = sampled_exploded.select(col('*'), row_number().over(window).alias('row_number'))

training_df = sampled_exploded.where(col('row_number') <= col("trainSize"))
training_df = training_df.select("userID", "paperID").orderBy("userID")
training_df.show()

test_df = sampled_exploded.where(col('row_number') > col("trainSize"))
test_df = test_df.select("userID", "paperID").orderBy("userID")
test_df.show()

+--------------------+--------+
|              userID| paperID|
+--------------------+--------+
|589b870a611c25fa9...| 1029499|
|589b870a611c25fa9...| 1305474|
|589b870a611c25fa9...| 1283233|
|589b870a611c25fa9...|  945604|
|589b870a611c25fa9...|  956315|
|589b870a611c25fa9...|  967275|
|90f1a3e6fcdbf9bc5...| 8310458|
|90f1a3e6fcdbf9bc5...| 9045137|
|90f1a3e6fcdbf9bc5...|11733005|
|90f1a3e6fcdbf9bc5...|  115945|
+--------------------+--------+

+--------------------+-------+
|              userID|paperID|
+--------------------+-------+
|589b870a611c25fa9...|1001231|
|589b870a611c25fa9...| 352713|
|90f1a3e6fcdbf9bc5...|3728173|
+--------------------+-------+



In [277]:
# create the user profile using the tf_idf dataframe and the users' library dataframe
training_user_profile = tf_idf_built_in.join(training_df, ["paperID"]).orderBy("userID").select('userId', 'features')

# convert the dataframe to RDD to sum up the tf_idf vector of each user and then convert back to dataframe
training_user_profile = training_user_profile.rdd.mapValues(lambda v: v.toArray()).reduceByKey(lambda x, y: x + y).mapValues(lambda x: DenseVector(x))\
.toDF(["userId", "features"])

training_user_profile = training_user_profile.select("userId", col("features").alias("user_profile"))

training_user_profile = training_user_profile.withColumn("user_profile", to_sparse_udf(col("user_profile")))

train_df = unrated_df.join(tf_idf_built_in, ["paperID"]).join(training_user_profile, ["userID"])

train_df = train_df.select("userID", "paperID", "user_profile", col("features").alias("paper_profile"))

train_df = unrated_df.join(tf_idf_built_in, ["paperID"]).join(training_user_profile, ["userID"])

train_df = train_df.select("userID", "paperID", "user_profile", col("features").alias("paper_profile")).orderBy("userID")

#train_df.show()

In [None]:
# creating a user profile based on the LDA results
training_lda_user_profile = transformed.join(training_df, ["paperID"]).orderBy("userID").select('userId', col('topicDistribution').alias('features'))

training_lda_user_profile = training_lda_user_profile.rdd.mapValues(lambda v: v.toArray()).reduceByKey(lambda x, y: x + y).mapValues(lambda x: DenseVector(x))\
.toDF(["userId", "features"])
training_lda_user_profile = training_lda_user_profile.select("userId", col("features").alias("lda_user_profile"))

training_lda_user_profile = training_lda_user_profile.withColumn("user_profile", to_sparse_udf(col("lda_user_profile")))

train_df_lda = unrated_df.join(transformed, ["paperID"]).join(training_lda_user_profile, ["userID"])

train_df_lda = train_df_lda.select("userID", "paperID", "user_profile", col("features").alias("paper_profile")).orderBy("userID")

#train_df_lda.show()

## Exercise 4. 5 (Off-line evaluation)

In [None]:
# a) Generate 10 recommendations

def castToArray(df, colName):
    dff = df.withColumn(colName, split(col(colName), ", ").cast(ArrayType(IntegerType())))
    return dff

# TF-IDF
print("TF-IDF recommender")
tf_rec = cbrs(train_df, 10).orderBy("userID")
tf_rec  = castToArray(tf_rec, "top_papers")
tf_rec.show(truncate=False)

# print("LDA recommender")
# LDA
lda_rec  = cbrs(train_df_lda, 10).orderBy("userID")
lda_rec  = castToArray(lda_rec, "top_papers")
lda_rec.show(truncate=False)

TF-IDF recommender


In [None]:
test_df_collected = test_df.groupby("userID").agg(f.concat_ws(", ", f.collect_list(test_df.paperID)).alias("paperID"))
test_df_collected = test_df_collected.withColumn("paperID", split(col("paperID"), ",\s*").cast(ArrayType(IntegerType())).alias("paperID")).orderBy("userID")

test_df_collected.show()

In [263]:
joined_tf_test = test_df_collected.join(tf_rec, "userID")

joined_tf_test = joined_tf_test.select("userID", col("paperID").alias("test_set"), col("top_papers").alias("train_set"))

joined_tf_test.show()

+--------------------+-----------------+--------------------+
|              userID|         test_set|           train_set|
+--------------------+-----------------+--------------------+
|589b870a611c25fa9...|[1283233, 956315]|[9045137, 115945,...|
|90f1a3e6fcdbf9bc5...|         [115945]|[1001231, 352713,...|
+--------------------+-----------------+--------------------+



In [264]:
def getHits(train, test):
    return list(set(train).intersection(test))

getHits_udf = udf(getHits, ArrayType(IntegerType()))

tf_hits = joined_tf_test.withColumn('Hits', getHits_udf(joined_tf_test.train_set, joined_tf_test.test_set))

tf_hits.show(truncate=False)

+--------------------------------+-----------------+--------------------------------------------------+----+
|userID                          |test_set         |train_set                                         |Hits|
+--------------------------------+-----------------+--------------------------------------------------+----+
|589b870a611c25fa99bd3d7295ac0622|[1283233, 956315]|[9045137, 115945, 8310458, 3728173, 11733005]     |[]  |
|90f1a3e6fcdbf9bc550e866116bbcea5|[115945]         |[1001231, 352713, 956315, 1305474, 967275, 945604]|[]  |
+--------------------------------+-----------------+--------------------------------------------------+----+

