>>> #### Big DAta Project: Analyzing Public Perception and Topics Surrounding Elon Musk on Twitter

> <strong> Introduction
>>
      In today's digital age, social media platforms serve as rich sources of data for understanding public sentiment and opinion on various topics
      This project focuses on analyzing Twitter data related to Elon Musk, a prominent figure in the tech industry, between November 21 to 22, 2022
      The objective is to gain insights into public sentiment towards Elon Musk during this period and extract meaningful topics from the tweets

> <strong> Objectives
>>
      . Collect Twitter data containing mentions of Elon Musk
      . Clean and preprocess the collected data
      . Analyze sentiment and derive topics from the tweets
      . Train and evaluate machine learning models to predict sentiment
      . Create a dashboard to visualize insights from the data

> <strong> Methodology
>>
      - Data Collection: twitter API is integrated with AWS Data Firehose and S3 bucket via EC2 instance
      - Data Cleaning: the dataset is securely transfered from S3 bucket to MS Azure Databricks for wrangling 
      - Sentiment Analysis: for the NLP, pretrained model from Hugging Face Transformer library is employed
      - Topic Modeling: applied topic modeling algorithms to identify prevalent themes and discussions
      - Feature transformation and Model Training: transformed the tweet to chunks suitable for training and validating ML algorithm
      - Pipeline: pipelines are created for the trained models, and the resulting datasets were exported to a private S3 bucket for further analysis 
      - Visualization and Interpretation: tables are generated from the stored dataset using AWS Athena. The curated data is then visualized QuickSight dashboard

> <strong> Outcomes
>>
      . Identification of dominant sentiment clusters and key themes emerging from the Twitter data
      . Insights into public sentiment towards Elon Musk were gained, revealing a majority of negative sentiments during the specified time period
      . Visualization of sentiment trends and topic distributions to facilitate understanding and interpretation of the data

> <strong> Conclusions
>>    
      This project successfully demonstrated the process of collecting, processing, and analyzing Twitter data related to Elon Musk
      By leveraging ML & visualization tools, valuable insights are obtained, highlighting the importance of understanding public sentiment in the digital age.

#### 1.  INITIALIZE SPARK SESSION AND INSTALL LIBRARIES
> <strong> steps
>>
      - install libraries to support Hugging Face Transformer
      - initialize spark session and spark contents
      - import required libraries

In [None]:
# install required libraries

%pip install torch
%pip install transformers
%pip install --upgrade numpy
%pip install bertopic
dbutils.library.restartPython()

In [None]:
# initialize spark session and contents

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('BigDataProject') \
        .getOrCreate()
print('Session created')
sc = spark.sparkContext

In [None]:
# load libraries

from pyspark.sql.types import *
from pyspark.sql import functions as F
from transformers import pipeline
import torch
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF, Tokenizer, NGram, ChiSqSelector, VectorAssembler, CountVectorizer
from pyspark.ml.classification import LogisticRegression, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from bertopic import BERTopic

#### 2.  MOUNT AND LOAD S3 BUCKET DIRECTORY
> <strong> steps and purpose
>>
      - define function for mounting s3 bucket which will be use as reference to s3 directories
      - mount the s3 directory where the scrapped data from firehose is stored
      - mount my s3 buckets which are used to store and retrieve data for cleaning, preprocessing, and analyzing

In [None]:
# function to mount s3 buckets

def mount_s3_bucket(access_key, secret_key, bucket_name, mount_folder):
  ACCESS_KEY_ID = access_key
  SECRET_ACCESS_KEY = secret_key
  ENCODED_SECRET_KEY = SECRET_ACCESS_KEY.replace("/", "%2F")
  print("Mounting", bucket_name)
  try:
    # unmount the data in case it was already mounted.
    dbutils.fs.unmount("/mnt/%s" % mount_folder)
  except:
    # if it fails to unmount it most likely wasn't mounted in the first place
    print ("Directory not unmounted: ", mount_folder)
  finally:
    # lastly, mount the bucket.
    dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY_ID, ENCODED_SECRET_KEY, bucket_name), "/mnt/%s" % mount_folder)
    #dbutils.fs.mount("s3a://"+ ACCESS_KEY_ID + ":" + ENCODED_SECRET_KEY + "@" + bucket_name, mount_folder)
    print("The bucket", bucket_name, "was mounted to", mount_folder, "\n")

In [None]:
# set AWS programmatic access credentials

ACCESS_KEY = dbutils.secrets.get(scope="aws_key", key="AccesskeyID")
SECRET_ACCESS_KEY = dbutils.secrets.get(scope="aws_key", key="Secretaccesskey")

In [None]:
# mount Amazon Data Firehose S3 Bbucket directory

mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'weclouddata/twitter', 'wcd_twt')

In [None]:
# mount my own AWS s3 bucket directories

mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'bigdatapro/Project/raw_dataset/', 'raw_data')
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'bigdatapro/Project/clean_dataset/', 'clean_data')
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'bigdatapro/Project/prediction_dataset/', 'pred_data')
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'bigdatapro/Project/plot_output/', 'figures')
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'bigdatapro/Project/topic_dataset1/', 'topic_data1')
mount_s3_bucket(ACCESS_KEY, SECRET_ACCESS_KEY, 'bigdatapro/Project/topic_dataset2/', 'topic_data2')

In [None]:
%fs ls /mnt/wcd_twt

In [None]:
%fs ls /mnt/wcd_twt/ElonMusk/

In [None]:
# define dataset schema for scrapped Elon Musk tweets

elonmusk_schema = StructType([
    StructField('id', StringType(), True),
    StructField('name', StringType(), True),
    StructField('username', StringType(), True),
    StructField('tweet', StringType(), True),
    StructField('followers', IntegerType(), True),
    StructField('location', StringType(), True),
    StructField('geo', StringType(), True),
    StructField('when', StringType(), True),
    StructField('others', StringType(), True)])

In [None]:
# read the dataset with given schema

wcd_file_path = '/mnt/wcd_twt/ElonMusk/*/*/*/*/*'
elonmusk_raw = (spark.read
       .option("header", "false")
       .option("delimiter", "\t")
       .schema(elonmusk_schema)
       .csv(wcd_file_path))
display(elonmusk_raw)

In [None]:
%fs ls /mnt/raw_data

In [None]:
# write the raw constructed df above to my s3 bucket

raw_filepath = "/mnt/raw_data"
(elonmusk_raw.write
  .option("delimiter", "\t")  
  .option("header", "false")
  .mode("overwrite")
  .csv(raw_filepath))

In [None]:
%fs ls /mnt/raw_data

#### 3. CLEAN THE DATASET
> <strong> cleaning steps
>>
        - select all columns needed for sentiment analysis
        - convert date_time column to timestamp
        - remove URLs from the 'tweet' column and create a new column 'tweet' with the cleaned text
        - replace any non-alphabetic characters in the 'tweet' column with a space and create a new column 'tweet' with the cleaned text
        - replace consecutive spaces with a single space in the 'tweet' column and create a new column 'tweet' with the cleaned text
        - convert all characters in the 'tweet' column to lowercase and create a new column 'tweet' with the cleaned text
        - trim leading and trailing spaces from the 'tweet' column and create a new column 'tweet' with the cleaned text
        - remove the rows where the tweet column is NaN or NA

In [None]:
# clean and cache the dataset
spark.sql('set spark.sql.legacy.timeParserPolicy=LEGACY')

# select columns for sentiment analysis
elonmusk_senti = elonmusk_raw.select('tweet', 'followers', 'location', F.col('when').alias('date_time'))

# clean the 'tweet' and date_time columns
elonmusk_clean = elonmusk_senti.withColumn('date_time', F.to_timestamp(F.col('date_time'), 'EEE MMM dd HH:mm:ss Z yyyy')) \
    .withColumn('tweet', F.regexp_replace('tweet', r"http\S+", "")) \
    .withColumn('tweet', F.regexp_replace('tweet', r"[^a-zA-z]", " ")) \
    .withColumn('tweet', F.regexp_replace('tweet', r"\s+", " ")) \
    .withColumn('tweet', F.lower('tweet')) \
    .withColumn('tweet', F.trim('tweet')) \
    .na.drop(subset=['tweet'])
display(elonmusk_clean)

#### 4. CREATE THE SENTIMENT COLUMN USING PRETRAINED MODEL

> <strong> steps to create sentiment column for each tweet using pretrained model from hugging face transformer

>>
      - import all necessary libraries
      - initialize the model for sentiment analysis
      - use user define function to apply the model to 'tweet' column and create a  new column for the sentiment
      - replace the sentiments with numerical equivalent
      - drop unused column

In [None]:
# generate sentiment/label column

# load the RoBERTa classifier model from hugging face transformer
device = 0 if torch.cuda.is_available() else -1
roberta_classifier = pipeline(task='sentiment-analysis', model='cardiffnlp/twitter-roberta-base-sentiment', max_length=512, device=device, truncation=True)

# define a UDF to apply the RoBERTa classifier for sentiment analysis
def predict_sentiment(tweet):
    result = roberta_classifier.predict(tweet)[0]
    return (result['label'], float(result['score']))

# apply the UDF to create a new column 'roberta_sentiment'
schema = StructType([StructField('label', StringType(), True), StructField('score', FloatType(), True)])
predict_sentiment_udf = F.udf(predict_sentiment, schema)
elonmusk_clean = elonmusk_clean.withColumn('roberta_sentiment', predict_sentiment_udf(F.col('tweet')))

# extract the label from the struct and create new column called roberta_label
elonmusk_clean = elonmusk_clean.withColumn('roberta_label', F.col('roberta_sentiment').getField('label'))

# replace the label values with numerical equivalents (Positive/LABEL_2: 2, Neutral/LABEL_1: 1, Negative/LABEL_0: 0)
elonmusk_clean = elonmusk_clean.withColumn('roberta_label', 
                                         F.when(F.col('roberta_label') == 'LABEL_2', 2)
                                         .when(F.col('roberta_label') == 'LABEL_0', 0)
                                         .when(F.col('roberta_label') == 'LABEL_1', 1)
                                         .otherwise(None))

# drop roberta_sentiment column
elonmusk_clean = elonmusk_clean.drop('roberta_sentiment')

In [None]:
# list all existing columns
print(elonmusk_clean.columns)

In [None]:
display(elonmusk_clean)

#### 5. USE S3 BUCKET AS CACHE STORAGE
> <strong> required steps
>>
      - write the cleaned sentiment table into my aws s3 bucket
      - define the schema to read back the dataset from s3 bucket
      - read the dataset from s3 bucket for easy recall

In [None]:
# write the clean dataset to my s3 bucket for easy reference

clean_filepath = "/mnt/clean_data"
(elonmusk_clean.write
  .option("delimiter", "\t")  
  .option("header", "false")
  .mode("overwrite")
  .csv(clean_filepath))

In [None]:
# read the clean dataset from my s3 bucket for faster compute
re_schema = StructType([
    StructField('tweet', StringType(), True),
    StructField('followers', IntegerType(), True),
    StructField('location', StringType(), True),
    StructField('date_time', TimestampType(), True),
    StructField('label', IntegerType(), True)])

re_elonmusk = (spark.read
       .option("header", "false")
       .option("delimiter", "\t")
       .schema(re_schema)
       .csv("/mnt/clean_data"))
re_elonmusk.show(3)

#### 6. TRANSFORM FEATURES
> <strong> transformation steps include
>>
      - tokenization
      - stopword removal
      - count vectorization
      - term frequency - inverse document frequency vectorization

In [None]:
# break down the strings in tweet to individual words/tokens

tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
tweets_tokenized = tokenizer.transform(re_elonmusk)
tweets_tokenized.show(3)

In [None]:
# filter out stopwords

stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
tweets_stopword = stopword_remover.transform(tweets_tokenized)
tweets_stopword.show(3)

In [None]:
# create index for individual word and then include the number of times they occur

cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
cv_model = cv.fit(tweets_stopword)
tweets_cv = cv_model.transform(tweets_stopword)
tweets_cv.show(3)

In [None]:
# penalizing words appearing more frequently and assigning more weight (rewards) to less frequent word

idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5)
idf_model = idf.fit(tweets_cv)
tweets_idf = idf_model.transform(tweets_cv)
tweets_idf.show(3)

#### 7. TRAIN AND EVALUATE MODEL
> <strong> steps required
>>
      - perform train test split using 80% and 20% respectively
      - train and test using two different ML algorithms
      - evaluate the performance of the model using accuracy and roc-auc score

In [None]:
# perform train and test splitting

train, test = tweets_idf.randomSplit([0.8, 0.2], seed=20240531)
print(f'train_count: {train.count()}, test_count: {test.count()}')

In [None]:
# Model_1: Logistic Regression Classifier

lr = LogisticRegression(maxIter=100)
lr_model = lr.fit(train)
lr_pred = lr_model.transform(test)
display(lr_pred)

In [None]:
# Model_1 Evaluation

lr_eval = MulticlassClassificationEvaluator()
lr_roc_auc = lr_eval.evaluate(lr_pred)
lr_accuracy = lr_pred.filter(lr_pred.label == lr_pred.prediction).count() / float(lr_pred.count())

print("Accuracy Score: {0:.4f}".format(lr_accuracy))
print("ROC-AUC: {0:.4f}".format(lr_roc_auc))

In [None]:
# Model_2: NaiveBayes

nb = NaiveBayes(modelType='multinomial')
nb_model = nb.fit(train)
nb_pred = nb_model.transform(test)
nb_pred.show(3)

In [None]:
# Model_2 Evaluation

nb_eval = MulticlassClassificationEvaluator()
nb_roc_auc = nb_eval.evaluate(nb_pred)
nb_accuracy = nb_pred.filter(nb_pred.label == nb_pred.prediction).count() / float(nb_pred.count())

print("Accuracy Score: {0:.4f}".format(nb_accuracy))
print("ROC-AUC: {0:.4f}".format(nb_roc_auc))

#### 8. CREATE PIPELINE AND SAVE PREDICTIONS
> <STRONG> steps include
>> 
      - add all the cleaning and preprocessing steps to a pipeline
      - include n-gram feature transformation in the pipeline
      - base on best ML algorithm, train and predict the sentiment label using the pipeline
      - run the pipeline and save the prediction into s3 bucket for analysis and dashboard
      - create pipeline for model 2 


In [None]:
# best model pipeline (model1)

# perform train test split
X_train, y_test = re_elonmusk.randomSplit([0.8, 0.2], seed=20240531)

# create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5)
ngram = NGram(n=2, inputCol="filtered", outputCol="2gram")
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000)
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) 

# assemble all text features
assembler = VectorAssembler(inputCols=["1gram_idf", "2gram_tf"], outputCol="rawFeatures")

# Chi-square variable selection
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# regression model estimator
lr = LogisticRegression(maxIter=100)

# build the pipeline
pipeline = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, ngram, ngram_hashingtf, ngram_idf, assembler, selector, lr])

# pipeline model fitting
pipeline_model = pipeline.fit(X_train)
y_pred = pipeline_model.transform(y_test)

evaluator = MulticlassClassificationEvaluator()
accuracy = y_pred.filter(y_pred.label == y_pred.prediction).count() / float(y_test.count())
roc_auc = evaluator.evaluate(y_pred)

print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

In [None]:
# select prediction column and other columns needed analysis and dashboard
save_prediction = lr_pred.select('tweet', 'followers', 'location', 'date_time', 'label', 'prediction')

# save the prediction table as .csv file into s3 bucket
pred_filepath = "/mnt/clean_data"
(save_prediction.write
  .option("delimiter", "\t")  
  .option("header", "false")
  .mode("overwrite")
  .csv(pred_filepath))

# # to read the predicted dataset from my s3 bucket, use this schema
# pred_schema = StructType([
#     StructField('tweet', StringType(), True),
#     StructField('followers', IntegerType(), True),
#     StructField('location', StringType(), True),
#     StructField('date_time', TimestampType(), True),
#     StructField('label', IntegerType(), True),
#     StructField('prediction', DoubleType(), True)])

# readme = (spark.read
#        .option("header", "false")
#        .option("delimiter", "\t")
#        .schema(pred_schema)
#        .csv("/mnt/pred_data"))
# readme.show(4)

In [None]:
# pipeline for model 2

# perform train test split
X_train, y_test = re_elonmusk.randomSplit([0.8, 0.2], seed=20240531)

# create transformers for the ML pipeline
tokenizer = Tokenizer(inputCol="tweet", outputCol="tokens")
stopword_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
cv = CountVectorizer(vocabSize=2**16, inputCol="filtered", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="1gram_idf", minDocFreq=5)
ngram = NGram(n=2, inputCol="filtered", outputCol="2gram")
ngram_hashingtf = HashingTF(inputCol="2gram", outputCol="2gram_tf", numFeatures=20000)
ngram_idf = IDF(inputCol='2gram_tf', outputCol="2gram_idf", minDocFreq=5) 

# assemble all text features
assembler = VectorAssembler(inputCols=["1gram_idf", "2gram_tf"], outputCol="rawFeatures")

# Chi-square variable selection
selector = ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")

# NaiveBayes model estimator
nb = NaiveBayes(modelType='multinomial')

# build the pipeline
pipeline_1 = Pipeline(stages=[tokenizer, stopword_remover, cv, idf, ngram, ngram_hashingtf, ngram_idf, assembler, selector, nb])

# pipeline model fitting
nb_pipeline = pipeline_1.fit(X_train)
y_pred = nb_pipeline.transform(y_test)

evaluator_1 = MulticlassClassificationEvaluator()
nb_accuracy = y_pred.filter(y_pred.label == y_pred.prediction).count() / float(y_test.count())
nb_roc_auc = evaluator_1.evaluate(y_pred)

print("NB Accuracy Score: {0:.4f}".format(nb_accuracy))
print("NB ROC-AUC: {0:.4f}".format(nb_roc_auc))

### 9. TOPIC MODELING WITH HUGGING FACE
> <STRONG> steps include
>> 
      - load required libraries
      - extract the tweet column and convert it to a list of text
      - remove stopwords
      - initialize, train and transform BERTopic model
      - perform analysis
      - save the results


In [None]:
# STEP_1:

from pyspark.ml.feature import Tokenizer

# select the needed columns for topic analysis
topic_data_df = re_elonmusk.select("tweet", "label", "followers")

# tokenize the 'tweet' column
tweet_token = Tokenizer(inputCol="tweet", outputCol="tokenized_tweet")
topic_data_df = tweet_token.transform(topic_data_df)

# remove stopwords from the 'tokenized_tweet' column
stopword_remover = StopWordsRemover(inputCol="tokenized_tweet", outputCol="clean_tweet")
topic_data_df = stopword_remover.transform(topic_data_df)

# extract the 'clean_tweet' column as a list of documents
documents = topic_data_df.select("clean_tweet").rdd.flatMap(lambda row: [' '.join(row.clean_tweet)]).collect()
print(documents[:5])

In [None]:
# STEP_2
## train and transform the model

# initialize BERTopic model
model = BERTopic(nr_topics=21, verbose=True)

# fit and transform the model on the list
topics, _ = model.fit_transform(documents)

In [None]:
# STEP_3a
## assign topic to each tweet/row

# convert the list of topics to a Spark DataFrame with a single column
topics_df = spark.createDataFrame([(index, topic) for index, topic in enumerate(topics)], ["row_index", "topic"])

# add a row index to the original DataFrame for joining
topic_data_df = topic_data_df.withColumn("row_index", F.monotonically_increasing_id())

# assign the topics to the original DataFrame
topic_data_df = topic_data_df.join(topics_df, on="row_index", how="inner").drop("row_index")
topic_data_df.show(4)

In [None]:
# STEP_3b
## save the updated topic dataframe to my s3 bucket

# zip the topic_data_df with the list of documents created in step 1
topic_data_with_documents = topic_data_df.rdd.zipWithIndex().map(lambda x: Row(**dict(x[0].asDict(), documents=documents[x[1]]))).toDF()

# drop some columns before saving
topic_data_with_documents = topic_data_with_documents.drop("tokenized_tweet", "clean_tweet")

# save the updated dataframe as parquet file into s3 bucket
topic_filepath1 = "/mnt/topic_data1"
(topic_data_with_documents.write
    .mode("overwrite")
    .option('compression', 'snappy')
    .parquet(topic_filepath1))
topic_data_with_documents.limit(3).show(truncate=True)

# # read the Parquet file into a DataFrame
# reading_parquet = spark.read.parquet("/mnt/topic_data1")
# display(reading_parquet)

In [None]:
# STEP_4

# display the topic frequent identified in the dataset
## -1 due to outliers including: empty docs, low confidence assignment, duplicates docs among others
most_frequent_topics = model.get_topic_freq()
display(most_frequent_topics)
print("Total number of topics:", model.get_topic_freq().count())

In [None]:
# STEP_5

# extract the top 10 words for each topic
top_words_lists = []
for topic_index in range(-1,20):
    top_words = model.get_topic(topic_index)
    top_words = [item[0] for item in top_words]
    top_words_lists.append((topic_index, top_words))

# create a list of Row objects to represent each topic with its top words
unique_topic_rows = [Row(topic_index=topic[0], top_words=topic[1]) for topic in top_words_lists]

# Create a DataFrame from the list of Row objects with the specified schema
unique_topic_df = spark.createDataFrame(unique_topic_rows, schema=["topic_index", "top_words"])

# save the unique topic dataframe as parquet file into s3 bucket
topic_filepath2 = "/mnt/topic_data2"
(unique_topic_df.write
    .mode("overwrite")
    .option('compression', 'snappy')
    .parquet(topic_filepath2))
unique_topic_df.limit(3).show(truncate=False)

# read the Parquet file into a DataFrame
# reading_parquet = spark.read.parquet("/mnt/topic_data2")

In [None]:
# STEP_6a

# plot the Intertopic Distance Map to understand topic
model.visualize_topics()

In [None]:
# STEP_6b

#  visualize topics with c-TF-IDF scores and save to s3 bucket
model.visualize_barchart()


In [None]:
# STEP_6c

# visualize topic similarity with the heatmap
model.visualize_heatmap()

In [None]:
# STEP_7

# save the model for reuse
model.save("new_model")
model = BERTopic.load("new_model")