In [None]:
# Import and create a new SQLContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [None]:
# Read the country CSV file into an RDD
country_lines = sqlContext.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [None]:
# Convert each line into a pair of words
pair_words = country_lines.flatMap(lambda line : line.split("\n"))
pair_words.take(5) # pair_words.collect() or pair_words.show()

In [None]:
# Convert each pair of words into a tuple
country_tuples = pair_words.map(lambda word : (word.split(",")[0], word.split(", ")[1]))
country_tuples.take(5)

In [None]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuple, ["country", "code"])
countryDF.printSchema()
countryDF.show()

In [None]:
pandas_countryDF = countryDF.toPandas()
pandas_countryDF.head()

In [None]:
# Read tweets CSV file into RDD of lines
tweet_texts = sc.textFile("file:///home/cloudera/Downloads/big-data-3/final-project/Tweets.csv")
tweets_texts.count()

In [None]:
# Clean the data: some tweets are empty. Remoce the empty tweets using filer()
clean_tweet_texts = tweet_texts.filter(lambda x : len(x) > 0)
clean_tweet_texts.count()

In [None]:
# Perform WordCount on the cleaned tweet texts.
tweet_words = clean_tweet_texts.flatMap(lambda line : line.split(" "))
tweet_tuples = tweet_words.map(lambda word : (word, 1))
word_counts = tweet_tuples.reduceByKey(lambda a,b : (a + b))
word_counts.take(5)

In [None]:
# Create a DataFrame of tweet word counts
tweetDF = sqlContext.createDataFrame(word_counts, ["word", "count"])
tweetDF.printSchema()
tweetDF.take(5)

In [None]:
# Join country and Tweet DataFrames on country and word name
from pyspark.sql.funtions import col
joinedDF = countryDF.alias('c').join(tweetDF.alias('t'), col('c.country') == col('t.word')).select(col('c.code'), col('c.country'), col('t.word'))
joinedDF.printSchema()
joinedDF.take(5)

In [None]:
# Question 1: number of distinct countries mentioned
joinedDF.count(), joinedDF.select('code').distinct().count()

In [None]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.funtions import sum
joinedDF.agg(sum("count")).first()

In [None]:
# Table 1: Top three countries and their counts
from pyspark.sql.funtions import desc
descSorted = joinedDF.sort(desc("count"))
descSorted.show(5)

In [None]:
# Table 2: Counts for Wales, Iceland and Japan.
selectedDF = joinedDF.where((col("country") == "Wales") | (col("country") == "Iceland") | (col("country") == "Japan")).sort(desc("count")))
selectedDF.show()

In [None]:
from pyspark.sql.functions import avg

joinedDF.agg(avg("count")).first()

In [None]:
# Wite counts to text file in HDFS
word_counts.coalesce(1).saveAsTextFile('hdfs:/user/cloudera/wordcount/outputDir') # coalese() method combines all the RDD partition into a single partition