# Setup the Environment

In [0]:
%pip install delta-spark spark-nlp==3.3.3 wordcloud contractions gensim pyldavis==3.2.0


In [0]:

# Import your dependecies
from delta import *
import pyspark # run after findspark.init() if you need it
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# from pyspark.sql.functions import col, split
from pyspark.sql import functions as F

import re
from textblob import TextBlob

# Streaming Data Ingestion

In [0]:
confluentApiKey = "RSQKGKXY7364BGWJ"
confluentSecret = "O4HcP/YgoBJLDdD5F9i4OBQM27iRhP6NYRayqU6wlPPgEHFD4dU33trLLuOHqi4x"
host = "pkc-w12qj.ap-southeast-1.aws.confluent.cloud:9092"

In [0]:

streamingInputDF = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", host)  \
  .option("kafka.security.protocol", "SASL_SSL") \
  .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';".format(confluentApiKey, confluentSecret)) \
  .option("kafka.ssl.endpoint.identification.algorithm", "https") \
  .option("kafka.sasl.mechanism", "PLAIN") \
  .option("startingOffsets", "latest") \
  .option("failOnDataLoss", "false") \
  .option("subscribe", "product") \
  .load()



In [0]:
tweet_df_string = streamingInputDF.selectExpr("CAST (key AS STRING)", "CAST(value AS STRING)")

tweet_df_string.display

# Cleaning Tweet

In [0]:
myschema = StructType([StructField('value', StringType(), True)])


In [0]:
def cleanTweet(tweet:str):
    tweet = re.sub(r'http\S+', '', str(tweet))
    tweet = re.sub(r'bit.ly/\S+', '', str(tweet))
    tweet = tweet.strip('[link]')

    # remove users
    tweet = re.sub('(RT\S@[A-ZA-z]+[A-Za-z0-9-_]+)', '', str(tweet))
    tweet = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))

    # remove punctuation 
    my_punctuation = '!"$%&\()*+,-./:;<=>?[\\]^_`{|}~@'
    tweet = re.sub('[' + my_punctuation + ']', ' ', str(tweet))

    # remove number
    tweet = re.sub('([0-9]+)', '', str(tweet))


    # remove hashtag 
    tweet = re.sub('(#[A-Za-z]+[A-Za-z0-9-_]+)', '', str(tweet))
    
    # remove RT
    tweet = re.sub('RT', '', str(tweet))
    
    # remove enter
    tweet = re.sub('\n\n', ' ', str(tweet))
    
    return tweet 

In [0]:
clean_tweets = F.udf(cleanTweet, StringType())

In [0]:

raw_tweets = tweet_df_string.withColumn('processed_text', clean_tweets(col('value')))


# Sentiment Analysis

In [0]:
def getSubjectivity(tweet: str) -> float:
    return TextBlob(tweet).sentiment.subjectivity


def getPolarity(tweet: str)-> float:
    return TextBlob(tweet).sentiment.polarity


def getSentiment(polarityValue: int) -> str:
    if polarityValue < 0:
        return 'Negative'
    elif polarityValue == 0 :
        return 'Neutral'
    else:
        return 'Positive'

In [0]:
subjectivity = F.udf(getSubjectivity, FloatType())
polarity = F.udf(getPolarity, FloatType())
sentiment = F.udf(getSentiment, StringType())

In [0]:
subjectivity_tweets = raw_tweets.withColumn('subjectivity', subjectivity(col('processed_text')))
polarity_tweets = subjectivity_tweets.withColumn("polarity", polarity(col('processed_text')))
sentiment_tweets = polarity_tweets.withColumn('sentiment', sentiment(col('polarity')))

In [0]:
# sentiment_tweets.writeStream.format("memory").queryName("tweetquery_sent").trigger(processingTime='2 seconds').start()

In [0]:
sentiment_tweet = sentiment_tweets \
  .writeStream.format("delta") \
  .outputMode("append") \
  .trigger(processingTime='10 seconds') \
  .option("checkpointLocation", "/tmp/checkpoint") \
  .start("/tmp/delta-tweet-table")

# Hashtag

In [0]:

tweets_tab_hashtag = sentiment_tweets \
    .withColumn('word', explode(split(col('value'), ' '))) \
    .groupBy('word') \
    .count() \
    .sort('count', ascending=False) \
    .filter(col('word').contains('#'))

writeTweet2 = tweets_tab_hashtag \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("tweetquery") \
    .trigger(processingTime='2 seconds') \
    .start()

In [0]:
spark.sql("select * from tweetquery").show()

# Reading Data

In [0]:
DF = (
   spark.read \
      .format("delta") \
      .load("/tmp/delta-tweet-table") \
      .createOrReplaceTempView("table2")
)

In [0]:
display(spark.sql("SELECT sentiment FROM table2 LIMIT 1000"))


key,value,processed_text,subjectivity,polarity,sentiment
1534388442699816961,"""RT @greenhousenyt: The Starbucks union crushes it in Michigan\n\nThe union crushes Starbucks in 5 vote counts today\n\n15-1 in Ann Arbor ( Stat""",The Starbucks union crushes it in Michigan\n\nThe union crushes Starbucks in vote counts today\n\n in Ann Arbor Stat,0.0,0.0,Neutral
1534388333341773824,"""Starbucks Cat Paw order here\nhttps://t.co/3EcTLdKONr""",Starbucks Cat Paw order here\,0.0,0.0,Neutral
1534388329034223616,"""RT @greenhousenyt: The Starbucks union crushes it in Michigan\n\nThe union crushes Starbucks in 5 vote counts today\n\n15-1 in Ann Arbor ( Stat""",The Starbucks union crushes it in Michigan\n\nThe union crushes Starbucks in vote counts today\n\n in Ann Arbor Stat,0.0,0.0,Neutral
1534388317692862466,"""RT @lailaddaltonn: Starbucks unjustly fired a worker (me) after @NLRB filed a complaint on my behalf. After everything corporate put my com""",Starbucks unjustly fired a worker me after filed a complaint on my behalf After everything corporate put my com,0.1,-0.15,Negative
1534388293508399105,"""My Starbucks keriug coffee went from like 10.99 to 13.99 within a month stop this please! https://t.co/OQwM4gcZ0X""",My Starbucks keriug coffee went from like to within a month stop this please,0.0,0.0,Neutral
1534388289725243393,"""RT @BLaw: Starbucks employees voted decisively to unionize at a Memphis cafe where seven pro-union coworkers were fired after launching the""",Starbucks employees voted decisively to unionize at a Memphis cafe where seven pro union coworkers were fired after launching the,0.0,0.0,Neutral
1534388284872445952,"""RT @andEps: Accidentally walked by a @SBWorkersUnited action tonight where @MuseumModernArt is honoring @Starbucks board chair / union bust""",Accidentally walked by a action tonight where is honoring board chair union bust,0.1,0.1,Positive
1534388226412232704,"""RT @greenhousenyt: The Starbucks union crushes it in Michigan\n\nThe union crushes Starbucks in 5 vote counts today\n\n15-1 in Ann Arbor ( Stat""",The Starbucks union crushes it in Michigan\n\nThe union crushes Starbucks in vote counts today\n\n in Ann Arbor Stat,0.0,0.0,Neutral
1534388183055511553,"""RT @greenhousenyt: The Starbucks union crushes it in Michigan\n\nThe union crushes Starbucks in 5 vote counts today\n\n15-1 in Ann Arbor ( Stat""",The Starbucks union crushes it in Michigan\n\nThe union crushes Starbucks in vote counts today\n\n in Ann Arbor Stat,0.0,0.0,Neutral
1534388140689002497,"""no but imagine starbucks being your personality""",no but imagine starbucks being your personality,0.0,0.0,Neutral


In [0]:
from wordcloud import WordCloud
from wordcloud import ImageColorGenerator
from wordcloud import STOPWORDS
import matplotlib.pyplot as plt
import pandas as pd
 