In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
import pandas as pd

In [16]:
df = pd.read_csv("./country-list.csv")
df.shape

(210, 2)

In [3]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [8]:
# Convert each line into a pair of words
words = country_lines.map(lambda l: l.split(','))

In [12]:
# Convert each pair of words into a tuple
country_tuples = words.map(lambda x : tuple(x))

In [13]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

In [15]:
countryDF.count()

211

In [40]:
# Read tweets CSV file into RDD of lines
lines = sc.textFile("file:///home/cloudera/Downloads/big-data-3/mongodb/mongodb/tweet_user.csv")

In [30]:
# lines.count() # to check the number of rows

13995

In [41]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
lines = lines.filter(lambda l: len(l)!=0)

In [32]:
# lines.count()

13391

In [45]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words = lines.flatMap(lambda l: l.split(" "))
word_tuples = words.map(lambda w: (w, 1))
word_cnts = word_tuples.reduceByKey(lambda a, b: a+b)

In [46]:
# words.take(5)

['tweet_text', 'RT', '@ochocinco:', 'I', 'beat']

In [47]:
# Create the DataFrame of tweet word counts
word_count_df = sqlContext.createDataFrame(word_cnts, ["word", "count"])

In [48]:
word_count_df.take(5)

[Row(word='', count=3292),
 Row(word='https://t.co/fQftAwGAad', count=1),
 Row(word='mobile', count=1),
 Row(word='#FridayNightTouchdown', count=1),
 Row(word='circle', count=7)]

In [53]:
# Join the country and tweet data frames (on the appropriate column)
merge = countryDF.join(word_count_df, (countryDF.country == word_count_df.word))

**Question 1**: As a Sports Analyst, you are interested in how many different countries are mentioned in the tweets. Use the Spark to calculate this number. Note that regardless of how many times a single country is mentioned, this country only contributes 1 to the total.

In [54]:
# Question 1: number of distinct countries mentioned
merge.count()

44

**Question 2**: Next, compute the total number of times any country is mentioned. This is different from the previous question since in this calculation, if a country is mentioned three times, then it contributes 3 to the total.

In [55]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
merge.select(sum("count")).show()

+----------+
|sum(count)|
+----------+
|       397|
+----------+



**Question 3**: Your next task is to determine the most popular countries. You can do this by finding the three countries mentioned the most.

In [57]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
merge.orderBy(desc("count")).show(3)

+-------+----+-------+-----+
|country|code|   word|count|
+-------+----+-------+-----+
| Norway| NOR| Norway|   52|
|Nigeria| NGA|Nigeria|   49|
| France| FRA| France|   42|
+-------+----+-------+-----+
only showing top 3 rows



**Question 4**: After exploring the dataset, you are now interested in how many times specific countries are mentioned. For example, how many times was France mentioned?

In [63]:
merge.filter(merge['country']=='France').select('count').show()

+-----+
|count|
+-----+
|   42|
+-----+



**Question 5**: Which country has the most mentions: Kenya, Wales, or Netherlands?

In [60]:
# Table 2: counts for Wales, Iceland, and Japan.
merge.filter(merge["country"].isin(["Wales", "Iceland", "Japan"])).show()

+-------+----+-------+-----+
|country|code|   word|count|
+-------+----+-------+-----+
|Iceland| ISL|Iceland|    2|
|  Wales| WAL|  Wales|   19|
|  Japan| JPN|  Japan|    5|
+-------+----+-------+-----+



In [61]:
merge.filter(merge["country"].isin(["Kenya", "Wales", "Netherlands"])).orderBy(desc("count")).show()

+-----------+----+-----------+-----+
|    country|code|       word|count|
+-----------+----+-----------+-----+
|      Wales| WAL|      Wales|   19|
|Netherlands| NED|Netherlands|   13|
|      Kenya| KEN|      Kenya|    3|
+-----------+----+-----------+-----+



**Question 6**: Finally, what is the average number of times a country is mentioned?

In [62]:
from pyspark.sql.functions import mean
merge.select(mean('count')).show()

+-----------------+
|       avg(count)|
+-----------------+
|9.022727272727273|
+-----------------+

