In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/workspace/coursera/big-data-3/final-project/country-list.csv')

In [3]:
# Convert each line into a pair of words
words = country_lines.flatMap(lambda line : [line.split(", ")])
words.take(2)

[['Afghanistan', 'AFG'], ['Albania', 'ALB']]

In [4]:
# Convert each pair of words into a tuple
country_tuples = words.map(lambda word : (word[0], word[1]))
country_tuples.take(2)

[('Afghanistan', 'AFG'), ('Albania', 'ALB')]

In [5]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [28]:
# Read tweets CSV file into RDD of lines
tweet_lines = sc.textFile('file:///home/cloudera/workspace/coursera/big-data-3/mongodb/sample_user_tweet_text.csv')
tweet_lines = tweet_lines.flatMap(lambda line : line.split(","))
tweet_lines.take(2)

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL']

In [29]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
tweet_lines = tweet_lines.filter(lambda line: len(line) > 0)

In [30]:
tweet_lines.take(3)

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL',
 'RT @NiallOfficial: @Louis_Tomlinson @socceraid when I retired from playing because of my knee . I went and did my uefa A badges in Dublin']

In [33]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
words = tweet_lines.flatMap(lambda line : line.split(" ")).filter(lambda line: len(line) > 0)
tuples = words.map(lambda word : (word, 1))
tuples.take(15)

[('tweet_text', 1),
 ('RT', 1),
 ('@ochocinco:', 1),
 ('I', 1),
 ('beat', 1),
 ('them', 1),
 ('all', 1),
 ('for', 1),
 ('10', 1),
 ('straight', 1),
 ('hours', 1),
 ('#FIFA16KING', 1),
 ('https://t.co/BFnV6jfkBL', 1),
 ('RT', 1),
 ('@NiallOfficial:', 1)]

In [34]:
tweet_counts = tuples.reduceByKey(lambda a, b: (a + b))
tweet_counts.coalesce(1).saveAsTextFile('hdfs:/user/cloudera/wordcount/outputDir4')

In [35]:
tweet_counts.count()

26121

In [37]:
tweet_counts.take(5)

[('https://t.co/fQftAwGAad', 1),
 ('nite', 2),
 ('mobile', 1),
 ('#FridayNightTouchdown', 1),
 ('circle', 7)]

In [40]:
# Create the DataFrame of tweet word counts
tweetsDF = sqlContext.createDataFrame(counts, ["word", "count"])
tweetsDF.take(3)

[Row(word='https://t.co/fQftAwGAad', count=1),
 Row(word='nite', count=2),
 Row(word='mobile', count=1)]

In [43]:
# Join the country and tweet DataFrames (on the appropriate column)
merge = countryDF.join(tweetsDF, countryDF.country == tweetsDF.word, 'inner')
merge.show()

+-----------+----+-----------+-----+
|    country|code|       word|count|
+-----------+----+-----------+-----+
|   Thailand| THA|   Thailand|    1|
|    Iceland| ISL|    Iceland|    2|
|     Mexico| MEX|     Mexico|    1|
|      Wales| WAL|      Wales|   19|
|    Denmark| DEN|    Denmark|    1|
|      India| IND|      India|    4|
|   Portugal| POR|   Portugal|    9|
|     Poland| POL|     Poland|    1|
|     Norway| NOR|     Norway|   52|
|     Guinea| GUI|     Guinea|    8|
|   Slovakia| SVK|   Slovakia|   30|
|     Canada| CAN|     Canada|   12|
|    Bahamas| BAH|    Bahamas|    1|
|Netherlands| NED|Netherlands|   13|
|    Belgium| BEL|    Belgium|    1|
|      Kenya| KEN|      Kenya|    3|
|       Oman| OMA|       Oman|    1|
|      Qatar| QAT|      Qatar|    4|
|     Brazil| BRA|     Brazil|   13|
|    England| ENG|    England|   30|
+-----------+----+-----------+-----+
only showing top 20 rows



In [44]:
# Question 1: number of distinct countries mentioned
merge.count()

46

In [46]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
merge.agg({'count':'sum'}).show()

+----------+
|sum(count)|
+----------+
|       420|
+----------+



In [47]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
merge.sort(['count'],ascending = False).show()

+-----------+----+-----------+-----+
|    country|code|       word|count|
+-----------+----+-----------+-----+
|    Nigeria| NGA|    Nigeria|   55|
|     Norway| NOR|     Norway|   52|
|     France| FRA|     France|   45|
|   Slovakia| SVK|   Slovakia|   30|
|    England| ENG|    England|   30|
|    Germany| GER|    Germany|   20|
|      Wales| WAL|      Wales|   19|
|     Russia| RUS|     Russia|   16|
|     Brazil| BRA|     Brazil|   13|
|Netherlands| NED|Netherlands|   13|
|     Canada| CAN|     Canada|   12|
|      Spain| ESP|      Spain|   11|
|Switzerland| SUI|Switzerland|   10|
|       Chad| CHA|       Chad|    9|
|   Portugal| POR|   Portugal|    9|
|     Guinea| GUI|     Guinea|    8|
|     Jordan| JOR|     Jordan|    6|
|       Iraq| IRQ|       Iraq|    6|
|      Japan| JPN|      Japan|    5|
|    Austria| AUT|    Austria|    5|
+-----------+----+-----------+-----+
only showing top 20 rows



In [48]:
# Table 2: counts for Wales, Iceland, and Japan.
li=["Wales","Iceland","Japan"]
merge.filter(merge.country.isin(li)).show()

+-------+----+-------+-----+
|country|code|   word|count|
+-------+----+-------+-----+
|Iceland| ISL|Iceland|    2|
|  Wales| WAL|  Wales|   19|
|  Japan| JPN|  Japan|    5|
+-------+----+-------+-----+

