In [76]:
import findspark
findspark.init()
import sys
from pyspark import SparkContext
from datetime import datetime

In [59]:
sc = SparkContext(appName="Top Dislike Growth")


#Relative path is used to specify the output directory
#The relative path is always relative to your home directory in HDFS: /user/<yourUserName>
output_path = "topDislikes"
videos = sc.textFile("AllVideos_short.csv")


In [77]:
def key_and_country(line):
    line = line.strip()
    parts = line.split(",")
    video_id = parts[0].strip()
    category = parts[3].strip()
    country = parts[11].strip()
    trending_date = parts[1].strip()
    likes = parts[6].strip()
    dislikes = parts[7].strip()
    key = video_id+":"+category+":"+country
    value = trending_date+":"+likes+":"+dislikes
    return key,value

def get_growth_rate(input):
    line = [i for i in input]
    diff=[0,0]
    if len(line)>1:
        for i in range(2):
            like = line[i].split(":")[1]
            dislike = line[i].split(":")[2]
            diff[i] = int(like) - int(dislike)
        growth_rate = diff[0]-diff[1]
        return growth_rate
    else:
        return 0

def order_items(line):
    
    rate = line[1]
    sub_parts = line[0].split(":")
    
    vid = sub_parts[0]
    category = sub_parts[1]
    country = sub_parts[2]
    
    key = vid
    value = str(rate)+", "+category+", "+country
    return key, value

In [162]:
header = videos.first()
videos=videos.filter(lambda line:line!=header)

vv=videos.map(key_and_country)
vv.collect()

[('SbOwzAl9ZfQ:Entertainment:MX', '17.14.11:4182:361'),
 ('klOV6Xh-DnI:People & Blogs:MX', '17.14.11:271:174'),
 ('6L2ZF7Qzsbk:News & Politics:MX', '17.14.11:10105:266'),
 ('hcY52MFWMDM:News & Politics:MX', '17.14.11:378:171'),
 ('_OXDcGPVAa4:Howto & Style:MX', '17.14.11:57781:681'),
 ('Q9kK6NWZR1U:Music:MX', '17.14.11:506:67'),
 ('c9VTD3n_IDs:People & Blogs:MX', '17.14.11:2277:69'),
 ('XzULSsZYMRc:News & Politics:MX', '17.14.11:7745:659'),
 ('uijjYNtl_UM:Entertainment:MX', '17.14.11:20155:912'),
 ('cOJ68MQm2ac:Entertainment:MX', '17.14.11:83582:2194'),
 ('rZZEeeAVgog:Comedy:MX', '17.14.11:23279:270'),
 ('kTT472QeJGg:People & Blogs:MX', '17.14.11:17070:7718'),
 ('yhdI98_O-Xc:People & Blogs:MX', '17.14.11:13293:216'),
 ('7jmJtdqI6YE:Entertainment:MX', '17.14.11:194:41'),
 ('Dhhp8M5K3UI:Entertainment:MX', '17.14.11:194:41'),
 ('M16Usa8oqDg:News & Politics:MX', '17.14.11:399:39'),
 ('OFXU_vrye9w:Comedy:MX', '17.14.11:705:199'),
 ('238T-LaO_IM:Entertainment:MX', '17.14.11:776:143'),
 ('KO-

In [155]:
rdd=vv.groupByKey().mapValues(get_growth_rate)
aggregated_result = rdd.sortBy(lambda a:a[1],0)
result = aggregated_result.map(order_items)
result.collect()[:10]

[('BEePFpC9qG8', '366556, Film & Animation, DE'),
 ('RmZ3DPJQo2k', '334594, Music, KR'),
 ('1Aoc-cd9eYs', '192222, Entertainment, GB'),
 ('QwZT7T-TXT0', '189608, Entertainment, US'),
 ('QwZT7T-TXT0', '189605, Entertainment, GB'),
 ('PfLCyR6Efvw', '106418, Music, GB'),
 ('ZGEoqPpJQLE', '98934, Music, DE'),
 ('ZGEoqPpJQLE', '98930, Music, RU'),
 ('84LBjXaeKk4', '93961, Entertainment, FR'),
 ('84LBjXaeKk4', '93961, Entertainment, RU')]

In [98]:
final = sc.parallelize(result.take(10))
final.repartition(1).saveAsTextFile("Output")

In [159]:
# initialized value: diffpair = (0,0)
MAX_DATE = datetime(9999,1,1)

def SeqOp(diffpair,inputpair):
    date1 = diffpair[0][0]
    diff1 = diffpair[0][1]
    date2 = diffpair[1][0]
    diff2 = diffpair[1][1]
    
    inputs = inputpair.split(":")
    dates = datetime.strptime(inputs[0],"%y.%d.%m")
    like = int(inputs[1])
    dislike = int(inputs[2])
    
    if(dates < date1): 
        date2 = date1
        diff2 = diff1
        date1 = dates
        diff1 = dislike - like
    elif(dates < date2):
        date2 = dates
        diff2 = dislike - like
    return [[date1,diff1],[date2,diff2]]

def CombOp(diffpair1, diffpair2):
#     comb = sorted(diffpair1+diffpair2, key = lambda x:x[0])
#     return comb[:2]
    return diffpair1
    

def remap(diffpair):
    date1 = diffpair[0][0]
    diff1 = diffpair[0][1]
    date2 = diffpair[1][0]
    diff2 = diffpair[1][1]
    if(date1==MAX_DATE or date2==MAX_DATE):
        return 0
    else:
        return diff2-diff1

In [163]:
v1=vv.aggregateByKey([[MAX_DATE,0],[MAX_DATE,0]],SeqOp,CombOp,1)

In [164]:
v1.collect()

[('SbOwzAl9ZfQ:Entertainment:MX',
  [[datetime.datetime(2017, 11, 14, 0, 0), -3821],
   [datetime.datetime(2017, 11, 15, 0, 0), -5338]]),
 ('klOV6Xh-DnI:People & Blogs:MX',
  [[datetime.datetime(2017, 11, 14, 0, 0), -97],
   [datetime.datetime(9999, 1, 1, 0, 0), 0]]),
 ('6L2ZF7Qzsbk:News & Politics:MX',
  [[datetime.datetime(2017, 11, 14, 0, 0), -9839],
   [datetime.datetime(2017, 11, 15, 0, 0), -17791]]),
 ('hcY52MFWMDM:News & Politics:MX',
  [[datetime.datetime(2017, 11, 14, 0, 0), -207],
   [datetime.datetime(2017, 11, 15, 0, 0), -453]]),
 ('_OXDcGPVAa4:Howto & Style:MX',
  [[datetime.datetime(2017, 11, 14, 0, 0), -57100],
   [datetime.datetime(2017, 11, 15, 0, 0), -91477]]),
 ('Q9kK6NWZR1U:Music:MX',
  [[datetime.datetime(2017, 11, 14, 0, 0), -439],
   [datetime.datetime(9999, 1, 1, 0, 0), 0]]),
 ('c9VTD3n_IDs:People & Blogs:MX',
  [[datetime.datetime(2017, 11, 14, 0, 0), -2208],
   [datetime.datetime(2017, 11, 15, 0, 0), -3671]]),
 ('XzULSsZYMRc:News & Politics:MX',
  [[datetime.d

In [135]:
v2=sc.parallelize(v1.take(267))

In [52]:
from datetime import datetime
datetime(9999,1,1)

datetime.datetime(9999, 1, 1, 0, 0)

In [166]:
v1.mapValues(remap).sortBy(lambda a:a[1],0).collect()

[('BEePFpC9qG8:Film & Animation:DE', 366556),
 ('RmZ3DPJQo2k:Music:KR', 334594),
 ('1Aoc-cd9eYs:Entertainment:GB', 192222),
 ('QwZT7T-TXT0:Entertainment:US', 189608),
 ('QwZT7T-TXT0:Entertainment:GB', 189605),
 ('PfLCyR6Efvw:Music:GB', 106418),
 ('ZGEoqPpJQLE:Music:DE', 98934),
 ('ZGEoqPpJQLE:Music:RU', 98930),
 ('84LBjXaeKk4:Entertainment:RU', 93961),
 ('84LBjXaeKk4:Entertainment:FR', 93961),
 ('84LBjXaeKk4:Entertainment:DE', 93959),
 ('aqoPG-fYcXs:Entertainment:DE', 63985),
 ('5d0lsUiM0zo:People & Blogs:MX', 63912),
 ('Cd0EbzegNH0:Nonprofits & Activism:RU', 58201),
 ('8fVXPaVabxM:People & Blogs:RU', 57465),
 ('8I_NkJ8VTEI:People & Blogs:US', 39219),
 ('pOHQdIDds6s:People & Blogs:DE', 37480),
 ('yof_rbxu8Nc:News & Politics:RU', 36309),
 ('V5cOvyDpWfM:Sports:CA', 35994),
 ('V5cOvyDpWfM:Sports:GB', 35993),
 ('V5cOvyDpWfM:Sports:FR', 33596),
 ('V5cOvyDpWfM:Sports:DE', 33595),
 ('B0ry4CFZwfQ:People & Blogs:RU', 33502),
 ('RA6PXJIlCIk:People & Blogs:RU', 33362),
 ('uJb1zN2pGw4:Nonprofits &

In [58]:
sc.stop()