# Import and create a new SQLContext

In [32]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Read the country CSV file into an RDD

In [46]:
country_lines = sc.textFile('file:///home/cloudera/Downloads/coursera-master/big-data-3/final-project/country-list.csv')
country_lines.take(7)

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND',
 'Angola, ANG',
 'Anguilla, AIA']

# Convert each line into a pair of words

In [49]:
country_lines.collect()
country_lines.map(lambda line : line.split(",")).take(7)

[['Afghanistan', ' AFG'],
 ['Albania', ' ALB'],
 ['Algeria', ' ALG'],
 ['American Samoa', ' ASA'],
 ['Andorra', ' AND'],
 ['Angola', ' ANG'],
 ['Anguilla', ' AIA']]

# Convert each pair of words into a tuple

In [52]:
country_tuples = country_lines.map(lambda line : (line.split(",")[0], 
                                                  line.split(",")[1]))
country_tuples.take(7)

[('Afghanistan', ' AFG'),
 ('Albania', ' ALB'),
 ('Algeria', ' ALG'),
 ('American Samoa', ' ASA'),
 ('Andorra', ' AND'),
 ('Angola', ' ANG'),
 ('Anguilla', ' AIA')]

# Create the DataFrame, look at schema and contents

In [54]:
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

# Read tweets CSV file into RDD of lines

In [55]:
tweets = sc.textFile('file:///home/cloudera/Downloads/coursera-master/big-data-3/final-project/tweets.csv')
tweets.count()

13994

# Clean the data: Remove the empty tweets using filter() 

In [56]:
filtered_tweets = tweets.filter(lambda a : len(a) > 0)
filtered_tweets.count()

13390

# Perform WordCount on the cleaned tweet texts.

In [66]:
word_counts = filtered_tweets.flatMap(lambda line : line.split(" ")) \
    .map(lambda word : (word, 1)) \
    .reduceByKey(lambda a, b : a + b)
word_counts.take(7)

[('', 3292),
 ('mobile', 1),
 ('one😁😍⚽⚾🏀🚵❄"', 1),
 ('https://t.co/Y7yrAbst7W"', 1),
 ('circle', 7),
 ('#thfc', 1),
 ('reinstated', 4)]

# Create the DataFrame of tweet word counts

In [64]:
tweetsDF = sqlContext.createDataFrame(word_counts, ["word", "count"])
tweetsDF.printSchema()
tweetsDF.take(7)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(word='', count=3292),
 Row(word='mobile', count=1),
 Row(word='one😁😍⚽⚾🏀🚵❄"', count=1),
 Row(word='https://t.co/Y7yrAbst7W"', count=1),
 Row(word='circle', count=7),
 Row(word='#thfc', count=1),
 Row(word='reinstated', count=4)]

# Join the country and tweet DataFrames

In [75]:
joinedDF = countryDF.join(tweetsDF, countryDF.country == tweetsDF.word)
joinedDF.take(7)

[Row(country='Thailand', code=' THA', word='Thailand', count=1),
 Row(country='Iceland', code=' ISL', word='Iceland', count=2),
 Row(country='Mexico', code=' MEX', word='Mexico', count=1),
 Row(country='Wales', code=' WAL', word='Wales', count=19),
 Row(country='Denmark', code=' DEN', word='Denmark', count=1),
 Row(country='India', code=' IND', word='India', count=4),
 Row(country='Portugal', code=' POR', word='Portugal', count=8)]

# Question 1: number of distinct countries mentioned

In [76]:
distinct_countries = joinedDF.select("country").distinct()
distinct_countries.count()

42

# Question 2: number of countries mentioned in tweets.

In [77]:
from pyspark.sql.functions import sum

joinedDF.agg(sum("count")).first()

Row(sum(count)=360)

In [78]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
descSorted = joinedDF.sort(desc("count"))
descSorted.show(5)

+--------+----+--------+-----+
| country|code|    word|count|
+--------+----+--------+-----+
| Nigeria| NGA| Nigeria|   48|
|  France| FRA|  France|   38|
|Slovakia| SVK|Slovakia|   30|
|  Norway| NOR|  Norway|   28|
| England| ENG| England|   25|
+--------+----+--------+-----+
only showing top 5 rows



In [81]:
# Table 2: counts for Wales, Iceland, and Japan.
from pyspark.sql.functions import col

selected = joinedDF.where((col("country")=="Wales") | 
                          (col("country")=="Iceland") |
                          (col("country")=="Japan"))
selected.show()

+-------+----+-------+-----+
|country|code|   word|count|
+-------+----+-------+-----+
|Iceland| ISL|Iceland|    2|
|  Wales| WAL|  Wales|   19|
|  Japan| JPN|  Japan|    5|
+-------+----+-------+-----+

