In [5]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [6]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file://path/country-list.csv')

In [7]:
# Convert each line into a pair of words
country_words = country_lines.flatMap(lambda line : [line.split(",")])

In [8]:
# Convert each pair of words into a tuple
country_tuples = country_words.map(lambda country : (country[0], country[1].strip()))

In [9]:
country_tuples.take(5)

[('Afghanistan', 'AFG'),
 ('Albania', 'ALB'),
 ('Algeria', 'ALG'),
 ('American Samoa', 'ASA'),
 ('Andorra', 'AND')]

In [10]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [11]:
# Read tweets CSV file into RDD of lines
users_tweets = sc.textFile('file://path/users_tweets.csv')

In [12]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
clean_tweets = users_tweets.filter(lambda x : len(x) > 1)

In [13]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweet_words = clean_tweets.flatMap(lambda line : line.split(" "))
tweet_tuples = tweet_words.map(lambda word : (word,1))
tweet_counts = tweet_tuples.reduceByKey(lambda a, b : (a + b))

In [14]:
# Create the DataFrame of tweet word counts
tweetsDF = sqlContext.createDataFrame(tweet_counts, ["tweet", "count"])
tweetsDF.printSchema()
tweetsDF.take(3)

root
 |-- tweet: string (nullable = true)
 |-- count: long (nullable = true)



[Row(tweet='', count=3280),
 Row(tweet='https://t.co/fQftAwGAad', count=1),
 Row(tweet='mobile', count=1)]

In [15]:
# Join the country and tweet data frames (on the appropriate column)
merge = countryDF.join(tweetsDF, (countryDF.country == tweetsDF.tweet))
merge.printSchema()
merge.take(5)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)
 |-- tweet: string (nullable = true)
 |-- count: long (nullable = true)



[Row(country='Thailand', code='THA', tweet='Thailand', count=1),
 Row(country='Iceland', code='ISL', tweet='Iceland', count=2),
 Row(country='Mexico', code='MEX', tweet='Mexico', count=1),
 Row(country='Wales', code='WAL', tweet='Wales', count=19),
 Row(country='Denmark', code='DEN', tweet='Denmark', count=1)]

In [16]:
# Number of distinct countries mentioned
merge.count()

44

In [17]:
# Number of countries mentioned in tweets.
from pyspark.sql.functions import sum

merge.select(sum('count')).show()

+----------+
|sum(count)|
+----------+
|       397|
+----------+



In [18]:
# Top three countries and their counts.
from pyspark.sql.functions import desc

merge.select("country", "count").orderBy(desc("count")).show(3)

+-------+-----+
|country|count|
+-------+-----+
| Norway|   52|
|Nigeria|   49|
| France|   42|
+-------+-----+
only showing top 3 rows



In [19]:
# Average number of times a country mentioned
from pyspark.sql.functions import avg

merge.select(avg("count")).show()

+-----------------+
|       avg(count)|
+-----------------+
|9.022727272727273|
+-----------------+



In [20]:
# Counts for Wales, Netherlands, Kenya

merge.filter(merge["country"].isin({'Kenya', 'Wales', 'Netherlands'})).show()


+-----------+----+-----------+-----+
|    country|code|      tweet|count|
+-----------+----+-----------+-----+
|      Wales| WAL|      Wales|   19|
|Netherlands| NED|Netherlands|   13|
|      Kenya| KEN|      Kenya|    3|
+-----------+----+-----------+-----+

