## 1. Introduction
In natural language processing, the Latent Dirichlet Allocation (LDA) is a generative statistical model that allows sets of observations to be explained by unobserved groups that explain why some parts of the data are similar.
We would like to determine the number of topics on tweets from Egypt collected in 1st Decemeber 2022.

## 2. Exploratory data analysis (EDA)

### 2.1 Initialize Spark

In [None]:
import json 
import findspark
findspark.init()


### 2.2 Create the spark app 

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, udf, lit, col, size, expr

spark = SparkSession.builder.appName('read JSON files').getOrCreate()



### 2.3 Read all the fetched tweets Json files

In [51]:
json_df = spark.read.option("multiline", "true").json(
    "DecTweets/WCTweets.json")

for i in range(2, 724):
    path = "DecTweets/WCTweets"+str(i)+".json"
    print(path)
    temp_df = spark.read.option("multiline", "true").json(path=path)
    json_df = json_df.unionByName(temp_df, allowMissingColumns=True)
    json_df.count()
    temp_df=None


DecTweets/WCTweets365.json


#### 2.3.1 Show the number of files in the dataframe

In [None]:
json_df.count()

#### 2.3.2 View the dataframe schema 

In [None]:
json_df.printSchema()

#### 2.3.3 View the ***data*** attribute schema from the dataframe

In [None]:
json_df.select('data').printSchema()


### 2.4 Create a new dataframe ***data_df*** that contains only the data attribute of each tweet

In [None]:
# Select the data column from the json_df, and adds a new column to the data_df called "data"
# that column has an alias called "data", and each row has the data of only one objects.
data_df=json_df.select('data').withColumn('data', explode('data').alias('data'))

#### 2.4.1 View the new dataframe schema 

In [None]:
data_df.printSchema()


#### 2.4.2 Show the number of tweets in the dataframe

In [None]:
data_df.count()


#### 2.4.3 Selecting only the inner attributes of the ***data*** attribute 

In [None]:
data_df = data_df.select('data.author_id',
                         'data.created_at',
                         'data.geo.place_id',
                         'data.id',
                         'data.public_metrics',
                         'data.text')


#### 2.4.4 Viewing the updated schema

In [None]:
# the schema after selection
data_df.printSchema()


### 2.5 Create a new dataframe ***place_df*** that contains only the *includes.places* attribute of each tweet

In [None]:
# select the place content
place_df=json_df.select('includes.places')

#### 2.5.1 View the place_df Schema

In [None]:
# show the schema fo place
place_df.schema.names

### 2.6 Create a new dataframe ***user_df*** that contains only the *includes.users* attribute of each tweet

In [None]:
# select the user content
user_df=json_df.select('includes.users')

#### 2.6.1 View the *users_df* schema

In [None]:
user_df.printSchema()


### 2.7 Unravel the nested json

In [None]:
import pyspark.sql.types as T
from pyspark.sql.functions import col



#### 2.7.1 Define a function to read the nested json structure

In [None]:
def read_nested_json(df):
    column_list = []
    # Iterate over all the column names in the schema
    for column_name in df.schema.names:
        # Check if the column is of "ArrayType"
        if isinstance(df.schema[column_name].dataType, T.ArrayType):
            # Replace the column of "ArrayType" with an exploded version of that column
            df = df.withColumn(column_name, explode(column_name).alias(column_name))
            # Append the column name to an array of column list array
            column_list.append(column_name)

        # Check if the column is of "StructType"
        elif isinstance(df.schema[column_name].dataType, T.StructType):
            # Iterate over all fields of this struct
            for field in df.schema[column_name].dataType.fields:
                # Append a new column made of the column name and the field name to the column list array 
                column_list.append(col(column_name + "." + field.name).alias(column_name + "_" + field.name))
        
        # If the column is neither an "ArrayType" nor a "StructType" aka "Leaf" attribute
        # eg:"String" , "Long", etc.
        # add this column to the column list array
        else:
            column_list.append(column_name)

    # Selecting columns using column_list from dataframe: df
    df = df.select(column_list)
    return df


#### 2.7.2 Define a function to flatten the nested JSON File

In [None]:
# This is a recursive function to allow the flattening 
# of nested "ArrayTypes" or "StructTypes" 
def flatten_nested_json(df):
    read_nested_json_flag = True
    while read_nested_json_flag:
        df = read_nested_json(df)
        read_nested_json_flag = False
        
        # Check if there is still an "ArrayType" or "StructType" after the 
        # reading itertaion and if there is, iterate over the json 
        # structure again until there are no "ArrayType" or "StructType" 
        # in the dataframe, only primitive types.
        for column_name in df.schema.names:
            if isinstance(df.schema[column_name].dataType, T.ArrayType):
              read_nested_json_flag = True
            elif isinstance(df.schema[column_name].dataType, T.StructType):
              read_nested_json_flag = True
    return df


#### 2.7.3 Flatten the Data df and show the updated schema

In [None]:
data_df=flatten_nested_json(data_df)

data_df.printSchema()

#### 2.7.4 Perform different sql queries on the data dataframe to learn more information 

In [None]:
# check the number of record in data_df dataframe
data_df.createOrReplaceTempView("df_data")
spark.sql("SELECT count(*) FROM df_data").show()


In [None]:
# check that the id is the primary key
spark.sql("SELECT count(DISTINCT id) FROM df_data").show()


#### 2.7.5 Flatten the Place df and show the updated schema

In [None]:
place_df=flatten_nested_json(place_df)
place_df.printSchema()

#### 2.7.6 Perform different sql queries on the dataframe to learn more information 

In [None]:
# check the number of unique place in place_df dataframe
place_df.createOrReplaceTempView("df_place")
spark.sql("SELECT count(*) FROM df_place").show()



In [None]:
# show the tweet location for first 100 records
place_df.show(10, False)


#### 2.7.7 Flatten the Users df and show the updated schema

In [None]:
# flatten the neseted json of user
user_df=flatten_nested_json(user_df)

#### 2.7.8 Perform different sql queries on the dataframe to learn more information 

In [None]:
# check the number of unique user in user_df dataframe
user_df.createOrReplaceTempView("df_user")
spark.sql("SELECT count(DISTINCT users_id) FROM df_user").show()


In [None]:
# show the tweet user of first 100 records
user_df.show(10, False)


In [None]:
spark.sql("SELECT DISTINCT(users_name,users_location) FROM df_user where users_location is not null").show()


## 3. Topic Modeling

### 3.1 Importing Libs

In [None]:
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from gensim.models import LsiModel


###  3.2 Create a new dataframe that contains only the tweet id, creation date and text

In [None]:
tweet_text = data_df.select('id','created_at','text')

#### 3.2.1 View the new dataframe

In [None]:
tweet_text.show(10)


#### 3.2.2 View the new dataframe schema

In [None]:
tweet_text.printSchema()


### 3.3 Data Preprocessing and Cleaning

#### 3.3.1 Importing Libs

In [None]:
# Preprocess steps
import re
from pyspark.sql.functions import to_timestamp
import pyspark.sql.types as T


#### 3.3.2 Change the *created_at* attribute from **String** to **DateType**

In [None]:
tweet_text=tweet_text.withColumn("created_date", tweet_text['created_at'].cast(T.DateType()))

#### 3.3.3 Define the UDFs for the cleaning process

In [None]:
text = re.sub('\[.*?\]', '', text)  # remove square brackets
text = re.sub('[%s]' % re.escape(string.punctuation),
              '', text)  # remove puncitutations marks
text = re.sub('\w*\d\w*', '', text)  # remove words that contain numbers
text = re.sub('[‘’“”…]', '', text)  # remove quotes
text = re.sub('\r', '', text)  # remove \r
text = re.sub('\n', '', text)  # remove \n
text = re.sub('\t', '', text)  # remove \t


In [None]:
PUNCTUATION = '!"$%&\'()*+,-./:;<=>?[\\]^_`{|}~•@â'

def removeLinks(tweet):
    tweet = re.sub(r'http\S+', '', tweet) 
    tweet = re.sub(r'bit.ly/\S+', '', tweet) 
    tweet = tweet.strip('[link]') 
    return tweet
def removeMentions(tweet):
    tweet = re.sub('(RT\s@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    tweet = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweet) 
    return tweet
def removePunctuation(tweet):
    tweet = re.sub('['+ PUNCTUATION + ']+', ' ', tweet) 
    return tweet
def removeNumbers(tweet):
    tweet = re.sub('([0-9]+)', '', tweet) 
    return tweet

def removeBreakLines(tweet):
    tweet=tweet.strip().replace('\n', '')
    tweet=tweet.strip().replace('\r', '')
    tweet = tweet.strip().replace('\t', '')
    return tweet


def remove_emoji(tweet):
    emoji_pattern = re.compile("["
    u"\U0001F600-\U0001F64F" # emoticons
    u"\U0001F300-\U0001F5FF" # symbols & pictographs
    u"\U0001F680-\U0001F6FF" # transport & map symbols
    u"\U0001F1E0-\U0001F1FF" # flags (iOS)
    u"\U00002702-\U000027B0"
    u"\U000024C2-\U0001F251""]+", flags=re.UNICODE)
    return emoji_pattern.sub(r'', tweet)


In [None]:
# register user defined function
removeLinks = udf(removeLinks)
removeMentions = udf(removeMentions)
removePunctuation=udf(removePunctuation)
removeNumbers=udf(removeNumbers)
removeBreakLines= udf(removeBreakLines)
remove_emoji=udf(remove_emoji)

#### 3.3.4 Remove Links, User Mentions, Punctuation and Numbers from *tweet_text*

In [None]:
tweet_text=tweet_text.withColumn('cleaned_Text', removeLinks(tweet_text['text']))

In [None]:
tweet_text=tweet_text.withColumn('cleaned_Text', removeMentions(tweet_text['cleaned_Text']))

In [None]:
tweet_text=tweet_text.withColumn('cleaned_Text', removePunctuation(tweet_text['cleaned_Text']))

In [None]:
tweet_text=tweet_text.withColumn('cleaned_Text', removeNumbers(tweet_text['cleaned_Text']))

In [None]:
tweet_text=tweet_text.withColumn('cleaned_Text', removeBreakLines(tweet_text['cleaned_Text']))

In [None]:
tweet_text=tweet_text.withColumn('cleaned_Text', remove_emoji(tweet_text['cleaned_Text']))

In [None]:
tweet_text.select('text','cleaned_Text').show(10)

#### 3.3.5 Create a tokenizer 

In [None]:
# A tokenizer that matches any tokens that contains characters,
# with a minimum length of 3 characters, from the cleaned_Text,
# and output the tokens in the tokens output.

tokenizer = RegexTokenizer().setPattern("[\\W_]+").setMinTokenLength(3).setInputCol("cleaned_Text").setOutputCol("tokens")


In [None]:
tokenized_tweets = tokenizer.transform(tweet_text)

In [None]:
# Display the original text and the clenaed text 
# and the tokenized cleaned tweets.
tokenized_tweets.select('text','cleaned_Text','tokens').show(50)

#### 3.3.6 Create a WordNet Lemmatizer

In [None]:
import nltk
from nltk.stem import WordNetLemmatizer
nltk.download('wordnet')

lemmatizer = WordNetLemmatizer()

##### 3.6.6.1 Create a lemmatizing function

In [None]:
# Defining Lemmatizing function that takes each row 
# and lemmatize each word to a verb and return the 
# row containing the lemmatized words.

def lemmatization(row):
    row = [lemmatizer.lemmatize(word,'v') for word in row]
    return row


lemmatize = udf(lemmatization)

##### 3.6.6.2 Create a new column that contains the lemmatized tokens

In [None]:

tokenized_tweets=tokenized_tweets.withColumn('tokens_lemma', lemmatize(tokenized_tweets['tokens']))

In [None]:
tokenized_tweets.select('cleaned_Text','tokens','tokens_lemma').show(50)

#### 3.3.7 Removing Stop words

In [None]:
twitter_stopwords = open('TwitterStopWords.txt', 'r').read().split(",")

##### 3.3.7.1 Create a new stopwords list from twitter stop list and the nltk wordnet list

In [None]:
stopwordList=StopWordsRemover().getStopWords()
stopwordList.extend(twitter_stopwords)
stopwordList = list(set(stopwordList))

In [None]:
# Create a new stopWordsRemover with the stopwords as our 
# extended stopwordsList, the input from the tokens column,
# and output to cleaned_tokens column.
remover = StopWordsRemover(stopWords=stopwordList).setInputCol("tokens").setOutputCol("cleaned_tokens")


In [None]:
cleaned_tweets = remover.transform(tokenized_tweets)

In [None]:
cleaned_tweets.show()

### 3.4 Latent Dirichlet Allocation (LDA)

In [None]:
# create a vector of words that at least appeared in two different tweets, and set maximum vocab size to 20000.
vectorizer = CountVectorizer().setInputCol("cleaned_tokens").setOutputCol("features").setVocabSize(20000).setMinDF(2).fit(cleaned_tweets)
wordVectors = vectorizer.transform(cleaned_tweets)


In [None]:
wordVectors.select("features").show(truncate=False)


In [None]:
# set number of topic
# set the mertic to evaluate model performance
num_topics = range(2, 11)
models = []
log_likeli = []
log_perp = []
for num in num_topics:
    # LDA
    # create Latent Dirichlet Allocation model and run it on our data with 50 iteration and selected topics number
    lda = LDA(k=num, maxIter=50)
    # fit the model on data
    ldaModel = lda.fit(wordVectors)
    models.append(ldaModel)
    ll = ldaModel.logLikelihood(wordVectors)
    lp = ldaModel.logPerplexity(wordVectors)
    log_likeli.append(ll)
    log_perp.append(lp)


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
plot_data=pd.DataFrame(list(zip(num_topics,log_likeli,log_perp)),
            columns=['topics_num','logLikelihood','logPerplexity'])

In [None]:
# use the elbow method to determine the optimal k
plot_data.plot(x='topics_num', y='logLikelihood', kind='line')
plt.show()


In [None]:
# use the elbow method to determine the optimal k
plot_data.plot(x='topics_num', y='logPerplexity', kind='line')
plt.show()


In [None]:
model=models[2]

In [None]:
# LDA
# create Latent Dirichlet Allocation model and run it on our data with 50 iteration and selected topics number
lda = LDA(k=4, maxIter=100)
# fit the model on data
model = lda.fit(wordVectors)

In [None]:
ll = model.logLikelihood(wordVectors)
lp = model.logPerplexity(wordVectors)


In [None]:
print("The lower bound on the log likelihood of the entire corpus: ",ll)
print("The upper bound on perplexity: ",lp)

In [None]:
# extract vocabulary from CountVectorizer

vocab = vectorizer.vocabulary

In [None]:
# create topics based on LDA
lda_topics = model.describeTopics()
lda_topics.show()


In [None]:
lda_topics.select('termWeights').show(10,False)

In [None]:
lda_topics.select('termIndices').show(10,False)

In [None]:
topics_rdd = lda_topics.rdd
topics_words = topics_rdd\
    .map(lambda row: row['termIndices'])\
    .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
    .collect()

for idx, topic in enumerate(topics_words):
    print("topic: {}".format(idx))
    print("*"*25)
    for word in topic:
       print(word)
    print("*"*25)


In [None]:
transformed = model.transform(wordVectors)
transformed.show(10)


In [None]:
transformed.printSchema()


In [None]:
to_array = udf(lambda v: v.toArray().tolist(), T.ArrayType(T.FloatType()))


In [None]:
max_index = udf(lambda x: x.index(__builtin__.max(x))
                if x is not None else None, T.IntegerType())


In [None]:
topic_index = udf(lambda x: 'topic'+str(x), T.StringType())


In [None]:
key_word = udf(lambda x: ', '.join(topics_words[x]), T.StringType())


In [None]:
transformed=transformed.withColumn('topicDistribution_array', to_array(transformed['topicDistribution']))
transformed=transformed.withColumn('dominant_topic_index', max_index(transformed['topicDistribution_array']))

In [None]:
transformed=transformed.withColumn('dominant_topic', topic_index(transformed['dominant_topic_index']))

In [None]:
transformed=transformed.withColumn('dominant_topic_keywords', key_word(transformed['dominant_topic_index']))

In [None]:
transformed.schema


In [None]:
transformed.select('topicDistribution_array','dominant_topic','dominant_topic_keywords').show(20, False)

In [None]:
transformed.groupBy("created_date","dominant_topic","dominant_topic_keywords").count().orderBy("created_date","dominant_topic").show()

In [None]:
transformed.groupBy("created_date","dominant_topic","dominant_topic_keywords").count().toPandas().to_csv('NovemberTopics.csv')

## 4. Sentiment Analysis

### 4.1 Importing Libs

In [None]:
from textblob import TextBlob
import config
import sys
import tweepy
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import os
import nltk
import pycountry
import re
import string
from wordcloud import WordCloud, STOPWORDS
from PIL import Image
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from langdetect import detect
from nltk.stem import SnowballStemmer
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from pyspark.ml.feature import CountVectorizer

from googletrans import Translator

detector = Translator()


### 4.2 Getting Topic from User

In [None]:
topic = input("Enter topic to analyze sentiment over: ")


### 4.3 Fetching Tweets and Analyzing Sentiment

In [None]:
noOfTweets = 38640  # number of tweets to iterate over


positive = 0
negative = 0
neutral = 0
polarity = 0
tweet_list = []
neutral_list = []
negative_list = []
positive_list = []

for tweet in tweet_text.collect():
    dec_lan = detector.detect(tweet["cleaned_Text"])

    if dec_lan.lang == "en" and dec_lan.confidence == 1:
       
    
        print(tweet["cleaned_Text"])
    #     tweet_list.append(tweet.text)
    #     analysis = TextBlob(tweet.text)
    #     score = SentimentIntensityAnalyzer().polarity_scores(tweet.text)
    #     neg = score['neg']
    #     neu = score['neu']
    #     pos = score['pos']
    #     comp = score['compound']
    #     polarity += analysis.sentiment.polarity
    
    #     if (neg > pos):
    #         negative_list.append(tweet.text)
    #         negative += 1
    #     elif (neg < pos):
    #         positive_list.append(tweet.text)
    #         positive += 1
    #     else:
    #         neutral_list.append(tweet.text)
    #         neutral += 1
    else: continue

# positive = percentage(positive, noOfTweets)
# negative = percentage(negative, noOfTweets)
# neutral = percentage(neutral, noOfTweets)
# polarity = percentage(polarity, noOfTweets)
# positive = format(positive, '.1f')
# negative = format(negative, '.1f')
# neutral = format(neutral, '.1f')

In [None]:
#Number of Tweets (Total, Positive, Negative, Neutral)

tweet_list = pd.DataFrame(tweet_list)
neutral_list = pd.DataFrame(neutral_list)
negative_list = pd.DataFrame(negative_list)
positive_list = pd.DataFrame(positive_list)

print("total number: ",len(tweet_list))
print("positive number: ",len(positive_list))
print("negative number: ", len(negative_list))
print("neutral number: ",len(neutral_list))

In [None]:
#Creating PieCart

labels = ['Positive ['+str(positive)+'%]' , 'Neutral ['+str(neutral)+'%]','Negative ['+str(negative)+'%]']
sizes = [positive, neutral, negative]
colors = ['yellowgreen', 'blue','red']
patches, texts = plt.pie(sizes,colors=colors, startangle=90)

plt.style.use('default')
plt.legend(labels)
plt.title("Sentiment Analysis Result for Topic = "+topic+"" )
plt.axis('equal')
plt.show()