In [None]:
# setting up spark, java and hadoop environments
import os
os.environ['SPARK_HOME'] = 'C:\spark\spark-3.2.0-bin-hadoop3.2'
os.environ['JAVA_HOME'] = 'C:\Program Files\Java\jdk1.8.0_301'
os.environ['HADOOP_HOME'] = 'C:\hadoop'

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql import DataFrameWriter
from textblob import TextBlob

In [None]:
def preprocessing(lines):
    #
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    
    # NaN values were removed
    words = words.na.replace('', None)
    words = words.na.drop()
    
    # below https, single characters, hashtags, RTs are removed from the tweets
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

In [None]:
# text classification
def polarity_detection(text):
    return TextBlob(text).sentiment.polarity

def subjectivity_detection(text):
    return TextBlob(text).sentiment.subjectivity

def text_classification(words):
    # polarity detection
    polarity_detection_udf = udf(polarity_detection, StringType())
    words = words.withColumn("polarity", polarity_detection_udf("word"))
    # subjectivity detection
    subjectivity_detection_udf = udf(subjectivity_detection, StringType())
    words = words.withColumn("subjectivity", subjectivity_detection_udf("word"))
    return words

In [None]:
if __name__ == "__main__":
    # create Spark session
    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()
    # read the tweet data from socket
    lines = spark.readStream.format("socket").option("host", "127.0.0.1").option("port", 5555).load()
    # Preprocess the data
    words = preprocessing(lines)
    # text classification to define polarity and subjectivity
    words = text_classification(words)
    words = words.repartition(1)
    query = words.writeStream.queryName("all_tweets")\
        .outputMode("append").format("parquet")\
        .option("path", "./parc")\
        .option("checkpointLocation", "./check")\
        .trigger(processingTime='60 seconds').start()
    query.awaitTermination()

In [None]:
!pip install pyarrow
!pip install fastparquet

In [None]:
from pathlib import Path
import pandas as pd

# the parquet files are read into a single dataframe for analysis
data_dir = Path('parc')
tweets_df = pd.concat(
    pd.read_parquet(parquet_file)
    for parquet_file in data_dir.glob('*.parquet')
)

In [None]:
tweets_df.head()

In [None]:
tweets_df.dtypes

In [None]:
tweets_df.info

In [None]:
#Calculating Negative, Positive, Neutral and Compound values
from nltk.sentiment.vader import SentimentIntensityAnalyzer

for index, row in tweets_df['word'].iteritems():
    score = SentimentIntensityAnalyzer().polarity_scores(row)
    neg = score['neg']
    neu = score['neu']
    pos = score['pos']
    comp = score['compound']
    
    if neg > pos:
        tweets_df.loc[index, 'sentiment'] = "negative"
    elif pos > neg:
        tweets_df.loc[index, 'sentiment'] = "positive"
    else:
        tweets_df.loc[index, 'sentiment'] = "neutral"
        tweets_df.loc[index, 'neg'] = neg
        tweets_df.loc[index, 'neu'] = neu
        tweets_df.loc[index, 'pos'] = pos
        tweets_df.loc[index, 'compound'] = comp

tweets_df.head(10)

In [None]:
tweets_df.tail()

In [None]:
def count_values_in_column(data,feature):
    total=data.loc[:,feature].value_counts(dropna=False)
    percentage=round(data.loc[:,feature].value_counts(dropna=False,normalize=True)*100,2)
    return pd.concat([total,percentage],axis=1,keys=['Total','Percentage'])

#Count_values for sentiment
count_values_in_column(tweets_df,"sentiment")

In [None]:
from matplotlib import pyplot as plt
# create data for Pie Chart
pichart = count_values_in_column(tweets_df,"sentiment")
names= pichart.index
size=pichart["Percentage"]
 
# Create a circle for the center of the plot
my_circle=plt.Circle( (0,0), 0.7, color='white')
plt.pie(size, labels=names, colors=['green','blue','red'])
p=plt.gcf()
p.gca().add_artist(my_circle)
plt.show()

In [None]:
#Function to Create Wordcloud
from wordcloud import WordCloud, STOPWORDS
from PIL import Image

def create_wordcloud(text):
    #mask = np.array(Image.open("cloud.png"))
    stopwords = set(STOPWORDS)
    wc = WordCloud(background_color="white",
                   mask = None,
                   max_words=3000,
                   stopwords=stopwords,
                   repeat=True)
    wc.generate(str(text))
    wc.to_file("wc.png")
    
    print("Word Cloud Saved Successfully")
    
    path="wc.png"
    display(Image.open(path))

In [None]:
tweets_df['subjectivity'] = tweets_df['subjectivity'].astype(float, errors = 'raise')

In [None]:
#Creating wordcloud for all tweets
create_wordcloud(tweets_df["word"].values)

In [None]:
#Creating wordcloud for positive tweets
positive_tweets = tweets_df[tweets_df['sentiment']=='positive']
print('Average people subjectivity for positive tweets: ', positive_tweets['subjectivity'].mean())
create_wordcloud(positive_tweets["word"].values)

In [None]:
#Creating wordcloud for negative tweets
negative_tweets = tweets_df[tweets_df['sentiment']=='negative']
print('Average people subjectivity for negative tweets: ', negative_tweets['subjectivity'].mean())
create_wordcloud(negative_tweets["word"].values)

In [None]:
#Creating wordcloud for neutral tweets
neutral_tweets = tweets_df[tweets_df['sentiment']=='neutral']
print('Average people subjectivity: ', neutral_tweets['subjectivity'].mean())
create_wordcloud(neutral_tweets["word"].values)

In [None]:
#Calculating tweet’s lenght and word count
tweets_df['text_len'] = tweets_df['word'].astype(str).apply(len)
tweets_df['text_word_count'] = tweets_df['word'].apply(lambda x: len(str(x).split()))
round(pd.DataFrame(tweets_df.groupby("sentiment").text_len.mean()),2)

In [None]:
round(pd.DataFrame(tweets_df.groupby("sentiment").text_word_count.mean()),2)

In [None]:
import string
import re
import nltk 

#Removing Punctuation
def remove_punct(text):
    text = "".join([char for char in text if char not in string.punctuation])
    text = re.sub('[0–9]+', '', text)
    return text

tweets_df['punct'] = tweets_df['word'].apply(lambda x: remove_punct(x))

#Appliyng tokenization
def tokenization(text):
    text = re.split('\W+', text)
    return text

tweets_df['tokenized'] = tweets_df['punct'].apply(lambda x: tokenization(x.lower()))

#Removing stopwords
stopword = nltk.corpus.stopwords.words('english')
def remove_stopwords(text):
    text = [word for word in text if word not in stopword]
    return text
    
tweets_df['nonstop'] = tweets_df['tokenized'].apply(lambda x: remove_stopwords(x))

#Appliyng Stemmer
ps = nltk.PorterStemmer()
def stemming(text):
    text = [ps.stem(word) for word in text]
    return text

tweets_df['stemmed'] = tweets_df['nonstop'].apply(lambda x: stemming(x))

#Cleaning Text
def clean_text(text):
    text_lc = "".join([word.lower() for word in text if word not in string.punctuation]) # remove puntuation
    text_rc = re.sub('[0-9]+', '', text_lc)
    tokens = re.split('\W+', text_rc)    # tokenization
    text = [ps.stem(word) for word in tokens if word not in stopword]  # remove stopwords and stemming
    return text

tweets_df.head()

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

#Appliyng Countvectorizer
countVectorizer = CountVectorizer(analyzer=clean_text) 
countVector = countVectorizer.fit_transform(tweets_df['word'])
print('{} Number of reviews has {} words'.format(countVector.shape[0], countVector.shape[1]))
#print(countVectorizer.get_feature_names())

count_vect_df = pd.DataFrame(countVector.toarray(), columns=countVectorizer.get_feature_names())
count_vect_df.head()

In [None]:
# Most Used Words
count = pd.DataFrame(count_vect_df.sum())
countdf = count.sort_values(0,ascending=False).head(20)
countdf[1:11]

In [None]:
#Function to ngram
def get_top_n_gram(corpus,ngram_range,n=None):
    vec = CountVectorizer(ngram_range=ngram_range,stop_words = 'english').fit(corpus)
    bag_of_words = vec.transform(corpus)
    sum_words = bag_of_words.sum(axis=0) 
    words_freq = [(word, sum_words[0, idx]) for word, idx in vec.vocabulary_.items()]
    words_freq =sorted(words_freq, key = lambda x: x[1], reverse=True)
    return words_freq[:n]

#n2_bigram
n2_bigrams = get_top_n_gram(tweets_df['word'],(2,2),20)
n2_bigrams

In [None]:
#n3_trigram
n3_trigrams = get_top_n_gram(tweets_df['word'],(3,3),20)
n3_trigrams