In [1]:
# Import and create a new SQLContext 
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContext = SQLContext(sc)

In [4]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('big-data-3/country-list.csv')
country_lines.take(3)

['Afghanistan, AFG', 'Albania, ALB', 'Algeria, ALG']

In [8]:
# Convert each line into a pair of words
words = country_lines.map(lambda l:l.split(', '))
words.take(3)

[['Afghanistan', 'AFG'], ['Albania', 'ALB'], ['Algeria', 'ALG']]

In [9]:
# Convert each pair of words into a tuple
tuples = words.map(tuple)
tuples.take(3)

[('Afghanistan', 'AFG'), ('Albania', 'ALB'), ('Algeria', 'ALG')]

In [10]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [15]:
# Read tweets CSV file into RDD of lines
tt = sqlContext.read.csv('big-data-3/tweets.csv',
                        header='true',
                        inferSchema='true')
tt.show(3)
tt.printSchema()

+------------------+---------------------+-------------------+-------------------+------------------------+--------------------+--------------------+--------------------+-------------+
|          tweet_id|in_reply_to_status_id|in_reply_to_user_id|retweeted_status_id|retweeted_status_user_id|           timestamp|              source|                text|expanded_urls|
+------------------+---------------------+-------------------+-------------------+------------------------+--------------------+--------------------+--------------------+-------------+
|322185112684994561|   322137634161971200|          222854927|               null|                    null|2013-04-11 03:11:...|"<a href=""http:/...|@Bill_Porter nice...|         null|
|321279208158552064|   321275690811011072|           15105039|               null|                    null|2013-04-08 15:11:...|"<a href=""http:/...|@sudhamshu after ...|         null|
|321155708324311040|   321148055212679168|           52959201|             

In [20]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
print(tt.count())
tt2 = tt.na.drop(subset='text')
print(tt2.count())
tt2.show(1)

3563
3545
+------------------+---------------------+-------------------+-------------------+------------------------+--------------------+--------------------+--------------------+-------------+
|          tweet_id|in_reply_to_status_id|in_reply_to_user_id|retweeted_status_id|retweeted_status_user_id|           timestamp|              source|                text|expanded_urls|
+------------------+---------------------+-------------------+-------------------+------------------------+--------------------+--------------------+--------------------+-------------+
|322185112684994561|   322137634161971200|          222854927|               null|                    null|2013-04-11 03:11:...|"<a href=""http:/...|@Bill_Porter nice...|         null|
+------------------+---------------------+-------------------+-------------------+------------------------+--------------------+--------------------+--------------------+-------------+
only showing top 1 row



In [39]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
texts = tt2.select('text')
texts.show(1)
words2 = texts.rdd.flatMap(lambda l:l[0].split())
counts = words2.map(lambda w:(w,1))
word_counts = counts.reduceByKey(lambda a,b:a+b)
word_counts.take(5)

+--------------------+
|                text|
+--------------------+
|@Bill_Porter nice...|
+--------------------+
only showing top 1 row



[('@Bill_Porter', 1), ('nice', 46), ('to', 1744), ('know', 161), ('that', 304)]

In [40]:
# Create the DataFrame of tweet word counts
wcDF = sqlContext.createDataFrame(word_counts, ["word", "count"])
wcDF.show(2)

+------------+-----+
|        word|count|
+------------+-----+
|@Bill_Porter|    1|
|        nice|   46|
+------------+-----+
only showing top 2 rows



In [42]:
# Join the country and tweet data frames (on the appropriate column)
dfj = countryDF.join(wcDF,countryDF.country==wcDF.word )
dfj.show()

+---------+----+---------+-----+
|  country|code|     word|count|
+---------+----+---------+-----+
| Malaysia| MAS| Malaysia|    7|
|    India| IND|    India|   27|
|   Norway| NOR|   Norway|    1|
|   Bhutan| BHU|   Bhutan|   13|
|Indonesia| IDN|Indonesia|    2|
|Australia| AUS|Australia|    2|
| Pakistan| PAK| Pakistan|    1|
+---------+----+---------+-----+



In [43]:
# Question 1: number of distinct countries mentioned
dfj.count()

7

In [64]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
dfj.agg(sum('count')).show()

+----------+
|sum(count)|
+----------+
|        53|
+----------+



In [68]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
dfj.sort(desc('count')).show(3)


+--------+----+--------+-----+
| country|code|    word|count|
+--------+----+--------+-----+
|   India| IND|   India|   27|
|  Bhutan| BHU|  Bhutan|   13|
|Malaysia| MAS|Malaysia|    7|
+--------+----+--------+-----+
only showing top 3 rows



In [84]:
# Table 2: counts for Wales, Iceland, and Japan.
dfj.filter(dfj['country'] == 'Wales').show()
dfj.filter(dfj['country'] == 'Iceland').show()
dfj.filter(dfj['country'] == 'Japan').show()

+-------+----+----+-----+
|country|code|word|count|
+-------+----+----+-----+
+-------+----+----+-----+

+-------+----+----+-----+
|country|code|word|count|
+-------+----+----+-----+
+-------+----+----+-----+

+-------+----+----+-----+
|country|code|word|count|
+-------+----+----+-----+
+-------+----+----+-----+

