<a href="https://colab.research.google.com/github/skyprince999/100-Days-Of-ML/blob/master/Day%2028%23%20Covid19_Tweets_Streaming_Analysis_from_AWS_Kinesis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This was executed in a Databricks environment

In [None]:
import sys
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
from pyspark.sql.types import *

In [None]:
spark = SparkSession.builder\
                    .master("local")\
                    .appName("Structured Streaming")\
                    .getOrCreate()

In [None]:
pythonSchema = StructType() \
          .add("id", StringType(), True) \
          .add("tweet", StringType(), True) \
          .add("ts", StringType(), True)
awsAccessKeyId = "#########################" # update the access key
awsSecretKey = "###########################"   # update the secret key
kinesisStreamName = "covid-stream"  # update the kinesis stream name
kinesisRegion = "us-east-1"
kinesisDF = spark \
  .readStream \
  .format("kinesis") \
  .option("streamName", kinesisStreamName)\
  .option("region", kinesisRegion) \
  .option("initialPosition", "LATEST") \
  .option("format", "json") \
  .option("awsAccessKey", awsAccessKeyId)\
  .option("awsSecretKey", awsSecretKey) \
  .option("inferSchema", "true") \
  .load()

In [None]:
df = kinesisDF \
  .writeStream \
  .format("memory") \
  .outputMode("append") \
  .queryName("tweets")  \
  .start()

In [None]:
df.status


In [None]:
%sql

select partitionKey, cast(data as string) from tweets;

partitionKey,data


In [None]:
tweets = spark.sql("select cast(data as string) from tweets")

In [None]:
tweets.show(5)

In [None]:
tweets.count()

In [None]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import *



In [None]:
from datetime import datetime
import json

def user(text):
  data = json.loads(text)
  return data['user']['screen_name']

def hashtag(text):
  data = json.loads(text)
  hashtags = data['entities']['hashtags']
  return [h['text'] for h in hashtags]
  #return text[0]

def user_mention(text):
  data = json.loads(text)
  userm = data['entities']['user_mentions']
  return [h['screen_name'] for h in userm]

def full_text(text):
  data =json.loads(text)
  return data['full_text']

def get_datetime(text):
  monthKey = {'Jan': '01', 'Feb': '02', 'Mar': '03', 'Apr': '04', 'May': '05', 'Jun': '06'}
  data = json.loads(text)['created_at'] #text # 
  year = data[-4:]
  month = data[4:7]
  month = monthKey[month]
  date = data[8:10]
  hour = data[11:13]
  minute = data[14:16]
  constructDate = date + '-' + month + '-' + year + '  ' + hour + ':' + minute
  #print(year, month, date, hour, minute)
  return constructDate#datetime(year, month, date, hour, minute)
  

In [None]:
# # Define your function
# getID = UserDefinedFunction(lambda x: parse_tweet(x)[0], StringType())
# getTs = UserDefinedFunction(lambda x: parse_tweet(x)[1], StringType())
# getTweet = UserDefinedFunction(lambda x: parse_tweet(x)[2], StringType())

from pyspark.sql.types import ArrayType, DateType

# Define your function
getSN = UserDefinedFunction(lambda x: user(x), StringType())
getHash = UserDefinedFunction(lambda x: hashtag(x), ArrayType(StringType()))
getUM = UserDefinedFunction(lambda x: user_mention(x), ArrayType(StringType()))
getFT = UserDefinedFunction(lambda x: full_text(x), StringType())
getTime = UserDefinedFunction(lambda x: get_datetime(x), StringType())

In [None]:
user = (tweets.withColumn('user', getSN(col('data')))
             .withColumn('hashtags', getHash(col('data')))
             .withColumn('user_mention', getUM(col('data')))
             .withColumn('full_text', getFT(col('data')))
             .withColumn('parseDate', getTime(col('data')))
             .select('parseDate', 'user', 'hashtags', 'user_mention', 'full_text')
             )
user.show(5)

In [None]:
import textblob
from textblob import TextBlob

def get_sentiment(text):
    
    tweet = TextBlob(text)
    if tweet.sentiment.polarity < 0:
      sentiment = "negative"
    elif tweet.sentiment.polarity == 0:
        sentiment = "neutral"
    else:
        sentiment = "positive"
    return sentiment

In [None]:
# Define your function
getSentiment = UserDefinedFunction(lambda x: get_sentiment(x), StringType())

# Apply the UDF using withColumn
tweets_proc = user.withColumn('sentiment', getSentiment(col("full_text")))

In [None]:
tweets_proc.show()

In [None]:
tweets_proc.count()

In [None]:
display(tweets_proc)

parseDate,user,hashtags,user_mention,full_text,sentiment
31-01-2020 05:09,mybabewoodzyou,List(),List(noppatjak),RT @noppatjak: ราคามาตรฐานของหน้ากากอนามัย (แบบปกติ ไม่ใช่ N95) คือชิ้นละ 2 บาท,neutral
31-01-2020 05:09,UnseenJapanSite,List(),List(),"""All they need to do is declare coronavirus a designated infectious disease. Literally no need to change the constitution. They should focus instead on enforcing two-week quarantines and not bunking folks together."" (2/2)",neutral
31-01-2020 05:09,KevinHarley5,List(),"List(CNNPolitics, CNN)","@CNNPolitics @CNN stupidity is astonishing. Diversity in tackling the Coronavirus. Just when they can’t get any dumber, this...",negative
31-01-2020 05:09,wiggyagogo,"List(closetheborder, auspol)","List(SkyNewsAust, SenPaterson)",@SkyNewsAust @SenPaterson Who’s evidence your relying on? China’s? #closetheborder #auspol,neutral
31-01-2020 05:09,HeavenSmileLisa,List(),List(08escaper),RT @08escaper: ไม่ใช่ของไทยแต่อยากแบ่งปั่น วิธีป้องกันตัวจากไวรัสโคโลน่าสายพันธุ์ใหม่ 1. ล้างมือ/ฆ่าเชื้อบนมืออย่างถูกต้อง 2. ใส่หน้ากากใน…,neutral
31-01-2020 05:09,prawitting,List(ไวรัสโคโรนา),List(fm91trafficpro),RT @fm91trafficpro: องค์การอนามัยโลก (WHO) ประกาศยกระดับ #ไวรัสโคโรนา เป็น “ภาวะฉุกเฉินด้านสาธารณสุขของโลก” หลังพบผู้ติดเชื้อเพิ่ม10 เท่าใน…,neutral
31-01-2020 05:09,RebellionIII,List(),List(),"There should be a bottom line for everything. When I bought the N95 mask a few days ago, many places were sold out. I finally found a bucket of N95 masks in a remote shop. The shop knew that the Chinese needed to pay 50% off the sales. I took 30% of the stock in the shop. This https://t.co/Zw43XhTFJo",positive
31-01-2020 05:09,annarizaaa,"List(coronavirus, CoronavirusOutbreak)",List(choculate),RT @choculate: Really appreciated the humanity act! #coronavirus #CoronavirusOutbreak https://t.co/C5rZTlPj2y,positive
31-01-2020 05:09,yoninvon,List(),List(MedwinTruefaith),"RT @MedwinTruefaith: Taiwan is making sure China won’t get their face masks. 👏👏👏 😆 Tayo, takot ma-hurt ang Tsina. 😩👎 https://t.co/KvwanmCc…",positive
31-01-2020 05:09,albert1776,List(),List(DougieTrucker),RT @DougieTrucker: GOOD NIGHT AMERICA  AND THE WORLD BE SAFE. BE ALERT SWEET DREAMS  SLEEP TIGHT PRAY FOR THE C…,positive


In [None]:
tweets_proc.limit(10000).toPandas().to_csv('/FileStore/Output.csv')

In [None]:
dbutils.fs.put("/FileStore/Output.csv", "Output.csv")

In [None]:
%python
dbutils.widgets.get("Covid_Twarc")

In [None]:

tweets_proc.dtypes

In [None]:
sparse_format_udf = udf(lambda x: ','.join([str(elem) for elem in x], StringType()))

query = (tweets_proc.withColumn('hashtags', sparse_format_udf(col('hashtags')))
                   .withColumn('user_mention', sparse_format_udf(col('user_mention'))))
query.dtypes

In [None]:
query.show(5)

In [None]:
display(query.limit(10000))

In [None]:
import os

os.listdir()

In [None]:
tweets.createOrReplaceTempView("tweets_parsed")
#tweets.groupBy('sentiment').count()

In [None]:
%sql
select sentiment, count(*) as cnt from tweets_parsed group by sentiment

sentiment,cnt
positive,34886
neutral,54719
negative,17161


In [None]:
import plotly.express as px

import pandas as pd
df = pd.read_csv('https://raw.githubusercontent.com/plotly/datasets/master/finance-charts-apple.csv')

fig = px.line(df, x='Date', y='AAPL.High')
fig.show()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime


In [None]:
type(tweets)

In [None]:
tweets.dtypes

In [None]:
tweets.count()

In [None]:
tweets_pdf = tweets.toPandas()

In [None]:
tweets_pdf.head()

Unnamed: 0,data,id,ts,tweet,sentiment
0,"[{""id"": ""1267925038927314945"", ""tweet"": ""b\""RT...",1267925038927314945,Tue Jun 02 21:04:20 +0000 2020,"b""RT @AshaRangappa_: Trudeau's pause brought t...",neutral
1,"[{""id"": ""1267925038881267712"", ""tweet"": ""b'RT ...",1267925038881267712,Tue Jun 02 21:04:20 +0000 2020,b'RT @SylvieoooOooo: Ah ils sont fort \xf0\x9f...,neutral
2,"[{""id"": ""1267925039329894400"", ""tweet"": ""b'Can...",1267925039329894400,Tue Jun 02 21:04:20 +0000 2020,b'Canada\xe2\x80\x99s Military knows more than...,positive
3,"[{""id"": ""1267925040231825409"", ""tweet"": ""b'The...",1267925040231825409,Tue Jun 02 21:04:21 +0000 2020,b'The impacts on #CoVID19 are slowly precipita...,negative
4,"[{""id"": ""1267925040722579471"", ""tweet"": ""b'RT ...",1267925040722579471,Tue Jun 02 21:04:21 +0000 2020,b'RT @pamSmacdonald: @jpwooster @MuhammadLila ...,neutral


In [None]:
idx = pd.DatetimeIndex(pd.to_datetime(tweets_pdf['ts']))

In [None]:
type(idx)

In [None]:
len(pd.to_datetime(tweets_pdf['ts']))

In [None]:
ones = np.ones(len(pd.to_datetime(tweets_pdf['ts'])))

In [None]:
idx.shape[0]

In [None]:
# the actual series (at series of 1s for the moment) 
my_series = pd.Series(ones, index=idx)

In [None]:
# Resampling / bucketing into 1-minute buckets
per_minute = my_series.resample('1Min').sum().fillna(0)

In [None]:
per_minute_df = pd.DataFrame(per_minute)
per_minute_df.head()

Unnamed: 0_level_0,0
ts,Unnamed: 1_level_1
2020-06-02 21:04:00+00:00,650.0
2020-06-02 21:05:00+00:00,1089.0
2020-06-02 21:06:00+00:00,1041.0
2020-06-02 21:07:00+00:00,363.0


In [None]:
per_minute_df.columns

In [None]:
fig = px.line(per_minute_df, x=per_minute_df.index, y=0)
fig.show()