# Tweets during US elections 2020 
### Data Engineering Capstone Project

#### Project Summary
In this project, I try to analyse the tweets during the US elections of 2020 which had Trump and Biden in their hashtags. 


#### Project Scope
I try to figure out the tweet timings, and get the user details to understand which states have more tweets for each Presidential candidate. And also, bring in the population of each state to understand more about the ratio of people tweeting from each state.
Data sets (Twitter data with hashtags) are gathered from Kaggle and the population data set from dataworld. 

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [32]:
# Import libraries
import pandas as pd
import time
from datetime import datetime
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, LongType, DoubleType, FloatType
from pyspark.sql.functions import lit, udf, col, to_timestamp, to_date, monotonically_increasing_id

In [10]:
#Creating Spark Session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

## Exploring And Assessing Data:
I’m using Spark as my big data framework and its libraries to do my data extraction and transformation. 
Exploration and Assessment of data includes various steps from fetching the data properly, assigning proper data types for each column, dropping the duplicates, removing the null values and grouping the data together for the creation of a data model.


## Extracting the Data from the data sources and creating a spark data frame to load and transform the data
Steps Followed:
1. Created a schema to be used for extraction of the data from the data sets
2. Loaded the dataset with trump hashtags
3. Loaded the dataset with Biden hashtags
4. Cleaned the datasets and combined them to form a single dataset with all the tweets
5. Loaded the population dataset

In [15]:
my_schema= StructType([ StructField("created_at", StringType(), True)\
                       ,StructField("tweet_id", DoubleType(), False)\
                       ,StructField("tweet", StringType(), True)\
                       ,StructField("likes", FloatType(), True)\
                       ,StructField("retweet_count", FloatType(), True)\
                       ,StructField("source", StringType(), True)\
                       ,StructField("user_id", DoubleType(), True)\
                       ,StructField("user_name", StringType(), True)\
                       ,StructField("user_screen_name", StringType(), True)\
                       ,StructField("user_description", StringType(), True)\
                       ,StructField("user_join_date", StringType(), True)\
                       ,StructField("user_followers_count", DoubleType(), True)\
                       ,StructField("user_location", StringType(), True)\
                       ,StructField("lat", FloatType(), True)\
                       ,StructField("long", FloatType(), True)\
                       ,StructField("city", StringType(), True)\
                       ,StructField("country", StringType(), True)\
                       ,StructField("continent", StringType(), True)\
                       ,StructField("state", StringType(), True)\
                       ,StructField("state_code", StringType(), True)\
                      ])

In [16]:
#Extracting trump data
file_path_trump= 'Twitter_Data/hashtag_donaldtrump.csv'
df_trump= spark.read.format("csv").option("header", True).option('delimiter', ",").schema(my_schema).option('multiLine', True).load(file_path_trump)

In [17]:
#Extracting biden data
file_path_biden= 'Twitter_Data/hashtag_joebiden.csv'
df_biden= spark.read.format("csv").option("header", True).option('delimiter', ",").schema(my_schema).option('multiLine', True).load(file_path_biden)

In [18]:
#Adding a hashtag column for both the datasets
df_trump= df_trump.withColumn("hashtag_type",lit("Trump"))
df_trump.printSchema()
df_biden= df_biden.withColumn("hashtag_type",lit("Biden"))
df_biden.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- tweet_id: double (nullable = true)
 |-- tweet: string (nullable = true)
 |-- likes: float (nullable = true)
 |-- retweet_count: float (nullable = true)
 |-- source: string (nullable = true)
 |-- user_id: double (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_join_date: string (nullable = true)
 |-- user_followers_count: double (nullable = true)
 |-- user_location: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- long: float (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- hashtag_type: string (nullable = false)

root
 |-- created_at: string (nullable = true)
 |-- tweet_id: double (nullable = true)
 |-- tweet: string (nullable = true)
 |-- 

In [19]:
#combining both the dataframes
df_tweets= df_biden.union(df_trump)

In [20]:
#creating a timestamp column from the existing timestamp as string
df_tweets= df_tweets.withColumn("tweet_staging_time", to_timestamp(col('created_at'), "yyyy-MM-dd HH:mm:ss"))

In [21]:
#Cleaning the data from Null values
df_tweets= df_tweets.na.drop(subset=['tweet_id','user_id','tweet_staging_time'])

In [22]:
df_tweets.printSchema()
df_tweets.count()

root
 |-- created_at: string (nullable = true)
 |-- tweet_id: double (nullable = true)
 |-- tweet: string (nullable = true)
 |-- likes: float (nullable = true)
 |-- retweet_count: float (nullable = true)
 |-- source: string (nullable = true)
 |-- user_id: double (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_join_date: string (nullable = true)
 |-- user_followers_count: double (nullable = true)
 |-- user_location: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- long: float (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- state: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- hashtag_type: string (nullable = false)
 |-- tweet_time: timestamp (nullable = true)



1682732

In [23]:
## Reading the json file with US state population data to be used later
spark_pop= spark.read.json('us_population.json')

# Defining a Data Model:

I’m trying to define a data model with 5 dimension tables and 1 fact table

1st Dimension table- User details: This table contains user details such as user_id, user_name, user_description, user_screen_name, user_followers_count.

2nd Dimension table- twitter_details: This table contains twitter information such as tweet_id, likes, retweets, tweet, tweet source.

3rd Dimension table- time_details: This table contains time details of the tweet such as tweet_time, hour, day, week, month, day of the week.

4th Dimension table- location_details: This table contains location details of the tweet such as location, latitude, longitude, state, country

5th Dimension table- population_details: This table contains population details of the US states such as  state_name, population 

Fact table- tweet details: This is the fact table which contains details such as user_id, tweet_id, tweet, hashtag, tweet_time, tweet_location, week_of the tweet.


In [24]:
#Creation of user table
df_users= df_tweets.select(["user_id","user_name","user_screen_name","user_description","user_followers_count"]).dropDuplicates()

In [25]:
df_users.printSchema()

root
 |-- user_id: double (nullable = true)
 |-- user_name: string (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- user_description: string (nullable = true)
 |-- user_followers_count: double (nullable = true)



In [26]:
#Creation of twitter details table
df_twitter= df_tweets.select(["tweet_id","tweet","likes","retweet_count","source"]).dropDuplicates()
df_twitter.printSchema()

root
 |-- tweet_id: double (nullable = true)
 |-- tweet: string (nullable = true)
 |-- likes: float (nullable = true)
 |-- retweet_count: float (nullable = true)
 |-- source: string (nullable = true)



In [27]:
#Creation of time table
df_time= df_tweets.select(["created_at"])
df_time.printSchema()

root
 |-- created_at: string (nullable = true)



In [28]:
#Creating a timestamp and making a final time table
df_time= df_time.withColumn("tweet_time", to_timestamp(col('created_at'), "yyyy-MM-dd HH:mm:ss"))
df_time= df_time.withColumn("Date",to_date(col('tweet_time')))

In [30]:
df_time= df_time.selectExpr(['tweet_time as tweet_time',
                            'hour(Date) as hour',
                            'dayofmonth(Date) as day',
                            'weekofyear(Date) as week',
                            'month(Date) as month',
                            'dayofweek(Date) as weekday',
                            ])
df_time.printSchema()

root
 |-- tweet_time: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- weekday: integer (nullable = true)



In [31]:
#Creating Location table 
df_location= df_tweets.select(['user_location','lat','long','country','state'])
df_location= df_location.na.drop(subset=['user_location','state'])
df_location.printSchema()

root
 |-- user_location: string (nullable = true)
 |-- lat: float (nullable = true)
 |-- long: float (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)



# Data Pipeline and Fact table Creation
After defining the data model above, I’ve used the staged data from the spark dataframe and loaded them into the corresponding tables to be used for further analysis.

The fact table is created from the extracted data with the model mentioned above

In [4]:
## Reading the json file with US state population data to be used later
spark_pop= spark.read.json('us_population.json')

In [5]:
spark_pop.printSchema()
spark_pop= spark_pop.withColumn("population",spark_pop['population'].cast('double'))
spark_pop.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- nyt_population: string (nullable = true)
 |-- population: string (nullable = true)
 |-- region: string (nullable = true)
 |-- subregion: string (nullable = true)
 |-- us_county_fips: string (nullable = true)
 |-- us_state_fips: string (nullable = true)

root
 |-- _corrupt_record: string (nullable = true)
 |-- nyt_population: string (nullable = true)
 |-- population: double (nullable = true)
 |-- region: string (nullable = true)
 |-- subregion: string (nullable = true)
 |-- us_county_fips: string (nullable = true)
 |-- us_state_fips: string (nullable = true)



#Grouping by region to calculate the population for each state and creating a new spark dataframe which contains only required details

In [8]:
population_df= spark_pop.groupBy('region').sum('population')

In [9]:
population_df= population_df.withColumn("Population",population_df["sum(population)"]).drop("sum(population)").na.drop(subset=["region"])
population_df.printSchema()
population_df.orderBy('region').limit(10).toPandas()

root
 |-- region: string (nullable = true)
 |-- Population: double (nullable = true)



Unnamed: 0,region,Population
0,Alabama,4864680.0
1,Alaska,738516.0
2,Arizona,6946685.0
3,Arkansas,2990671.0
4,California,39148760.0
5,Colorado,5531141.0
6,Connecticut,3581504.0
7,Delaware,949495.0
8,District of Columbia,684498.0
9,Florida,20598139.0


#Reading tweets containg Trump as the hashtag
Also cleaning the data with none values, dropping duplicates, creating a proper schema for importing data and storing it in trump dataframe