In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
from operator import add
import time
import csv
import re
from tempfile import NamedTemporaryFile


In [2]:
spark = SparkSession \
.builder \
.appName("cite") \
.config("spark.memory.fraction", 0.8) \
.config("spark.executor.memory", "14g") \
.config("spark.driver.memory", "12g")\
.config("spark.sql.shuffle.partitions" , "800") \
.getOrCreate()

sc = spark.sparkContext
execution_time_file = open('execution_time_file.txt','a')

In [3]:
# Parse a line from the file papers_vocab.txt
# comma to separate ID and vocab, space to separate vocabularies.
def parse_papers_count(line):
    if not line:
        return dict()
    papersCountRaw = line.split(' ')
    papersCount = dict()
    for pcRaw in papersCountRaw:
        paper, count = pcRaw.split(':')
        papersCount[paper] = int(count)
    return papersCount

In [4]:
# Parse a line from the file users_libraries.txt
# semi-colon to separate user hash id with library, comma to separate the IDs in the library.
def parse_users_libraries(line):
    if not line:
        return

    userHash, libraryRAW = line.split(';')
    library = [int(paper_id) for paper_id in libraryRAW.split(',')]
    return userHash, library

In [5]:
# Parse a line from the file papers.csv. Comma seperated. Each line has 15 fields.
# The first is paper_id, the last is 'abstract' of a paper.
def parse_papers(line):
    if not line:
        return
    papersInfo = csv.reader([line.replace("\0", "")], delimiter=',', quoting=csv.QUOTE_MINIMAL)
    papersInfoList = next(papersInfo)
    # paper_id, abstract (the last element in the list)
    return papersInfoList[0], papersInfoList[-1]


### Loading the data in users_libraries.txt into an RDD

In [6]:
start_time = time.time()
users_libraries_path = 'users_libraries.txt'
userRatingsRDD = sc.textFile(users_libraries_path)
# key - user_hash id, values - list of paper_ids
userRatingsRDD = userRatingsRDD.map(parse_users_libraries)

execution_time_file.write('Loading of userRatingsRDD: --- %s seconds --- \n' % (time.time() - start_time))

63

### Loading the data in papers.csv into an RDD

In [7]:
start_time = time.time()
papers_path = 'papers.csv'
# load paper terms 
paperTermsRDD = sc.textFile(papers_path)
# key - paper_id, value - abstract
allPaperTermsRDD = paperTermsRDD.map(parse_papers)

# filter empty abstracts
paperTermsRDD = allPaperTermsRDD.filter(lambda x: x[1] != '')

# key - paper_id, values - list of words 
paperTermsRDD = paperTermsRDD.map(lambda x: (int(x[0]), x[1].lower().split(' ')))
paperTermsRDD.take(10)

execution_time_file.write('Loading of paperTermsRDD: --- %s seconds --- \n' % (time.time() - start_time))

61

We will now generate the top-10 most frequent words in the favorite papers for each user. 
We need to join the paperTermsRDD and userRatingsRDD such that the user hashes are linked directly with the words.
After which we calculate the top words for each user.
RDD's do not allow nested opperations and join is only possible by key on key. 
Therefore, we need to interchange the key and values of both usersLibraries RDD and PaperTerms RDD to paper_ids.

In [8]:
# Create a broadcast variable of stopwords
# the variable is stored across all nodes
# we need to remove stopwords from paper terms parallely in multiple nodes
stopwordsBroadcast = sc.broadcast([word.rstrip('\n') for word in open('stopwords_en.txt')])

In [9]:
# separate the values from an array to separate rows
def flatten(value):
    return value

# remove 
def filterStopWords(value):
    return value[1] not in stopwordsBroadcast.value

start_time = time.time()

# First we flatten the paperTermsRDD and filter out the stop words [paper_id: word]
flattenedPaperTermsRDD = paperTermsRDD.flatMapValues(flatten).filter(filterStopWords)

# Same needs to be done for paper terms as well [user_hash: paper_id]
flattenedUserRatingsRDD = userRatingsRDD.flatMapValues(flatten)
# Now, we need to interchange the keys and values 
# we want to join the paperids from user ratings and paper terms RDD's [paper_id: user_hash]
paperUsersRDD = flattenedUserRatingsRDD.map(lambda x: (x[1], x[0]))

# since nested RDD's are not supported, we need to create a unique userhash_word to word count mapping 
userWordsRDD = paperUsersRDD.join(flattenedPaperTermsRDD).map(lambda x: (x[1][0]+'_'+x[1][1], 1))
# We now reduce the RDD by keys such that [userhash_word: count]
userWordsRDD = userWordsRDD.reduceByKey(add)

# retrieve the original user hashes as keys
def split_hash_word(x):
    userHash, word = x[0].split('_')
    return (userHash, (word, x[1]))

userWordCountsRDD = userWordsRDD.map(split_hash_word)

# sort the word by their occurances and take the first 10
sortedUserCountsRDD = userWordCountsRDD.groupByKey().mapValues(lambda x: sorted(list(x), key=lambda y: -y[1])[:10])

#sortedUserCountsRDD.saveAsTextFile(tempFile.name)
execution_time_file.write('Joining RDDs and retriveing top 10 words: --- %s seconds --- \n' % (time.time() - start_time)) 




79

We will now perform some data analysis on the RDDs

### Number of (distinct) user, number of (distinct) items, and number of ratings

In [10]:
start_time = time.time()

n_users = userRatingsRDD.keys().distinct().count()
print('number of users (RDD) %d\n' % (n_users))

# we take into account even papers with empty abstracts to calculate the list of items
n_items = allPaperTermsRDD.keys().distinct().count()
print('number of items (RDD) %d\n' % n_items)

n_ratings = flattenedUserRatingsRDD.keys().distinct().count()
print('number of ratings (RDD) %d\n' %n_ratings)

number of users (RDD) 28416

number of items (RDD) 172079

number of ratings (RDD) 28416



### Min, Max, average and standard deviation of number of ratings a user has given
We create an RDD of user hash with the number of ratings given by user. MapValues is used to map the values (list of ratings) to the number of ratings (length of the list of ratings).

In [11]:

userRatingNumberRDD = userRatingsRDD.mapValues(len).values()

min_user_ratings = userRatingNumberRDD.min()
print('minimum number of ratings (RDD) given by a user %d\n' % min_user_ratings)

max_user_ratings = userRatingNumberRDD.max()
print('maximum number of ratings (RDD) given by a user %d\n' % max_user_ratings)

avg_user_ratings = userRatingNumberRDD.mean()
print('average number of ratings (RDD) given by a user %d\n' % avg_user_ratings)

std_dev_user_ratings = userRatingNumberRDD.stdev()
print('standard deviation of number of ratings (RDD) given by a user %d\n' % std_dev_user_ratings)


minimum number of ratings (RDD) given by a user 1

maximum number of ratings (RDD) given by a user 1922

average number of ratings (RDD) given by a user 29

standard deviation of number of ratings (RDD) given by a user 81



### Mean, max, average and standard deviation of ratings an item has recieved
We create a grouped RDD on the individual papers (items). The ratings corresponding to each distinct paper_id are grouped together. We again use mapValues to map the list of ratings to their length. 

In [12]:
paperRatingNumberRDD = paperUsersRDD.groupByKey().mapValues(len).values()

min_paper_ratings = paperRatingNumberRDD.min()
print('minimum number of ratings (RDD) recieved by the item (paper) %d\n' % min_paper_ratings)

max_paper_ratings = paperRatingNumberRDD.max()
print('maximum number of ratings (RDD) recieved by the item (paper) %d\n' %max_paper_ratings)

avg_paper_ratings = paperRatingNumberRDD.mean()
print('average number of ratings (RDD) recieved by the item (paper) %d\n' %avg_paper_ratings)

std_dev_paper_ratings = paperRatingNumberRDD.stdev()
print('standard deviation of a number of ratings (RDD) recieved by the item (paper) %d\n' %std_dev_paper_ratings)

execution_time_file.write('Data Analysis over RDDs : --- %s seconds --- \n' % (time.time() - start_time)) 


minimum number of ratings (RDD) recieved by the item (paper) 3

maximum number of ratings (RDD) recieved by the item (paper) 924

average number of ratings (RDD) recieved by the item (paper) 4

standard deviation of a number of ratings (RDD) recieved by the item (paper) 5



62

## Loading the Data onto Dataframes

In [15]:
start_time = time.time()

# loading the schema as mentioned in the README 
papersSchema = StructType([
    StructField("paper_id", IntegerType(), False),
    StructField("type", StringType(), True),
    StructField("journal", StringType(), True),
    StructField("book_title", StringType(), True),
    StructField("series", StringType(), True),
    StructField("publisher", StringType(), True),
    StructField("pages", IntegerType(), True),
    StructField("volume", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("postedat", IntegerType(), True),
    StructField("address", StringType(), True),
    StructField("title", StringType(), True),
    StructField("abstract", StringType(), True)
])
# load csv data onto the RDD
allPapersDf = spark.read.csv(papers_path, header = False, schema = papersSchema).select('paper_id', 'abstract')
# drop rows with empty abstract 
papersAbstractDf = allPapersDf.dropna()

# loading the users usersLibraries schema as mentioned in the README
usersLibrariesSchema = StructType([
    StructField("user_hash_id", StringType(), False),
    StructField("user_library", StringType(), False)
])
# load data into dataframe
usersLibrariesDf = spark.read.csv(users_libraries_path, sep = ";", header = False, schema = usersLibrariesSchema)
execution_time_file.write('Loading Data into dataframes : --- %s seconds --- \n' % (time.time() - start_time)) 


68

We will now use dataframes to generate the top-10 most frequent words in the favorite papers for each user. 
This task is simpler to accomplish with Dataframes than with RDD's as we can join on any column and hence don't have to reverse the key-value pairs. Dataframes also provide several functions that can be operated on different values. The two functions relevant for our result are split and explode.  

In [16]:
start_time = time.time()

# split the usersLibraries as an array and store in column with name user_library_arr [user_hash_id, user_library_arr]
usersLibrariesDf = usersLibrariesDf.select(usersLibrariesDf.user_hash_id,\ 
                                           F.split(usersLibrariesDf.user_library,',').alias('user_library_arr'))

# explode is used to separate the paper_ids from the user_library_arr list into separate rows (opposite of groupBy) [user_hash_id, paper_id]
usersPapersDf = usersLibrariesDf.select(usersLibrariesDf.user_hash_id,\ 
                                        F.explode(usersLibrariesDf.user_library_arr).alias('paper_id'))

# split the words in the abstract of the papersAbstractDf as an array in the column abstract_words [paper_id, abstract_words]
papersAbstractDf = papersAbstractDf.select(papersAbstractDf.paper_id,\ 
                                           F.split(papersAbstractDf.abstract, ' ').alias('abstract_words'))

# join the usersPapersDf and papersAbstractDf on the column 'paper_id' [user_hash_id, abstract_words]
usersWordsDf = usersPapersDf.join(papersAbstractDf, usersPapersDf.paper_id == papersAbstractDf.paper_id)\
                            .select(usersPapersDf.user_hash_id, papersAbstractDf.abstract_words)

# using explode, separate the words of the abstract into separate words [user_hash_id, words]
usersWordsDf = usersWordsDf.select(usersWordsDf.user_hash_id, \
                                   F.explode(usersWordsDf.abstract_words).alias('word'))

# The words need to be transformed into lower case before filtering out the stopwords
usersWordsDf = usersWordsDf.select(usersWordsDf.user_hash_id, \
                                   F.lower(F.col('word')).alias('word'))
# Filter out stopwords 
usersWordsDf = usersWordsDf.filter(~usersWordsDf.word.isin(stopwordsBroadcast.value))

# using aggregate, calcuate the count of all the rows in the group formed by user_hash_id and word combined.
usersWordCountsDf = usersWordsDf.groupBy(usersWordsDf.user_hash_id, usersWordsDf.word)\
                                .agg(F.count('*').alias('word_count'))

# create partions formed by different user_hash_ids and inside the partitions we order by the word count in descending order
usersWordCountsWindow = Window.partitionBy(usersWordCountsDf.user_hash_id)\
                                .orderBy(usersWordCountsDf.word_count.desc())

# using the rank window function, create a column top_word_rank containing the rank of the word wrt its word count in the wondow (partition of hash_id)
topUsersWordsDf = usersWordCountsDf.withColumn('top_word_rank', \
                                               F.rank().over(usersWordCountsWindow))\
                                               .filter(F.col('top_word_rank') <= 10)

# The result is generated by aggregating the remaining top words into a list of top_words
topUsersWordsDf = topUsersWordsDf.groupBy(usersWordCountsDf.user_hash_id)\
                                .agg(F.collect_list(topUsersWordsDf.word)\
                                .alias('top_words'))

execution_time_file.write('Calculating top words in dataframes : --- %s seconds --- \n' % (time.time() - start_time)) 


74

### Data Analysis on top of DataFrames
#### Number of distinct users, items and ratings

In [17]:
start_time = time.time()

n_users = usersLibrariesDf.distinct().count()
print('number of users (dataframe) %d\n' %n_users)

n_items = allPapersDf.distinct().count()
print('number of items (paper_ids) (dataframe) %d\n' %n_items)

n_ratings = usersPapersDf.count()
print('number of ratings (dataframe) %d\n' %n_ratings)

number of users (dataframe) 28416

number of items (paper_ids) (dataframe) 172079

number of ratings (dataframe) 828481



#### Min, Max, average and standard deviation of number of ratings a user has given


In [19]:
# we create a column n_ratings with count of ratings = length of the user_library_arr list
nRatingsDf = usersLibrariesDf.withColumn('n_ratings', F.size(usersLibrariesDf.user_library_arr))

# aggregator function to calculate the statistics on number of ratings user has given
nRatingsDf.agg(F.min('n_ratings').alias('min_user_ratings'), 
               F.max('n_ratings').alias('max_user_ratings'), 
               F.mean('n_ratings').alias('avg_user_ratings'),  
               F.stddev('n_ratings').alias('stddev_user_ratings')).show()

+----------------+----------------+------------------+-------------------+
|min_user_ratings|max_user_ratings|  avg_user_ratings|stddev_user_ratings|
+----------------+----------------+------------------+-------------------+
|               1|            1922|29.155440596846848|  81.17660451011598|
+----------------+----------------+------------------+-------------------+



#### Mean, max, average and standard deviation of ratings an item has recieved

In [21]:
# the counts of each group containing distinct paper_ids
nPaperRatingsDf = usersPapersDf.groupBy('paper_id').count()

# aggregator function to calculate the statistics on the number of ratings an item has recieved
nPaperRatingsDf.agg(F.min('count').alias('min_item_ratings'), 
                    F.max('count').alias('mav_item_ratings'), 
                    F.mean('count').alias('avg_item_ratings'),
                    F.stddev('count').alias('stddev_item_ratings')).show()

execution_time_file.write('Data Analysis over Dataframes : --- %s seconds --- \n' % (time.time() - start_time))

+----------------+----------------+----------------+-------------------+
|min_item_ratings|mav_item_ratings|avg_item_ratings|stddev_item_ratings|
+----------------+----------------+----------------+-------------------+
|               3|             924|4.81453867119172|  5.477818208917296|
+----------------+----------------+----------------+-------------------+



67

In [None]:
execution_time_file.close()