In [None]:
FILENAME = 's3://full-stack-bigdata-datasets/Big_Data/tears_in_rain_not_public.txt'

# Unfortunately, I can't share the access with you.
ACCESS_KEY_ID = "" 
SECRET_ACCESS_KEY = "" 

hadoop_conf = spark._jsc.hadoopConfiguration() # this will set the Spark framework to interact with your S3 DFS
hadoop_conf.set("fs.s3a.access.key", ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")

In [None]:
# Load file into to a PySpark RDD
text_file = sc.textFile(FILENAME)
print(text_file)
print(type(text_file))

s3://full-stack-bigdata-datasets/Big_Data/tears_in_rain_not_public.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0
<class 'pyspark.rdd.RDD'>


In [None]:
# Print out the first 3 elements
text_file.take(3)

Out[3]: ["I've seen things you people wouldn't believe. ",
 'Attack ships on fire off the shoulder of Orion. ',
 'I watched C-beams glitter in the dark near the Tannhäuser Gate. ']

In [None]:
# Collect all elements of text_file
text_file.collect()

Out[6]: ["I've seen things you people wouldn't believe. ",
 'Attack ships on fire off the shoulder of Orion. ',
 'I watched C-beams glitter in the dark near the Tannhäuser Gate. ',
 'All those moments will be lost in time, like tears in rain. ',
 'Time to die.']

In [None]:
# Count the number of lines
text_file.count()

Out[7]: 5

In [None]:
# compute the length of each sentence
lineLengths = text_file.map(lambda s: len(s))
lineLengths.take(3)

Out[4]: [46, 48, 64]

In [None]:
# compute the average length
avgLength = lineLengths.mean()
avgLength
#type(avgLength) => Out: float

Out[14]: 46.0

In [None]:
# Compute the sum of all lines
totalLength = lineLengths.sum()
print(totalLength)
#type(totalLength) => Out: Int

230


In [None]:
# split the charater strings into lists of words
tokenized_text = text_file.map(lambda line: line.split(' '))
tokenized_text.take(3)

Out[5]: [["I've", 'seen', 'things', 'you', 'people', "wouldn't", 'believe.', ''],
 ['Attack',
  'ships',
  'on',
  'fire',
  'off',
  'the',
  'shoulder',
  'of',
  'Orion.',
  ''],
 ['I',
  'watched',
  'C-beams',
  'glitter',
  'in',
  'the',
  'dark',
  'near',
  'the',
  'Tannhäuser',
  'Gate.',
  '']]

In [None]:
###
### That's not exactly what we wanted. We wanted a list of tokens, we got a list of list of tokens.  
### That's because, in this case, we need a special version of `.map()` called `flatMap`: it will flatten the list of list of tokens into a list of tokens.

tokens = text_file.flatMap(lambda line: line.split(' '))
tokens.collect()

Out[6]: ["I've",
 'seen',
 'things',
 'you',
 'people',
 "wouldn't",
 'believe.',
 '',
 'Attack',
 'ships',
 'on',
 'fire',
 'off',
 'the',
 'shoulder',
 'of',
 'Orion.',
 '',
 'I',
 'watched',
 'C-beams',
 'glitter',
 'in',
 'the',
 'dark',
 'near',
 'the',
 'Tannhäuser',
 'Gate.',
 '',
 'All',
 'those',
 'moments',
 'will',
 'be',
 'lost',
 'in',
 'time,',
 'like',
 'tears',
 'in',
 'rain.',
 '',
 'Time',
 'to',
 'die.']

In [None]:
# we will associate each word with an initial number, instead of having this.
def token_to_tuple(token):
    return (token, 1)

partial_count = tokens.map(token_to_tuple)
partial_count.take(10)

Out[7]: [("I've", 1),
 ('seen', 1),
 ('things', 1),
 ('you', 1),
 ('people', 1),
 ("wouldn't", 1),
 ('believe.', 1),
 ('', 1),
 ('Attack', 1),
 ('ships', 1)]

In [None]:
################# groupByKey #################
grouped_by_key = partial_count.groupByKey()
grouped_by_key.take(3)



Out[8]: [("I've", <pyspark.resultiterable.ResultIterable at 0x7f28dc6a5ee0>),
 ('seen', <pyspark.resultiterable.ResultIterable at 0x7f28dc6a50d0>),
 ('things', <pyspark.resultiterable.ResultIterable at 0x7f28dc6a5a00>)]

In [None]:
first_item = grouped_by_key.take(1)[0]
first_item
#list(first_item[1]) => Out: [1]


Out[9]: ("I've", <pyspark.resultiterable.ResultIterable at 0x7f28d3bd3dc0>)

In [None]:
def print_item(item_as_tuple):
    token_name, occurences = item_as_tuple
    occurences_as_list = list(occurences)
    print(f"{token_name}: {occurences_as_list}")
    
for item in grouped_by_key.collect():
    print_item(item)

I've: [1]
seen: [1]
things: [1]
believe.: [1]
: [1, 1, 1, 1]
ships: [1]
fire: [1]
shoulder: [1]
of: [1]
Orion.: [1]
C-beams: [1]
in: [1, 1, 1]
dark: [1]
near: [1]
lost: [1]
like: [1]
tears: [1]
rain.: [1]
you: [1]
people: [1]
wouldn't: [1]
Attack: [1]
on: [1]
off: [1]
the: [1, 1, 1]
I: [1]
watched: [1]
glitter: [1]
Tannhäuser: [1]
Gate.: [1]
All: [1]
those: [1]
moments: [1]
will: [1]
be: [1]
time,: [1]
Time: [1]
to: [1]
die.: [1]


In [None]:
# What we want is a list of `Tuple[str, int]` where the second element is the total number of occurrences of the first element.
[(token, sum(list(occurences))) for token, occurences in grouped_by_key.collect()]

Out[11]: [("I've", 1),
 ('seen', 1),
 ('things', 1),
 ('believe.', 1),
 ('', 4),
 ('ships', 1),
 ('fire', 1),
 ('shoulder', 1),
 ('of', 1),
 ('Orion.', 1),
 ('C-beams', 1),
 ('in', 3),
 ('dark', 1),
 ('near', 1),
 ('lost', 1),
 ('like', 1),
 ('tears', 1),
 ('rain.', 1),
 ('you', 1),
 ('people', 1),
 ("wouldn't", 1),
 ('Attack', 1),
 ('on', 1),
 ('off', 1),
 ('the', 3),
 ('I', 1),
 ('watched', 1),
 ('glitter', 1),
 ('Tannhäuser', 1),
 ('Gate.', 1),
 ('All', 1),
 ('those', 1),
 ('moments', 1),
 ('will', 1),
 ('be', 1),
 ('time,', 1),
 ('Time', 1),
 ('to', 1),
 ('die.', 1)]

In [None]:
################# Reduce #################
# Write a reduce function: reduce_function which takes 2 values and return their sum
# EX 16
def reduce_function(a, b):
    return a + b

reduced = partial_count.reduceByKey(reduce_function)
reduced.collect()

Out[14]: [("I've", 1),
 ('seen', 1),
 ('things', 1),
 ('believe.', 1),
 ('', 4),
 ('ships', 1),
 ('fire', 1),
 ('shoulder', 1),
 ('of', 1),
 ('Orion.', 1),
 ('C-beams', 1),
 ('in', 3),
 ('dark', 1),
 ('near', 1),
 ('lost', 1),
 ('like', 1),
 ('tears', 1),
 ('rain.', 1),
 ('you', 1),
 ('people', 1),
 ("wouldn't", 1),
 ('Attack', 1),
 ('on', 1),
 ('off', 1),
 ('the', 3),
 ('I', 1),
 ('watched', 1),
 ('glitter', 1),
 ('Tannhäuser', 1),
 ('Gate.', 1),
 ('All', 1),
 ('those', 1),
 ('moments', 1),
 ('will', 1),
 ('be', 1),
 ('time,', 1),
 ('Time', 1),
 ('to', 1),
 ('die.', 1)]

In [None]:
# Use sorted()
# example
fruits = [('banana', 3), ('orange', 5), ('pineapple', 2)]
sorted(fruits)
# `sorted(fruits)` won't work because by default sorting on tuple take the first element, in our case, it would sort alphabetically on the name of the fruits.

Out[16]: [('banana', 3), ('orange', 5), ('pineapple', 2)]

In [None]:
# Use sorted()
# We can force the `key` parameter to sort on the second item of each tuple.
# Ascending:
sorted(fruits, key=lambda x: x[1])

#Descending:
#sorted(fruits, key=lambda x: x[1], reverse=True) 

Out[17]: [('pineapple', 2), ('banana', 3), ('orange', 5)]

In [None]:
# use .sortBy()
# Ascending 
sorted_counts = reduced.sortBy(lambda t: t[1]) 
sorted_counts.collect()

# Descending
#desc_sorted_counts = reduced.sortBy(lambda t: -t[1])

Out[22]: [("I've", 1),
 ('seen', 1),
 ('things', 1),
 ('believe.', 1),
 ('ships', 1),
 ('fire', 1),
 ('shoulder', 1),
 ('of', 1),
 ('Orion.', 1),
 ('C-beams', 1),
 ('dark', 1),
 ('near', 1),
 ('lost', 1),
 ('like', 1),
 ('tears', 1),
 ('rain.', 1),
 ('you', 1),
 ('people', 1),
 ("wouldn't", 1),
 ('Attack', 1),
 ('on', 1),
 ('off', 1),
 ('I', 1),
 ('watched', 1),
 ('glitter', 1),
 ('Tannhäuser', 1),
 ('Gate.', 1),
 ('All', 1),
 ('those', 1),
 ('moments', 1),
 ('will', 1),
 ('be', 1),
 ('time,', 1),
 ('Time', 1),
 ('to', 1),
 ('die.', 1),
 ('in', 3),
 ('the', 3),
 ('', 4)]

In [None]:
desc_sorted_counts = reduced.sortBy(lambda t: -t[1])
desc_sorted_counts.collect()


Out[23]: [('', 4),
 ('in', 3),
 ('the', 3),
 ("I've", 1),
 ('seen', 1),
 ('things', 1),
 ('believe.', 1),
 ('ships', 1),
 ('fire', 1),
 ('shoulder', 1),
 ('of', 1),
 ('Orion.', 1),
 ('C-beams', 1),
 ('dark', 1),
 ('near', 1),
 ('lost', 1),
 ('like', 1),
 ('tears', 1),
 ('rain.', 1),
 ('you', 1),
 ('people', 1),
 ("wouldn't", 1),
 ('Attack', 1),
 ('on', 1),
 ('off', 1),
 ('I', 1),
 ('watched', 1),
 ('glitter', 1),
 ('Tannhäuser', 1),
 ('Gate.', 1),
 ('All', 1),
 ('those', 1),
 ('moments', 1),
 ('will', 1),
 ('be', 1),
 ('time,', 1),
 ('Time', 1),
 ('to', 1),
 ('die.', 1)]

In [None]:
# Putting it all together
def count_words(filepath):
    return sc.textFile(filepath)\
    .flatMap(lambda line: line.split(' '))\
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda t: -t[1])

rdd = count_words(FILENAME)
rdd.collect()

Out[24]: [('', 4),
 ('in', 3),
 ('the', 3),
 ("I've", 1),
 ('seen', 1),
 ('things', 1),
 ('believe.', 1),
 ('ships', 1),
 ('fire', 1),
 ('shoulder', 1),
 ('of', 1),
 ('Orion.', 1),
 ('C-beams', 1),
 ('dark', 1),
 ('near', 1),
 ('lost', 1),
 ('like', 1),
 ('tears', 1),
 ('rain.', 1),
 ('you', 1),
 ('people', 1),
 ("wouldn't", 1),
 ('Attack', 1),
 ('on', 1),
 ('off', 1),
 ('I', 1),
 ('watched', 1),
 ('glitter', 1),
 ('Tannhäuser', 1),
 ('Gate.', 1),
 ('All', 1),
 ('those', 1),
 ('moments', 1),
 ('will', 1),
 ('be', 1),
 ('time,', 1),
 ('Time', 1),
 ('to', 1),
 ('die.', 1)]