In [0]:
pip install textblob

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
                    .master("local")\
                    .appName("")\
                    .getOrCreate()

In [0]:
from pyspark.sql.types import *

In [0]:
pythonSchema = StructType() \
          .add("id", StringType(), True) \
          .add("tweet", StringType(), True) \
          .add("ts", StringType(), True)
awsAccessKeyId = "" # update the access key
awsSecretKey = ""   # update the secret key
kinesisStreamName = "terraform-kinesis-test"  # update the kinesis stream name
kinesisRegion = "us-east-1"
kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesisStreamName)\
  .option("region", kinesisRegion) \
  .option("initialPosition", "LATEST") \
  .option("format", "json") \
  .option("awsAccessKey", awsAccessKeyId)\
  .option("awsSecretKey", awsSecretKey) \
  .option("inferSchema", "true") \
  .load()

In [0]:
df = kinesisDF \
  .writeStream \
  .format("memory") \
  .outputMode("append") \
  .queryName("tweets")  \
  .start()

In [0]:
tweets = spark.sql("select cast(data as string) from tweets")

In [0]:
from pyspark.sql import functions as F
import json
def parse_tweet(text):
    data = json.loads(text)
    id = data[0]['id']
    ts = data[0]['ts']
    tweet = data[0]['tweet'] 
    return (id, ts, tweet)
    
# Define your function
getID = udf(lambda x: parse_tweet(x)[0], StringType())
getTs = udf(lambda x: parse_tweet(x)[1], StringType())
getTweet = udf(lambda x: parse_tweet(x)[2], StringType())
# Apply the UDF using withColumn
tweets = (tweets.withColumn('id', getID(F.col("data")))
               .withColumn('ts', getTs(F.col("data")))
               .withColumn('tweet', getTweet(F.col("data")))
         )
tweets = tweets.drop('data')
tweets = tweets.withColumn('tweet', F.regexp_replace(('tweet'),'\\\\[0-9a-zA-Z]{1,5}',''))
tweets = tweets.withColumn('tweet', F.regexp_replace(('tweet'),'b\'RT\s@[a-zA-Z_]+\:',''))
tweets.display()

In [0]:
import textblob
def get_sentiment(text):
    from textblob import TextBlob
    tweet = TextBlob(text)
    if tweet.sentiment.polarity < 0:
      sentiment = "NEGATIVE"
    elif tweet.sentiment.polarity == 0:
        sentiment = "NEUTRAL"
    else:
        sentiment = "POSITIVE"
    return sentiment
getSentiment = udf(lambda x: get_sentiment(x), StringType())

In [0]:
tweets = tweets.withColumn('sentiment', getSentiment(F.col('tweet')))

In [0]:
import boto3
comprehend = boto3.client('comprehend', region_name = 'us-east-1',aws_access_key_id = awsAccessKeyId,aws_secret_access_key = awsSecretKey)

In [0]:
def aws_sentiment(x):
    lang = comprehend.detect_dominant_language(x)
    out = json.dumps(comprehend.detect_sentiment(Text=x, LanguageCode='lang'), sort_keys=True, indent=4)
    out = json.loads(out)
    return out.get('Sentiment')

In [0]:
import pandas as pd
df = tweets.toPandas()

In [0]:
k = df['tweet'].to_numpy()
import numpy as np
h = []
for i in k:
  h.append(aws_sentiment(i))
df['aws'] = h

In [0]:
import matplotlib.pyplot as plt

In [0]:
# Plot for tweet frequency
df['ts'] = pd.to_datetime(df['ts'])
df['minute'] = df['ts'].apply(lambda x: "%d" % (x.minute))
r = df.groupby('minute').aws.count()
d = plt.subplot()
d.grid(True)
d.set_title("Tweet Frequency")
d.plot(r.index, r)

In [0]:
# To plot aws sentiment as bar graph
s = df.groupby('aws').aws.count()
plt.plot(s)

In [0]:
# To plot textblob sentiment analysis as bar graph
s = df.groupby('sentiment').sentiment.count()
plt.plot(s)