In [1]:
from pyspark import SparkConf, SparkContext

# I run spark on Spark-Yarn mode, therefore setMaster('yarn') 
conf = SparkConf().setMaster("yarn").setAppName("Lab 2")

# It is not permitted to create new SparkContext while the previous one is running
# Therefore, get the previous one if exists
sc = SparkContext.getOrCreate(conf=conf)

TypeError: an integer is required (got type bytes)

In [3]:
# display my SparkContext information
sc

In [None]:
# this function is implemented to remove special character. Only letter is kept
# "Hey!!!" --> "Hey" 

def deleteSpecialCharacter(string):
    result = ''
    for character in string:
        if (character <= 'z' and character >= 'a') or (character >= 'A' and character <= 'Z'):
            result += character
    return result

In [None]:
# this function is implemented to transform text to list of words
# "Hello World!! Love you." --> return ['hello', 'world', 'love', 'you']

def toWordList(line_of_words):
    line_of_words = line_of_words.lower()
    word_list = line_of_words.split(" ")
    for i in range(0, len(word_list)):
        word_list[i] = deleteSpecialCharacter(word_list[i])
    while '' in word_list:
        word_list.remove('')
    return word_list

In [7]:
# get data.txt from hadoop

data_from_hadoop_system = sc.textFile("/user/vagrant/data.txt")

In [113]:
# display what's inside data.txt

data_from_hadoop_system.collect()

['Donald John Trump (born June 14, 1946) is the 45th and current president of the United States. Before entering politics, he was a businessman and television personality.',
 "Trump was born and raised in Queens, a borough of New York City, and received a bachelor's degree in economics from the Wharton School. He took charge of his family's real-estate business in 1971, renamed it The Trump Organization, and expanded its operations from Queens and Brooklyn into Manhattan. The company built or renovated skyscrapers, hotels, casinos, and golf courses. Trump later started various side ventures, mostly by licensing his name. He owned the Miss Universe and Miss USA beauty pageants from 1996 to 2015 and produced and hosted The Apprentice, a reality television show, from 2003 to 2015. Forbes estimates his net worth to be $3.1 billion.[a]",
 'Trump entered the 2016 presidential race as a Republican and defeated 16 other candidates in the primaries. His political positions have been described a

In [127]:
# instead of using map transformation, I use flatMap (which return multiples value for an input)
# for example: input ["hello world", "duc loc"], lambda string: string.split(" ")

# if we use map transformation, the result will be something like: 
#                   [ ('hello', 'world'), ('duc', 'loc') ] --> only 2 results

# on the other hand, flatMap transformation will return something like: 
#                   ['hello', 'world', 'duc', 'loc'] --> we have 4 results

# finally, from a string of dataset, we have a list of word separatedly


words = data_from_hadoop_system.flatMap(lambda line: toWordList(line))

In [128]:
# Spark commands are using lazy evaluation. That means, they haven't really convert each line of
# dataset to a list of words yet. 
# this collect function will force Spark to take action on data by gather all result from clusters


words.collect()

['donald',
 'john',
 'trump',
 'born',
 'june',
 'is',
 'the',
 'th',
 'and',
 'current',
 'president',
 'of',
 'the',
 'united',
 'states',
 'before',
 'entering',
 'politics',
 'he',
 'was',
 'a',
 'businessman',
 'and',
 'television',
 'personality',
 'trump',
 'was',
 'born',
 'and',
 'raised',
 'in',
 'queens',
 'a',
 'borough',
 'of',
 'new',
 'york',
 'city',
 'and',
 'received',
 'a',
 'bachelors',
 'degree',
 'in',
 'economics',
 'from',
 'the',
 'wharton',
 'school',
 'he',
 'took',
 'charge',
 'of',
 'his',
 'familys',
 'realestate',
 'business',
 'in',
 'renamed',
 'it',
 'the',
 'trump',
 'organization',
 'and',
 'expanded',
 'its',
 'operations',
 'from',
 'queens',
 'and',
 'brooklyn',
 'into',
 'manhattan',
 'the',
 'company',
 'built',
 'or',
 'renovated',
 'skyscrapers',
 'hotels',
 'casinos',
 'and',
 'golf',
 'courses',
 'trump',
 'later',
 'started',
 'various',
 'side',
 'ventures',
 'mostly',
 'by',
 'licensing',
 'his',
 'name',
 'he',
 'owned',
 'the',
 'miss',

In [116]:
# first of all, we initialize each word in list with frequency 1 by using mapping operation
# ['hello', 'world'] --> [('hello', 1), ('world', 1)]


rdd = words.map(lambda word: (word, 1))
rdd.collect()

[('donald', 1),
 ('john', 1),
 ('trump', 1),
 ('born', 1),
 ('june', 1),
 ('is', 1),
 ('the', 1),
 ('th', 1),
 ('and', 1),
 ('current', 1),
 ('president', 1),
 ('of', 1),
 ('the', 1),
 ('united', 1),
 ('states', 1),
 ('before', 1),
 ('entering', 1),
 ('politics', 1),
 ('he', 1),
 ('was', 1),
 ('a', 1),
 ('businessman', 1),
 ('and', 1),
 ('television', 1),
 ('personality', 1),
 ('trump', 1),
 ('was', 1),
 ('born', 1),
 ('and', 1),
 ('raised', 1),
 ('in', 1),
 ('queens', 1),
 ('a', 1),
 ('borough', 1),
 ('of', 1),
 ('new', 1),
 ('york', 1),
 ('city', 1),
 ('and', 1),
 ('received', 1),
 ('a', 1),
 ('bachelors', 1),
 ('degree', 1),
 ('in', 1),
 ('economics', 1),
 ('from', 1),
 ('the', 1),
 ('wharton', 1),
 ('school', 1),
 ('he', 1),
 ('took', 1),
 ('charge', 1),
 ('of', 1),
 ('his', 1),
 ('familys', 1),
 ('realestate', 1),
 ('business', 1),
 ('in', 1),
 ('renamed', 1),
 ('it', 1),
 ('the', 1),
 ('trump', 1),
 ('organization', 1),
 ('and', 1),
 ('expanded', 1),
 ('its', 1),
 ('operations', 

In [117]:
# this lazy transformation is implemented to aggregate each input by key
# at the moment, input is in form [('hello', 1), ('world', 1), ('hello', 2)]
# 'hello' and 'world', the entry of each tuple, is consider to be the key
# after aggregate operation (sum in our exercise), the result should be: [('hello', 1+2), ('world', 1)]

rdd = rdd.reduceByKey(lambda x, y: x+y)
rdd.collect()

[('john', 1),
 ('june', 1),
 ('is', 2),
 ('current', 1),
 ('of', 11),
 ('united', 1),
 ('before', 1),
 ('entering', 1),
 ('politics', 2),
 ('he', 11),
 ('was', 5),
 ('television', 2),
 ('personality', 1),
 ('raised', 1),
 ('in', 9),
 ('queens', 2),
 ('new', 1),
 ('york', 1),
 ('bachelors', 1),
 ('economics', 1),
 ('wharton', 1),
 ('school', 1),
 ('took', 1),
 ('charge', 1),
 ('his', 11),
 ('realestate', 1),
 ('business', 1),
 ('renamed', 1),
 ('operations', 1),
 ('brooklyn', 1),
 ('into', 1),
 ('renovated', 1),
 ('started', 1),
 ('ventures', 1),
 ('name', 1),
 ('universe', 1),
 ('usa', 1),
 ('beauty', 1),
 ('produced', 1),
 ('apprentice', 1),
 ('forbes', 1),
 ('estimates', 1),
 ('worth', 1),
 ('billiona', 1),
 ('entered', 1),
 ('presidential', 3),
 ('race', 1),
 ('as', 5),
 ('defeated', 1),
 ('other', 1),
 ('candidates', 1),
 ('political', 1),
 ('positions', 1),
 ('have', 5),
 ('populist', 1),
 ('protectionist', 1),
 ('despite', 1),
 ('democratic', 1),
 ('hillary', 1),
 ('clinton', 1),

In [123]:
# spark supply an sortByKey function
# our previous result form is (word, frequency)
# to sort this list base on frequency, we transfer the previous one to this form: (frequency, word)
# now, this is our final result - a list of word count

result = rdd.map(lambda x: (x[1], x[0])).sortByKey(ascending=False)

In [124]:
result.collect()

[(33, 'the'),
 (27, 'and'),
 (13, 'trump'),
 (11, 'of'),
 (11, 'he'),
 (11, 'his'),
 (10, 'a'),
 (9, 'in'),
 (7, 'to'),
 (7, 'from'),
 (7, 'la'),
 (6, 'on'),
 (5, 'by'),
 (5, 'or'),
 (5, 'roi'),
 (5, 'was'),
 (5, 'as'),
 (5, 'have'),
 (4, 'that'),
 (4, 'with'),
 (4, 'us'),
 (4, 'vang'),
 (3, 'election'),
 (3, 'be'),
 (3, 'for'),
 (3, 'mau'),
 (3, 'been'),
 (3, 'presidential'),
 (3, 'foreign'),
 (3, 'toi'),
 (3, 'dau'),
 (2, 'mueller'),
 (2, 'found'),
 (2, 'campaign'),
 (2, 'interference'),
 (2, 'it'),
 (2, 'not'),
 (2, 'also'),
 (2, 'obstruction'),
 (2, 'impeachment'),
 (2, 'impeached'),
 (2, 'president'),
 (2, 'lan'),
 (2, 'quanh'),
 (2, 'dung'),
 (2, 'born'),
 (2, 'its'),
 (2, 'miss'),
 (2, 'described'),
 (2, 'first'),
 (2, 'has'),
 (2, 'many'),
 (2, 'statements'),
 (2, 'presidency'),
 (2, 'supreme'),
 (2, 'court'),
 (2, 'is'),
 (2, 'politics'),
 (2, 'television'),
 (2, 'queens'),
 (2, 'during'),
 (2, 'third'),
 (2, 'trade'),
 (2, 'negotiations'),
 (2, 'charges'),
 (2, 'house'),
 (2,