## 1. Experiment: Familiarizing with Tools, Loading Data, and Basic Analysis of Data
Done by Marek Schuster (ms2228) 

## Execution details
All of this work was done using a M1 MBP with 16GB RAM.

In [1]:
# We want to measure the runtime of our code, so we have to import time.
import time

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.functions import concat_ws
from pyspark.sql.window import Window

import pyspark

# use pyspark, create new empty context in a new session.
spark = SparkSession.builder.appName("projectOne").getOrCreate()
sc = pyspark.SparkContext.getOrCreate()

## Exercise 1.2

#### a) *userRatingsRDD*: create a pair RDD from *user_libraries.txt* using the user hash as the key and the liked paper(s) (*paper id*) as the value(s)

To create this RDD we will add users_libraries.txt to the context.  
A line equals a user and the papers he liked in the format:

{user-hash};{paper-id1},{paper-id2},...

With varying numbers of papers.  
So we split by the semicolon (separate user and the papers) giving us the user and split the paper-string by the comma, giving us a list of the papers.

In [2]:
start_time = time.time()
userRatingsRDD_unflat = sc.textFile('users_libraries.txt')
userRatingsRDD_unflat = userRatingsRDD_unflat.map(lambda s: (s.split(';')[0], [int(x) for x in s.split(';')[1].split(',')]))
userRatingsRDD = userRatingsRDD_unflat.flatMapValues(lambda x: x)

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

0.5


#### b) *paperTermsRDD* : create a pair RDD from *papers.csv* using the *paper_id* as the key and the words contained in the abstract as the value(s).

A line of a paper looks as follows:  
{paper-id:integer},{type:string},{journal:string},{book-title:string},{series:string},{publisher:string},{pages:string},{volume},{number},{year},{month:string},{postedat:timestamp},{address:string},{title:string},{abstract:string}

Since we can not load it into any other format we have to manipulate the lines in RDD a bit:  
paper-id (integer) is on the left until the first colon is reached.  
abstract (string) is on the rightmost field of the csv. Due to the encapsulating of strings containing commas, we have to differentiate: If abstract contains a colon, it's encapsulated in double-quotes. If not, it's not. Also we reomve punctuations directly attached to a word because those could mess with the following wordcount excercise (eg. "text," would be counted as a separate word from just "text").

In [3]:
def tidyText(txt):
    # remove ,"" because it can fuck up splitting by ," if neccessary
    abstract = txt.replace(",\"\"", "")
    abstract = abstract.split(',\"')[-1] if txt[-1] == '\"' else abstract.split(',')[-1]
    # remove other symbols we do not need to care about
    # @@@ Replace with a filter!!!
    abstract = abstract.replace(', ', ' ')
    abstract = abstract.replace('. ', ' ')
    abstract = abstract.replace('\"', '')
    #abstract = abstract.replace('-', '')
    #abstract = abstract.replace('{', '')
    #abstract = abstract.replace('}', '')
    #abstract = abstract.replace('\"', '')

    abstract = abstract.split(' ')
    return abstract

paperTermsRDD_unflat = sc.textFile('papers.csv').map(lambda s: (int(s.split(',')[0]),tidyText(s)))
paperTermsRDD = paperTermsRDD_unflat.flatMapValues(lambda x: x)

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

0.6


## Exercise 1.3
#### Create a script that computes for each user the top-10 most frequent words appearing in the papers she likes. Exclude the stop words listed in stopwords en.txt. Store the results into a file which contains in each line the user hash and the list of her retrieved words sorted by frequency (top 1 is the most frequent): user hash, top 1 word, top 2 word, top 3 word,..., top 10 word

To accomplish this goal we first have to flatten our key-value pairs ((K, Seq[V]) -> (K, V1), (K,V2), ...) to comfortably process the data (we already did this in the above code cell). Next we prepare the stopwords as a broadcast variable for later use. After preparation we swap keys and values in the userRatingsRDD so paper_id is now the key, this allows us to later join both RDDs on the paper_id. Before the join we also filter out all stopwords and empty words (as in: "").

After joining we remap our new RDD so it is usable for performing a "wordcount" by reducing, similar to the example from the spark lecture. Atlast we sort the RDD by word occurrences descending and then remap a last time to remove the numbers as we don't need them anymore. No we just iterate over our final RDD and print each user with their respective top 10 papers to a textfile.

In [4]:
# Create broadcast of stopwords
# Read stopword-file and flatten it.
stopwordsRDD = sc.textFile('stopwords_en.txt').flatMap(lambda line: line.split(" "))
# broadcast the stopword-rdd.
stopwordsBC = sc.broadcast(stopwordsRDD.collect())

# Create Swapped userRatingsRDD (paper, user) so we can me8rge it:
userRatingsRDDs = userRatingsRDD.map(lambda s: (s[1], s[0]))

# remove all emtpy words and words that are part of stopwords.
paperTermsRDDf = paperTermsRDD.filter(lambda s: s[1] and s[1] not in stopwordsBC.value)

# Join them together based on paper id using only words that are not blacklisted or empty:
joined = userRatingsRDDs.join(paperTermsRDDf)

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

2.4


In [5]:
# restructure to ((user, word), 1) to do a simple word-count
wcable = joined.map(lambda s: ((s[1][0],s[1][1]), 1))

# word count
wcable = wcable.reduceByKey(lambda x, y: x+y)

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

2.4


In [6]:
# Sort table by occurence of words descending.
wcable_desc = wcable.sortBy(lambda s: -s[1])

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

647.3


In [7]:
# restructure again to (user, (word, number))
wcable_desc2 = wcable_desc.map(lambda s: (s[0][0], (s[0][1], s[1])))

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

647.4


In [8]:
# Group entries by user, so we can get top 10 easily
wcable_desc3 = wcable_desc2.groupByKey()

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

647.4


In [9]:
#Iterate over the RDD writing user_id and their top 10 papers as lines into a textfile
with open("wordcountRDD.txt", "w") as file:
    i = -1
    for x in wcable_desc3.collect():
        if(i >= 0):
            file.write("\n");
        file.write(str(x[0]))
        i = 0
        for y in x[1]:
            if i <= 9:
                file.write("," + str(y[0]))
            else:
                continue;
            i += 1

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

991.0


In [10]:
print(joined.count())
print(wcable.count())

52381857
20984374


## Exercise 1.4
#### a) Number of (distinct) user, number of (distinct) items, and number of ratings

In [11]:
# Calculate users based on the unique keys (unique key = unique user)
# Grouping by key to make sure no user is counted twice.
print("Number of users:", userRatingsRDD_unflat.groupByKey().count())

# Calculate papers based on the unique keys (unique key = unique user)
# Grouping by key to make sure no paper is counted twice.
print("Number of papers:", paperTermsRDD_unflat.groupByKey().count())

# Number of user ratings equals length is userRatingsRDD (each pair is one rating)
print("Number of ratings:", userRatingsRDD.count())

Number of users: 28416
Number of papers: 172079
Number of ratings: 828481


#### b) Min number of ratings a user has given

In [12]:
# Create a pair of (user_id, 1), execute a word count, restructure to flat list of only count of ratings of users
numberOfRaitingsRDD = userRatingsRDD.map(lambda pair: (pair[1], 1)).reduceByKey(lambda x, y: x+y).map(lambda x: x[1])

# Get Minimum value of rating-count list
print("Min. number of ratings by user:", numberOfRaitingsRDD.min())

Min. number of ratings by user: 3


#### c) Max number of ratings a user has given

In [13]:
# Get Maximum value of rating-count list
print("Max. number of ratings by user:", numberOfRaitingsRDD.max())

Max. number of ratings by user: 924


#### d) Average number of ratings of users

In [14]:
# Get mean (average) value of rating-count list
print("Mean. number of ratings by user:", numberOfRaitingsRDD.mean())

Mean. number of ratings by user: 4.814538671191726


#### e) Standard deviation for ratings of users

In [15]:
# Get Stdev value of rating-count list
print("Stdev of number of ratings by user:", numberOfRaitingsRDD.stdev())

Stdev of number of ratings by user: 5.477802292314525


#### f) Min number of ratings an item has received

In [16]:
# Create a pair of (paper_id, 1), execute a word count, restructure to flat list of only count value.
raitingsPerItemRDD = paperTermsRDD.map(lambda pair: (pair[0], 1)).reduceByKey(lambda x, y: x+y).map(lambda x: x[1])

# Get minimum of the flat list of rating-number
print("Min. number of ratings of paper:", raitingsPerItemRDD.min())

Min. number of ratings of paper: 1


#### g) Max number of ratings an item has received

In [17]:
# Get maximum of the flat list of rating-number
print("Max. number of ratings of paper:", raitingsPerItemRDD.max())

Max. number of ratings of paper: 9169


#### h) Average number of ratings of items

In [18]:
# Get mean (average) of the flat list of rating-number
print("Mean. number of ratings of paper:", raitingsPerItemRDD.mean())

Mean. number of ratings of paper: 109.90063866015028


#### i) Standard deviation for ratings of items

In [19]:
# Get stdev (standard deviation) of the flat list of rating-number
print("Stdev of number of ratings of paper:", raitingsPerItemRDD.stdev())

Stdev of number of ratings of paper: 139.0853409861822


## Exercise 1.5
#### In contrast to RDDs, DataFrames allow one to handle structured, distributed data in a table-like representation with named and typed columns. DataFrames are therefore applicable in any instance that requires manipulation of structured data. Load the dataset into DataFrames leveraging the structure of the data. Choose a reasonable schema.

First we create the schemas, this was relativly easy since they are largly the same as the definition given in readme.txt. Then we just had to load the data into a dataframe with respect to the schema by using the read() function. In preparation for the next exercise we also already modified the data accordingly. First we split the perform a split in the abstract and user_library columns to turn them into arrays of words/ids and then perform and explode on them (this is very similar to the flattening in 1.3). Also we perform a filtering on the abstracts words using the where function and stopwordsBC. 

In [20]:
start_time = time.time()

# Create DataFrame scheme for papers
papers_schema = StructType() \
      .add("paper_id",IntegerType(),True) \
      .add("type",StringType(),True) \
      .add("journal",StringType(),True) \
      .add("book_title",StringType(),True) \
      .add("series",StringType(),True) \
      .add("publisher",StringType(),True) \
      .add("pages",IntegerType(),True) \
      .add("volume",IntegerType(),True) \
      .add("number",IntegerType(),True) \
      .add("year",IntegerType(),True) \
      .add("month",IntegerType(),True) \
      .add("postedat",DateType(),True) \
      .add("adress",StringType(),True) \
      .add("title",StringType(),True) \
      .add("abstract",StringType(),True) \

# Load in dataset of papers
papers_df = spark.read.format("csv") \
      .option("header", False) \
      .schema(papers_schema) \
      .load("papers.csv")

# Split abstracts by space and then explode it, so we get paper-word - pairs
actualDF = papers_df.withColumn("abstract", split(col("abstract"), " "))
wordsDF = actualDF.withColumn("abstract", explode(col('abstract')))

# remve all words that are part of the stopwords
filteredDF = wordsDF.where(~wordsDF["abstract"].isin(stopwordsBC.value))

# create DataFrame for user table
users_schema = StructType() \
      .add("user_id",StringType(),True) \
      .add("user_library",StringType(),True)

# load in dataset of users
users_df= spark.read.options(delimiter=";").schema(users_schema).csv("users_libraries.txt")

# split ratings by the ',' and then explode, so we get user-rating-pairs.
splitDF = users_df.withColumn("user_library", split(col("user_library"), ","))
ratingsDF = splitDF.withColumn("user_library", explode(col("user_library")))

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

ratingsDF.show()

3.6
+--------------------+------------+
|             user_id|user_library|
+--------------------+------------+
|28d3f81251d94b097...|     3929762|
|28d3f81251d94b097...|      503574|
|28d3f81251d94b097...|     5819422|
|28d3f81251d94b097...|     4238883|
|28d3f81251d94b097...|     5788061|
|28d3f81251d94b097...|      462949|
|28d3f81251d94b097...|      635215|
|28d3f81251d94b097...|      635216|
|28d3f81251d94b097...|     4810441|
|28d3f81251d94b097...|     3481823|
|28d3f81251d94b097...|     4165233|
|28d3f81251d94b097...|     3366480|
|28d3f81251d94b097...|     5984302|
|28d3f81251d94b097...|     4238942|
|28d3f81251d94b097...|     5490453|
|28d3f81251d94b097...|     4636156|
|28d3f81251d94b097...|     5996865|
|28d3f81251d94b097...|     4194836|
|28d3f81251d94b097...|     5828780|
|28d3f81251d94b097...|     4450195|
+--------------------+------------+
only showing top 20 rows



## Exercise 1.6 a)
Now we perform 1.3 on a dataframe. We take our already prepared data from before and reduce the paper Dataframe to only the relevant data. After this we join the dataframe on the paper_id and a now column with a "1" in each row, similar to the mapping in 1.3. We can now perform the classic reducebykey wordcount trough the groupy, agg and sum methods. After this we sort the data by wordcount and use the windowfunction row_number. Since the data is sorted to have the top word in the top row for each user, the row_number is the same as a "top words rating" so to speak. After this we just select all rows where top row number is <= 10, modify the data for writing and save everything in a correctly formated text file.

In [21]:
# create reduced set with paper-word-pairs
reducedDF = filteredDF.select(col("paper_id"), col("abstract"))

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

4.9


In [22]:
# (inner) join both tables using the paperIds
joinedDF = ratingsDF.join(reducedDF, (ratingsDF["user_library"] == reducedDF["paper_id"]), "inner")
# Add column with all 1 for word counting
joinedDF = joinedDF.withColumn("count", lit(1))

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

5.0


In [23]:
# Reduce/Count by userId-word-pair
wordcountDF = joinedDF.groupBy("user_id", "abstract").agg(sum("count"))

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

5.0


In [24]:
# Evaluate rank of word based on count using window function
windowSpec  = Window.partitionBy("user_id").orderBy(desc("sum(count)"))
rankedDF = wordcountDF.withColumn("row_numbber",row_number().over(windowSpec))

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

5.1


In [25]:
# Create tempView, so we can apply sql queries
rankedDF.createOrReplaceTempView("rankedDF");

# only get the top 10 words of every user in descending order
finalDF = spark.sql("SELECT user_id, abstract, row_numbber FROM rankedDF WHERE row_numbber <= 10")

# Combine single words to one string
finalDF = finalDF.groupBy('user_id').agg(collect_list('abstract').alias('words'))
finalDF.show()

# Put everything in just one cell, so we can easily print it
dataFrameWithOnlyOneColumn = finalDF.select(concat_ws(",", col("user_id"), col('words')).alias('data'))

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

+--------------------+--------------------+
|             user_id|               words|
+--------------------+--------------------+
|00095808cdc611fb5...|[errors, text, si...|
|000ac87bf9c1623ee...|[consciousness, p...|
|001933555c2b77453...|[game, motivation...|
|002bf830c228777bc...|[, detection, dar...|
|002e030d14bbb8058...|[archive, culture...|
|004d7b66452498748...|[visual, recognit...|
|007792b2578fc8df9...|[chromosome, bact...|
|008213f1de217daf6...|[potential, model...|
|0087a177586a5c9eb...|[search, tasks, i...|
|008efe9995cd194f8...|[heating, microwa...|
|00a0508d589584122...|[learner, learnin...|
|00a19396b7bb52e40...|[face, recognitio...|
|00b45845b91a446a8...|[teacher, profess...|
|00bc1bb009b21847c...|[strategic, {erp}...|
|00cec95a2c007b650...|[force, states, s...|
|00cfce7a36abfc91e...|[10.1093/hrlr/4.1.1]|
|00d08b45f5ce47630...|[gene, data, tree...|
|00d145bb7325f62e5...|[graphics, proces...|
|01071eb603bbdb648...|[hydroxyurea, cel...|
|01467ff62bfe31efe...|[protein, 

In [26]:
# Output table to textfile withoud headers.
dataFrameWithOnlyOneColumn.coalesce(1).write.format("text").option("header", "false").save("wordcountDF")

run_time = (time.time() - start_time)
print("%.1f" % (run_time))

255.1


## Exercise 1.6 b

This is effectivly just the same as in 1.4, the comments should be enough of an explanation.

In [27]:
#1.6 b)

#a) Number of (distinct) user, number of (distinct) items, and number of ratings
users_df.select(countDistinct("user_id")).show()
papers_df.select(countDistinct("paper_id")).show()
ratingsDF.select(count("user_library")).show()

#b) Min number of ratings a user has given
users_df.withColumn('wordCount', size(split(col('user_library'), ','))).select(min(col("wordCount"))).show()

#c) Max number of ratings a user has given
users_df.withColumn('wordCount', size(split(col('user_library'), ','))).select(max(col("wordCount"))).show()

#d) Average number of ratings of users
users_df.withColumn('wordCount', size(split(col('user_library'), ','))).select(avg(col("wordCount"))).show()

#e) Standard deviation for ratings of users
users_df.withColumn('wordCount', size(split(col('user_library'), ','))).select(stddev(col("wordCount"))).show()

#f) Min number of ratings an item has received
ratingsDF.groupBy("user_library").count().select(min(col("count"))).show()

#g) Max number of ratings an item has received
ratingsDF.groupBy("user_library").count().select(max(col("count"))).show()

#h) Average number of ratings of items
ratingsDF.groupBy("user_library").count().select(avg(col("count"))).show()

#i) Standard deviation for ratings of items
ratingsDF.groupBy("user_library").count().select(stddev(col("count"))).show()


+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                  28416|
+-----------------------+

+------------------------+
|count(DISTINCT paper_id)|
+------------------------+
|                  172079|
+------------------------+

+-------------------+
|count(user_library)|
+-------------------+
|             828481|
+-------------------+

+--------------+
|min(wordCount)|
+--------------+
|             1|
+--------------+

+--------------+
|max(wordCount)|
+--------------+
|          1922|
+--------------+

+------------------+
|    avg(wordCount)|
+------------------+
|29.155440596846848|
+------------------+

+----------------------+
|stddev_samp(wordCount)|
+----------------------+
|     81.17660451011598|
+----------------------+

+----------+
|min(count)|
+----------+
|         3|
+----------+

+----------+
|max(count)|
+----------+
|       924|
+----------+

+----------------+
|      avg(count)|
+----------------+
|4.81453867119172|
+---------

#### Runtime comparison

It took 991 seconds to do the evaluation using RDD.
DF were significantly faster (factor of nearly 4) with 255.1 seconds.

Since a lot of work that is done in RDDs (map/flatMap/reduce/...) takes python code as a parameter it will run this code on every occasion where data has to be compared. Therefore this is not native code but slow pyhton stuff and it is not optimized, causing long runtimes.  
Additionally DataFrames get compiled for execution while RDDs are code that is running directly in python, giving DataFrames a generally faster execution speed.