In [2]:
from pyspark import SparkConf, SparkContext

spark_conf = SparkConf()\
        .setAppName("11-iall9376-ismail-allana")
sc=SparkContext.getOrCreate(spark_conf)

In [3]:
from datetime import datetime

In [4]:
## Read the input as line and convert into RDD of string
videosRDD = sc.textFile('AllVideos.csv')

In [5]:
videosRDD.take(5)

['SbOwzAl9ZfQ,17.14.11,Entertainment,310130,4182,361,MX',
 'klOV6Xh-DnI,17.14.11,People & Blogs,104972,271,174,MX',
 '6L2ZF7Qzsbk,17.14.11,News & Politics,136064,10105,266,MX',
 'hcY52MFWMDM,17.14.11,News & Politics,96153,378,171,MX',
 '_OXDcGPVAa4,17.14.11,Howto & Style,499965,57781,681,MX']

## Workload 1 (Controversial Videos)

### User defined functions for mapping

In [7]:
def extractRecords(record):
    '''Rearranges the RDD into key value pair of (Country,VideoID) and (Date,likes,dislikes) 
    while converting likes and dislikes into integar format'''
    video,date,category,views,likes,dislikes,country = record.split(',')
    likes = int(likes)
    dislikes = int(dislikes)
    return ((country,video),(date,likes,dislikes))

def sortDates(record):
    '''For each country Video ID pair this function removes all the dates except first and last appearance date'''
    key = record[0]
    values = record[1]
    values = sorted(values,key=lambda x:datetime.strptime(x[0],'%y.%d.%m'))
    firstapp = values[0]
    lastapp = values[-1]
    return (key,(firstapp,lastapp))

### Computation Graph

In [13]:
## We parse the given dataset into an RDD of key value pair of (country,videoID):(date,likes,dislikes)
formattedRDD = videosRDD.map(extractRecords)
## Grouping the RDD by Key
groupedRDD = formattedRDD.groupByKey().mapValues(list)
## Filter our the first and last appearance of each video
filteredRDD = groupedRDD.map(sortDates)
## Filter out videos with single appearance
filteredRDD2 = filteredRDD.filter(lambda x: x[1][0] != x[1][1])
## Ungroup the filtered data
ungroupRDD = filteredRDD2.flatMapValues( lambda x:x)
## Subtract the likes and dislikes of first appearance from last
differenceRDD = ungroupRDD.reduceByKey(lambda x,y: (y[1]-x[1],y[2]-x[2]))
## Subtract the resultant likes from dislikes
finalRDD = differenceRDD.mapValues(lambda x: x[1]-x[0])

### The final result

In [9]:
finalRDD.map(lambda x: (x[0][1],x[1],x[0][0])).takeOrdered(10, key = lambda x: -x[1])

[('QwZT7T-TXT0', 579119, 'GB'),
 ('QwZT7T-TXT0', 478100, 'US'),
 ('BEePFpC9qG8', 365862, 'DE'),
 ('RmZ3DPJQo2k', 334390, 'KR'),
 ('q8v9MvManKE', 299044, 'IN'),
 ('pOHQdIDds6s', 160365, 'CA'),
 ('ZGEoqPpJQLE', 151913, 'RU'),
 ('84LBjXaeKk4', 134836, 'FR'),
 ('84LBjXaeKk4', 134834, 'DE'),
 ('84LBjXaeKk4', 121240, 'RU')]

## Workload 2 (Country Number of each category)

### User defined functions for mapping/

In [10]:
def pairCategoryVideos(record):
    '''Takes the RDD as input and returns key value pair of Category and Video ID'''
    video,date,category,views,likes,dislikes,country = record.split(',')
    return (category,video)

def pairCategoryCountry(record):
    '''Takes the RDD as input and returns key value pair of (Category,VideoID) and Country'''
    video,date,category,views,likes,dislikes,country = record.split(',')
    return ((category,video),country)

### Computation Graph

In [11]:
## Parse the dataset into Category video pair to find the number of videos in each category (the denominator)
categoriesRDD = videosRDD.map(pairCategoryVideos)
## Removing duplicate values
distinctRDD = categoriesRDD.distinct()
## Group the remaining results by category
groupedRDD3 = distinctRDD.groupByKey().mapValues(list)
## Count the number of videos in each category
countRDD = groupedRDD3.map(lambda x: (x[0],len(x[1])))
## Make another RDD with (Category,VideoID):Country as the key value pair to calculate the numerator
countriesRDD = videosRDD.map(pairCategoryCountry)
## Remove duplicate entries
distinctRDD2 = countriesRDD.distinct()
## Group the remaining result by key
groupedRDD4 = distinctRDD2.groupByKey().mapValues(list)
## Removing the video ID from each key and calculating the length of lists that we grouped in the last step
groupedRDD5 = groupedRDD4.map(lambda x: (x[0][0],len(x[1])))
## Reduce the RDD by adding all the elements together for each category
countRDD2 = groupedRDD5.reduceByKey(lambda x,y:x+y)
## Join the two RDD together
joinedRDD = countRDD2.join(countRDD)
## Divide the two columns to find the country number
finalRDD = joinedRDD.mapValues(lambda x: x[0]/x[1])

### The final result

In [12]:
finalRDD.takeOrdered(18, key = lambda x: x[1])

[('Trailers', 1.0),
 ('Autos & Vehicles', 1.0190448285965426),
 ('News & Politics', 1.0527098256521152),
 ('Nonprofits & Activism', 1.057344064386318),
 ('Education', 1.0628976994615762),
 ('People & Blogs', 1.063884131133748),
 ('Pets & Animals', 1.0703560703560704),
 ('Howto & Style', 1.0875230863944183),
 ('Travel & Events', 1.0929411764705883),
 ('Gaming', 1.09443748882132),
 ('Sports', 1.1421507122296848),
 ('Entertainment', 1.1446024282935856),
 ('Science & Technology', 1.1626835588828102),
 ('Film & Animation', 1.1677314564158094),
 ('Comedy', 1.2142258635136394),
 ('Movies', 1.25),
 ('Music', 1.3105183216252136),
 ('Shows', 1.555045871559633)]