                                                                       TEMUULEN Bulgan - 2022427

### CCT College Dublin Continuous Assessment No.2
# AN ANALYSIS OF INDIAN FARMERS' PROTEST TWEETS.

#### Brief Introduction of the project:

1. For my second continuous assessment, I choose CSV format data of Indian Farmer's Protest Tweets. This file contains over 1 million English language tweets tweeted between November 1st, 2020 and november 21st, 2021 with the hashtag <#FarmersProtest>. It is downloaded from the Kaggle website with the CCO:Public Domain license.
(https://kaggle.com/datasets/prathamsharma123/farmers-protest-tweets-dataset-csv)

2. I divided my project into 3 primary sections (Every step in data processing and analysis is fully discussed on each subsection of these primary sections.):
    1. big data storage and processing
    2. sentiment analysis 
    3. EDA and forecast
    4. databases and comparison

3. I used Git for daily code tracking and GitHub for archiving, monitoring and sharing. To view the whole project on GitHub, click on the follwig link 

#### Libraries and modules used for this project:

In [None]:
import pandas as pd
import os
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
import spacy

## 1. Big Data Storage and Processing

For my second continous project I choose Kaggle.com's dataset called 'Farmers Protest Tweets'. It was collected by the hashtag #FarmersProtest including CCO:Pulic Domain license which means that this dataset allows copy, modify, distribute and perform the work, even for commercial purpososes, all without asking permission. All the tweets in it in English Language and collected from Twitter.com from November 1st, 2020 to November 21st, 2021. The main subject matter of these tweets are about the biggest anti-farm laws protest which took place at the borders of the Indian natioanl capital of New Delhi, organized by coalition of over 40 farmer from across the country.The dataset extraction process was done by Pratham Sharma, kaggle datasets expert who used Twitter API and  snscrape Python library for collection. The tweets data consist of two separate CSVL files with size the size of 1.7 GB and 81.2 MB.

In [None]:
# Checking the file size of the file.
file_size = os.path.getsize('/home/hduser/Desktop/ca/tweets.csv')/(1024*1024*1024)
print(f'The size of the tweets file is: {file_size:.2f} GB.')
file_size = os.path.getsize('/home/hduser/Desktop/ca/users.csv')/(1024*1024)
print(f'The size of the users file is: {file_size:.2f} MB.')

### 1.1. *Preprocessing in Python.*

Before to start processing the DataSet in distributed file system platforms I decided to examine each CSV file using Pandas on Jupyter Notebook.

#### *tweets.csv*

In [None]:
# Creating a DataFrame.
df_tweets = pd.read_csv('/home/hduser/Desktop/ca/tweets.csv')

# Checking the DataFrame.
df_tweets.head(2)

In [None]:
# Printing information about the DataFrame
df_tweets.info()

#### *users.csv*

In [None]:
# Creating a DataFrame.
df_users = pd.read_csv('/home/hduser/Desktop/ca/users.csv')

# Checking the DataFrame.
df_users.head(2)

In [None]:
# Printing information about the DataFrame
df_users.info()

As we can see from abowe first two code sells the DataSet with tweets consisted of 13 distinct columns of information where some of which is not very importand for further processing. The columns such as 'tweetUrl', 'tweetId', 'source', 'media', 'retweetedTweet', 'quotedTweet', 'mentionedUsers', and 'userId' doesn't include importand information for analysis.

On the other hand last two code sells show that the DataSet with users information consisted of 18 distincst columns of information from which I can only use only column with 'display name. So further I'm going to remove unnessecary columns from each DataFrame and merge them as one.

In [None]:
# Deleting columns from the DataFrame of tweets.
df_tweets = df_tweets.drop(labels=['tweetUrl', 'tweetId', 'source', 'media', 'retweetedTweet', 'quotedTweet', \
                                   'mentionedUsers'], axis=1)

# Checking the changes.
df_tweets.head(2)

In [None]:
# Deleting columns from the DataFrame of tweets.
df_users = df_users[['displayname', 'userId']]

# Checking the changes.
df_users.head(2)

In [None]:
# Performing left merging on two DataFrames.
df_final = pd.merge(df_tweets, df_users, on='userId', how='left')

# Deleting the column 'userId'.
df_final = df_final.drop(labels=['userId'], axis=1)

# Checking the changes.
df_final.head(10)

I merged two DataFrames and removed unessary columns. Now I'm going to save the it as a CSV file on my Ubuntu VM. 

In [None]:
# Saving the DataFrame as 'new_tweets.csv' on my VM with the utf-8 Encoding.
df_final.to_csv('/home/hduser/Desktop/ca/new_tweets.csv', encoding='utf-8', index=False)

In [None]:
# Checking the file size.
new_file_size = os.path.getsize('/home/hduser/Desktop/ca/new_tweets.csv')/(1024*1024)
print(f'The size of new tweet file is: {new_file_size:.2f} MB.')

print(df_final.shape())

The file size is reduced from 1.7GB to 670.2MB and it still have importand information of the tweets for further processing.

### 1.2 *Data cleaning in Pyspark.*

I desided to store my new created twitter dataset file in HDFS and before doing EDA and sentiment analysis I'll further do more thorough preprocessing and cleaning done using Apache Spark tools.

In [None]:
# Starting a new SparkSession for data import from HDFS.
spark = SparkSession.builder \
        .appName('Sentiment Analysis') \
        .getOrCreate()

In [None]:
# Reading the file.
df_spark = spark.read.option('header', 'true') \
                        .option('multiline', 'true') \
                        .option('quote', "\"") \
                        .option('escape', "\"") \
                        .csv('/ca2/new_tweets.csv')

Using 'SparkSession.builder' I created a new SparkSession for interacting with Spark functionality. Then I created a new PySpark DataFrame by importing the 'new_tweets.csv' file which is stored in HDFS's 'ca2' directory. 

#### *columns*

First thing I would like to do is check all the columns and its datatype. 

In [None]:
# Checking the contents of the DataFrame.
df_spark.show(10)

In [None]:
# Checking the columns of the DataFrame
x = 0
columns = ''
while x < len(df_spark.columns):
    columns += df_spark.columns[x] + ', '
    x += 1
print('Columns of the DataFrame are:', columns)

In [None]:
# Checking column dtypes.
df_spark.printSchema()

I examined the DataFrame with its columns and found that because it was imported from the CSV file all 7 columns contained the string dtype values. I'll change the names of the columns to make it easy to use, and dtypes of the column values.

In [None]:
# Renaming the columns.
df_spark = df_spark.select(col('date').alias('date'),
                           col('displayname').alias('user'),
                           col('renderedContent').alias('tweet'),
                           col('replyCount').alias('replied'),
                           col('retweetCount').alias('retweeted'),
                           col('likeCount').alias('liked'),
                           col('quoteCount').alias('quoted'))

In [None]:
# Changing dtypes.
df_spark = df_spark.withColumn('date',to_timestamp(col('date').cast(TimestampType())))
df_spark = df_spark.withColumn('tweet',col('tweet').cast(StringType()))
df_spark = df_spark.withColumn('user',col('user').cast(StringType()))
df_spark = df_spark.withColumn('replied',col('replied').cast('integer'))
df_spark = df_spark.withColumn('retweeted',col('retweeted').cast('integer'))
df_spark = df_spark.withColumn('liked',col('liked').cast('integer'))
df_spark = df_spark.withColumn('quoted',col('quoted').cast('integer'))

In [None]:
# Checking changes.
df_spark.show(2)

In [None]:
# Checking changes.
df_spark.printSchema()

Now all the column values has the appropirate data format as well as the names of the columns are short and clear.

#### *null values and duplicates*

Second thing I would like to do for cleaning is removing the duplicates and null values.

In [None]:
# Examining the shape of the DataFrame
print('The DataFrame consists of', len(df_spark.columns), 'columns and', df_spark.count(), 'rows.')

The Dataframe consists of 2.953.850 rows and 7 colums. And as can be seen from the 'show()' method above it is obvious that there are lots of duplicates on it, so I would like to remove them. 

Moreover, there are some NaN values in the new merged column 'displayname'. It seems like some users that tweeted on the twitter doesn't have a Display Name. It might indicate that they no longer user of the social media plaform, or even might have blocked becuse of trolling or whatever reason, the account might have been deleted. So to prevent the bias and also to not waste my memory for extra processing I have decided to drop the rows which doesn't contain the display name or contain duplicate tweet entries.

After these changes I'll order table contents by the date.

In [None]:
# Dropping the duplicates and rows with null values then ordering the rows.
df_spark = df_spark.drop_duplicates()
df_spark = df_spark.filter(col('displayname').isNotNull())
df_spark = df_spark.orderBy('date')

In [None]:
# Checking the changes.
df_spark.show(10)

In [None]:
# Checking the changes.
print('The DataFrame consists of', len(df_spark.columns), 'columns and', df_spark.count(), 'rows.')

In [None]:
# Checking null values on entire DataFrame.
df_spark.filter(col('date').isNull() | col('user').isNull() | col('tweet').isNull() | col('replied').isNull() | \
                col('retweeted').isNull() | col('liked').isNull() | col('quoted').isNull()).show()

Above executed cells show that now the DataFrame shape is 1.066.380x7. There are no dublicates and no null values, it is now clean and clear. However since I'll do the sentimental analysis for this project I would like to clean the entries of the column 'tweet'.

#### *the column 'tweet'*

Firstly I will remove tags, hashtags, emails, links from the tweets because I would like to focus on the text itself, rather than considering the additional context. They extra contexts are not  directly contribute to the sentiment of the tweet.

In [None]:
# Removing tags, hashtags, emails, and website links from the values of the column 'tweet'.
df_spark = df_spark.withColumn('tweet', regexp_replace('tweet', r'@\w+|#\S+|\S+@\S+|http\S+|www\S+|\S+/\S+', ''))

In [None]:
# Checking changes.
df_spark.select('tweet').orderBy('tweet', ascending=False).limit(10).show(10, False)

Emails, links, tags, hashtags from The 'tweet' column values were successfully removed, however, from the above cell I can see that the  column 'tweet' still missing some cleaning. Further, I will remove leading and traling whitespaces, '&amp' character referencing for an ampersand, punctuation marks and non-English texts from the string value of the column. Then, I'll replace two or more continous whitespaces with a single whitespace, and also, I'll replace the new line and tab with ' '. And lastly I'll lowercase the entire string for each column value.

In [None]:
# Extra cleaning for the 'tweet' column values
df_spark = df_spark.withColumn('tweet', trim(df_spark.tweet))
df_spark = df_spark.withColumn('tweet', regexp_replace('tweet', '&amp', ' '))
df_spark = df_spark.withColumn('tweet', regexp_replace('tweet', '[\|,.;:\?!_+-]', ' '))
df_spark = df_spark.withColumn('tweet', regexp_replace('tweet', r'\s{2,}', ' '))
df_spark = df_spark.withColumn('tweet', regexp_replace('tweet', r'\n|\t', ' '))
df_spark = df_spark.withColumn('tweet', regexp_replace('tweet', "[^a-zA-Z0-9!@#$%^&*()_+\-={}\[\]|\\;:'\",.<>/?~` ]", ''))
df_spark = df_spark.withColumn('tweet', lower(df_spark.tweet))

In [None]:
# Checking changes.
df_spark.select('tweet').show(10, False)

Ok, the cleaning is done and it looks fine. I'll safe this DataFrame to HDFS as CSV file. 

In [None]:
# Saving the DataFrame to HDFS.
df_spark.write.format('csv').save('hdfs://localhost:9000/ca2/tweets')

## 2. Sentiment Analysis

Since I'm using tweet DataSet about the protest it is obvious taht the most tweets would have negative tone of writing. And instead of focusing solely on positive or negative sentiments I decided to use lexicon-based emotion analysis approach for my Sentiment Analysis which is more nuanced analysis approach at the sense level. I choose NRC-Emotion-Lexicon-Senselevel-v0.92 which associates emotions with specific word senses of or meaning. It includes emotions such as anger, anticipation, disgust, fear, joy, sadness, surprice and trust.

### 2.1 *Lexicon File*

I downloaded lexicon file from the website mentioned on the reference part of the report and uploaded it into HDFS. And now using PySpark's read.csv function I'm going to create the PySpark DataFrame of the lexicon file for emotions sentiment.

In [None]:
# Importing csv file from HDFS.
df_lexicon = spark.read.csv('/ca2/NRC-Emotion-Lexicon-Senselevel-v0.92.txt', sep='\t', header=False, inferSchema=True)

# Checking the ne created DataFrame.
df_lexicon.show(3)

In [None]:
# Examining the shape of the DataFrame
print('The DataFrame consists of', len(df_lexicon.columns), 'columns and', df_lexicon.count(), 'rows.')

I have now lexicon DataFrame wich consists of 3 columns and 241590 rows. Next step is I'm going to crate a separate DataFrame with column of tweets since the 'tweet' is the only column that I'm gonna use for the sentiment Analysis. However I'm not using this column separately alone because of the PySpark's parallel processing and data partitioning the row order will change and I won't be able to do the further forecasting in the result. To keep the order for the row data i will use column 'date'.

In [None]:
# Creating a new DataFrame for sentiment analysis.
df_for_sentiment = df_spark.select('date', 'tweet')

# Checking the DataFrame.
df_for_sentiment.show(2)

In [None]:
# Examining the shape of the DataFrame.
print('The DataFrame consists of', len(df_for_sentiment.columns), 'columns and', df_for_sentiment.count(), 'rows.')

### 2.2 *Tokenization*

I've separated and created a new DataFrame called 'df_for_sentiment' it includes columns 'date' and 'tweet'. Next step is tokenizing, using the default 'Tokenizer' class from the 'pyspark.ml.feature' module, which will split text into individual tokens based on default delimiter 'whitespace'. 

In [None]:
# Splitting tweet strings into tokenized text.
tokenizer = Tokenizer(inputCol='tweet', outputCol='token')
df_for_sentiment = tokenizer.transform(df_for_sentiment)

In [None]:
# Checking the changes.
df_for_sentiment.show(2)

### *2.3 Stop word removal*


I have now a new column 'token' with tokenized text on it, and when I look closer to the text in it there are lots of stop words that are not very inportand for further processing. Since they do not carry sifnificant meaning or contribute to the sentiment of a text, by removing them I will reduce noise and focus on the more meaningful words.

In [None]:
# Removing stop words from token text.
stopwords = StopWordsRemover.loadDefaultStopWords('english')
remover = StopWordsRemover(inputCol='token', outputCol='clean_token', stopWords=stopwords)
df_for_sentiment = remover.transform(df_for_sentiment)

In [None]:
# Checking the changes.
df_for_sentiment.select('clean_token').show(5, False)

In [None]:
# Checking removed stop words list for PySpark.ML.
print(stopwords)

### *2.4 Lemmatization*

As it can bee seen from above subsection I have removed default stop words from the token text. And now I would like to do the lemmatizzation to reduce words to their base or root form. It will help me normalize words and reduce the dimentionality of each row.

I tried lots of different libraries which has Lemmatization tool in it however all of them didn't work with PySpark. There were some errors in execution with Javascript, some had library import error, some libraries stopped development of the tool and etc. So that I decided to convert my PySpark DataFrame to Pandas DataFrame and use NLTK for lemmatization.

In [None]:
# Checking the DataFrame
df_for_sentiment.show(2)

In [None]:
# Creating new DataFrame for Pandas
df_for_pandas = df_for_sentiment.select('date', 'clean_token')

# Converting the values of the 'date' column to strings.
df_for_pandas = df_for_pandas.withColumn('date_string', col('date').cast('string'))

# Using 'clean_token' and 'date_string' for Pandas.
df_for_pandas = df_for_pandas.select('clean_token', 'date_string')

# Conversion from Pyspark to Pandas
df_pandas = df_for_pandas.toPandas()

# Checking the changes
df_pandas.head()

I'm trying to keep the column with dates together with the column of tweets because if i process tweets separatily it will loose the order that I'm trying to keep for further processing. Now we have a Pandas DataFrame and I'll do lemmatazation here then I'll convert it back again to Pyspark DataFrame, since I choose to do the sentiment analysis in PySpark.

### 2.1 *Loading the lexicon*

## 3. EDA

Since the file size of new tweets dataset is not too large and Pandas provides more user-friendly and interactive environment for Exploratory Data Analysis, I decided to do the EDA on Pandas DataFrame.

In [None]:
# Converting my PyCharm DataFrame to Pandas DataFrame.
df = df_spark.toPandas()

In [None]:
# Checking the size of dataset in terms of memory usage.
memory_usage = df.memory_usage(deep=True).sum()/(1024*1024)
print(f'The Pandas DataFrame size is: {memory_usage:.2f} MB.')

In [None]:
df.info()

The Dataset was succefully converted to Pandas and the size of memory usage is 306.74. Everythings seems okay for analysis, only thing do to is change the dtype of the column 'date' to 

In [None]:
df.head(3)

In [None]:
df.describe()

In [None]:
df_spark.groupBy('user').count().sort('count', ascending=False).show()

In [None]:
df_spark.collect()[1][1]

In [None]:
print(type(df_spark.count()))

In [None]:
spark.sql('select * from tweet').show(4)

In [None]:
spark.stop()