### Initializing Spark


In [5]:
from pyspark import SparkConf, SparkContext

spark_conf = SparkConf()\
        .setAppName("Week 5 Lecture Sample Code")
sc=SparkContext.getOrCreate(spark_conf) 


### Word Count Program ###

This is the word count program used in week 5 lecture to illustrate basic spark program structure. It reads a text file from local disk and count the occurance of words in the text. For simplicity, words are considered as separaetd by white space only.

**Each run of this cell will create an output directory called 1984_wordcount. To re-run the cell, you need to remove that directory**

You can access the terminal to remove the output directory, or expend the "Files" tab on the left verticle bar to get the file panel and delete from there.


In [6]:
input_file = '1984-GeorgeOrwell.txt'
output_path = '1984_wordcount'

text_file = sc.textFile(input_file)

### FILE ###
# Alice and Bob are playing in the field
# Jack and Jill are running up the hill
############

# map --> [["Alice", "and", "Bob", "are", "playing", "in", "the", "field"], ["Jack", "and", "Jill", ...]]
# flatMap --> ["Alice", "and", "Bob", "are", "playing", "in", "the", "field", "Jack", "and", "Jill", ...]


counts = text_file.flatMap(lambda line: line.strip().split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile(output_path)

## map style transformations

 `map` vs. `mapValues`

In [18]:
d = [('a',1),('b',2),('c',3),('d',4),('e',5)]
distRDD = sc.parallelize(d)

#convert to kms
kvmap= distRDD.map(lambda rec: (rec[0],rec[1] * 1.6)).collect()
kvmapvalues = distRDD.mapValues(lambda dist: dist * 1.6).collect()

In [19]:
print(kvmap)
print(kvmapvalues)

[('a', 1.6), ('b', 3.2), ('c', 4.800000000000001), ('d', 6.4), ('e', 8.0)]
[('a', 1.6), ('b', 3.2), ('c', 4.800000000000001), ('d', 6.4), ('e', 8.0)]


## map style transformation 

`filter`

In [20]:
longDist = distRDD.filter(lambda rec: rec[1] > 2)
longDist.collect()

[('c', 3), ('d', 4), ('e', 5)]

### Movie Rating Computing ###

This is a sample notebook showing basic spark RDD operations. The program has two input data sources: *ratings.csv* and *movies.csv*.

The *movies.csv* file contains movie information. Each row represents one movie, and has the following format:

`movieId,title,genres`

The *ratings.csv* file contains rating information. Each row represents one rating of one movie by one user, and has the following format:

`userId,movieId,rating,timestamp`


#### The following cell defines a number of functions to be used in the computation ####

In [5]:
import csv
"""
This module includes a few functions used in computing average rating per genre
"""
def pairMovieToGenre(record):
    """This function converts entries of movies.csv into key,value pair of the following format
    (movieID, genre)
    since there may be multiple genre per movie, this function returns a list of tuples
    Args:
        record (str): A row of CSV file, with three columns separated by comma
    Returns:
        The return value is a list of tuples, each tuple contains (movieID, genre)
    """
    for row in csv.reader([record]):
        if len(row) != 3:
            continue
        movieID, genreList = row[0],row[2]
        return [(movieID, genre) for genre in genreList.split("|")]

def extractRating(record):
    """ This function converts entries of ratings.csv into key,value pair of the following format
    (movieID, rating)
    Args:
        record (str): A row of CSV file, with four columns separated by comma
    Returns:
        The return value is a tuple (movieID, genre)
    """
    try:
        userID, movieID, rating, timestamp = record.split(",")
        rating = float(rating)
        return (movieID, rating)
    except:
        return ()

def mapToPair(line):
    """ This function converts tuples of (genre, rating) into key,value pair of the following format
    (genre,rating)
    
    Args:
        line (str): A touple of  (genre, rating) 
    Returns:
        The return value is a tuple  (genre, rating) 
    """
    genre, rating = line
    return (genre, rating)

def avg(values):
    #convert the iterable into a list
    vlist = list(values) 
    # the average is the sum of the list divided by the count of the the list
    return sum(vlist)/len(vlist)

#### This cell defines the spark function  skeleton (e.g. the computation graph ####

To facilitate inspection of each intermediate RDD, we write each transformation in a separate statement. This is not necessary in production code. 

In [6]:
output_path = 'ratingOut'

#read the input as line and convert into RDD of String
ratingData = sc.textFile("ratings.csv")
movieData = sc.textFile("movies.csv")

movieRatings = ratingData.map(extractRating)
# we use flatMap as there are multiple genre per movie
movieGenre = movieData.flatMap(pairMovieToGenre)

# join  the two RDDs
joined = movieGenre.join(movieRatings)
# throw away the movieID which is useless for subsequent computation
joined_gk = joined.values()
# group ratings by genre
grouped = joined_gk.groupByKey()
genreRatingsAvg = grouped.mapValues(avg).collect()

''' The short hand version
genreRatingsAvg = movieGenre \
    .join(movieRatings) \
    .values() \
    .groupByKey() \
    .mapValues(avg) \
    .collect()
'''
genreRatingsAvg

[('Drama', 3.6502661839863713),
 ('Romance', 3.544254739708809),
 ('Thriller', 3.4955613220431574),
 ('Sci-Fi', 3.4544805001488537),
 ('Fantasy', 3.5004591789879695),
 ('Musical', 3.57196174480989),
 ('Western', 3.565687121866897),
 ('Comedy', 3.4209959269478385),
 ('Adventure', 3.518027387762177),
 ('War', 3.7832017844886754),
 ('Film-Noir', 3.9136363636363636),
 ('Action', 3.4514500881269026),
 ('Horror', 3.281097331830139),
 ('Children', 3.4394294887626575),
 ('Documentary', 3.6430348258706466),
 ('IMAX', 3.641820580474934),
 ('Crime', 3.6423924334372098),
 ('Mystery', 3.652043269230769),
 ('Animation', 3.6353503184713376),
 ('(no genres listed)', 3.0714285714285716)]

#### Check RDD element ####

In [7]:
#What does movieData look like
#Each row is a string
movieData.take(2)

['1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
 '2,Jumanji (1995),Adventure|Children|Fantasy']

In [8]:
# What does movieRatings RDD look like
# Each row is a tuple of String, float
movieRatings.take(2)

[('16', 4.0), ('24', 1.5)]

In [5]:
#How many element are there in movieRatings
movieRatings.count()

NameError: name 'movieRatings' is not defined

In [10]:
#what does moveGenre RDD look like
#Each row is a tuple of string, string
movieGenre.take(2)

[('1', 'Adventure'), ('1', 'Animation')]

In [11]:
#How many element are there in movieGenre
movieGenre.count()

23114

In [12]:
#what does joined look like 
# we are joinning (mid, genre) with (mid, rating)
# the result is (mid, (genre, rating))
joined.take(2)

[('4', ('Comedy', 3.5)), ('4', ('Comedy', 3.0))]

In [13]:
# What does joined_gk look like
# a touple of (string, float) representing (genre, rating)
joined_gk.take(2)

[('Comedy', 3.5), ('Comedy', 3.0)]

In [14]:
# When we run groupByKey on joined_gk, all rating values 
# for the same genre will be grouped into a single sequence as an iterable object
grouped.take(2)

[('Drama', <pyspark.resultiterable.ResultIterable at 0x7f19b2ada8d0>),
 ('Romance', <pyspark.resultiterable.ResultIterable at 0x7f19b2ada190>)]

## Question 1

In [16]:
movieData = sc.textFile("movies.csv")
movieData.filter(lambda line: '(no genres listed)' in line) \
         .map(lambda row: row.split(",")[0]) \
         .collect() 


['126929', '135460', '138863', '141305', '141472', '143709', '149532']

## Question 2


In [17]:
docMovie = movieData.filter(lambda row: 'Documentary' in row) \
    .map(lambda row: (row.split(",")[0],1))

In [18]:
movieRatings = sc.textFile("ratings.csv") \
                .map(extractRating)

In [19]:
doc_rating = docMovie.join(movieRatings)

doc_rating.reduceByKey(lambda v1,v2: (v1[0]+v2[0], 0)) \
        .mapValues(lambda v: v[0]) \
        .sortBy(lambda r:r[1],ascending=False) \
        .take(5)   

[('5669', 51), ('246', 36), ('2064', 35), ('8464', 33), ('8622', 33)]

In [20]:
import csv
"""
This module includes a few functions used in computing average rating per genre
"""
def getGenrePairs(record):
    """This function converts entries of movies.csv into ((g1,g2),1) pair for all genres 
    appearing in the row. 
    since there may be multiple genre per movie, this function returns a list of tuples
    Args:
        record (str): A row of CSV file, with three columns separated by comma
    Returns:
        The return value is a list of tuples, each tuple contains ((g1,g2), 1)
    """
    for row in csv.reader([record]):
        if len(row) != 3:
            return []
        genre_list = row[2].split("|")
        g = len(genre_list)
        if g<2 : #single genre case
            return []
        # at least two genre case
        results = []
        sorted_glist = sorted(genre_list) # sort by aphabet order

        for i in range(g): 
            for j in range(i+1,g): # from 1 to last
                results.append(((sorted_glist[i],sorted_glist[j]),1))
        return results

In [21]:
movieData.flatMap(getGenrePairs)\
    .reduceByKey(lambda a,b:a+b) \
    .sortBy(lambda r: r[1],ascending=False).take(5)

[(('Drama', 'Romance'), 1096),
 (('Comedy', 'Drama'), 1039),
 (('Drama', 'Thriller'), 1016),
 (('Comedy', 'Romance'), 892),
 (('Crime', 'Drama'), 841)]