In [1]:
# load textfile to an rdd
textfile = sc.textFile('/FileStore/tables/steve.txt')

# split text into a list of words
words = textfile.flatMap(lambda line: line.split(' '))

# map every word to a key with value 1
pairs = words.map(lambda s: (s, 1))

# reduce by key to count word occurrence
counts = pairs.reduceByKey(lambda a, b: a + b)

# print results
for(word, count) in counts.collect():
  print('%s : %i' % (word, count))

In [2]:
# transform the (key, value) RDD into a (value, key) RDD
inverse_counts = counts.map(lambda pair: (pair[1], pair[0]))

# sort the RDD in descending order
inverse_counts = inverse_counts.sortByKey(0)

# print the top 5 words with the most number of occurrences
for (count, word) in inverse_counts.take(5):
  print('%s : %i' % (word, count))

In [3]:
# filter out words with less than 5 characters
inverse_counts = inverse_counts.filter(lambda pair: len(pair[1]) >= 5)

# print the top 5 words with at least 5 characters with the most number of occurrences
for (count, word) in inverse_counts.take(5):
  print('%s : %i' % (word, count))

In [4]:
# load the edgelist to an rdd
edgelist = sc.textFile('/FileStore/tables/edgelist.txt')

# remove sending nodes from edgelist, obtaining a list with the receiving nodes only
receiving_nodes = edgelist.map(lambda line: line.split(' ', 1)[1])

# split receiving nodes into separate items
receiving_nodes = receiving_nodes.flatMap(lambda line: line.split(' '))

# map all receiving nodes to a (node, 1) pair
receiving_node_pairs = receiving_nodes.map(lambda node: (node, 1))

# count the occurence of every receiving node
in_degrees = receiving_node_pairs.reduceByKey(lambda a, b : a + b)

# load the ids list to an rdd
idslabels = sc.textFile('/FileStore/tables/idslabels.txt')

# map ids to (id, name) pair
idslabels = idslabels.map(lambda line: (line.split(' ', 1)[0], line.split(' ', 1)[1]))

# join name to in_degree RDD
in_degrees = in_degrees.join(idslabels)

# transform the (node, in_degree) RDD into a (in_degree, node) RDD
in_degrees = in_degrees.map(lambda pair: (pair[1], pair[0]))

# sort the in_degrees RDD in descending order
in_degrees = in_degrees.sortByKey(0)

# go back to (node, in_degree) RDD
in_degrees = in_degrees.map(lambda pair: (pair[1], pair[0]))

for (id, (in_degree, name)) in in_degrees.take(10):
  print(name, ':', in_degree)