In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [3]:
# Convert each line into a pair of words
country_pairs = country_lines.map(lambda line: line.split(","))

In [4]:
country_lines.take(5)

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND']

In [5]:
country_pairs.take(5)

[['Afghanistan', ' AFG'],
 ['Albania', ' ALB'],
 ['Algeria', ' ALG'],
 ['American Samoa', ' ASA'],
 ['Andorra', ' AND']]

In [6]:
# Convert each pair of words into a tuple
country_tuples = country_pairs.map(lambda array: (array[0], array[1]))

In [7]:
country_tuples.take(5)

[('Afghanistan', ' AFG'),
 ('Albania', ' ALB'),
 ('Algeria', ' ALG'),
 ('American Samoa', ' ASA'),
 ('Andorra', ' AND')]

In [8]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

In [9]:
# Read tweets CSV file into RDD of lines
tweets_rdd = sc.textFile('file:///home/cloudera/Downloads/big-data-3/mongodb/tweet_textFieldData.csv')

In [10]:
tweets_rdd.count()

13995

In [11]:
tweets_rdd.take(3)

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL',
 'RT @NiallOfficial: @Louis_Tomlinson @socceraid when I retired from playing because of my knee . I went and did my uefa A badges in Dublin']

In [12]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
filtered_tweets = tweets_rdd.filter(lambda x: len(x) > 0 )

In [13]:
filtered_tweets.count()

13391

In [14]:
filtered_tweets.take(20)

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL',
 'RT @NiallOfficial: @Louis_Tomlinson @socceraid when I retired from playing because of my knee . I went and did my uefa A badges in Dublin',
 'RT @GameSeek: Follow & Retweet for your chance to win a copy of FIFA 17 Deluxe Edition (platform of your choice) in our #giveaway! https://…',
 '@CIVARAGI ...I was putting ffs but it autocorrected it too FIFA',
 "RT @GeniusFootball: You know it's FIFA... https://t.co/tLK6sTnPaM",
 '"RT @WeahsCousin: ""Pogba isn\'t worth £100million.""',
 'Thanks Dean, the £500 you\'ve spent on FIFA Ultimate Team probably wasn\'t worth it either."',
 '"RT @WeahsCousin: ""Pogba isn\'t worth £100million.""',
 'Thanks Dean, the £500 you\'ve spent on FIFA Ultimate Team probably wasn\'t worth it either."',
 'New on eBay! XBOX 360 Game FIFA 16 2016 https://t.co/xicyLOE6aQ https://t.co/uw9OLrie4e',
 'damn basti just ruined karl on fifa again',
 'RT @as_shahid12:

In [15]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words = filtered_tweets.flatMap(lambda line: line.split(" "))

In [16]:
words.take(30)

['tweet_text',
 'RT',
 '@ochocinco:',
 'I',
 'beat',
 'them',
 'all',
 'for',
 '10',
 'straight',
 'hours',
 '#FIFA16KING',
 '',
 'https://t.co/BFnV6jfkBL',
 'RT',
 '@NiallOfficial:',
 '@Louis_Tomlinson',
 '@socceraid',
 'when',
 'I',
 'retired',
 'from',
 'playing',
 'because',
 'of',
 'my',
 'knee',
 '.',
 'I',
 'went']

In [17]:
word_tuples = words.map(lambda word: (word,1))

In [18]:
word_tuples.take(20)

[('tweet_text', 1),
 ('RT', 1),
 ('@ochocinco:', 1),
 ('I', 1),
 ('beat', 1),
 ('them', 1),
 ('all', 1),
 ('for', 1),
 ('10', 1),
 ('straight', 1),
 ('hours', 1),
 ('#FIFA16KING', 1),
 ('', 1),
 ('https://t.co/BFnV6jfkBL', 1),
 ('RT', 1),
 ('@NiallOfficial:', 1),
 ('@Louis_Tomlinson', 1),
 ('@socceraid', 1),
 ('when', 1),
 ('I', 1)]

In [19]:
word_counts = word_tuples.reduceByKey(lambda a, b: (a + b))

In [20]:
word_counts.take(20)

[('', 3292),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1),
 ('Just', 44),
 ('BONUSES,', 1),
 ('like?"', 1),
 ('recieve', 1),
 ('Bellow', 1),
 ('now"', 1),
 ('https://t.co/W4QluWGyeq', 1),
 ('https://t.co/Jii7MfUgDj', 1),
 ('review', 1),
 ('Can', 37),
 ('https://t.co/UBBj3iS0cI', 1),
 ('@MattHDGamer', 1),
 ('https://t.co/k1ojzXbzG0', 1),
 ('ago"', 1),
 ('GERMANY', 1),
 ('Revolutionary', 1),
 ('https://t.co/1zmlxb0NuL', 1)]

In [21]:
# Create the DataFrame of tweet word counts look at schema and contents
WordCountsDF = sqlContext.createDataFrame(word_counts, ["country", "counts"])
WordCountsDF.printSchema()
WordCountsDF.take(10)

root
 |-- country: string (nullable = true)
 |-- counts: long (nullable = true)



[Row(country='', counts=3292),
 Row(country='mobile', counts=1),
 Row(country='#FridayNightTouchdown', counts=1),
 Row(country='Just', counts=44),
 Row(country='BONUSES,', counts=1),
 Row(country='like?"', counts=1),
 Row(country='recieve', counts=1),
 Row(country='Bellow', counts=1),
 Row(country='now"', counts=1),
 Row(country='https://t.co/W4QluWGyeq', counts=1)]

In [22]:
# Join the country and tweet data frames (on the appropriate column)
mergedData = WordCountsDF.join(countryDF, 'country')

In [23]:
mergedData.show(10)

+--------+------+----+
| country|counts|code|
+--------+------+----+
|Thailand|     1| THA|
| Iceland|     2| ISL|
|  Mexico|     1| MEX|
|   Wales|    19| WAL|
| Denmark|     1| DEN|
|   India|     4| IND|
|Portugal|     8| POR|
|  Poland|     1| POL|
|  Norway|    52| NOR|
|  Guinea|     8| GUI|
+--------+------+----+
only showing top 10 rows



In [24]:
# Question 1: number of distinct countries mentioned
mergedData.distinct().count()

44

In [25]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
mergedData.select(sum('counts')).show()

+-----------+
|sum(counts)|
+-----------+
|        397|
+-----------+



In [26]:
# Question 3: top three countries and their counts.
from pyspark.sql.functions import desc
mergedData.sort(desc('counts')).show(3)

+-------+------+----+
|country|counts|code|
+-------+------+----+
| Norway|    52| NOR|
|Nigeria|    49| NGA|
| France|    42| FRA|
+-------+------+----+
only showing top 3 rows



In [27]:
# Table 1: counts for Wales, Iceland, and Japan.
mergedData.filter((mergedData["country"] == "Wales") 
                  | (mergedData["country"] == "Iceland")
                  | (mergedData["country"] == "Japan")).show()

+-------+------+----+
|country|counts|code|
+-------+------+----+
|Iceland|     2| ISL|
|  Wales|    19| WAL|
|  Japan|     5| JPN|
+-------+------+----+



In [28]:
# Question 4:how many times was France mentioned?
mergedData.filter(mergedData["country"] == "France").show()

+-------+------+----+
|country|counts|code|
+-------+------+----+
| France|    42| FRA|
+-------+------+----+



In [29]:
#Question 5: Which country has the most mentions: Kenya, Wales, or Netherlands?
mergedData.filter((mergedData["country"] == "Wales") 
                  | (mergedData["country"] == "Kenya")
                  | (mergedData["country"] == "Netherlands")).show()

+-----------+------+----+
|    country|counts|code|
+-----------+------+----+
|      Wales|    19| WAL|
|Netherlands|    13| NED|
|      Kenya|     3| KEN|
+-----------+------+----+



In [30]:
#Question 6: Finally, what is the average number of times a country is mentioned?
from pyspark.sql.functions import mean
mergedData.select(mean('counts')).show()

+-----------------+
|      avg(counts)|
+-----------------+
|9.022727272727273|
+-----------------+

