In [None]:
# Jayson Francis - Analzying Tweets about Countries

In [257]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [258]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [358]:
country_lines.take(3)

['Afghanistan, AFG', 'Albania, ALB', 'Algeria, ALG']

In [261]:
# Convert each line into a pair of words 
country_words = country_lines.map(lambda line : line.split(", "))

In [359]:
country_words.take(3)

[['Afghanistan', 'AFG'], ['Albania', 'ALB'], ['Algeria', 'ALG']]

In [263]:
# Convert each pair of words into a tuple
country_tuples = country_words.map(lambda word : (word[0], word[1]))

In [264]:
country_tuples.take(3)

[('Afghanistan', 'AFG'), ('Albania', 'ALB'), ('Algeria', 'ALG')]

In [265]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [362]:
# Read tweets CSV file into RDD of lines
tweet_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/mongoexport2.csv')

In [363]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter()
tweet_lines_filtered = tweet_lines.filter(lambda line: len(line) != 0)

In [364]:
tweet_lines.count(), tweet_lines_filtered.count()

(13995, 13391)

In [323]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words = tweet_lines_filtered.flatMap(lambda line : line.split(" "))
tuples = words.map(lambda word : (word, 1))
counts = tuples.reduceByKey(lambda a, b: (a+b))

In [333]:
# Create the DataFrame of tweet word counts
tweetDF = sqlContext.createDataFrame(counts, ["country", "count"])
tweetDF.printSchema()
tweetDF.take(3)

root
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)



[Row(country='', count=3292),
 Row(country='mobile', count=1),
 Row(country='#FridayNightTouchdown', count=1)]

In [334]:
# Join the country and tweet data frames (on the appropriate column)
merge = tweetDF.join(countryDF, 'country')

In [340]:
# Question 1: number of distinct countries mentioned
merge.distinct().count()

44

In [342]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
merge.select(sum('count')).show() 

+----------+
|sum(count)|
+----------+
|       397|
+----------+



In [369]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
merge.orderBy(desc("count")).collect()[:3] # Only showing top 3.

[Row(country='Norway', count=52, code='NOR'),
 Row(country='Nigeria', count=49, code='NGA'),
 Row(country='France', count=42, code='FRA')]

In [354]:
# Table 2: counts for Wales, Iceland, and Japan.
merge.filter((merge.country == 'Wales') | (merge.country == 'Iceland') | (merge.country == 'Japan')).collect()

[Row(country='Iceland', count=2, code='ISL'),
 Row(country='Wales', count=19, code='WAL'),
 Row(country='Japan', count=5, code='JPN')]

In [380]:
# Quiz Question 5 - Which country was mentioned most, Kenya, Wales or Netherlands
from pyspark.sql.functions import max
merge.filter((merge.country == 'Kenya') | (merge.country == 'Wales') | (merge.country == 'Netherlands')).collect()

[Row(country='Wales', count=19, code='WAL'),
 Row(country='Netherlands', count=13, code='NED'),
 Row(country='Kenya', count=3, code='KEN')]

In [357]:
# Quiz Question 6 - What is the average number of times a country is mentioned? 
from pyspark.sql.functions import mean
merge.select(mean('count')).show()

+-----------------+
|       avg(count)|
+-----------------+
|9.022727272727273|
+-----------------+



In [None]:
# End