In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [6]:
# Convert each line into a pair of words
#words_pair = country_lines.flatMap(lambda line: line.split(", "))

In [3]:
# Convert each pair of words into a tuple
#country_tuples = words_pair.map(lambda pair: tuple(pair))
country_tuples = country_lines.map(lambda line: tuple(line.split(", ")))

In [4]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [6]:
# Read tweets CSV file into RDD of 
tweet_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/mongodb/tweet_text.csv')

In [14]:
type(tweet_lines)

pyspark.rdd.RDD

In [15]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
def not_null(line):
    return line != ''
tweets = tweet_lines.filter(not_null)#.collect()

In [16]:
type(tweets)

pyspark.rdd.PipelinedRDD

In [13]:
#tweets[:10]

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL',
 'RT @NiallOfficial: @Louis_Tomlinson @socceraid when I retired from playing because of my knee . I went and did my uefa A badges in Dublin',
 'RT @GameSeek: Follow & Retweet for your chance to win a copy of FIFA 17 Deluxe Edition (platform of your choice) in our #giveaway! https://…',
 '@CIVARAGI ...I was putting ffs but it autocorrected it too FIFA',
 "RT @GeniusFootball: You know it's FIFA... https://t.co/tLK6sTnPaM",
 '"RT @WeahsCousin: ""Pogba isn\'t worth £100million.""',
 'Thanks Dean, the £500 you\'ve spent on FIFA Ultimate Team probably wasn\'t worth it either."',
 '"RT @WeahsCousin: ""Pogba isn\'t worth £100million.""',
 'Thanks Dean, the £500 you\'ve spent on FIFA Ultimate Team probably wasn\'t worth it either."']

In [17]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words = tweets.flatMap(lambda line : line.split(" "))
tuples = words.map(lambda word : (word, 1))
counts = tuples.reduceByKey(lambda a, b: (a + b))

In [100]:
# Create the DataFrame of tweet word counts
wordcount_DF = sqlContext.createDataFrame(counts, ["word", "count"])
wordcount_DF.printSchema()
wordcount_DF.take(3)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(word='', count=3292),
 Row(word='https://t.co/fQftAwGAad', count=1),
 Row(word='mobile', count=1)]

In [101]:
wordcount_DF.show(5)

+--------------------+-----+
|                word|count|
+--------------------+-----+
|                    | 3292|
|https://t.co/fQft...|    1|
|              mobile|    1|
|#FridayNightTouch...|    1|
|              circle|    7|
+--------------------+-----+
only showing top 5 rows



In [102]:
countryDF.show(5)

+--------------+----+
|       country|code|
+--------------+----+
|   Afghanistan| AFG|
|       Albania| ALB|
|       Algeria| ALG|
|American Samoa| ASA|
|       Andorra| AND|
+--------------+----+
only showing top 5 rows



In [103]:
# Join the country and tweet data frames (on the appropriate column)
merge = countryDF.join(wordcount_DF, countryDF.country == wordcount_DF.word, 'left_outer')
merge.show(5)

+-------------+----+--------+-----+
|      country|code|    word|count|
+-------------+----+--------+-----+
|       Cyprus| CYP|    null| null|
|   Luxembourg| LUX|    null| null|
|     Bulgaria| BUL|    null| null|
|Guinea-Bissau| GNB|    null| null|
|     Thailand| THA|Thailand|    1|
+-------------+----+--------+-----+
only showing top 5 rows



In [104]:
# Question 1: number of distinct countries mentioned
mentioned = merge.filter(merge["count"] > 0)
mentioned.show(5)

+--------+----+--------+-----+
| country|code|    word|count|
+--------+----+--------+-----+
|Thailand| THA|Thailand|    1|
| Iceland| ISL| Iceland|    2|
|  Mexico| MEX|  Mexico|    1|
|   Wales| WAL|   Wales|   19|
| Denmark| DEN| Denmark|    1|
+--------+----+--------+-----+
only showing top 5 rows



In [105]:
mentioned.count()

44

In [106]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
mentioned.select(sum('count')).show()

+----------+
|sum(count)|
+----------+
|       397|
+----------+



In [107]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc

mentioned.sort(desc("count")).show(3)

+-------+----+-------+-----+
|country|code|   word|count|
+-------+----+-------+-----+
| Norway| NOR| Norway|   52|
|Nigeria| NGA|Nigeria|   49|
| France| FRA| France|   42|
+-------+----+-------+-----+
only showing top 3 rows



In [108]:
#mentioned.sort(mentioned.cnt.desc()).show(3) 
#also works if column 'count' is named 'cnt' instead

In [109]:
# Table 2: counts for Wales, Iceland, and Japan.
mentioned.filter(mentioned["country"].isin(['Wales', 'Kenya', 'Netherlands'])).show()

+-----------+----+-----------+-----+
|    country|code|       word|count|
+-----------+----+-----------+-----+
|      Wales| WAL|      Wales|   19|
|Netherlands| NED|Netherlands|   13|
|      Kenya| KEN|      Kenya|    3|
+-----------+----+-----------+-----+



In [110]:
from pyspark.sql.functions import *
mentioned.select(mean('count')).show()

+-----------------+
|       avg(count)|
+-----------------+
|9.022727272727273|
+-----------------+

