In [1]:
import pandas as pd
from tensorflow import keras
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.layers import SpatialDropout1D
from tensorflow.keras.layers import Embedding

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, split
from pyspark.sql import functions as F 
import re

In [3]:
# Tensorflow imports
from tensorflow import keras
from tensorflow.keras.preprocessing.sequence import pad_sequences
import joblib

In [4]:
def anti_join(x, y, on):
    """Return rows in x which are not present in y"""
    ans = pd.merge(left=x, right=y, how='left', indicator=True, on=on)
    ans = ans.loc[ans._merge == 'left_only', :].drop(columns='_merge')
    return ans


In [5]:
def SentimentPredictor(to_predict_tweet):
    tw = PN_data_tokenizer.texts_to_sequences([to_predict_tweet])
    tw = pad_sequences(tw, maxlen=200)
    result = int(Sentiment_model.predict(tw).round().item())
    return result


def RTPredictor(to_predict_tweet):
    tw = RT_data_tokenizer.texts_to_sequences([to_predict_tweet])
    tw = pad_sequences(tw, maxlen=200)
    result = int(RT_model.predict(tw).round().item())
    return result

In [6]:
Sentiment_model = keras.models.load_model("PNmodel.h5")
RT_model = keras.models.load_model("RTmodel.h5")
PN_data_tokenizer = joblib.load("PN_data_tokenizer.joblib")
RT_data_tokenizer = joblib.load("RT_data_tokenizer.joblib")

# text classification
    
def text_classification(words):
    # Sentiment detection
    Sentiment_udf = udf(SenitmentPredictor, StringType())
    words = words.withColumn("Sentiment", Sentiment_udf("word"))
    return words

In [7]:
def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("word"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('word', F.regexp_replace('word', r'http\S+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '@\w+', ''))
    words = words.withColumn('word', F.regexp_replace('word', '#', ''))
    words = words.withColumn('word', F.regexp_replace('word', 'RT', ''))
    words = words.withColumn('word', F.regexp_replace('word', ':', ''))
    return words

In [8]:
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)

In [9]:
if __name__ == "__main__":

    # create Spark session
    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

    # read the tweet data from socket
    tweet_df = spark \
        .readStream \
        .format("socket") \
        .option("host", "127.0.0.1") \
        .option("port", 3333) \
        .load()

    # type cast the column value
    #tweet_df_string = tweet_df.selectExpr("CAST(value AS STRING)")
    # apply tokenizer on the column
    tweet_df = preprocessing(tweet_df)
    tweet_df = tweet_df.withColumn('Sentiment', lit("empty").cast(StringType()))
    tweet_df = tweet_df.withColumn('Relaxed/Tensed', lit("empty").cast(StringType()))
    tweet_df = tweet_df.withColumn('Location', lit("empty").cast(StringType()))
    tweet_df = tweet_df.withColumn('Timestamp', lit("empty").cast(StringType()))
    #tweet_df = text_classification(tweet_df)

    print("----- streaming is ready -------")

----- streaming is ready -------


In [10]:
# write the above data into memory. consider the entire analysis in all iteration (output mode = complete). and let the trigger runs in every 2 secs.
writeTweet = tweet_df.writeStream. \
    outputMode("append"). \
    format("memory"). \
    queryName("tweetquery"). \
    trigger(processingTime='10 seconds'). \
    start()

print("----- streaming is running -------")

----- streaming is running -------


In [None]:
print(writeTweet.lastProgress)

In [11]:
tweetDF = spark.sql("SELECT * FROM tweetquery where word <> ''LIMIT 100")

In [12]:
tweetDF.head(1000)

[Row(word="[Wakefield, England]{Fri May 07 160815 +0000 2021}First vaccine done. I've got to say it was like a military operation,very efficient. Well done all staff at sandal Rugby club ðŸ‘�ðŸ‘�", Sentiment='empty', Relaxed/Tensed='empty', Location='empty', Timestamp='empty'),
 Row(word='[Cheshire,UK]{Fri May 07 160815 +0000 2021}For all the Labour Partyâ€™s introspection, post-mortoming, defenestration, and self immolation the fact remains that the Tories keep beating them by spewing a pack of easily debunkable lies. Whereâ€™s the honour in this victory? Or does no one care about that either?ðŸ¤·â€�â™‚ï¸�', Sentiment='empty', Relaxed/Tensed='empty', Location='empty', Timestamp='empty'),
 Row(word="[Salt Lake City, UT]{Fri May 07 160815 +0000 2021}  post didn't come across that way. Your post came across as not caring about other people. Getting the vaccine is not only about your personal health, but also the health and well-being of the people around you. Because you are immune compr

In [14]:
#timeStamp = re.findall('\[(.*?)\]', str(words))
#location = re.findall('\{(.*?)\}', str(words))

tweetDF = tweetDF.toPandas()
index = 0
for i, j in tweetDF.iterrows():
    Sresult = SentimentPredictor(j[0])
    RTresult = RTPredictor(j[0])
    #extract all within the square brackets which is the  location encoded in socket
    location = re.findall('\[(.*?)\]', str(j[0]))
    timestamp = re.findall('\{(.*?)\}', str(j[0]))

    #tweetDF.replace('{(.*?)}','',regex=True)
    #tweetDF.replace('[(.*?)]','',regex=True)
    tweetDF.loc[index, 'Location'] = location
    tweetDF.loc[index, 'Timestamp'] = timestamp
    
    if Sresult == 1:
        if RTresult == 1:
            tweetDF.loc[index, 'Sentiment'] = "Positive"
            tweetDF.loc[index, 'Relaxed/Tensed'] = 'Relaxed'

        else:
            tweetDF.loc[index, 'Sentiment'] = "Positive"
            tweetDF.loc[index, 'Relaxed/Tensed'] = "Tensed"

    else:
        if RTresult == 1:
            tweetDF.loc[index, 'Sentiment'] = "Negative"
            tweetDF.loc[index, 'Relaxed/Tensed'] = 'Relaxed'
        else:
            tweetDF.loc[index, 'Sentiment'] = "Negative"
            tweetDF.loc[index, 'Relaxed/Tensed'] = 'Tensed'

    index += 1

In [15]:
tweetDF.head(1000)

Unnamed: 0,word,Sentiment,Relaxed/Tensed,Location,Timestamp
0,"[Wakefield, England]{Fri May 07 160815 +0000 2021}First vaccine done. I've got to say it was like a military operation,very efficient. Well done all staff at sandal Rugby club ðŸ‘�ðŸ‘�",Negative,Tensed,"[Wakefield, England]",[Fri May 07 160815 +0000 2021]
1,"[Cheshire,UK]{Fri May 07 160815 +0000 2021}For all the Labour Partyâ€™s introspection, post-mortoming, defenestration, and self immolation the fact remains that the Tories keep beating them by spewing a pack of easily debunkable lies. Whereâ€™s the honour in this victory? Or does no one care about that either?ðŸ¤·â€�â™‚ï¸�",Positive,Relaxed,"[Cheshire,UK]",[Fri May 07 160815 +0000 2021]
2,"[Salt Lake City, UT]{Fri May 07 160815 +0000 2021} post didn't come across that way. Your post came across as not caring about other people. Getting the vaccine is not only about your personal health, but also the health and well-being of the people around you. Because you are immune compromised, you and I need as many people as",Positive,Relaxed,"[Salt Lake City, UT]",[Fri May 07 160815 +0000 2021]
3,"[Mumbai, India]{Fri May 07 160815 +0000 2021} Vaccine efficacy is a statistical measure. It's not even the probability of not getting infected. It's not your chance ofâ€¦",Positive,Relaxed,"[Mumbai, India]",[Fri May 07 160815 +0000 2021]
4,"[Maryland, USA]{Fri May 07 160815 +0000 2021} ðŸ—’âœ�ï¸�WORDS OF HOPE| This note on the wall at a Maryland vax site was written by someone who just got the vaccine. Gus wroteâ€¦",Positive,Relaxed,"[Maryland, USA]",[Fri May 07 160815 +0000 2021]
5,"[Ahmadabad City, India]{Fri May 07 160815 +0000 2021} GujaratCoronaUpdate",Negative,Tensed,"[Ahmadabad City, India]",[Fri May 07 160815 +0000 2021]
6,COVID19Dashboard,Negative,Tensed,[],[]
7,"12,064 New cases",Negative,Tensed,[],[]
8,"13,085 Discharged",Positive,Relaxed,[],[]
9,119 Deaths reported,Negative,Tensed,[],[]


In [None]:
df2 = spark.sql("SELECT * FROM tweetquery where word <> ''")
df2 = df2.toPandas()

In [None]:
df2.head(1000)

In [None]:
#set tweetDF to just the word column
tweetDF = tweetDF['word']
#set the name of the column to 'word'
tweetDF.columns = ['word']

In [None]:
#if the initial dataframe is not empty, then we replace the tweets with fresh query.
if(len(tweetDF.head(1)) > 0):
    df3 = anti_join(df2, tweetDF, on='word')
    df2.reset_index()

In [None]:
index = 0
for i, j in df3.iterrows():
    Sresult = SentimentPredictor(j[0])
    RTresult = RTPredictor(j[0])
    
    if Sresult == 1:
        if RTresult == 1:
            df3.loc[index, 'Sentiment'] = "Positive"
            df3.loc[index, 'Relaxed/Tensed'] = 'Relaxed'

        else:
            df3.loc[index, 'Sentiment'] = "Positive"
            df3.loc[index, 'Relaxed/Tensed'] = "Tensed"

    else:
        if RTresult == 1:
            df3.loc[index, 'Sentiment'] = "Negative"
            df3.loc[index, 'Relaxed/Tensed'] = 'Relaxed'
        else:
            df3.loc[index, 'Sentiment'] = "Negative"
            df3.loc[index, 'Relaxed/Tensed'] = 'Tensed'

    index += 1

In [16]:
from geopy.geocoders import Nominatim
locator = Nominatim(user_agent="myGeocoder")
location = locator.geocode("Mumbai")

In [17]:
print("Latitude = {}, Longitude = {}".format(location.latitude, location.longitude))

Latitude = 19.0759899, Longitude = 72.8773928


In [18]:
from geopy.extra.rate_limiter import RateLimiter

# 1 - conveneint function to delay between geocoding calls
geocode = RateLimiter(locator.geocode, min_delay_seconds=1)


In [20]:
# 2- - create location column
tweetDF['location'] = tweetDF['Location'].apply(geocode)
# 3 - create longitude, laatitude and altitude from location column (returns tuple)
tweetDF['point'] = tweetDF['location'].apply(lambda loc: tuple(loc.point) if loc else None)
# 4 - split point column into latitude, longitude and altitude columns
tweetDF[['latitude', 'longitude', 'altitude']] = pd.DataFrame(tweetDF['point'].tolist(), index=tweetDF.index)


In [21]:
tweetDF.head(1000)

Unnamed: 0,word,Sentiment,Relaxed/Tensed,Location,Timestamp,location,point,latitude,longitude,altitude
0,"[Wakefield, England]{Fri May 07 160815 +0000 2021}First vaccine done. I've got to say it was like a military operation,very efficient. Well done all staff at sandal Rugby club ðŸ‘�ðŸ‘�",Negative,Tensed,"[Wakefield, England]",[Fri May 07 160815 +0000 2021],"(Wakefield, West Yorkshire, Yorkshire and the Humber, England, United Kingdom, (53.6829541, -1.4967286))","(53.6829541, -1.4967286, 0.0)",53.682954,-1.496729,0.0
1,"[Cheshire,UK]{Fri May 07 160815 +0000 2021}For all the Labour Partyâ€™s introspection, post-mortoming, defenestration, and self immolation the fact remains that the Tories keep beating them by spewing a pack of easily debunkable lies. Whereâ€™s the honour in this victory? Or does no one care about that either?ðŸ¤·â€�â™‚ï¸�",Positive,Relaxed,"[Cheshire,UK]",[Fri May 07 160815 +0000 2021],"(Cheshire, England, United Kingdom, (53.2141028, -2.471770086071205))","(53.2141028, -2.471770086071205, 0.0)",53.214103,-2.47177,0.0
2,"[Salt Lake City, UT]{Fri May 07 160815 +0000 2021} post didn't come across that way. Your post came across as not caring about other people. Getting the vaccine is not only about your personal health, but also the health and well-being of the people around you. Because you are immune compromised, you and I need as many people as",Positive,Relaxed,"[Salt Lake City, UT]",[Fri May 07 160815 +0000 2021],"(Salt Lake City, Salt Lake County, Utah, United States, (40.7596198, -111.8867975))","(40.7596198, -111.8867975, 0.0)",40.75962,-111.886798,0.0
3,"[Mumbai, India]{Fri May 07 160815 +0000 2021} Vaccine efficacy is a statistical measure. It's not even the probability of not getting infected. It's not your chance ofâ€¦",Positive,Relaxed,"[Mumbai, India]",[Fri May 07 160815 +0000 2021],"(Mumbai, Mumbai Suburban, Maharashtra, India, (19.0759899, 72.8773928))","(19.0759899, 72.8773928, 0.0)",19.07599,72.877393,0.0
4,"[Maryland, USA]{Fri May 07 160815 +0000 2021} ðŸ—’âœ�ï¸�WORDS OF HOPE| This note on the wall at a Maryland vax site was written by someone who just got the vaccine. Gus wroteâ€¦",Positive,Relaxed,"[Maryland, USA]",[Fri May 07 160815 +0000 2021],"(Maryland, United States, (39.5162234, -76.9382069))","(39.5162234, -76.9382069, 0.0)",39.516223,-76.938207,0.0
5,"[Ahmadabad City, India]{Fri May 07 160815 +0000 2021} GujaratCoronaUpdate",Negative,Tensed,"[Ahmadabad City, India]",[Fri May 07 160815 +0000 2021],"(Ahmedabad, Ahmadabad City Taluka, Ahmedabad District, Gujarat, 380001, India, (23.0216238, 72.5797068))","(23.0216238, 72.5797068, 0.0)",23.021624,72.579707,0.0
6,COVID19Dashboard,Negative,Tensed,[],[],,,,,
7,"12,064 New cases",Negative,Tensed,[],[],,,,,
8,"13,085 Discharged",Positive,Relaxed,[],[],,,,,
9,119 Deaths reported,Negative,Tensed,[],[],,,,,
