<h1>Extract, Transform, Load<h1>

This notebook demonstrates the process of extracting data from Apache Kafka stream (subscribing pre-defined range of dates), **retrieving coronavirus-related tweets using tweepy for accessing Twitter API (bonus section)** and loading to Elasticsearch data store.

We chose to create two indexes filtered by date:
* *test3* - contains tweets created at 7/2/2020 (will be referred as the **old period**)
* *test5* - contains tweets created at 3/6/2020 and 8/6/2020 (will be referred as the **new period**)

Due to Twitter API rate constrains, these indexes are unequal w.r.t both number of documents and number of dates.
<br>Both indexes will be used for both data analysis and machine learning tasks.

Also, we chose to create an index filtered by location:
* *test10_us* - contains tweets from United States from 7/2/2020 to 10/6/2020 (all dates available)
This index will be transformed in Kibana, and the transformation result (*us_grouped2*) will be used for comparing number of tweets in US to the Dow Jones Industrial Average, as part of our data analysis task.

Note - index names in this notebook appear with the prefix 'new_' to prevent overriding our indexes in elasticsearch.

<h1>ETL - 1<h1>
<h3>for the first 2 indexes (filtered by dates)<h3>

<h2>Extract<h2>

We ran the following commands (Cmd 6 to Cmd 15) twice - one for each period of time with the following differences:
* different index_name on Cmd 6
* different dates(=topics) on Cmd 7

In [0]:
ES_OUR_HOST = 'dds2019s-1010.eastus.cloudapp.azure.com'
index = "new_test5" 
# index = 'new_test3'

In [0]:
# Create Topics for kafka

from datetime import date, timedelta

sdate = date(2020,6,3)
edate = date(2020,6,9)
# sdate = date(2020,2,7)
# edate = date(2020,2,14)

def dates_bwn_twodates(start_date, end_date):
    for n in range(int ((end_date - start_date).days)):
        yield (start_date + timedelta(n)).strftime("%d-%m-%Y")

topics = ", ".join(dates_bwn_twodates(sdate,edate))

In [0]:
# read from kafka into a streaming dataframe

raw_stream_df = spark.readStream\
          .format("kafka")\
          .option("kafka.bootstrap.servers", "http://ddkafka.eastus.cloudapp.azure.com:9092")\
          .option("subscribe", topics)\
          .option("startingOffsets", "earliest")\
          .load()

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

string_value_df = raw_stream_df.selectExpr("CAST(value AS STRING)")

# Define the schema of the data:
schema = StructType()\
          .add("tweet_id", LongType(), False)\
          .add("user_id", LongType(), False)\
          .add("date", StringType(), True)\
          .add("keywords", ArrayType(StringType(), True), True)\
          .add("location", MapType(StringType(), StringType(), True), True)

# Read each 'value' String as JSON:
json_df = string_value_df.select(F.from_json(F.col("value"), schema= schema).alias('json'))
# Flatten the nested object:
streaming_df = json_df.select("json.*")
display(streaming_df)

tweet_id,user_id,date,keywords,location
1270003116302311425,1256334649028157442,Mon Jun 08 14:41:53 +0000 2020,List(covid19),"Map(country -> Ecuador, state -> Guayas, city -> Guayaquil)"
1270003116264439808,5950872,Mon Jun 08 14:41:53 +0000 2020,List(coronavirus),"Map(country -> United States, state -> New Mexico, city -> Albuquerque)"
1270003116772077570,140961493,Mon Jun 08 14:41:53 +0000 2020,List(covid),"Map(country -> Argentina, state -> Autonomous City of Buenos Aires, city -> Buenos Aires)"
1270003116814057472,850395405917540352,Mon Jun 08 14:41:53 +0000 2020,List(covid19),
1270003116901974016,318857880,Mon Jun 08 14:41:53 +0000 2020,List(coronavirus),
1270003117057286145,1163556137624584193,Mon Jun 08 14:41:53 +0000 2020,List(coronavirus),
1270003117220823042,327897223,Mon Jun 08 14:41:53 +0000 2020,List(coronavirus),
1270003117283667968,105857751,Mon Jun 08 14:41:53 +0000 2020,List(coronavirus),
1270003117434769410,400788839,Mon Jun 08 14:41:53 +0000 2020,List(covid),
1270003117501952001,72186440,Mon Jun 08 14:41:53 +0000 2020,List(covid),


<h2>Transform & Load<h2>

In [0]:
# get full tweet schema

import pickle
tweets_schema = pickle.load(open('/dbfs/mnt/tweet_schema.pkl', 'rb'))

In [0]:
# define parameters for using tweepy

import tweepy
from pyspark.sql.types import *

consumer_key = 'PCcs2JWhoJL2o9k6nTq9TgE29'
consumer_secret = 'TApeTZr9E7HVKVHJUTHIkCZKZ0ejVCia2quFfCghDBgqdMzxaA'
access_token = '771948456793477120-JRirOAMfnBoQrCt6lnzLY2NDQkvw58r'
access_token_secret = 'lFxUsGvGMjKqy2CSystHjLjbitKEfC2eo7jYhDhYazrZ6'

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

In [0]:
# create index

from elasticsearch import Elasticsearch, helpers
import requests

es = Elasticsearch([{'host': ES_OUR_HOST}], timeout=60000)

dbutils.fs.rm("/tmp/OmerOmer/Stream/", True)
dbutils.fs.mkdirs("/tmp/OmerOmer/Stream/")


if es.indices.exists(index): # Delete if exists
  es.indices.delete(index=index)
  
settings = {
    "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 0,
        "refresh_interval" : -1
    }
  ,
    "mappings": {
        "properties": {
            "date" : { "type": "date" },
        }
    }
}

es.indices.create(index=index, ignore=400, body=settings)

Apply tweets hydration & pre process for each batch, and write to elastic index

In [0]:
from pyspark.sql.types import *
import pyspark.sql.functions as F


date_format = 'EEE MMM dd HH:mm:ss +SSSS yyyy'


def aux(df, epoch_id):
    
    records = df.collect()
    full_id_list = [str(record.tweet_id) for record in records]
    
    # iterate over blocks of 100 ids
    n = int(len(full_id_list)/100)
    for i in range(1, n+1): 
      relevant_id_list = full_id_list[(i-1)*100:i*100]
      tweets_statuses = api.statuses_lookup(id_=relevant_id_list)
      
      # retrieve features
      tweets_df = sqlContext.createDataFrame(tweets_statuses, schema=tweets_schema)
      tweets_df = tweets_df.select(['id', 'text', 'lang', 'user', 'source']).withColumnRenamed('id', 'tweet_id')
      tweets_df = tweets_df.join(df.select(['tweet_id', 'location', 'date']), 'tweet_id')
      
      # pre process
      
      ### explode user:
      tweets_df = tweets_df.withColumn("user_id", F.explode(F.array(tweets_df.user).getItem("id")))
      tweets_df = tweets_df.withColumn("user_followers_count", F.explode(F.array(tweets_df.user).getItem("followers_count")))
      tweets_df = tweets_df.withColumn("user_friends_count", F.explode(F.array(tweets_df.user).getItem("friends_count")))
      tweets_df = tweets_df.withColumn("user_listed_count", F.explode(F.array(tweets_df.user).getItem("listed_count")))
      tweets_df = tweets_df.withColumn("user_lang", F.explode(F.array(tweets_df.user).getItem("lang")))
      
      ### convert created_at into TS
      tweets_df = tweets_df.withColumn('date', F.to_timestamp(F.col("date"), date_format))
      
      # drop unwanted columns:
      tweets_df = tweets_df.drop("user")
      
      # add month column:
      tweets_df = tweets_df.withColumn('month', F.month('date'))

      # write to es
      tweets_df.write.format("org.elasticsearch.spark.sql")\
      .option("es.resource", index)\
      .option("es.nodes.wan.only","true")\
      .option("es.port","9200")\
      .option("es.nodes",ES_OUR_HOST)\
      .option("es.nodes.client.only", "false")\
      .mode("append") \
      .save()
    
    return

streaming_df.writeStream.foreachBatch(aux).start()

<h1>ETL - 2<h1>
<h3>for the third index (filtered by location)<h3>

<h2>Extract<h2>

In [0]:
index_us = 'new_test10_us'

In [0]:
# Create Topics for kafka

sdate_us = date(2020,2,7)
edate_us = date(2020,6,10)

topics_us = ", ".join(dates_bwn_twodates(sdate_us,edate_us))

In [0]:
raw_stream_df_us = spark.readStream\
          .format("kafka")\
          .option("kafka.bootstrap.servers", "http://ddkafka.eastus.cloudapp.azure.com:9092")\
          .option("subscribe", topics_us)\
          .option("startingOffsets", "earliest")\
          .load()

In [0]:
string_value_df_us = raw_stream_df_us.selectExpr("CAST(value AS STRING)")

# Read each 'value' String as JSON:
json_df_us = string_value_df_us.select(F.from_json(F.col("value"), schema= schema).alias('json'))
# Flatten the nested object:
streaming_df_us = json_df_us.select("json.*")
display(streaming_df_us)

tweet_id,user_id,date,keywords,location
1255346044675796994,349084023,Wed Apr 29 03:59:54 +0000 2020,List(coronavirus),
1255346044738646017,877454535131844612,Wed Apr 29 03:59:54 +0000 2020,List(coronavirus),
1255346044906483715,16026153,Wed Apr 29 03:59:54 +0000 2020,List(covid19),"Map(country -> United States, state -> Montana, city -> Missoula)"
1255346045007147008,948497847808270336,Wed Apr 29 03:59:54 +0000 2020,List(covid),"Map(country -> Indonesia, state -> DKI Jakarta, city -> Jakarta)"
1255346045514665986,20930240,Wed Apr 29 03:59:55 +0000 2020,List(coronavirus),"Map(country -> United States, state -> Florida, city -> Lakeland)"
1255346042687586311,146608044,Wed Apr 29 03:59:54 +0000 2020,List(coronavirus),
1255346045606912003,2858719709,Wed Apr 29 03:59:55 +0000 2020,List(covid),
1255346045950857216,1254644320109355009,Wed Apr 29 03:59:55 +0000 2020,List(covid),
1255346045753663489,86805382,Wed Apr 29 03:59:55 +0000 2020,List(covid),"Map(country -> United States, state -> Texas, city -> Fort Worth)"
1255346045690826752,952247125185720320,Wed Apr 29 03:59:55 +0000 2020,List(covid19),


<h2>Transform<h2>

In [0]:
# filter US tweets & transform date into a timestamp

date_format = 'EEE MMM dd HH:mm:ss +SSSS yyyy'
df_loc = streaming_df_us.withColumn("country", F.explode(F.array(streaming_df_us.location.country)))
df_us = df_loc.where(df_loc.country == 'United States').dropDuplicates(['tweet_id'])
df_us = df_us.withColumn('date', F.to_timestamp(F.col("date"), date_format))
display(df_us)

tweet_id,user_id,date,keywords,location,country
1239918097332658177,317385229,2020-03-17T14:14:45.000+0000,List(coronavirus),"Map(country -> United States, state -> New York, city -> null)",United States
1239918294603386886,14928079,2020-03-17T14:15:32.000+0000,List(coronavirus),"Map(country -> United States, state -> Pennsylvania, city -> Pittsburgh)",United States
1239918537772236801,2512857185,2020-03-17T14:16:30.000+0000,"List(coronavirus, coronavirus)","Map(country -> United States, state -> District of Columbia, city -> Washington)",United States
1239918672132571141,477176185,2020-03-17T14:17:02.000+0000,List(covid),"Map(country -> United States, state -> New Mexico, city -> null)",United States
1239918810741772288,937040650469552128,2020-03-17T14:17:35.000+0000,List(coronavirus),"Map(country -> United States, state -> Florida, city -> null)",United States
1239918815175114752,879198372,2020-03-17T14:17:36.000+0000,List(coronavirus),"Map(country -> United States, state -> Alabama, city -> Huntsville)",United States
1239918844308795392,865764283656884224,2020-03-17T14:17:43.000+0000,List(coronavirus),"Map(country -> United States, state -> Florida, city -> Port St Lucie)",United States
1239918869306920960,721355986863263745,2020-03-17T14:17:49.000+0000,List(covid),"Map(country -> United States, state -> Texas, city -> Houston)",United States
1239919591008190464,1148704035266093056,2020-03-17T14:20:41.000+0000,List(coronavirus),"Map(country -> United States, state -> Florida, city -> null)",United States
1239919632942804992,118563618,2020-03-17T14:20:51.000+0000,List(coronavirus),"Map(country -> United States, state -> Ohio, city -> null)",United States


In [0]:
if es.indices.exists(index_us): # Delete if exists
  es.indices.delete(index=index_us)

es.indices.create(index=index_us, ignore=400, body=settings)

<h2>Load<h2>

In [0]:
# write to elastic

df_us.writeStream \
        .outputMode("append") \
        .queryName(f"{index_us}_to_es") \
        .format("org.elasticsearch.spark.sql") \
        .option("es.nodes.wan.only","true") \
        .option("checkpointLocation", f"/tmp/OmerOmer2/Stream/{index_us}/") \
        .option("es.resource", index_us) \
        .option("es.nodes", ES_OUR_HOST) \
        .option("es.port","9200") \
        .start()