# Setting up the environment

In [2]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 74kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 44.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=f1551111abb28d077b49ef28994668e643455eaea3c9bf6367c7ebfc25ab1657
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
The 

In [3]:
# Import and create a new SQLContext 
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as f
from pyspark.sql.window import Window

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
conf = SparkConf().set("spark.ui.port", "4050")
sc = SparkContext.getOrCreate(conf = conf)
sqlContext = SQLContext(sc)

In [6]:
spark = SparkSession.builder.getOrCreate()

# Reading files

In [77]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('.../country-list.csv')
country_lines.count()

211

In [80]:
# Read tweets JSON file
tweets = spark.read.json(".../users.json")
tweets.printSchema()
tweets.count()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- coordinates: string (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- source: string (nullable = true)
 |-- tweet_ID: string (nullable = true)
 |-- tweet_followers_count: long (nullable = true)
 |-- tweet_mentioned_count: long (nullable = true)
 |-- tweet_text: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- CreatedAt: struct (nullable = true)
 |    |    |-- $date: string (nullable = true)
 |    |-- FavouritesCount: long (nullable = true)
 |    |-- FollowersCount: long (nullable = true)
 |    |-- FriendsCount: long (nullable = true)
 |    |-- Location: string (nullable = true)
 |    |-- UserId: long (nullable = true)
 |-- user_name: string (nullable = true)



11188

# Data Preprocessing





In [78]:
# Convert each pair of words into a tuple
country_tuples = country_lines.map(lambda x: (x.split(",")[0], x.split(",")[1]))
country_tuples.take(5)

[('Afghanistan', ' AFG'),
 ('Albania', ' ALB'),
 ('Algeria', ' ALG'),
 ('American Samoa', ' ASA'),
 ('Andorra', ' AND')]

In [79]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
countryDF.printSchema()
countryDF.show()

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)

+-------------------+----+
|            country|code|
+-------------------+----+
|        Afghanistan| AFG|
|            Albania| ALB|
|            Algeria| ALG|
|     American Samoa| ASA|
|            Andorra| AND|
|             Angola| ANG|
|           Anguilla| AIA|
|Antigua and Barbuda| ATG|
|          Argentina| ARG|
|            Armenia| ARM|
|              Aruba| ARU|
|          Australia| AUS|
|            Austria| AUT|
|         Azerbaijan| AZE|
|            Bahamas| BAH|
|            Bahrain| BHR|
|         Bangladesh| BAN|
|           Barbados| BRB|
|            Belarus| BLR|
|            Belgium| BEL|
+-------------------+----+
only showing top 20 rows



# Cleaning the tweets for special characters

In [141]:
#cleaning tweets by removing special characters
tweets = tweets.withColumn("clean_tweets", f.regexp_replace('tweet_text', ',|-|_|:|#|!', ' '))

In [166]:
# Lines for checking if a specific country exists in the tweets
# tweets.select("tweet_text").filter(tweets.tweet_text.contains("Maldives")).collect()
# tweets.select("clean_tweets").filter(tweets.tweet_text.contains("Maldives")).collect()

# Word Count for tweets

In [142]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
word_count = tweets.withColumn('country', f.explode(f.split(f.col('clean_tweets'), ' ')))\
    .groupBy('country')\
    .count()\
    .sort('count', ascending=False)

In [143]:
word_count.show()

+---------+-----+
|  country|count|
+---------+-----+
|         |21172|
|    https| 9759|
|       RT| 5927|
|       to| 5842|
|      the| 4247|
|     when| 3906|
|     FIFA| 3508|
|      you| 2759|
| football| 2539|
|     from| 2331|
|        a| 2125|
|       he| 2038|
|    could| 1923|
|     away| 1887|
|      run| 1882|
|Throwback| 1835|
|    tried| 1831|
|     book| 1824|
|      ref| 1821|
|     you.| 1821|
+---------+-----+
only showing top 20 rows



In [144]:
# Join the country and tweet DataFrames (on the appropriate column)
word_count = word_count.withColumn("country",f.lower(f.col("country")))
countryDF = countryDF.withColumn("country",f.lower(f.col("country")))

# Joining Dataframes

In [145]:
final_df = word_count.join(countryDF, on=["country"])

In [146]:
final_df = final_df.groupBy('country').sum()
final_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- sum(count): long (nullable = true)



In [161]:
final_df.sort(asc("country")).collect()
# final_df.count()

[Row(country='albania', sum(count)=2),
 Row(country='argentina', sum(count)=3),
 Row(country='australia', sum(count)=2),
 Row(country='austria', sum(count)=7),
 Row(country='bahamas', sum(count)=1),
 Row(country='belgium', sum(count)=1),
 Row(country='brazil', sum(count)=16),
 Row(country='canada', sum(count)=12),
 Row(country='chad', sum(count)=9),
 Row(country='chile', sum(count)=1),
 Row(country='colombia', sum(count)=2),
 Row(country='denmark', sum(count)=1),
 Row(country='england', sum(count)=35),
 Row(country='finland', sum(count)=1),
 Row(country='france', sum(count)=54),
 Row(country='gambia', sum(count)=1),
 Row(country='georgia', sum(count)=6),
 Row(country='germany', sum(count)=17),
 Row(country='ghana', sum(count)=3),
 Row(country='greece', sum(count)=1),
 Row(country='guinea', sum(count)=8),
 Row(country='hungary', sum(count)=1),
 Row(country='iceland', sum(count)=2),
 Row(country='india', sum(count)=4),
 Row(country='iran', sum(count)=1),
 Row(country='iraq', sum(count)=6

In [148]:
final_df.sort("sum(count)", ascending = False).show()

+-----------+----------+
|    country|sum(count)|
+-----------+----------+
|    nigeria|        66|
|     norway|        54|
|     france|        54|
|    england|        35|
|   slovakia|        30|
|     russia|        24|
|      wales|        19|
|    germany|        17|
|      spain|        16|
|     brazil|        16|
|netherlands|        13|
|     canada|        12|
|   portugal|        10|
|       chad|         9|
|      japan|         8|
|     guinea|         8|
|switzerland|         7|
|     jordan|         7|
|    austria|         7|
|       iraq|         6|
+-----------+----------+
only showing top 20 rows



# Answers to the questions

In [149]:
# Question 1: number of distinct countries mentioned
print(final_df.count(), " distinct are mentioned in tweets")

55  distinct are mentioned in tweets


In [150]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
final_df.select(f.sum("sum(count)")).collect()[0][0]

485

In [165]:
# Question 3: How many times was France mentioned? 
final_df.filter(col("country").isin(["france"])).show()

+-------+----------+
|country|sum(count)|
+-------+----------+
| france|        54|
+-------+----------+



In [164]:
# Question 4: Which country has the most mentions: Kenya, Wales, or Netherlands?
final_df.filter(col("country").isin(["wales","netherland","kenya"])).show()

+-------+----------+
|country|sum(count)|
+-------+----------+
|  kenya|         3|
|  wales|        19|
+-------+----------+



In [153]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
final_df.sort("sum(count)", ascending = False).show()

+-----------+----------+
|    country|sum(count)|
+-----------+----------+
|    nigeria|        66|
|     france|        54|
|     norway|        54|
|    england|        35|
|   slovakia|        30|
|     russia|        24|
|      wales|        19|
|    germany|        17|
|      spain|        16|
|     brazil|        16|
|netherlands|        13|
|     canada|        12|
|   portugal|        10|
|       chad|         9|
|      japan|         8|
|     guinea|         8|
|switzerland|         7|
|     jordan|         7|
|    austria|         7|
|       iraq|         6|
+-----------+----------+
only showing top 20 rows



In [163]:
# Table 2: counts for Wales, Iceland, and Japan.
final_df.filter(col("country").isin(["wales","iceland","japan"])).show()

+-------+----------+
|country|sum(count)|
+-------+----------+
|  japan|         8|
|iceland|         2|
|  wales|        19|
+-------+----------+

