In [1]:
"""
Student Name :
Student ID   :
"""
from __future__ import print_function
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from book import Book
from user import User
from bookrating import BookRating
import shutil
import os, errno
import time

def silentremove(filedir): #helper function to clear rdd folders and their content
    try:
        shutil.rmtree(filedir)
    except FileNotFoundError: 
        pass
    

class ExerciseSet2(object):
    """
    Big Data Frameworks Exercises
    https://www.cs.helsinki.fi/courses/582740/2017/k/k/1
    """
        
    def __init__(self):
        """
        Initializing Spark Conf and Spark Context here
        Some Global variables can also be initialized
        """
        self.conf = (SparkConf().setMaster("local").
                     setAppName("exercise_set_2").
                     set("spark.executor.memory", "2g"))
        self.spark_context = SparkContext(conf=self.conf)
        
        # Have global variables here if you wish
        # self.global_variable = None

    def exercise_1(self,partitions=False):
        """
        INTRSUCTIONS: Download the Book-Crossing data set in CSV format from:
        https://moodle.helsinki.fi/pluginfile.php/915616/mod_forum/attachment/1239670/BX-dataset-fixe
        d.tar.gz
        Read users, books and ratings as RDDs using the textFile function. You should use separate
        serializable case classes for each csv files. The members of the classes are the columns of the
        csv files. Finally you will have three RDDs of three different object types and save these object
        RDDs (Hint: use booksrdd .saveAsObjectFile for Scala, and use booksrdd. saveAsPickleFile
        for Python).
        ● BX-Books has isbn, title, author, year, publisher, url1, url2, url3 (Hint: Inside map
        function skip these urls. Your case class for books should have isbn, title, author, year,
        publisher fields.)
        ● BX-Book-Ratings has user_id, isbn, rating (Your case class for ratings should have
        user_id, isbn, rating fields.)
        ● BX-Users has user_id, location, age (Your case class for users should have user_id,
        location, age fields.)
        You should use two partition settings; without specifying partitions, and specifying 5 partitions
        inside the textFile function and discuss the performance gain in total execution time.
        """
        print("Exercise 1")
        
        # remove previous instances of saved RDD objects in case they exist
        silentremove("users")
        silentremove("books")
        silentremove("ratings")
        
        start_time = time.time()
        
        sc = self.spark_context #for convenience abbreviate spark context variable
        
        if partitions: # use 5 partitions
            print('with 5 partitions')
            u = sc.textFile("BX-Users.csv", 5) #reads csv file
            b = sc.textFile("BX-Books.csv", 5)
            r = sc.textFile("BX-Book-Ratings.csv", 5)
        else:
            print('without partitions')
            u = sc.textFile("BX-Users.csv") #reads csv file
            b = sc.textFile("BX-Books.csv")
            r = sc.textFile("BX-Book-Ratings.csv")
        
        #clean csv file
        umap = u.map( lambda line : line.replace("\"", "").split(";")) # takes out new line and quotes characters \" out 
        # from csv format and splits the data with semicolon ; separator
        bmap = b.map( lambda line : line.replace("\"", "").split(";"))
        rmap = r.map( lambda line : line.replace("\"", "").split(";"))
        
        usersRDD = umap.map(lambda x : User(x[0],x[1],x[2])) # map items from clean csv file to their corresponding class fields
        booksRDD = bmap.map(lambda x : Book(x[0],x[1],x[2],x[3],x[4]))
        ratingsRDD = rmap.map(lambda x : BookRating(x[0],x[1],x[2]))
        
        # save new rdd objects
        usersRDD.saveAsPickleFile("users")
        booksRDD.saveAsPickleFile("books")
        ratingsRDD.saveAsPickleFile("ratings")
        print("--- %s seconds ---" % (time.time() - start_time))
        print('done')
        return None

    def exercise_2(self):
        """
        Read users, books and ratings object RDDs from the disk, as you saved in the first exercise.
        Join all the RDDs so that your result is only one RDD where each element is a book with its
        corresponding reviews. The final structure could be something like RDD[(ISBN, title, author,
        year, publisher, Set(userID, location, age, rating))].
        Hint: Reading ObjectFile or PickleFile can be done using the SparkContext object.
        Be sure to choose correct data types for each field, e.g. ratings are Integers. You can use case
        classes to help your work. Bear in mind that although the value for age is an integer, some
        entries have a “NULL” value which cannot be parsed with the toInt() method. Use whatever
        placeholder value you deem appropriate instead, like -1.
        Hint: Remember that you can rearrange fields within an RDD using map() and you can use the
        join() method to merge elements of the form (key, value) which have the same key.
        """
        sc = self.spark_context
        silentremove("rated_books")
        
        print("Exercise 2")
        
        #load rdds from saved pickle files
        usersRDD = sc.pickleFile("users")
        booksRDD = sc.pickleFile("books")
        ratingsRDD = sc.pickleFile("ratings")
        
        
        uratings = usersRDD.map(lambda u : (u.user_id,(u.location,u.age))).join(ratingsRDD.map(lambda r : (r.user_id,(r.isbn,r.rating)))) 
        # map usersRDD with key,value tuple e.g. (u.user_id,(u.location,u.age))) where key=u.user_id and value=(u.location,u.age)
        #Then, map ratingsRDD with key,value tuple where key is also user_id, so that they can be joined with join transformation
        
        bookmap = booksRDD.map(lambda b: (b.isbn,(b.title,b.author,b.year,b.publisher))) # map booksRD with ISBN as key
        
        uratingsmap = uratings.map(lambda x : (x[1][1][0],(x[0],x[1][0][0],x[1][0][1],x[1][1][1]))) # map uratings with ISBN 
        #as key, where we now use the tuple indeces instead of the class names
        bookjoin = bookmap.join(uratingsmap) #join both key-value RDDs by isbn key
        #print(bookjoin.take(1)) #display the content of one RDD
        bookjoin.saveAsPickleFile("rated_books")
        print('done')
        return None

    def exercise_3(self):
        """
        Write a function that computes how many reviews there are for books published between two
        given years. Apply the function to the resulting RDD from exercise 2 and find out the number of
        reviews for books published between 1992 and 1998.
        Hint: You can use filter() to omit elements with other publication dates.
        """
        
        def reviewsCount(obj,startyear,endyear):
            yearfilt = obj.filter(lambda x : x[1][0][2] <= endyear and x[1][0][2] >= startyear)
            reviewscount = yearfilt.groupByKey().count()
            return reviewscount
        
        sc = self.spark_context
        
        print("Exercise 3")
        
        #load rdds from saved pickle files
        rated_books = sc.pickleFile("rated_books")
        reviewscount = reviewsCount(rated_books,1992,1998)
        print(reviewscount)
        print("done")
        return None

    def exercise_4(self):
        """
        Write a function that takes the RDD created in Exercise 1, and returns an RDD of the 20 authors
        with the highest average age among their reviewers. Filter out the entries with a “NULL” value.
        Use only operations over RDDs (count, sort, etc.); the use of for loops is not allowed.
        """
        print("Exercise 4")
        
        sc = self.spark_context
        usersRDD = sc.pickleFile("users")
        booksRDD = sc.pickleFile("books")
        ratingsRDD = sc.pickleFile("ratings")   
        
        uratings = usersRDD.map(lambda u : (u.user_id,u.age)).join(ratingsRDD.map(lambda r : (r.user_id,(r.isbn,r.rating))))         
        bookmap = booksRDD.map(lambda b: (b.isbn,b.author)) # map booksRD as ISBN, author
        
        uratingsmap = uratings.map(lambda x : (x[1][1][0],x[1][0])) # map uratings as ISBN, rater's age
        authorsjoin = bookmap.join(uratingsmap)
        authorsmap = authorsjoin.map(lambda x : x[1]).filter(lambda a : a[1] > -1)
        
        zval = (0,0)
        aggregate_authors = authorsmap.aggregateByKey(zval, lambda a,b: (a[0] + b,    a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1]))
        # average with aggregate method. zval is our initial values tuple. first value will be the sum accumulator and the second value the counter accumulator
        # first lambda is w.r.t. a -> our initial tuple zval, and b -> escalar, next value from another k,v pair with same key
        #second lambda is w.r.t. a -> current tuple of zval, and b -> following tuple of zval with same key as the current
        #therefore in first lambda we increase our sum accumulator with the initial age=0 and the next age, and the initial counter=0 with +1
        #then in second lambda we add the current accumulator tuple with the next accumulator tuple
        # See https://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth
        
        average_authors = aggregate_authors.mapValues(lambda v: v[0]/v[1]) #then we calculate the average v[0] is the resulting accumulator sum
        #and v[1] is the resulting accumulator counter
        top_authors = average_authors.takeOrdered(20,key=lambda x : -x[1]) #finally we use takeOrdered to sort. negative sign is for descending order 
        #and 20 is to take the top 20 values
        
        print(top_authors) #print results
        print("done")
            
        return None

    def exercise_5(self):
        """
        Repeat exercise one with dataframes. You will save three DataFrames for three textfiles. You
        do not need to construct the case class objects. Apply filtering to find out the number of reviews
        for books published between 1992 and 1998.
        """
        
        print("Exercise 5")
        
        # remove previous instances of saved RDD objects in case they exist
        silentremove("users")
        silentremove("books")
        silentremove("ratings")
        
        #remove previous DFs
        silentremove("usersDF")
        silentremove("booksDF")
        silentremove("ratingsDF")
        
        sc = self.spark_context #for convenience abbreviate spark context variable
        sqlContext = SQLContext(sc) #SQL context to create dataframes and be able to perform SQL operations
        
        #load data as dataframes
        u = sqlContext.read.csv('BX-Users.csv',sep=";")
        b = sqlContext.read.csv('BX-Books.csv',sep=";")
        r = sqlContext.read.csv('BX-Book-Ratings.csv',sep=";")
        
        #convert existing dataframes to new dataframes with specified column names
        uDF = u.toDF("user_id", "location", "age")
        bDF = b.toDF("isbn", "title","author","year","publisher","url1","url2","url3")
        rDF = r.toDF("user_id", "isbn", "rating")
        
        #drop unused columns
        bDF = bDF.drop("url1","url2","url3")
        
        #cast specific columns to IntegerType
        uDF = uDF.withColumn("age",uDF["age"].cast(IntegerType()))
        bDF = bDF.withColumn("year",bDF["year"].cast(IntegerType()))
        rDF = rDF.withColumn("rating",rDF["rating"].cast(IntegerType()))
        
        uDF.write.save("usersDF")
        bDF.write.save("booksDF")
        rDF.write.save("ratingsDF")
        
        #replace None with -1 for integer type data
        uDF = uDF.na.fill({"age" : -1})
        bDF = bDF.na.fill({"year" : -1})
        rDF = rDF.na.fill({"rating" : -1})
        
        rbDF = bDF.join(rDF,"isbn") #join books DF and ratings DF by isbn column
        
        #option a) filter rated books with dataframe operations
        filtered_books = rbDF.filter("year >= 1992 and year <= 1998").groupby("isbn").agg({"isbn" : "count"})
        print(filtered_books.count())
        
        #option b) filter rated books with SQL operations
        rbDF.registerTempTable("rbtable") #save as temporary table for SQL queries
        sqlDF = sqlContext.sql("""SELECT isbn, COUNT(*) FROM rbtable WHERE year >= 1992 AND year <= 1998 GROUP BY isbn""")
        print(sqlDF.count())
        
        return None


if __name__ == "__main__":
    EXERCISESET2 = ExerciseSet2()
    EXERCISESET2.exercise_1()
    EXERCISESET2.exercise_1(True)
    EXERCISESET2.exercise_2()
    EXERCISESET2.exercise_3()
    EXERCISESET2.exercise_4()
    EXERCISESET2.exercise_5()
    EXERCISESET2.spark_context.stop()

Exercise 1
without partitions
--- 16.295982122421265 seconds ---
done
Exercise 1
with 5 partitions
--- 15.735426187515259 seconds ---
done
Exercise 2
done
Exercise 3
90164
done
Exercise 4
[('Robin Bovey', 239.0), ('Walter Gretzky', 239.0), ('Chris C. Fisher', 239.0), ('Charles Gill', 228.0), ('Neil R Carlson', 228.0), ('Alison Bell', 228.0), ('Ed Smith', 228.0), ('Ronald Comer', 228.0), ("Old Farmer's Almanac", 228.0), ('Carlton Fredericks', 228.0), ('James W. Kalat', 228.0), ('V. Wayne Klemin', 228.0), ('Robert H. Ramsey', 228.0), ('Mark Pfetzer', 219.0), ('Christian R. Hirsch', 204.0), ('Rink Van Der Velde', 204.0), ('Don Gerrard', 204.0), ('Alan Neibauer', 204.0), ('Gerard Egan', 201.0), ('Adrian Goldsworthy', 201.0)]
done
Exercise 5
90164
90164
