Title: identifying most popular country 

Author: Ivan Zheng

Date: 10/06

In [1]:
# Import and create a new SQLContext 
import pyspark
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

### Step 1: Transforming the Country Data to pyspark SQL formate

In [2]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('data/country-list.csv')

In [3]:
country_lines.take(5)

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND']

In [4]:
# Convert each line into a pair of words
country_words = country_lines.map(lambda line : line.split(", "))

In [5]:
country_words.take(5)

[['Afghanistan', 'AFG'],
 ['Albania', 'ALB'],
 ['Algeria', 'ALG'],
 ['American Samoa', 'ASA'],
 ['Andorra', 'AND']]

In [6]:
# Convert each pair of words into a tuple
country_tuples = country_words.map(lambda x: (x[0], x[1]))

In [7]:
country_tuples.take(5)

[('Afghanistan', 'AFG'),
 ('Albania', 'ALB'),
 ('Algeria', 'ALG'),
 ('American Samoa', 'ASA'),
 ('Andorra', 'AND')]

In [8]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

###### Transformed the Country Date to pyspark SQL formate

### Step 2: Calculating the most popular country in tweet text

In [9]:
# Read tweets CSV file into RDD of lines
tweet_lines = sc.textFile('data/tweet_output.csv')

In [10]:
tweet_lines.take(3)

['tweet_text',
 'RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL',
 'RT @NiallOfficial: @Louis_Tomlinson @socceraid when I retired from playing because of my knee . I went and did my uefa A badges in Dublin']

In [11]:
tweet_lines.count() #before cleaning

13995

In [12]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
tweet_lines = tweet_lines.filter(lambda x: x is not ' ')

In [13]:
tweet_lines.count() #before removing null

13989

##### Perform WordCount on the cleaned tweet texts.

In [14]:
tweet_words = tweet_lines.flatMap(lambda line : line.split(" "))

In [15]:
tweet_tuples = tweet_words.map(lambda word : (word, 1))

In [16]:
tweet_counts = tweet_tuples.reduceByKey(lambda a, b: (a + b))

In [17]:
tweet_counts.take(5)

[('tweet_text', 1), ('beat', 51), ('them', 70), ('10', 115), ('hours', 59)]

In [18]:
# Create the DataFrame of tweet word counts
tweetDF = sqlContext.createDataFrame(tweet_counts, ["country", "counts"])
tweetDF.printSchema()
tweetDF.take(3)

root
 |-- country: string (nullable = true)
 |-- counts: long (nullable = true)



[Row(country='tweet_text', counts=1),
 Row(country='beat', counts=51),
 Row(country='them', counts=70)]

In [19]:
# Join the country and tweet data frames (on the appropriate column)
countryDF.printSchema()

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



In [20]:
tweetDF.show(10)

+--------------------+------+
|             country|counts|
+--------------------+------+
|          tweet_text|     1|
|                beat|    51|
|                them|    70|
|                  10|   115|
|               hours|    59|
|         #FIFA16KING|    27|
|                    |  3884|
|https://t.co/BFnV...|    27|
|    @Louis_Tomlinson|     3|
|          @socceraid|     3|
+--------------------+------+
only showing top 10 rows



In [21]:
countryDF.show(10)

+-------------------+----+
|            country|code|
+-------------------+----+
|        Afghanistan| AFG|
|            Albania| ALB|
|            Algeria| ALG|
|     American Samoa| ASA|
|            Andorra| AND|
|             Angola| ANG|
|           Anguilla| AIA|
|Antigua and Barbuda| ATG|
|          Argentina| ARG|
|            Armenia| ARM|
+-------------------+----+
only showing top 10 rows



In [22]:
mergeDF = countryDF.join(tweetDF, 'country')

In [23]:
mergeDF.show(5)

+-------+----+------+
|country|code|counts|
+-------+----+------+
|   Chad| CHA|     9|
| Russia| RUS|    15|
|   Iraq| IRQ|     6|
|Germany| GER|    20|
| Jordan| JOR|     6|
+-------+----+------+
only showing top 5 rows



In [24]:
# Number of distinct countries mentioned
mergeDF.filter(mergeDF["counts"] > 0).count()

44

In [25]:
#Number of countries mentioned in tweets.
from pyspark.sql.functions import sum
mergeDF.select(sum('counts')).show()

+-----------+
|sum(counts)|
+-----------+
|        397|
+-----------+



In [26]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
mergeDF.sort(mergeDF.counts.desc()).show(5)

+--------+----+------+
| country|code|counts|
+--------+----+------+
|  Norway| NOR|    52|
| Nigeria| NGA|    49|
|  France| FRA|    42|
|Slovakia| SVK|    30|
| England| ENG|    25|
+--------+----+------+
only showing top 5 rows



In [27]:
# Table 2: counts for Wales, Iceland, and Japan.
mergeDF.filter(mergeDF["country"] == 'Wales').show()

+-------+----+------+
|country|code|counts|
+-------+----+------+
|  Wales| WAL|    19|
+-------+----+------+



In [28]:
mergeDF.filter(mergeDF["country"] == 'Kenya').show()

+-------+----+------+
|country|code|counts|
+-------+----+------+
|  Kenya| KEN|     3|
+-------+----+------+



In [29]:
mergeDF.filter(mergeDF["country"] == 'Netherlands').show()

+-----------+----+------+
|    country|code|counts|
+-----------+----+------+
|Netherlands| NED|    13|
+-----------+----+------+



In [30]:
mergeDF.filter(mergeDF["country"] == 'Iceland').show()

+-------+----+------+
|country|code|counts|
+-------+----+------+
|Iceland| ISL|     2|
+-------+----+------+



In [31]:
mergeDF.filter(mergeDF["country"] == 'Japan').show()

+-------+----+------+
|country|code|counts|
+-------+----+------+
|  Japan| JPN|     5|
+-------+----+------+



Summary, a short spark exercise using PySpark SQL and generate output using MonogDB.