<a href="https://colab.research.google.com/github/skyprince999/Data-Engineering-Covid19-ETL/blob/master/Hydrating_Streaming_AWS_Kinesis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Tweet IDs from the repository are hydrated and instead of storing to disk are piped to a AWS Kinesis data-stream. 

From there they are read into a Spark streaming context for processing.

The following attributes are extracted from the tweets -

1. User Id
2. User mentions
3. hashtags
4. full text of tweet
5. sentiment analysis using text-blob





In [None]:
!git clone https://github.com/echen102/COVID-19-TweetIDs

In [None]:
#Twarc is used to hydrate the tweets- don't run this
!pip install twarc
!twarc configure

In [None]:
!sudo cat /root/.twarc

In [None]:
 
import boto3
import json
import time

import random

In [None]:
import gzip
import json
import os 

from tqdm import tqdm
from twarc import Twarc
from pathlib import Path
import pathlib

twarc = Twarc()
data_dirs = ['COVID-19-TweetIDs/2020-01', 'COVID-19-TweetIDs/2020-02', 'COVID-19-TweetIDs/2020-03', 'COVID-19-TweetIDs/2020-04', 
             'COVID-19-TweetIDs/2020-05', 'COVID-19-TweetIDs/2020-06', 'COVID-19-TweetIDs/2020-07', 'COVID-19-TweetIDs/2020-08', 'COVID-19-TweetIDs/2020-09']  

In [None]:
# Get all files in the repo
fileList = list()
for data_dir in data_dirs:
  p = Path(data_dir).glob('**/*')
  files = [x for x in p if x.is_file()]
  fileList.extend(files)

# Randomly choose 30% of all files and then shuffle the list 
sampleN = int(round(len(fileList)*0.3, 0)) # 4570 >> 1371 
fileListN = random.sample(fileList, sampleN)
random.shuffle(fileListN)
print(len(fileListN))

In [None]:
!# Add filelist to skip tweets that were processed in previous iteration

In [None]:
for file in fileList:
   try:
     fileListN.remove(pathlib.PosixPath(file))
   except:
    pass

In [None]:
def main():
  for path in fileListN:
    if path.name.endswith('.txt'):
      hydrate(path)


In [None]:
def _reader_generator(reader):
    b = reader(1024 * 1024)
    while b:
        yield b
        b = reader(1024 * 1024)


In [None]:
def raw_newline_count(fname):
    """
    Counts number of lines in file
    """
    f = open(fname, 'rb')
    f_gen = _reader_generator(f.raw.read)
    return sum(buf.count(b'\n') for buf in f_gen)


In [None]:
def get_record(tweet):
  """
  Return data bytes
  """
  record = json.dumps(tweet)

  return {'Data': bytes(record, 'utf-8') }  # << Use this if sending to a Firehose
  #return [{'Data': bytes(record, 'utf-8'), 'PartitionKey': 'partition_key'}] # << This isto be used if passing the data to a Kinesis DataStream

In [None]:
def hydrate(id_file):
    # create kinesis client connection
    kinesis_client = boto3.client('firehose', # Change this to kinesis if you are using a Data stream 
                                  region_name='us-east-1',  # enter the region
                                  aws_access_key_id='XXXXXXXXXXXXXXXXXXXXXX',  # fill your AWS access key id (should have access to Kinesis resources)
                                  aws_secret_access_key='XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX')  # fill you aws secret access key

    print('hydrating {}'.format(id_file))

    gzip_path = id_file.with_suffix('.jsonl.gz')
    if gzip_path.is_file():
        print('skipping json file already exists: {}'.format(gzip_path))
        return

    num_ids = raw_newline_count(id_file)
    tweet_list = list()
    with gzip.open(gzip_path, 'w') as output:
        with tqdm(total=num_ids) as pbar:
            for tweet in twarc.hydrate(id_file.open()):
                record = get_record(tweet)
                kinesis_client.put_record(DeliveryStreamName="covid-stream", Record= record) # << This is for a Firehose
                tweet_list=list()
                
                # The below code should be place after the for loop. Takes one out of 3 tweets
                #tweet_list.append(tweet)
                #if len(tweet_list) == 3: # Take sample from N recent tweets
                #  tweet_r = random.choice(tweet_list)
                  
                #output.write(json.dumps(tweet).encode('utf8') + b"\n")  # This is used to write to a jsonl file
                #kinesis_client.put_record(StreamName="covid-stream", Records= record) # << This is for a Kinesis Data Stream
                pbar.update(1)
        

In [None]:
# !rm -rf COVID-19-TweetIDs/2020-06/coronavirus-tweet-id-2020-06-14-10.jsonl.gz
# !rm -rf COVID-19-TweetIDs/2020-05/coronavirus-tweet-id-2020-05-01-23.jsonl.gz
# !rm -rf COVID-19-TweetIDs/2020-07/coronavirus-tweet-id-2020-07-15-19.jsonl.gz

In [None]:
if __name__ == "__main__":
    main()

In [None]:
break here