You can use this notebook to develop your answers. Make sure to look at intermediate results using `take()` for debugging.

In [1]:
import json
import re
import csv
from io import StringIO

## Load data into RDDs
usersRDD = sc.textFile("datafiles/se_users.json").map(json.loads)
postsRDD = sc.textFile("datafiles/se_posts.json").map(json.loads)
playRDD = sc.textFile("datafiles/play.txt")
logsRDD = sc.textFile("datafiles/NASA_logs_sample.txt")
nobelRDD = sc.textFile("datafiles/prize.json").map(json.loads)


In [2]:
for t in usersRDD.take(10): print(t)

[Stage 0:>                                                          (0 + 1) / 1]

{'id': -1, 'reputation': 1, 'creationdate': '2011-01-03', 'displayname': 'Community', 'views': 863, 'upvotes': 12299, 'downvotes': 8651}
{'id': 2, 'reputation': 101, 'creationdate': '2011-01-03', 'displayname': 'Geoff Dalgas', 'views': 64, 'upvotes': 10, 'downvotes': 0}
{'id': 3, 'reputation': 101, 'creationdate': '2011-01-03', 'displayname': 'balpha', 'views': 35, 'upvotes': 4, 'downvotes': 0}
{'id': 4, 'reputation': 212, 'creationdate': '2011-01-03', 'displayname': 'Nick Craver', 'views': 61, 'upvotes': 11, 'downvotes': 1}
{'id': 5, 'reputation': 101, 'creationdate': '2011-01-03', 'displayname': 'Emmett', 'views': 32, 'upvotes': 2, 'downvotes': 0}
{'id': 6, 'reputation': 100, 'creationdate': '2011-01-03', 'displayname': 'Robert Cartaino', 'views': 36, 'upvotes': 0, 'downvotes': 2}
{'id': 7, 'reputation': 1128, 'creationdate': '2011-01-03', 'displayname': 'Toby', 'views': 69, 'upvotes': 18, 'downvotes': 0}
{'id': 8, 'reputation': 2949, 'creationdate': '2011-01-03', 'displayname': 'ilh

                                                                                

In [3]:
    def parse_csv(line):
        reader = csv.reader(StringIO(line))
        return next(reader)

    # Let's make RDDs for the Movie Lens Files 
    csvData1 = sc.textFile("datafiles/ml-latest-small/movies.csv")

    # This complicated command is needed to skip the header row using sparkContext
    moviesRDD = csvData1.zipWithIndex().filter(lambda line: line[1] != 0).map(lambda line: line[0]).map(parse_csv)

    for t in moviesRDD.take(11): print(t)

['1', 'Toy Story (1995)', 'Adventure|Animation|Children|Comedy|Fantasy']
['2', 'Jumanji (1995)', 'Adventure|Children|Fantasy']
['3', 'Grumpier Old Men (1995)', 'Comedy|Romance']
['4', 'Waiting to Exhale (1995)', 'Comedy|Drama|Romance']
['5', 'Father of the Bride Part II (1995)', 'Comedy']
['6', 'Heat (1995)', 'Action|Crime|Thriller']
['7', 'Sabrina (1995)', 'Comedy|Romance']
['8', 'Tom and Huck (1995)', 'Adventure|Children']
['9', 'Sudden Death (1995)', 'Action']
['10', 'GoldenEye (1995)', 'Action|Adventure|Thriller']
['11', 'American President, The (1995)', 'Comedy|Drama|Romance']


In [4]:
# For ratings, we will skip the timestamp for simplicity
csvData2 = sc.textFile("datafiles/ml-latest-small/ratings.csv")
ratingsRDD = csvData2.zipWithIndex().filter(lambda line: line[1] != 0).map(lambda line: line[0].split(",")[0:3])
        
for t in ratingsRDD.take(10): print(t)

['1', '1', '4.0']
['1', '3', '4.0']
['1', '6', '4.0']
['1', '47', '5.0']
['1', '50', '5.0']
['1', '70', '3.0']
['1', '101', '5.0']
['1', '110', '4.0']
['1', '151', '5.0']
['1', '157', '5.0']


In [5]:
print(postsRDD.take(1))

[{'id': 2, 'posttypeid': 1, 'title': 'How can a group track database schema changes?', 'acceptedanswerid': 4, 'parentid': None, 'creationdate': '2011-01-03', 'score': 68, 'viewcount': 11533, 'owneruserid': 7, 'lasteditoruserid': 97, 'tags': '<mysql><version-control><schema>'}]


In [7]:
t1 = postsRDD.filter(lambda t: t['owneruserid'] is not None and t['viewcount'] is not None and t['viewcount'] >= 10000).map(lambda t: (t['id'], t['title'], t['owneruserid'], t['viewcount']))
print(t1.takeOrdered(3))

[(2, 'How can a group track database schema changes?', 7, 11533), (5, 'What are the differences between NoSQL and a traditional RDBMS?', 24, 42961), (20, 'How can I optimize a mysqldump of a large database?', 43, 266614)]


In [None]:
t2 = moviesRDD.flatMap(lambda t: [(t[0], x) for x in t[2].split('|')])
print(t2.take(10))

In [None]:
import re
t3 = moviesRDD.filter(lambda t: re.search(r'\((\d{4})\)', t[1]) is not None).map(lambda t: (re.search(r'\((\d{4})\)', t[1]).group(1), t[2])).flatMap(lambda t: [((t[0], x), 1) for x in t[1].split('|')]).reduceByKey(lambda a, b: a + b)
print(t3.take(10))

In [None]:
temp_t4 = moviesRDD.map(lambda t: (t[0], t[2]))
t4 = ratingsRDD.map(lambda t: (t[0], t[1])).join(temp_t4) \
          .map(lambda t: (t[1][0], t[1][1].split('|'))) \
          .reduceByKey(lambda a, b: sorted(set(a + b))[:2]) 
print(t4.take(10))

In [None]:
def f(t):
        return [((x, y), 1) for x in re.sub(r'[,\(\):]', '', t[1]).split() for y in t[2].split('|')]

t5 = moviesRDD.flatMap(f).reduceByKey(lambda x, y: x+y)
print(t5.take(10))

In [None]:
t6 = postsRDD.filter(lambda t: t['tags'] is not None and t['owneruserid'] is not None).\
             flatMap(lambda t: [((t['owneruserid'], x), 1) for x in t['tags'][1:-1].split('><')]). \
             reduceByKey(lambda a, b: a+b)
print(t6.filter(lambda t: t[0][0] == 7).take(10))

In [None]:
t7 = ratingsRDD.map(lambda t: (t[0], float(t[2]))).aggregateByKey((0, 0), \
                        lambda x, y: (x[0] + y, x[1] + 1), 
                        lambda x, y: (x[0] + y[0], x[1] + y[1])).map(lambda x: (x[0], x[1][0]/x[1][1]))
print(t7.take(10))

In [None]:
def computemode(arr):
        arr = list(arr)
        maxcount = max([arr.count(x) for x in set(arr)])
        for x in ['5.0', '4.0', '3.0', '2.0', '1.0']:
            if arr.count(x) == maxcount:
                return x
t8 = ratingsRDD.map(lambda t: (t[1], t[2])).groupByKey().mapValues(computemode)
print(t8.take(10))

In [None]:
pattern = r'\[(?:\d{2})/.../\d{4}:(\d{2}):\d{2}:\d{2} -\d{4}\]'
t9 = logsRDD.map(lambda l: (int(re.search(pattern, l).group(1)), 1)).reduceByKey(lambda x, y: x+y)
print(t9.take(10))

In [None]:
def task10_flatmap(line):
    stop_words = {'is', 'the', 'in', 's'}
    expansions = {"is't": "is it", "'twere": "it were", "'tis": "it is"}

    line = line.lower()
    
    for word, expansion in expansions.items():
        line = line.replace(word, expansion)
    
    line = re.sub(r'\W+', ' ', line)
    
    return [word for word in line.split() if word not in stop_words]

t10 = playRDD.flatMap(task10_flatmap)
print(t10.take(15))

In [None]:
t11 = postsRDD.filter(lambda t: t['owneruserid'] is not None) \
              .map(lambda t: (t['owneruserid'], (1, t['score']))) \
              .reduceByKey(lambda a, b: (a[0] + b[0], a[1]+b[1])) \
              .filter(lambda x: x[1][0] >= 50 and x[1][1] < x[1][0] * 10) \
              .map(lambda t: (t[0], t[1][0], t[1][1]/t[1][0]))

print(t11.take(3))

In [None]:
def jaccard(set_a, set_b):
    # Calculate the intersection of sets
    intersection = len(set_a.intersection(set_b))
    # Calculate the union of sets
    union = len(set_a.union(set_b))
    # Compute the Jaccard coefficient
    jaccard = intersection / union if union != 0 else 0
    return jaccard

t15_1 = ratingsRDD.filter(lambda t: int(t[0]) <= 100).map(lambda t: (t[0], t[1])).groupByKey().map(lambda l: (l[0], list(l[1])))
t15_2 = t15_1.cartesian(t15_1).filter(lambda t: t[0][0] != t[1][0]).map(lambda t: (t[0][0], (t[1][0], jaccard(set(t[0][1]), set(t[1][1])))))
t15_3 = t15_2.reduceByKey(lambda a, b: a if a[1] > b[1] else b)

print(t15_3.take(100))