In [1]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [14]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/cloudera/Downloads/big-data-3/final-project/country-list.csv')

In [15]:
!head country-list.csv

Afghanistan, AFG
Albania, ALB
Algeria, ALG
American Samoa, ASA
Andorra, AND
Angola, ANG
Anguilla, AIA
Antigua and Barbuda, ATG
Argentina, ARG
Armenia, ARM


In [19]:
# Convert each line into a pair of words
words = country_lines.flatMap(lambda line: line.split(","))

In [21]:
tuples = country_lines.map(lambda line: line.split(","))

In [10]:
# Convert each pair of words into a tuple


In [22]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(tuples, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Row(country='Afghanistan', code=' AFG'),
 Row(country='Albania', code=' ALB'),
 Row(country='Algeria', code=' ALG')]

In [23]:
# Read tweets CSV file into RDD of lines
export_csv = sc.textFile('file:///home/cloudera/Downloads/big-data-3/mongodb/export_twitter.csv')

In [40]:
export_csv.count()

13995

In [30]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
#tweets_text_clean=export_csv.filter(lambda x:len(x)>0)

In [42]:
tweets_text_clean=export_csv.filter(lambda x:len(x.strip())>0)

In [43]:
tweets_text_clean.count()

13385

In [56]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweets_words = tweets_text_clean.flatMap(lambda line : line.split(" "))

In [57]:
tweets_words.count()

183304

In [58]:
tuples= tweets_words.map(lambda word:(word,1))

In [59]:
tweets_counts=tuples.reduceByKey(lambda a,b:(a + b))

In [60]:
tweets_counts.count()

26634

In [80]:
# Create the DataFrame of tweet word counts
tweets_countsDF=sqlContext.createDataFrame(tweets_counts, ["country", "count"])
tweets_countsDF.printSchema()
tweets_countsDF.take(10)

root
 |-- country: string (nullable = true)
 |-- count: long (nullable = true)



[Row(country='', count=3280),
 Row(country='mobile', count=1),
 Row(country='#FridayNightTouchdown', count=1),
 Row(country='Just', count=44),
 Row(country='BONUSES,', count=1),
 Row(country='like?"', count=1),
 Row(country='recieve', count=1),
 Row(country='Bellow', count=1),
 Row(country='now"', count=1),
 Row(country='https://t.co/W4QluWGyeq', count=1)]

In [81]:
tweets_countsDF.show()

+--------------------+-----+
|             country|count|
+--------------------+-----+
|                    | 3280|
|              mobile|    1|
|#FridayNightTouch...|    1|
|                Just|   44|
|            BONUSES,|    1|
|              like?"|    1|
|             recieve|    1|
|              Bellow|    1|
|                now"|    1|
|https://t.co/W4Ql...|    1|
|https://t.co/Jii7...|    1|
|              review|    1|
|                 Can|   37|
|https://t.co/UBBj...|    1|
|        @MattHDGamer|    1|
|https://t.co/k1oj...|    1|
|                ago"|    1|
|             GERMANY|    1|
|       Revolutionary|    1|
|https://t.co/1zml...|    1|
+--------------------+-----+
only showing top 20 rows



In [82]:
# Join the country and tweet data frames (on the appropriate column)
tweets_countsDF.filter(tweets_countsDF['country'] =='GERMANY').show()

+-------+-----+
|country|count|
+-------+-----+
|GERMANY|    1|
+-------+-----+



In [83]:
merge = tweets_countsDF.join(countryDF, 'country')

In [89]:
merge.select("country").distinct().count()

44

In [88]:
# Question 1: number of distinct countries mentioned
merge.groupBy('country').count().show(100)

+-----------+-----+
|    country|count|
+-----------+-----+
|   Thailand|    1|
|    Iceland|    1|
|     Mexico|    1|
|      Wales|    1|
|    Denmark|    1|
|      India|    1|
|   Portugal|    1|
|     Poland|    1|
|     Norway|    1|
|     Guinea|    1|
|   Slovakia|    1|
|     Canada|    1|
|Netherlands|    1|
|      Kenya|    1|
|       Oman|    1|
|      Qatar|    1|
|     Brazil|    1|
|    England|    1|
|    Albania|    1|
|  Argentina|    1|
|   Scotland|    1|
|      Ghana|    1|
|       Iran|    1|
|    Nigeria|    1|
|       Iraq|    1|
|    Georgia|    1|
|     Kosovo|    1|
|    Somalia|    1|
|     Israel|    1|
|     France|    1|
|     Russia|    1|
|      Sudan|    1|
|    Germany|    1|
|  Australia|    1|
|      Spain|    1|
|       Chad|    1|
|      Japan|    1|
|     Jordan|    1|
|     Gambia|    1|
|    Austria|    1|
|Switzerland|    1|
|      Italy|    1|
|    Jamaica|    1|
|      Nepal|    1|
+-----------+-----+



In [91]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
merge.select(sum('count')).show()

+----------+
|sum(count)|
+----------+
|       397|
+----------+



In [96]:
merge.registerAsTable("merge")



In [101]:
test = sqlContext.sql("select * from merge order by count desc")

In [102]:
test.show()

+-----------+-----+----+
|    country|count|code|
+-----------+-----+----+
|     Norway|   52| NOR|
|    Nigeria|   49| NGA|
|     France|   42| FRA|
|   Slovakia|   30| SVK|
|    England|   25| ENG|
|    Germany|   20| GER|
|      Wales|   19| WAL|
|     Russia|   15| RUS|
|     Brazil|   13| BRA|
|Netherlands|   13| NED|
|     Canada|   11| CAN|
|Switzerland|   10| SUI|
|       Chad|    9| CHA|
|     Guinea|    8| GUI|
|      Spain|    8| ESP|
|   Portugal|    8| POR|
|       Iraq|    6| IRQ|
|     Jordan|    6| JOR|
|      Japan|    5| JPN|
|    Austria|    5| AUT|
+-----------+-----+----+
only showing top 20 rows



In [103]:
test = sqlContext.sql("select * from merge where country in ('Wales', 'Iceland', 'Japan')")




In [104]:
test.show()

+-------+-----+----+
|country|count|code|
+-------+-----+----+
|Iceland|    2| ISL|
|  Wales|   19| WAL|
|  Japan|    5| JPN|
+-------+-----+----+



In [94]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
tweets_countsDF.desc('count').show()

AttributeError: 'DataFrame' object has no attribute 'desc'

In [None]:
# Table 2: counts for Wales, Iceland, and Japan.
