# Assignment 2

#### First Name: Madison 
#### Last Name: Chester


## 1. Load Data from JSON

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .getOrCreate()

In [0]:
df_twitter = spark.read.json("/FileStore/tables/corona_tweet_new.json")

In [0]:
df_twitter.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- id: string (nullable = true)
 |-- in_reply_to_status_id: string (nullable = true)
 |-- in_reply_to_user_id_str: string (nullable = true)
 |-- location: string (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- source: string (nullable = true)
 |-- user: struct (nullable = true)
 |    |-- contributors_enabled: boolean (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- default_profile: boolean (nullable = true)
 |    |-- default_profile_image: boolean (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- favourites_count: long (nullable = true)
 |    |-- follow_request_sent: string (nullable = true)
 |    |-- followers_count: long (nullable = true)
 |    |-- following: string (nullable = true)
 |    |-- frie

In [0]:
### from the user nestec col, select the following cols only: id_str, followers_count, friends_count and created_at (2 points)
from pyspark.sql.functions import col 

df_twitter = df_twitter.select(
  col('source'),
  col('hashtags'),
  col('retweet_count'),
  col('id'),
  col('location'),
  col('user.id_str').alias('user_id_str'),  # select 'id_str' and change the column name
  col('user.followers_count').alias('user_followers_count'),  # select 'followers_count' and change the column name
  col('user.friends_count').alias('user_friends_count'),  # select 'friends_count' and change the column name
  col('user.created_at').alias('user_created_at')  # select 'created_at' and change the column name
)

In [0]:
### print the total count of number of records in df_twitter (1 point)
df_twitter.count()

Out[5]: 15894

In [0]:
### extract the source lable from source col by droping the anchor tab, and save it as another col named extracted_source (4 points)
# for example <a href="https://mobile.twitter.com" rel="nofollow">Twitter Web App</a> => Twitter Web App
# you can use "<a [^>]+>([^<]+)" as the regualr expresion, and the group would be 1 for this regular expression
from pyspark.sql.functions import regexp_extract, col

# apply regexp_extract to extract the label from the 'source' column
df_twitter = df_twitter.withColumn('extracted_source', regexp_extract(col('source'), '<a[^>]*>(.*?)</a>', 1))
df_twitter.select(col('extracted_source'), col('source')).show()

+-------------------+--------------------+
|   extracted_source|              source|
+-------------------+--------------------+
|    Twitter Web App|<a href="https://...|
|Twitter for Android|<a href="http://t...|
|Twitter for Android|<a href="http://t...|
|Twitter for Android|<a href="http://t...|
|Twitter for Android|<a href="http://t...|
|    Twitter Web App|<a href="https://...|
| Twitter Web Client|<a href="http://t...|
|Twitter for Android|<a href="http://t...|
|Twitter for Android|<a href="http://t...|
| Twitter for iPhone|<a href="http://t...|
| Twitter for iPhone|<a href="http://t...|
|    Twitter Web App|<a href="https://...|
|Twitter for Android|<a href="http://t...|
|    Twitter Web App|<a href="https://...|
| Twitter for iPhone|<a href="http://t...|
|Twitter for Android|<a href="http://t...|
| Twitter for iPhone|<a href="http://t...|
|Twitter for Android|<a href="http://t...|
| Twitter for iPhone|<a href="http://t...|
|Twitter for Android|<a href="http://t...|
+----------

In [0]:
# convert the DataFrame into RDD
rdd_twitter = df_twitter.rdd.map(tuple)

In [0]:
### create a temporary table in memory with the name as twitter (1 point)
df_twitter.createOrReplaceTempView('twitter')

## 2. Analyze Data

#### You will be writing code to find the answer to the questions listed below using RDD/spark SQL.

- Analyze using RDD 
- Analyze using Dataframe without temporary table 
- Analyze using spark.sql with temporary table


#### 2.1 Get total number of unique users (1 point each)

In [0]:
# using RDD
user_id_index = df_twitter.columns.index('user_id_str')
user_id_rdd = rdd_twitter.map(lambda row: row[user_id_index])
unique_users = user_id_rdd.distinct().count()
print(unique_users)

14094


In [0]:
# using DataFrame
df_twitter.select(col('user_id_str')).distinct().count()

Out[10]: 14094

In [0]:
# using spark.sql and the temporary table
spark.sql('select distinct user_id_str from twitter').count()

Out[11]: 14094

#### 2.2 Get count of users who have more than 1 tweet in the data (2 points each)

In [0]:
# using RDD
user_id_index = df_twitter.columns.index('user_id_str')
user_id_rdd = rdd_twitter.map(lambda row: row[user_id_index])
user_count_rdd = user_id_rdd.map(lambda user_id: (user_id, 1)).reduceByKey(lambda x, y: x + y)
user_count_rdd.filter(lambda x: x[1] > 1).count()

Out[12]: 1016

In [0]:
# using DataFrame
df_twitter.groupBy('user_id_str').count().filter(col('count') > 1).count()

Out[13]: 1016

In [0]:
# using spark.sql and the temporary table
query = """
    SELECT user_id_str, COUNT(*) AS count
    FROM twitter
    GROUP BY user_id_str
    HAVING COUNT(*) > 1
"""
spark.sql(query).count()

Out[14]: 1016

#### 2.3 Get total number unique extracted_source (1 point each)

In [0]:
# using RDD
extracted_source_index = df_twitter.columns.index('extracted_source')
extracted_source_rdd = rdd_twitter.map(lambda row: row[extracted_source_index])
extracted_source_rdd.distinct().count()

Out[15]: 133

In [0]:
# using DataFrame
df_twitter.select(col('extracted_source')).distinct().count()

Out[16]: 133

In [0]:
# using spark.sql and the temporary table
query = f"select distinct {'extracted_source'} from twitter"
spark.sql(query).count()

Out[17]: 133

#### 2.4 Get top 5 most used extracted_source

In [0]:
# using RDD (5 points)
extracted_source_index = df_twitter.columns.index('extracted_source')
extracted_source_rdd = rdd_twitter.map(lambda row: row[extracted_source_index])
extracted_source_count_rdd = extracted_source_rdd.map(lambda extracted_source: (extracted_source, 1)).reduceByKey(lambda x, y: x + y)
extracted_source_count_rdd \
    .sortBy(lambda x: x[1], ascending=False) \
    .take(5)


Out[18]: [('Twitter for Android', 6262),
 ('Twitter for iPhone', 5698),
 ('Twitter Web App', 2878),
 ('Twitter for iPad', 428),
 ('Twitter Web Client', 136)]

In [0]:
# using DataFrame (2 points)
df_twitter.groupBy('extracted_source').count().orderBy(col('count').desc()).show(5)

+-------------------+-----+
|   extracted_source|count|
+-------------------+-----+
|Twitter for Android| 6262|
| Twitter for iPhone| 5698|
|    Twitter Web App| 2878|
|   Twitter for iPad|  428|
| Twitter Web Client|  136|
+-------------------+-----+
only showing top 5 rows



In [0]:
# using spark.sql and the temporary table (2 points)
query = """
    SELECT extracted_source, COUNT(*) AS count
    FROM twitter
    GROUP BY extracted_source
    ORDER BY count DESC
    limit 5
"""
spark.sql(query).show()

+-------------------+-----+
|   extracted_source|count|
+-------------------+-----+
|Twitter for Android| 6262|
| Twitter for iPhone| 5698|
|    Twitter Web App| 2878|
|   Twitter for iPad|  428|
| Twitter Web Client|  136|
+-------------------+-----+



#### 2.5 Get count of distinct hastags used (5 points each) 

In [0]:
# using RDD
# need to flatmap the result since hastags is a nested column
hashtags_index = df_twitter.columns.index('hashtags')
hashtags_rdd = rdd_twitter.flatMap(lambda row: row[hashtags_index])
hashtags_rdd.filter(lambda hashtags: hashtags is not None and len(hashtags) > 0).distinct().count()

Out[21]: 1215

In [0]:
# using DataFrame
from pyspark.sql.functions import explode

hashtags = df_twitter.select(explode(col('hashtags')).alias('hashtag'))
hashtags.filter(col('hashtag') != "").select(col('hashtag')).distinct().count()

Out[22]: 1215

In [0]:
# using spark.sql and the temporary table

spark.sql("""
    CREATE OR REPLACE TEMP VIEW hashtags_explode AS
    SELECT explode(hashtags) AS hashtags
    FROM twitter
""")

hashtags_unique_count = spark.sql("""
    SELECT COUNT(DISTINCT hashtags) AS hashtags_unique_count
    FROM hashtags_explode
    WHERE hashtags != ''
""").collect()[0][0]

print(hashtags_unique_count)

1215


#### 2.6 Get top 5 hashtags

In [0]:
# using RDD (4 points)
hashtags_rdd.map(lambda hashtags: (hashtags, 1)).reduceByKey(lambda x, y: x + y).sortBy(lambda x: x[1], ascending=False).take(5)

Out[24]: [('طبق_القدرات_للثانويه_ياريس', 385),
 ('Corona', 319),
 ('OilPrice', 251),
 ('COVID19', 125),
 ('corona', 123)]

In [0]:
# using DataFrame (2 points)
hashtags.groupBy('hashtag').count().orderBy(col('count').desc()).show(5)

+--------------------+-----+
|             hashtag|count|
+--------------------+-----+
|طبق_القدرات_للثان...|  385|
|              Corona|  319|
|            OilPrice|  251|
|             COVID19|  125|
|              corona|  123|
+--------------------+-----+
only showing top 5 rows



In [0]:
# using spark.sql and the temporary table (2 points)
top_hashtags = spark.sql("""
    SELECT hashtags, COUNT(*) AS count
    FROM hashtags_explode
    WHERE hashtags != ''
    GROUP BY hashtags
    ORDER BY count DESC
    LIMIT 5
""").show()

+--------------------+-----+
|            hashtags|count|
+--------------------+-----+
|طبق_القدرات_للثان...|  385|
|              Corona|  319|
|            OilPrice|  251|
|             COVID19|  125|
|              corona|  123|
+--------------------+-----+



#### 2.7 Get total number of tweets which are retweeted more than 100 times

In [0]:
# using RDD
retweet_count_index = df_twitter.columns.index('retweet_count')
rdd_twitter.filter(lambda row: row[retweet_count_index] > 100).count()

Out[27]: 15753

In [0]:
# using DataFrame
df_twitter.filter(col('retweet_count') > 100).count()

Out[28]: 15753

In [0]:
# using spark.sql and the temporary table
query = """
  SELECT * 
  FROM twitter
  WHERE retweet_count > 100
"""
spark.sql(query).count()

Out[29]: 15753

#### 2.8 Get top 3 most retweeted tweets per country (8 points each)

In [0]:
# using RDD
location_index = df_twitter.columns.index('location')
id_index = df_twitter.columns.index('id')
retweet_count_index = df_twitter.columns.index('retweet_count')
location_rdd = rdd_twitter.map(lambda row: (row[location_index], (row[id_index], row[retweet_count_index]))).groupByKey()
location_sorted_descending = location_rdd.mapValues(lambda tweet: sorted(tweet, key=lambda x: x[1], reverse=True)[:3])
location_sorted_descending.take(5)

Out[30]: [('India',
  [('1252332114948874240', 9988),
   ('1252252336921206787', 9976),
   ('1252254519116746754', 9973)]),
 ('Pakistan',
  [('1252334264248606720', 9988),
   ('1252251912084357121', 9975),
   ('1252252126694309888', 9973)]),
 ('USA',
  [('1252331777806524416', 9994),
   ('1252254239805579264', 9987),
   ('1252335464750735362', 9982)]),
 ('Italy',
  [('1252252106750377996', 9994),
   ('1252251206027816960', 9984),
   ('1252330500670337024', 9971)]),
 ('Canada',
  [('1252335430323888128', 9997),
   ('1252254877939531776', 9992),
   ('1252252082825986051', 9987)])]

In [0]:
# using DataFrame
from pyspark.sql.functions import col, desc, row_number
from pyspark.sql.window import Window

# window to partition and order by retweet_count in each location
window = Window.partitionBy('location').orderBy(desc('retweet_count'))
# row number column to rank tweets in each location by retweet_count
ranked = df_twitter.withColumn('rank', row_number().over(window))
# filter for top 3 most retweeted tweets per country
top_tweets = ranked.filter(col('rank') <= 3)
# show top 3 most retweeted tweets per country
top_tweets.select(col('location'), col('id'), col('retweet_count')).show(truncate=False)

+--------+-------------------+-------------+
|location|id                 |retweet_count|
+--------+-------------------+-------------+
|Canada  |1252335430323888128|9997         |
|Canada  |1252254877939531776|9992         |
|Canada  |1252252082825986051|9987         |
|Chile   |1252253612140490759|9988         |
|Chile   |1252334891951427585|9984         |
|Chile   |1252253710182481920|9978         |
|China   |1252335780707684352|9998         |
|China   |1252253596516843520|9993         |
|China   |1252255562525560832|9984         |
|Germany |1252334028092399622|9999         |
|Germany |1252330902325248000|9997         |
|Germany |1252252295510855682|9990         |
|India   |1252332114948874240|9988         |
|India   |1252252336921206787|9976         |
|India   |1252254519116746754|9973         |
|Italy   |1252252106750377996|9994         |
|Italy   |1252251206027816960|9984         |
|Italy   |1252330500670337024|9971         |
|Mexico  |1252253843145912320|9998         |
|Mexico  |

In [0]:
# using spark.sql and the temporary table
query = """
    SELECT *
    FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY location ORDER BY retweet_count DESC) AS rank
        FROM twitter
    ) ranked
    WHERE rank <= 3
"""
spark.sql(query).select(col('location'), col('id'), col('retweet_count')).show()

+--------+-------------------+-------------+
|location|                 id|retweet_count|
+--------+-------------------+-------------+
|  Canada|1252335430323888128|         9997|
|  Canada|1252254877939531776|         9992|
|  Canada|1252252082825986051|         9987|
|   Chile|1252253612140490759|         9988|
|   Chile|1252334891951427585|         9984|
|   Chile|1252253710182481920|         9978|
|   China|1252335780707684352|         9998|
|   China|1252253596516843520|         9993|
|   China|1252255562525560832|         9984|
| Germany|1252334028092399622|         9999|
| Germany|1252330902325248000|         9997|
| Germany|1252252295510855682|         9990|
|   India|1252332114948874240|         9988|
|   India|1252252336921206787|         9976|
|   India|1252254519116746754|         9973|
|   Italy|1252252106750377996|         9994|
|   Italy|1252251206027816960|         9984|
|   Italy|1252330500670337024|         9971|
|  Mexico|1252253843145912320|         9998|
|  Mexico|

#### 2.9 Total number of tweets per country

In [0]:
# using RDD (3 points)
rdd_twitter.map(lambda row: row[location_index]).map(lambda location: (location, 1)).reduceByKey(lambda x, y: x + y).collect()

Out[33]: [('India', 1480),
 ('Pakistan', 1470),
 ('USA', 1539),
 ('Italy', 1422),
 ('Canada', 1441),
 ('China', 1457),
 ('Chile', 1410),
 ('UK', 1376),
 ('Mexico', 1409),
 ('Spain', 1464),
 ('Germany', 1426)]

In [0]:
# using DataFrame (2 points)
df_twitter.groupby(col('location')).count().show()

+--------+-----+
|location|count|
+--------+-----+
| Germany| 1426|
|   India| 1480|
|   China| 1457|
|   Chile| 1410|
|   Italy| 1422|
|   Spain| 1464|
|     USA| 1539|
|  Mexico| 1409|
|      UK| 1376|
|  Canada| 1441|
|Pakistan| 1470|
+--------+-----+



In [0]:
# using spark.sql and the temporary table (1 point)
query = """
    SELECT location AS location, COUNT(*) AS count
    FROM twitter
    WHERE location IS NOT NULL
    GROUP BY location
    ORDER BY count DESC
"""
spark.sql(query).show()

+--------+-----+
|location|count|
+--------+-----+
|     USA| 1539|
|   India| 1480|
|Pakistan| 1470|
|   Spain| 1464|
|   China| 1457|
|  Canada| 1441|
| Germany| 1426|
|   Italy| 1422|
|   Chile| 1410|
|  Mexico| 1409|
|      UK| 1376|
+--------+-----+



## 3. Save Data

#### 3.1 Save the data such that you have seperate folder per country (2 points)

In [0]:
# using DataFrame
from pyspark.sql.functions import concat_ws

output_path = "/FileStore/tables/" 
locations = df_twitter.select('location').distinct().collect()

for location in locations:
    country = location['location']
    df_country = df_twitter.filter(col('location') == country)
    # convert hashtags array to a comma-separated string
    df_country = df_country.withColumn('hashtags', concat_ws(',', col('hashtags')))
    output_directory = f"{output_path}/{country.replace(' ', '_')}"  # replace spaces with underscores for folder names
    df_country.write.mode('overwrite').csv(output_directory)

#### 3.2 Save the data as parquet files (1 point)

In [0]:
# using DataFrame
output_directory = "/FileStore/tables/" 
df_twitter.write.partitionBy('location').mode('overwrite').parquet(output_directory)