In [0]:
!pip install pyspark
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Data Preprocessing").getOrCreate()

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 317.0/317.0 MB 2.2 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting py4j==0.10.9.7
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 200.5/200.5 kB 26.4 MB/s eta 0:00:00
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py): started
  Building wheel for pyspark (setup.py): finished with status 'done'
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488496 sha256=e675eeb8cc9d2106d27c89ef9cc995d73f108b81afa5828891b91c605a62204b
  Stored in directory: /home/spark-298e7b4d-0412-46c2-9d59-f2/.cache/pip/wheels/22/f3/c0/49d7c304ee9ebfd58d8417a140fa93a306ea3d28d19e9af018
Successf

### READING THE DATASETS

In [0]:
trump_df = spark.read.format("csv").option("header", "true").option("inferSchema","true").option("quote", "\"").option("escape", "\"").option("multiline", True).load("/mnt/2024-team19/hashtag_donaldtrump.csv")

In [0]:
biden_df = spark.read.format("csv").option("header", "true").option("inferSchema","true").option("quote", "\"").option("escape", "\"").option("multiline", True).load("/mnt/2024-team19/hashtag_joebiden.csv")

### DATA PREPROCESSING

### 1. CREATING A NEW COLUMN HASHTAG FOR JOE BIDEN AND DONALD TRUMP

In [0]:
from pyspark.sql.functions import lit
# Creating a new column named 'hashtag' with the value 'Trump'
trump_df = trump_df.withColumn("hashtag", lit("Trump"))

# Creating a new column named 'hashtag' with the value 'Biden'
biden_df = biden_df.withColumn("hashtag", lit("Biden"))

### 2. MERGING BOTH THE DATAFRAMES

In [0]:
combined_df = trump_df.union(biden_df)

### 3. FILTERING THE DATA FOR UNITED STATES OF AMERICA

In [0]:
from pyspark.sql.functions import col
filtered_df = combined_df.filter((col("Country") == "United States of America") | (col("Country") == "United States"))

In [0]:
filtered_df.show(5)

+-------------------+--------------------+--------------------+-----+-------------+-------------------+--------------------+--------------------+----------------+--------------------+-------------------+--------------------+--------------------+------------------+-------------------+----------+--------------------+-------------+--------------------+----------+--------------------+-------+
|         created_at|            tweet_id|               tweet|likes|retweet_count|             source|             user_id|           user_name|user_screen_name|    user_description|     user_join_date|user_followers_count|       user_location|               lat|               long|      city|             country|    continent|               state|state_code|        collected_at|hashtag|
+-------------------+--------------------+--------------------+-----+-------------+-------------------+--------------------+--------------------+----------------+--------------------+-------------------+-------------

### 4. DROPPING UNNECESSARY COLUMNS

In [0]:
columns_to_drop =['created_at',
 'tweet_id',
 'retweet_count',
 'likes',
 'source',
 'user_id',
 'user_name',
 'user_screen_name',
 'user_description',
 'user_join_date',
 'user_followers_count',
 'user_location',
 'lat',
 'long',
 'city',
 'country',
 'continent',
 'state',
 'state_code',
 'collected_at',
 'sentiment_score',
 'sentiment_label',
 ]

# Drop the specified columns
filtered_df = filtered_df.drop(*columns_to_drop)

#### 5. CREATING A SENTIMENT COLUMN

In [0]:
pip install vaderSentiment

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting vaderSentiment
  Downloading vaderSentiment-3.3.2-py2.py3-none-any.whl (125 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 126.0/126.0 kB 3.5 MB/s eta 0:00:00
Installing collected packages: vaderSentiment
Successfully installed vaderSentiment-3.3.2
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

def calculate_sentiment(text):
    sid = SentimentIntensityAnalyzer()
    return sid.polarity_scores(text)['compound']

# Register the function as a UDF (User Defined Function)
sentiment_udf = udf(calculate_sentiment, DoubleType())

# Apply the UDF to create a new column with sentiment scores
filtered_df = filtered_df.withColumn("sentiment_score", sentiment_udf(filtered_df["tweet"]))


In [0]:
def score_to_sentiment_label(score):
    if score > 0:
        return "positive"
    else:
        return "negative"


In [0]:
from pyspark.sql.types import StringType

# Register the function as a UDF (User Defined Function)
sentiment_label_udf = udf(score_to_sentiment_label, StringType())

# Apply the UDF to create a new column with sentiment labels
filtered_df = filtered_df.withColumn("sentiment_label", sentiment_label_udf(filtered_df["sentiment_score"]))

In [0]:
filtered_df = filtered_df.drop('sentiment_score')

In [0]:
filtered_df.show(5)

+--------------------+-------+---------------+
|               tweet|hashtag|sentiment_label|
+--------------------+-------+---------------+
|#Elecciones2020 |...|  Trump|       negative|
|#Trump: As a stud...|  Trump|       positive|
|You get a tie! An...|  Trump|       negative|
|@CLady62 Her 15 m...|  Trump|       negative|
|@DeeviousDenise @...|  Trump|       negative|
+--------------------+-------+---------------+
only showing top 5 rows



### TEXT CLEANING

In [0]:
pip install emoji

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting emoji
  Downloading emoji-2.11.1-py2.py3-none-any.whl (433 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 433.8/433.8 kB 10.5 MB/s eta 0:00:00
Installing collected packages: emoji
Successfully installed emoji-2.11.1
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


### 1. Removing Hashtags, usernames and urls from Tweets

In [0]:
import re
import emoji

def get_emoji_regexp():
    # Sort emoji by length to make sure multi-character emojis are matched first
    emojis = sorted(emoji.EMOJI_DATA, key=len, reverse=True)
    pattern = '(' + '|'.join(re.escape(u) for u in emojis) + ')'
    return re.compile(pattern)

exp = get_emoji_regexp()

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

# Define a function to remove URLs, emojis, and special characters
def clean_text(text):
    # Remove URLs
    text = re.sub(r"http\S+|www.\S+", "", text)
    # Remove emojis
    text = re.sub(get_emoji_regexp(), "", text)
    # Remove special characters
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)
    return text

# Define a UDF for cleaning text
clean_text_udf = udf(clean_text, StringType())

# Apply cleaning to the "tweet" column
processed_data = filtered_df.withColumn("tweet_cleaned", clean_text_udf("tweet"))

In [0]:
processed_data.show(5)

+--------------------+-------+---------------+--------------------+
|               tweet|hashtag|sentiment_label|       tweet_cleaned|
+--------------------+-------+---------------+--------------------+
|#Elecciones2020 |...|  Trump|       negative|Elecciones2020  E...|
|#Trump: As a stud...|  Trump|       positive|Trump As a studen...|
|You get a tie! An...|  Trump|       negative|You get a tie And...|
|@CLady62 Her 15 m...|  Trump|       negative|CLady62 Her 15 mi...|
|@DeeviousDenise @...|  Trump|       negative|DeeviousDenise re...|
+--------------------+-------+---------------+--------------------+
only showing top 5 rows



### 2. TOKENIZATION

In [0]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF

regexTokenizer = RegexTokenizer(inputCol="tweet_cleaned", outputCol="tokenized", pattern="\\W")
tokenized = regexTokenizer.transform(processed_data)

In [0]:
from pyspark.ml.feature import StopWordsRemover


remover = StopWordsRemover(inputCol="tokenized", outputCol="cleaned")
sw_removed = remover.transform(tokenized)

In [0]:
hashTF = HashingTF(inputCol="cleaned", outputCol="features")
result_df = hashTF.transform(sw_removed)

In [0]:
columns_to_drop = ['tokenized', 'tweet', 'cleaned']
result_df = result_df.drop(*columns_to_drop)

In [0]:
result_df.show(truncate = False)

+-------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|hashtag|sentiment_label|tweet_cleaned                                                                                                                                                                                                                                                                                 |features                                                                    

### CREATING A CUSTOM FUNCTION TO ENCODE HASHTAG COLUMN

In [0]:
from pyspark.sql.functions import when

# Define the conditions for encoding
trump_condition = (col("hashtag") == "Trump")
biden_condition = (col("hashtag") == "Biden")

# Define the values to assign for each condition
trump_value = 1  # You can choose any numeric value
biden_value = 2  #

# Apply the conditions and assign values using when function
result_df = result_df.withColumn("hashtag_encoded",
                                    when(trump_condition, trump_value)
                                    .when(biden_condition, biden_value)
                                    .otherwise(0))  # Assign 0 for other cases

# Show the DataFrame with the new encoded column
result_df.show()


+-------+---------------+--------------------+--------------------+---------------+
|hashtag|sentiment_label|       tweet_cleaned|            features|hashtag_encoded|
+-------+---------------+--------------------+--------------------+---------------+
|  Trump|       negative|Elecciones2020  E...|(262144,[1303,304...|              1|
|  Trump|       positive|Trump As a studen...|(262144,[41129,55...|              1|
|  Trump|       negative|You get a tie And...|(262144,[46479,12...|              1|
|  Trump|       negative|CLady62 Her 15 mi...|(262144,[12716,34...|              1|
|  Trump|       negative|DeeviousDenise re...|(262144,[5381,680...|              1|
|  Trump|       positive|One of the single...|(262144,[1604,218...|              1|
|  Trump|       negative|In 2020 NYPost is...|(262144,[1004,151...|              1|
|  Trump|       positive|Trump PresidentTr...|(262144,[22772,36...|              1|
|  Trump|       negative|cnnbrk Trump owes...|(262144,[51621,86...|         

In [0]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import when

# Defining the conditions for encoding
negative_condition = (col("sentiment_label") == "negative")
positive_condition = (col("sentiment_label") == "positive")

# Applying the conditions and assign values using when function
result_df = result_df.withColumn("sentiment_encoded",
                                    when(negative_condition, 0)
                                    .when(positive_condition,1))

# Shoing the DataFrame with the new encoded column
result_df.show(5)

+-------+---------------+--------------------+--------------------+---------------+-----------------+
|hashtag|sentiment_label|       tweet_cleaned|            features|hashtag_encoded|sentiment_encoded|
+-------+---------------+--------------------+--------------------+---------------+-----------------+
|  Trump|       negative|Elecciones2020  E...|(262144,[1303,304...|              1|                0|
|  Trump|       positive|Trump As a studen...|(262144,[41129,55...|              1|                1|
|  Trump|       negative|You get a tie And...|(262144,[46479,12...|              1|                0|
|  Trump|       negative|CLady62 Her 15 mi...|(262144,[12716,34...|              1|                0|
|  Trump|       negative|DeeviousDenise re...|(262144,[5381,680...|              1|                0|
+-------+---------------+--------------------+--------------------+---------------+-----------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import col

# Filter the data into separate DataFrames based on the hashtag
trump_data = result_df.filter(col("hashtag") == "Trump")
biden_data = result_df.filter(col("hashtag") == "Biden")

# Sample 25,000 rows from each DataFrame
sampled_trump_data = trump_data.sample(False, 25000 / trump_data.count(), seed=42)
sampled_biden_data = biden_data.sample(False, 25000 / biden_data.count(), seed=42)

# Union the sampled subsets together to create the final DataFrame
final_subset_data = sampled_trump_data.union(sampled_biden_data)



### BUILDING THE MODEL

In [0]:
from pyspark.ml.feature import VectorAssembler
featureassembler = VectorAssembler(inputCols= ["features","hashtag_encoded"], outputCol= "Independent Features")

In [0]:
output = featureassembler.transform(final_subset_data)

In [0]:
output.show(5)

+-------+---------------+--------------------+--------------------+---------------+-----------------+--------------------+
|hashtag|sentiment_label|       tweet_cleaned|            features|hashtag_encoded|sentiment_encoded|Independent Features|
+-------+---------------+--------------------+--------------------+---------------+-----------------+--------------------+
|  Trump|       positive|Trump PresidentTr...|(262144,[22772,36...|              1|                1|(262145,[22772,36...|
|  Trump|       negative|ChrisDJackson New...|(262144,[19282,40...|              1|                0|(262145,[19282,40...|
|  Trump|       negative|JimJordan DevinNu...|(262144,[3924,659...|              1|                0|(262145,[3924,659...|
|  Trump|       negative|realDonaldTrump U...|(262144,[12072,18...|              1|                0|(262145,[12072,18...|
|  Trump|       positive|Trump has been ba...|(262144,[93197,95...|              1|                1|(262145,[93197,95...|
+-------+-------

In [0]:
finalized_data = output.select("Independent Features","sentiment_encoded")

In [0]:
finalized_data.show(5)

+--------------------+-----------------+
|Independent Features|sentiment_encoded|
+--------------------+-----------------+
|(262145,[22772,36...|                1|
|(262145,[19282,40...|                0|
|(262145,[3924,659...|                0|
|(262145,[12072,18...|                0|
|(262145,[93197,95...|                1|
+--------------------+-----------------+
only showing top 5 rows



In [0]:
# train test split
train_data, test_data = finalized_data.randomSplit([0.75,0.25])

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Assuming you have already assembled the features and prepared the data

# Define the logistic regression model
log_reg = LogisticRegression(featuresCol="Independent Features", labelCol="sentiment_encoded",maxIter=10,regParam=0.01)

# Fit the logistic regression model
model = log_reg.fit(train_data)

In [0]:
# Make predictions on the test data
predictions = model.transform(test_data)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="sentiment_encoded")
accuracy = evaluator.evaluate(predictions)

# Print the accuracy
print("Logistic Regression Accuracy:", accuracy)

Logistic Regression Accuracy: 0.8534435493071367


In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
# Assuming you have already assembled the features and prepared the data
 
# Define the Random Forest classifier
rf = RandomForestClassifier(featuresCol="Independent Features", labelCol="sentiment_encoded")
 
# Fit the Random Forest model
rf_model = rf.fit(train_data)

In [0]:
# Make predictions on the test data
rf_predictions = rf_model.transform(test_data)
 
# Evaluate the model using BinaryClassificationEvaluator
rf_evaluator = BinaryClassificationEvaluator(labelCol="sentiment_encoded")
 
# Area Under the ROC Curve (AUC) is the default metric for BinaryClassificationEvaluator
rf_accuracy = rf_evaluator.evaluate(rf_predictions)
 
# Print the accuracy
print("Random Forest Accuracy:", rf_accuracy)

Random Forest Accuracy: 0.612232472736824


In [0]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
 
# Define the Linear SVC classifier
svc = LinearSVC(featuresCol="Independent Features", labelCol="sentiment_encoded")
 
# Fit the SVC model
svc_model = svc.fit(train_data)
svc_predictions = svc_model.transform(test_data)
 
# Evaluate the model using BinaryClassificationEvaluator
svc_evaluator = BinaryClassificationEvaluator(labelCol="sentiment_encoded")
 
# Area Under the ROC Curve (AUC) is the default metric for BinaryClassificationEvaluator
svc_accuracy = svc_evaluator.evaluate(svc_predictions)
 
# Print the accuracy
print("SVC Accuracy:", svc_accuracy)

SVC Accuracy: 0.8748350682149089


### RESULTS:
1. Logistic Regression : 85%
2. Random Forest Classifier : 61%
3. Support Vector Machine Classifier : 87%

#### Support Vector Classifier proved to be the most efficient model for Twitter Sentiment Analysis.