In [3]:
import json
from pyspark import SparkContext
from itertools import islice, izip
from collections import Counter
import re

class DisplayRDD:
	def __init__(self, rdd):
		self.rdd = rdd

	def _repr_html_(self):                                  
		x = self.rdd.mapPartitionsWithIndex(lambda i, x: [(i, [y for y in x])])
		l = x.collect()
		s = "<table><tr>{}</tr><tr><td>".format("".join(["<th>Partition {}".format(str(j)) for (j, r) in l]))
		s += '</td><td valign="bottom">'.join(["<ul><li>{}</ul>".format("<li>".join([str(rr) for rr in r])) for (j, r) in l])
		s += "</td></table>"
		return s

	def repr(self):                                  
		x = self.rdd.mapPartitionsWithIndex(lambda i, x: [(i, [y for y in x])])
		l = x.collect()
		s = "<table><tr>{}</tr><tr><td>".format("".join(["<th>Partition {}".format(str(j)) for (j, r) in l]))
		s += '</td><td valign="bottom">'.join(["<ul><li>{}</ul>".format("<li>".join([str(rr) for rr in r])) for (j, r) in l])
		s += "</td></table>"
		return s

# A hack to avoid having to pass 'sc' around
dummyrdd = None
def setDefaultAnswer(rdd): 
	global dummyrdd
	dummyrdd = rdd


##sc = SparkContext("local", "Simple App")
setDefaultAnswer(sc.parallelize([0]))

## Load data into RDDs
playRDD = sc.textFile("datafiles/play.txt")
sh = sc.textFile("datafiles/playshort.txt")
logsRDD = sc.textFile("datafiles/NASA_logs_sample.txt")
amazonInputRDD = sc.textFile("datafiles/amazon-ratings.txt")
nobelRDD = sc.textFile("datafiles/prize.json")

## The following converts the amazonInputRDD into 2-tuples with integers
amazonBipartiteRDD = amazonInputRDD.map(lambda x: x.split(" ")).map(lambda x: (x[0], x[1])).distinct()

def task1(playRDD):
    rdd1 = playRDD.map(lambda x: x.split()).map(lambda x:(x[0],(' '.join(x),len(x))))
    rdd2 = rdd1.filter(lambda x: x[1][1]>10)
    return rdd2

def task2_flatmap(x):
    people = x["laureates"]
    surname = []
    for p in people:
        surname.append(p["surname"])
    return surname

def task3helper(line):
    surnames = []
    for p in line["laureates"]:
        surnames.append(p["surname"])
    return (line["category"],surnames)

def task3(nobelRDD):
    pairRDD1 = nobelRDD.map(json.loads).map(task3helper).reduceByKey(lambda v1,v2:v1+v2)
    return pairRDD1


def task4helper(x,l):
    for date in l:
        if x.find(date)!=-1:
            return True
    return False

def task4(logsRDD, l):
    rdd1 = logsRDD.map(lambda x: x.split(":")[0]).filter(lambda x:task4helper(x,l)).distinct()
    rdd = rdd1.map(lambda x:(x.split(" - -")[0],1)).reduceByKey(lambda v1,v2:v1+v2).filter(lambda x:x[1]==2).map(lambda x:x[0])
    return rdd
    
def task5(bipartiteGraphRDD):
    rdd = bipartiteGraphRDD.map(lambda (x,y):(x,1)).reduceByKey(lambda v1,v2:v1+v2).map(lambda (x,y):(y,1)).reduceByKey(lambda v1,v2:v1+v2)
    return rdd

def task6helper(x,day):
    if x.find(day)!=-1:
        return 1
    return 0

def task6helper2 (x):
    list = x.split(' - -')
    host = list[0]
    if len(list)==2:
        others = list[1].split('GET ')
        if len(others) == 2:
            get = others[1].split(' HTTP')[0]
            return (host, get)
        return (host,None)
    return (host,None)
        
    
def task6(logsRDD, day1, day2):
    rdd1 = logsRDD.filter(lambda x: task6helper(x,day1)).map(task6helper2)
    rdd2 = logsRDD.filter(lambda x: task6helper(x,day2)).map(task6helper2)
    return rdd1.cogroup(rdd2).map(lambda (m,n):(m,tuple(map(list,n)))).filter(lambda (x,y):((len(y[0])!=0)and(len(y[1])!=0)))


#words = re.findall("\w+",  "the quick person did not realize his speed and the quick person bumped")
#print tuple(izip(words, islice(words, 1, None)))
#
def task7(nobelRDD):
    rdd = nobelRDD.map(json.loads).map(lambda x:x["laureates"]).flatMap(lambda x:[y["motivation"] for y in x])
    rdd1 = rdd.flatMap(lambda x:list(set(list(izip(x.split(),islice(x.split(),1,None)))))).map(lambda x:(x,1)).reduceByKey(lambda v1,v2:v1+v2)
    return rdd1


    

def task8(bipartiteGraphRDD, currentMatching):
    reverseCurr = currentMatching.map(lambda (x,y):(y,x))
    emptyRDD = bipartiteGraphRDD.subtractByKey(currentMatching).map(lambda (x,y):(y,x)).subtractByKey(reverseCurr).map(lambda (x,y):(y,x))
    rdd = emptyRDD.reduceByKey(lambda v1,v2:min(v1,v2)).map(lambda (x,y):(y,x)).reduceByKey(lambda v1,v2:min(v1,v2)).map(lambda (x,y):(y,x))
    
    return rdd
    

for x in playRDD.takeOrdered(10):
    print x
### Task 1
print "=========================== Task 1"
task1_result = task1(playRDD)
for x in task1_result.takeOrdered(10):
	print x
    
### Task 2
print "=========================== Task 2"
task2_result = nobelRDD.map(json.loads).flatMap(task2_flatmap).distinct()
print task2_result.takeOrdered(10)

#### Task 3
print "=========================== Task 3"
task3_result = task3(nobelRDD)
for x in task3_result.takeOrdered(10):
	print x

#### Task 4
print "=========================== Task 4"
task4_result = task4(logsRDD, ['01/Jul/1995', '02/Jul/1995'])
for x in task4_result.takeOrdered(10):
	print x

#### Task 5
print "=========================== Task 5"
task5_result = task5(amazonBipartiteRDD)
print task5_result.collect()

#### Task 6
print "=========================== Task 6"
task6_result = task6(logsRDD, '01/Jul/1995', '02/Jul/1995')
# print task6_result.collect()
for x in task6_result.takeOrdered(10):
	print x

### Task 7
print "=========================== Task 7"
task7_result = task7(nobelRDD)
for x in task7_result.takeOrdered(10):
	print x
    
#### Task 8 -- we will start with a non-empty currentMatching and do a few iterations
print "=========================== Task 8"
currentMatching = sc.parallelize([('user1', 'product8')])
res1 = task8(amazonBipartiteRDD, currentMatching)
print "Found {} edges to add to the matching".format(res1.count())
print res1.takeOrdered(100)
currentMatching = currentMatching.union(res1)
res2 = task8(amazonBipartiteRDD, currentMatching)
print "Found {} edges to add to the matching".format(res2.count())
print res2.takeOrdered(100)
currentMatching = currentMatching.union(res2)


'Here you may see Benedick the married man.'
'Nay,' said I, 'a good wit:' 'Just,' said she, 'it
'Nay,' said I, 'he hath the tongues:' 'That I
'Then' is spoken; fare you well now: and yet, ere
'Tis almost five o'clock, cousin; tis time you were
'Tis certain so; the prince wooes for himself.
'Tis even so. Hero and Margaret have by this
'Tis no such matter. Then you do not love me?
'Tis true, indeed; so your daughter says: 'Shall
'Tis very true.
(u'--O', (u'--O God, that I were a man! I would eat his heart', 12))
(u'/Enter', (u'/Enter DOGBERRY, VERGES, and Sexton, in gowns; and the Watch, with', 11))
(u'A', (u'A bird of my tongue is better than a beast of yours.', 12))
(u'A', (u"A commodity in question, I warrant you. Come, we'll obey you.", 11))
(u'A', (u'A good old man, sir; he will be talking: as they', 11))
(u'A', (u'A lord to a lord, a man to a man; stuffed with all', 13))
(u'Against', (u'Against my will I am sent to bid you come in to dinner.', 13))
(u'Against', (u'Against that powe

((u'"author', u'of'), 1)
((u'"because,', u'through'), 1)
((u'"for', u'ground-breaking'), 2)
((u'"for', u'groundbreaking'), 3)
((u'"for', u'having'), 6)
((u'"for', u'her'), 5)
((u'"for', u'his'), 16)
((u'"for', u'its'), 2)
((u'"for', u'mechanistic'), 3)
((u'"for', u'over'), 1)
Found 146 edges to add to the matching
[(u'user10', u'product229'), (u'user100', u'product130'), (u'user101', u'product151'), (u'user102', u'product164'), (u'user103', u'product174'), (u'user104', u'product192'), (u'user105', u'product206'), (u'user106', u'product0'), (u'user107', u'product119'), (u'user108', u'product133'), (u'user109', u'product157'), (u'user11', u'product251'), (u'user110', u'product167'), (u'user111', u'product182'), (u'user113', u'product19'), (u'user114', u'product71'), (u'user115', u'product100'), (u'user116', u'product104'), (u'user117', u'product117'), (u'user118', u'product127'), (u'user119', u'product147'), (u'user120', u'product162'), (u'user121', u'product126'), (u'user122', u'product