In [1]:
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark.sql.types import *

spark = SparkSession.builder\
                    .master("local")\
                    .appName("Structured Streaming - Twitter Sentiment")\
                    .getOrCreate()

pythonSchema = StructType() \
          .add("id", StringType(), True) \
          .add("tweet", StringType(), True) \
          .add("ts", StringType(), True)

In [2]:
awsAccessKeyId = "AKIAIJCX4CRFAREDQUYA" # update the access key
awsSecretKey = "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"   # update the secret key
kinesisStreamName = "twitter-data-stream"  # update the kinesis stream name (need to set up the stream first and ingest data)
kinesisRegion = "us-west-2"


kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesisStreamName)\
  .option("region", kinesisRegion) \
  .option("initialPosition", "latest") \
  .option("format", "json") \
  .option("awsAccessKey", awsAccessKeyId)\
  .option("awsSecretKey", awsSecretKey) \
  .option("inferSchema", "true") \
  .load()

In [3]:
df = kinesisDF \
  .writeStream \
  .format("memory") \
  .outputMode("append") \
  .queryName("tweets")  \
  .start()

In [4]:
df.status

In [5]:
%sql

select partitionKey, cast(data as string) from tweets;

partitionKey,data
gwA1DqrnYUTegXD,"[{""id"": ""1254201388692729856"", ""tweet"": ""b'RT @youaquarium: \\xe7\\xbd\\xb2\\xe5\\x90\\x8d\\xe3\\x82\\x92\\xe3\\x82\\x88\\xe3\\x82\\x8d\\xe3\\x81\\x97\\xe3\\x81\\x8f\\xe3\\x81\\x8a\\xe9\\xa1\\x98\\xe3\\x81\\x84\\xe3\\x81\\x97\\xe3\\x81\\xbe\\xe3\\x81\\x99\\xef\\xbc\\x81\\n#\\xe5\\xad\\xa6\\xe7\\x94\\x9f\\xe3\\x81\\xab\\xe4\\xba\\x88\\xe7\\xae\\x97\\xe3\\x82\\x92 \\n#\\xe5\\xa4\\xa7\\xe5\\xad\\xa6\\xe3\\x81\\xab\\xe3\\x82\\x82\\xe4\\xba\\x88\\xe7\\xae\\x97\\xe3\\x82\\x92 \\n#COVID19\\xe5\\xad\\xa6\\xe8\\xb2\\xbb\\xe5\\x95\\x8f\\xe9\\xa1\\x8c \\nhttps://t.co/ZpIhpME5Vm'"", ""ts"": ""Sun Apr 26 00:11:27 +0000 2020""}]"
JAstford,"[{""id"": ""1254201388667559936"", ""tweet"": ""b\""RT @KLGLASS2: .\\n\\n.\\n\\n United States \\n\\n 33%. Cases Globally @ 955,488\\n\\n 26%. Deaths Globally @ 54,120\\n\\n4.25% of World's Population\"""", ""ts"": ""Sun Apr 26 00:11:27 +0000 2020""}]"
ArkansasBlog,"[{""id"": ""1254201388801896454"", ""tweet"": ""b'RT @LiftedPlatinum: My mother was the first healthcare worker to lose her life to coronavirus in Arkansas'"", ""ts"": ""Sun Apr 26 00:11:27 +0000 2020""}]"
truthoreal,"[{""id"": ""1254201389015724032"", ""tweet"": ""b'RT @think_fee_hit: \\xe3\\x82\\xab\\xe3\\x83\\x8a\\xe3\\x83\\x80\\xe3\\x81\\xaf\\xe5\\xad\\xa6\\xe7\\x94\\x9f\\xe5\\x85\\xa8\\xe4\\xbd\\x93\\xe3\\x81\\xab\\xe7\\xb4\\x846900\\xe5\\x84\\x84\\xe5\\x86\\x86\\xe3\\x81\\xae\\xe6\\x94\\xaf\\xe5\\x87\\xba\\xe3\\x82\\x92\\xe3\\x81\\x99\\xe3\\x82\\x8b\\xe3\\x81\\xae\\xe3\\x81\\xab\\xe5\\xaf\\xbe\\xe3\\x81\\x97\\xe3\\x81\\xa6\\xe3\\x80\\x81\\xe6\\x97\\xa5\\xe6\\x9c\\xac\\xe3\\x81\\xaf7\\xe5\\x84\\x84\\xe5\\x86\\x86\\xe3\\x81\\xa7\\xe3\\x81\\x99\\xe3\\x80\\x82\\xe7\\xb4\\x841000\\xe5\\x80\\x8d\\xe3\\x81\\xae\\xe5\\xb7\\xae\\xe3\\x81\\x8c\\xe3\\x81\\x82\\xe3\\x82\\x8a\\xe3\\x81\\xbe\\xe3\\x81\\x99\\xe2\\x80\\xa6\\xe3\\x80\\x82\\n\\n#\\xe5\\xad\\xa6\\xe7\\x94\\x9f\\xe3\\x81\\xab\\xe4\\xba\\x88\\xe7\\xae\\x97\\xe3\\x82\\x92 \\n#\\xe5\\xa4\\xa7\\xe5\\xad\\xa6\\xe3\\x81\\xab\\xe3\\x82\\x82\\xe4\\xba\\x88\\xe7\\xae\\x97\\xe3\\x82\\x92'"", ""ts"": ""Sun Apr 26 00:11:27 +0000 2020""}]"
agodapokerpkv1,"[{""id"": ""1254201389292548098"", ""tweet"": ""b'#KIMJONGUNDEAD #sawityowit #recehkansahur #alat #pakgirl #corona #covid19 #staysafe #StayAtHomeChallenge https://t.co/FFDK5EnMpL'"", ""ts"": ""Sun Apr 26 00:11:27 +0000 2020""}]"
MarcoAPinoM,"[{""id"": ""1254201389741457408"", ""tweet"": ""b'RT @MINSAPma: Compartimos la actualizaci\\xc3\\xb3n de datos sobre el #COVID19 en nuestro pa\\xc3\\xads.\\nS\\xc3\\xa1bado 25 de abril de 2020.\\n#Prot\\xc3\\xa9getePanam\\xc3\\xa1\\n#Unidos\\xe2\\x80\\xa6'"", ""ts"": ""Sun Apr 26 00:11:27 +0000 2020""}]"
CubaJulito,"[{""id"": ""1254201390035079169"", ""tweet"": ""b'RT @DeZurdaTeam: El Imperio insiste en mantener un genocida bloqueo contra el pueblo de #Cuba en medio de la pandemia #COVID19.\\n\\nHoy se cum\\xe2\\x80\\xa6'"", ""ts"": ""Sun Apr 26 00:11:27 +0000 2020""}]"
kali_duarte,"[{""id"": ""1254201390668406786"", ""tweet"": ""b'RT @SenadoFederal: Mesmo quem n\\xc3\\xa3o est\\xc3\\xa1 no grupo de risco precisa seguir as recomenda\\xc3\\xa7\\xc3\\xb5es. Jovens podem precisar de atendimento, assim como\\xe2\\x80\\xa6'"", ""ts"": ""Sun Apr 26 00:11:28 +0000 2020""}]"
GolightlyGrl427,"[{""id"": ""1254201390643269634"", ""tweet"": ""b'Can\\xe2\\x80\\x99t wait for everyone to trash CA for this... like they did FL (when it was still allowed)'"", ""ts"": ""Sun Apr 26 00:11:28 +0000 2020""}]"
jonberrydesign,"[{""id"": ""1254201391024762881"", ""tweet"": ""b'RT @abc7chriscristi: RIGHT NOW: This is the scene from #Air7HD in Newport Beach, CA as the #COVID19 death toll surges in Southern Californ\\xe2\\x80\\xa6'"", ""ts"": ""Sun Apr 26 00:11:28 +0000 2020""}]"


In [6]:
tweets = spark.sql("select cast(data as string) from tweets")

In [7]:
tweets.show(5, truncate=True)

In [8]:
tweets.printSchema()

In [9]:
tweets.show(5, truncate=False)

In [10]:
tweets.count()

In [11]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *

import json

def parse_tweet(text):
    data = json.loads(text)
    id = data[0]['id']
    ts = data[0]['ts']
    tweet = data[0]['tweet'] 
    return (id, ts, tweet)
    
# Define your function
getID = UserDefinedFunction(lambda x: parse_tweet(x)[0], StringType())
getTs = UserDefinedFunction(lambda x: parse_tweet(x)[1], StringType())
getTweet = UserDefinedFunction(lambda x: parse_tweet(x)[2], StringType())

# Apply the UDF using withColumn
tweets = (tweets.withColumn('id', getID(col("data")))
               .withColumn('ts', getTs(col("data")))
               .withColumn('tweet', getTweet(col("data")))
         )

In [12]:
tweets.show(truncate=False)

In [13]:
tweets.show(truncate=True)

In [14]:
tweets.printSchema()

In [15]:
!pip install textblob
import textblob

In [16]:
def get_sentiment(text):
    from textblob import TextBlob
    tweet = TextBlob(text)
    if tweet.sentiment.polarity < 0:
      sentiment = "negative"
    elif tweet.sentiment.polarity == 0:
        sentiment = "neutral"
    else:
        sentiment = "positive"
    return sentiment
  
# Define your function
getSentiment = UserDefinedFunction(lambda x: get_sentiment(x), StringType())

# Apply the UDF using withColumn
tweets = tweets.withColumn('sentiment', getSentiment(col("tweet")))

In [17]:
tweets.show()

In [18]:
tweets.printSchema()

In [19]:
tweets.createOrReplaceTempView("tweets_parsed")

In [20]:
%sql
select sentiment, count(*) as covid19_case from tweets_parsed group by sentiment

sentiment,covid19_case
positive,993
neutral,2303
negative,494


In [21]:
!pip install plotly
import plotly.express as px

In [22]:
import pandas as pd
df = pd.read_csv('https://raw.githubusercontent.com/plotly/datasets/master/finance-charts-apple.csv')

fig = px.line(df, x='Date', y='AAPL.High')
fig.show()

In [23]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
type(tweets)

In [24]:
tweets.dtypes

In [25]:
tweets_pdf = tweets.toPandas()

In [26]:
tweets_pdf.head()

Unnamed: 0,data,id,ts,tweet,sentiment
0,"[{""id"": ""1254201388692729856"", ""tweet"": ""b'RT ...",1254201388692729856,Sun Apr 26 00:11:27 +0000 2020,b'RT @youaquarium: \xe7\xbd\xb2\xe5\x90\x8d\xe...,neutral
1,"[{""id"": ""1254201388667559936"", ""tweet"": ""b\""RT...",1254201388667559936,Sun Apr 26 00:11:27 +0000 2020,"b""RT @KLGLASS2: .\n\n.\n\n United St...",neutral
2,"[{""id"": ""1254201388801896454"", ""tweet"": ""b'RT ...",1254201388801896454,Sun Apr 26 00:11:27 +0000 2020,b'RT @LiftedPlatinum: My mother was the first...,positive
3,"[{""id"": ""1254201389015724032"", ""tweet"": ""b'RT ...",1254201389015724032,Sun Apr 26 00:11:27 +0000 2020,b'RT @think_fee_hit: \xe3\x82\xab\xe3\x83\x8a\...,neutral
4,"[{""id"": ""1254201389292548098"", ""tweet"": ""b'#KI...",1254201389292548098,Sun Apr 26 00:11:27 +0000 2020,b'#KIMJONGUNDEAD #sawityowit #recehkansahur #...,neutral


In [27]:
idx = pd.DatetimeIndex(pd.to_datetime(tweets_pdf['ts']))
idx

In [28]:
len(pd.to_datetime(tweets_pdf['ts']))

In [29]:
ones = np.ones(len(pd.to_datetime(tweets_pdf['ts'])))
idx.shape[0]

In [30]:
# the actual series (at series of 1s for the moment) 
my_series = pd.Series(ones, index=idx)
# Resampling / bucketing into 1-minute buckets
per_minute = my_series.resample('1Min').sum().fillna(0)
per_minute

In [31]:
# Plotting the series
%matplotlib inline

fig, ax = plt.subplots()
ax.grid(True)

ax.set_title("Tweet numbers")
interval = mdates.MinuteLocator(interval=10)
date_formatter = mdates.DateFormatter('%H:%M')

datemin = datetime(2020, 4, 24, 16, 0) 
datemax = datetime(2020, 4, 26, 17, 10)

ax.xaxis.set_major_locator(interval) 
ax.xaxis.set_major_formatter(date_formatter) 
ax.set_xlim(datemin, datemax)
max_freq = per_minute.max()
min_freq = per_minute.min()
ax.set_ylim(min_freq-100, max_freq+100) 
ax.plot(per_minute.index, per_minute)

display(fig)

In [32]:
# Query 1
# User profile – location from where the tweet is posted
%sql
SELECT user.name, text, geo, user.profile_image_url from tweets where geo IS NOT NULL