# Workload Summary 

Find out the top 10 videos with fastest growth
of dislikes number between its first and second trending appearances. Here we measure
the growth of dislikes number by the gap of dislikes increase and likes increase
between the first two trending appearances in the same country.

For instance, the dislikes growth of video QwZT7T-TXT0 in US is computed as follows:

```
(1065772 - 629120) - (1082422 - 835378) = 189608
```

Where the first component is the increase of dislikes and the second component is
the increase of likes between the first and second trending appearances .

The result of this workload should show a few details of the top 10 videos, including the
video id, category, dislike growth value and country code. Below is a few sample results:

```
"BEePFpC9qG8", 366556, "Film & Animation", "DE"
"RmZ3DPJQo2k", 334594, "Music", "KR"
"1Aoc-cd9eYs", 192222, "Entertainment", "GB"
"QwZT7T-TXT0", 189608, "Entertainment", "US"
"QwZT7T-TXT0", 189605, "Entertainment", "GB"
```

If a video has changed its category name over time, you can use the category of the
first appearance. It is possible to include the same video multiple times in top 10 list if it
has large dislikes growth in multiple countries. Video QwZT7T-TXT0 is such an example.

## Required Imports 
import necessary packages and expose pyspark to Jupyter Notebook.

In [1]:
# all required import libraries 
import sys
from importlib import reload
import findspark
import filterHelpers as helpers
findspark.init()
from pyspark import SparkContext

In [2]:
reload(helpers)

<module 'filterHelpers' from '/Users/nagibshah/dev/COMP5349_MRandSpark/spark/filterHelpers.py'>

## Create spark context and set paths 
create the spark context and set the input/output paths 

In [3]:
sc = SparkContext(appName="Top 10 Fastest Video Dislikes Trend")

#You can change the input path pointing to your own HDFS
#If spark is able to read hadoop configuration, you can use relative path
input_path = '../data/' # local file system relative path 

#Relative path is used to specify the output directory
#The relative path is always relative to your home directory in HDFS: /user/<yourUserName>
output_path = '../data/top10VideosOut'

## load the datasets 

In [4]:
videoData = sc.textFile(input_path + "AllVideos_short.csv")

## Run the filtering and processing 

In [5]:
videoStats = videoData.map(helpers.extractMovieRecord).filter(lambda x: x)

# group by key & filter by country 
groupedTrends = videoStats.groupByKey().mapValues(helpers.mapFirstTwoRecords)
# unpack the values in the rdd (K, [V,V]) > (K, V) & sort by the key 
videoTrends = groupedTrends.flatMap(lambda x: [(x[0], y) for y in x[1]]).sortByKey()
# reduce by key to sum the likes and dislikes 
videoTrendsDislikes = videoTrends.reduceByKey(helpers.reduceLikesDislikesGrowth).map(helpers.mapDislikesTrend)
# sort by the trend value in descending order 
videoTrendsSorted = videoTrendsDislikes.sortBy(lambda x: x[1],ascending=False)
print(type(videoTrendsSorted))


<class 'pyspark.rdd.PipelinedRDD'>


In [6]:
print("initial map size: {}".format(sys.getsizeof(videoStats.collect())))
print("post grouping and map - 2 records per video per region: {}"
      .format(sys.getsizeof(groupedTrends.collect())))

print("Total number of records ungrouped: {}".format(videoStats.count()))
print("Total number of unique video id (post grouping): {}".format(groupedTrends.count()))

initial map size: 3215232
post grouping and map - 2 records per video per region: 1784056
Total number of records ungrouped: 373623
Total number of unique video id (post grouping): 207142


In [7]:
# print samples 
print(groupedTrends.take(10))
print("\n")
print(videoTrends.take(10))

[(('SbOwzAl9ZfQ', 'MX'), [['Entertainment', 4182, 361], ['Entertainment', 5891, 553]]), (('_OXDcGPVAa4', 'MX'), [['Howto & Style', 57781, 681], ['Howto & Style', 93269, 1792]]), (('Q9kK6NWZR1U', 'MX'), [['Music', 506, 67]]), (('rZZEeeAVgog', 'MX'), [['Comedy', 23279, 270]]), (('kTT472QeJGg', 'MX'), [['People & Blogs', 17070, 7718]]), (('yhdI98_O-Xc', 'MX'), [['People & Blogs', 13293, 216]]), (('7jmJtdqI6YE', 'MX'), [['Entertainment', 194, 41], ['Entertainment', 347, 78]]), (('OFXU_vrye9w', 'MX'), [['Comedy', 705, 199]]), (('WflHonz04Uc', 'MX'), [['Entertainment', 20, 53]]), (('d1oYTRYmNHs', 'MX'), [['Comedy', 28782, 1770]])]



[(('--1skHapGUc', 'MX'), ['Entertainment', 483, 77]), (('--2K8l6BWfw', 'FR'), ['Pets & Animals', 694, 4]), (('--45ws7CEN0', 'CA'), ['Gaming', 3837, 516]), (('--45ws7CEN0', 'MX'), ['Gaming', 3837, 516]), (('--45ws7CEN0', 'RU'), ['Gaming', 3837, 516]), (('--6vcer7XYQ', 'MX'), ['Entertainment', 148, 12]), (('--6vcer7XYQ', 'MX'), ['Entertainment', 1626, 218]), (('--

In [8]:
# print the top 10
top10ControversialVideos = sc.parallelize(videoTrendsSorted.take(10))
print(top10ControversialVideos.take(10))

[('BEePFpC9qG8', 366556, 'Film & Animation', 'DE'), ('RmZ3DPJQo2k', 334594, 'Music', 'KR'), ('1Aoc-cd9eYs', 192222, 'Entertainment', 'GB'), ('QwZT7T-TXT0', 189608, 'Entertainment', 'US'), ('QwZT7T-TXT0', 189605, 'Entertainment', 'GB'), ('PfLCyR6Efvw', 106418, 'Music', 'GB'), ('8d_202l55LU', 105839, 'News & Politics', 'DE'), ('ZGEoqPpJQLE', 98934, 'Music', 'DE'), ('ZGEoqPpJQLE', 98930, 'Music', 'RU'), ('84LBjXaeKk4', 93961, 'Entertainment', 'FR')]


In [9]:
# save the output 
# coalesce to create a single partition out of the top 10 entries
top10ControversialVideos.coalesce(1,False).saveAsTextFile(output_path)

## Final step - safely stop the spark context

In [11]:
# stop the spark context 
sc.stop()