<a href="https://colab.research.google.com/github/slp22/data-engineering-project/blob/main/engineering_monkeypox_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Data Engineering | Pipeline

# Monkeypox Tweets

## Imports

In [18]:
import json
import logging
import sqlite3
import matplotlib.pyplot as plt
import numpy as np
import os, shutil, itertools
import pandas as pd
import pathlib as Path
import pickle
import PIL
import random
import seaborn as sns
import sklearn as sk
import warnings
import zipfile

from sqlite3 import connect
import time
from datetime import datetime
from dateutil.parser import parse
from dateutil.relativedelta import *
from dateutil.easter import *
from dateutil.rrule import *
from dateutil.parser import *
from datetime import *


### Google Drive

In [2]:
# mount drive
from google.colab import drive
drive.mount('/content/drive')

# https://colab.research.google.com/notebooks/snippets/sheets.ipynb#scrollTo=JiJVCmu3dhFa

# authorize access 
from google.colab import auth
auth.authenticate_user()

# read in from Google Sheets

import gspread
from google.auth import default
creds, _ = default()

gc = gspread.authorize(creds)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Pyspark



In [5]:
# # https://towardsdatascience.com/pyspark-on-google-colab-101-d31830b238be
# # https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [6]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

In [7]:
!tar -xf spark-3.0.0-bin-hadoop3.2.tgz

In [8]:
!pip install -q findspark

In [9]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [10]:
import findspark
findspark.init()

In [11]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, col, lower
from pyspark.sql.types import StructType,StructField, StringType, IntegerType


## 1 | Pipeline Design


* **Business Problem:** Can we build a dashboard to monitor top trending topics on Twitter about monkeypox?
* **Data source:** [Kaggle Tweets on Monkeypox ](https://www.kaggle.com/datasets/thakurnirmalya/monkeypox2022tweets)
* **Impact Hypothesis:** 

## 2 | Data Ingestion

#### 1. [Twitter Dataset on the 2022 MonkeyPox Outbreak](https://www.kaggle.com/datasets/thakurnirmalya/monkeypox2022tweets) (Dataset is list of TweetIDs)

#### 2. [Twitter Hydrating](https://towardsdatascience.com/learn-how-to-easily-hydrate-tweets-a0f393ed340e#:~:text=Hydrating%20Tweets) with [DocNow Hydrator](https://github.com/DocNow/hydrator/releases)

#### 3. Import [hydrated tweets](https://drive.google.com/drive/folders/1NbddxuSF3v5YuOgjvA1G4WgfPUlKfiul?usp=sharing) from GoogleDrive to Colab

In [3]:
# worksheets = ['TweetIDs_Part1', 'TweetIDs_Part2', 'TweetIDs_Part3', 'TweetIDs_Part4', 'TweetIDs_Part5', 'TweetIDs_Part6']

worksheet_1 = gc.open('TweetIDs_Part1').sheet1
# worksheet_2 = gc.open('TweetIDs_Part2').sheet1
# worksheet_3 = gc.open('TweetIDs_Part3').sheet1
# worksheet_4 = gc.open('TweetIDs_Part4').sheet1
# worksheet_5 = gc.open('TweetIDs_Part5').sheet1
# worksheet_6 = gc.open('TweetIDs_Part6').sheet1

# get_all_values gives a list of rows
rows_1 = worksheet_1.get_all_values()
# rows_2 = worksheet_2.get_all_values()
# rows_3 = worksheet_3.get_all_values()
# rows_4 = worksheet_4.get_all_values()
# rows_5 = worksheet_5.get_all_values()
# rows_6 = worksheet_6.get_all_values()

# Convert to a DataFrame and render
tweets_1 = pd.DataFrame.from_records(rows_1)
# tweets_2 = pd.DataFrame.from_records(rows_2)
# tweets_3 = pd.DataFrame.from_records(rows_3)
# tweets_4 = pd.DataFrame.from_records(rows_4)
# tweets_5 = pd.DataFrame.from_records(rows_5)
# tweets_6 = pd.DataFrame.from_records(rows_6)

In [None]:
print('tweets_1', tweets_1.shape)
print('tweets_2', tweets_2.shape)
print('tweets_3', tweets_3.shape)
print('tweets_4', tweets_4.shape)
print('tweets_5', tweets_5.shape)
print('tweets_6', tweets_6.shape)

tweets_1 (12656, 35)
tweets_2 (15294, 35)
tweets_3 (15140, 35)
tweets_4 (16874, 35)
tweets_5 (41280, 35)
tweets_6 (127941, 35)


In [13]:
n = tweets_1.shape[0] + tweets_2.shape[0] + tweets_3.shape[0] + tweets_4.shape[0] + tweets_5.shape[0] + tweets_6.shape[0]
print('n =', n)

n = 229185


In [4]:
tweets_1.head(2)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,25,26,27,28,29,30,31,32,33,34
0,coordinates,created_at,hashtags,media,urls,favorite_count,id,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_user_id,...,user_followers_count,user_friends_count,user_listed_count,user_location,user_name,user_screen_name,user_statuses_count,user_time_zone,user_urls,user_verified
1,,Wed May 18 21:49:25 +0000 2022,,,,1,1527043704967528453,theofficepirate,1527043356878155776,140472501,...,36791,6088,255,,Yates,Jyates5,36441,,,FALSE


In [5]:
tweets_1.columns = tweets_1.iloc[0]
tweets_1.head(2)

Unnamed: 0,coordinates,created_at,hashtags,media,urls,favorite_count,id,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_user_id,...,user_followers_count,user_friends_count,user_listed_count,user_location,user_name,user_screen_name,user_statuses_count,user_time_zone,user_urls,user_verified
0,coordinates,created_at,hashtags,media,urls,favorite_count,id,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_user_id,...,user_followers_count,user_friends_count,user_listed_count,user_location,user_name,user_screen_name,user_statuses_count,user_time_zone,user_urls,user_verified
1,,Wed May 18 21:49:25 +0000 2022,,,,1,1527043704967528453,theofficepirate,1527043356878155776,140472501,...,36791,6088,255,,Yates,Jyates5,36441,,,FALSE


In [6]:
tweets_1 = tweets_1.drop([0])

In [7]:
tweets_1.head(5)

Unnamed: 0,coordinates,created_at,hashtags,media,urls,favorite_count,id,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_user_id,...,user_followers_count,user_friends_count,user_listed_count,user_location,user_name,user_screen_name,user_statuses_count,user_time_zone,user_urls,user_verified
1,,Wed May 18 21:49:25 +0000 2022,,,,1,1527043704967528453,theofficepirate,1.5270433568781555e+18,140472501.0,...,36791,6088,255,,Yates,Jyates5,36441,,,False
2,,Fri May 20 20:43:44 +0000 2022,,,,0,1527751952448344065,,,,...,134,553,3,"Chicago, IL",Patrick,LeftistHank,10782,,,False
3,,Sat May 21 19:48:54 +0000 2022,,,,1,1528100542345527296,,,,...,1476,2251,7,Ohio,Firm Bizkit,MyNameIsRickyM,225443,,,False
4,,Sat May 14 06:59:18 +0000 2022,,,,1,1525370151025115137,,,,...,248,774,4,"Cumbernauld, Glasgow",Suzan 🥃🥃🏴󠁧󠁢󠁳󠁣󠁴󠁿🦄,Just_sue_now,9164,,,False
5,,Wed May 18 21:18:00 +0000 2022,,,,1,1527035801871294464,freidergeist,1.5270343225276088e+18,1.4160898259842744e+18,...,458,1706,8,"Here, before you",Inspector Mindbender (X),JackPaceSr,17913,,,False


## 3 | Exploratory Data Analysis

### Explore one set (tweet_1 n=12,656) 

In [None]:
tweets_1.head(2)

### First glance

In [None]:
tweets_1.info()

In [None]:
tweets_1.describe()

In [None]:
cols_list = list(tweets_1.columns)
cols_list

In [None]:
# ['coordinates',
#  'created_at',
 'hashtags',
#  'media',
#  'urls',
#  'favorite_count',
#  'id',
#  'in_reply_to_screen_name',
#  'in_reply_to_status_id',
#  'in_reply_to_user_id',
#  'lang',
#  'place',
#  'possibly_sensitive',
#  'quote_id',
#  'retweet_count',
#  'retweet_id',
#  'retweet_screen_name',
#  'source',
 'text',
 'tweet_url',
 'user_created_at',
#  'user_id',
#  'user_default_profile_image',
#  'user_description',
#  'user_favourites_count',
#  'user_followers_count',
#  'user_friends_count',
#  'user_listed_count',
 'user_location',
#  'user_name',
 'user_screen_name',
#  'user_statuses_count',
#  'user_time_zone',
#  'user_urls',
#  'user_verified']

In [None]:
tweets_1['hashtags']
tweets_1['hashtags'].unique()

In [69]:
tweets_1['possibly_sensitive'][0:2]

1    FALSE
2         
Name: possibly_sensitive, dtype: object

In [None]:
tweets_1['text']

In [69]:
tweets_1['tweet_url'][0:2]

1    https://twitter.com/Jyates5/status/15270437049...
5    https://twitter.com/JackPaceSr/status/15270358...
Name: tweet_url, dtype: object

In [39]:
tweets_1['user_created_at']

1        Fri Apr 01 00:29:41 +0000 2011
2        Fri Apr 01 13:14:03 +0000 2022
3        Fri Apr 01 15:12:02 +0000 2011
4        Fri Apr 01 16:27:26 +0000 2011
5        Fri Apr 01 17:41:17 +0000 2016
                      ...              
12652    Wed Sep 30 18:20:38 +0000 2015
12653    Wed Sep 30 19:12:03 +0000 2020
12654    Wed Sep 30 22:42:35 +0000 2020
12655    Wed Sep 30 23:06:12 +0000 2009
12656    Wed Sep 30 23:38:29 +0000 2009
Name: user_created_at, Length: 12656, dtype: object

In [45]:
# tweets_1['date'] = tweets_1['user_created_at'].astype('datetime64[ns]')
# tweets_1['date'] = pd.to_datetime(tweets_1['user_created_at'], format='%Y-%m-%dT%l:%M:%S%z')
# tweets_1['date'] = pd.to_datetime(tweets_1['user_created_at'], infer_datetime_format=True)


tweets_1['date'] = pd.to_datetime(tweets_1['user_created_at'], format='%a %b %d %M:%M:%S +0000 %Y')



error: ignored

In [None]:
tweets_1.info()

In [None]:
tweets_1[['date']]

In [None]:
tweets_1['user_id']

In [None]:
tweets_1['user_location']

In [None]:
tweets_1['user_screen_name']

In [None]:
tweets_1 = tweets_1[['date',
                     'user_screen_name',
                     'text',
                     'tweet_url',
                     'user_location',
                     'hashtags']]
tweets_1.head(2)

In [None]:
tweets_1 = tweets_1.sort_values('date')
tweets_1.head(2)

In [None]:
tweets_1.info()

In [58]:

# mask = tweets_1['date'] > '2022-05-01'

tweets_1[(tweets_1['date'] > '2022-01-01') & (tweets_1['date'] < '2022-08-01')]


Unnamed: 0,date,user_screen_name,text,tweet_url,user_location,hashtags


In [55]:
tweets_1

Unnamed: 0,date,user_screen_name,text,tweet_url,user_location,hashtags
1,2009-09-30 23:38:29+00:00,Jyates5,@theofficepirate You bro remember them talking...,https://twitter.com/Jyates5/status/15270437049...,,
5,2009-09-30 23:38:29+00:00,JackPaceSr,@freidergeist @Cameo3D @pullenmyleg_ @Breaking...,https://twitter.com/JackPaceSr/status/15270358...,"Here, before you",
6,2009-09-30 23:38:29+00:00,YUCCAYAWN,lemme be clear ... if you like your monkey pox...,https://twitter.com/YUCCAYAWN/status/152739626...,,
7,2009-09-30 23:38:29+00:00,pufpuffprincess,"@KIINGEFFIE_ @JayceBliss @Carol1mcs @TMZ Yes, ...",https://twitter.com/pufpuffprincess/status/152...,Charlie Lastra's arms,
8,2009-09-30 23:38:29+00:00,pufpuffprincess,@Craig4lyfe23 @NateSanti @Youngkid_Vibes @TMZ ...,https://twitter.com/pufpuffprincess/status/152...,Charlie Lastra's arms,
...,...,...,...,...,...,...
12653,2009-09-30 23:38:29+00:00,RolexCola,Ian Brown looks like he has monkey pox,https://twitter.com/RolexCola/status/152777961...,"Gay Street, Baltimore",
12654,2009-09-30 23:38:29+00:00,DeeKno_,@SkyNews how tf are people contracting “monkey...,https://twitter.com/DeeKno_/status/15258026808...,,
12633,2009-09-30 23:38:29+00:00,kkdenaee,wtf is monkey pox get me tf off this earth,https://twitter.com/kkdenaee/status/1527071017...,"Baton Rouge, LA",
12632,2009-09-30 23:38:29+00:00,iamdavidbudd,@6thGUARDS_TNK @NakedScientists @SJDavey56 @Ar...,https://twitter.com/iamdavidbudd/status/152813...,BLACKPOOL UNITED KINGDOM,


In [32]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [36]:
# tweet_df = spark.read.csv('/content/tweets.csv', inferSchema=True, header='true')

spark_df = spark.createDataFrame(tweets_1)
spark_df.printSchema()
# spark_df.show()

root
 |-- coordinates: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- hashtags: string (nullable = true)
 |-- media: string (nullable = true)
 |-- urls: string (nullable = true)
 |-- favorite_count: string (nullable = true)
 |-- id: string (nullable = true)
 |-- in_reply_to_screen_name: string (nullable = true)
 |-- in_reply_to_status_id: string (nullable = true)
 |-- in_reply_to_user_id: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- place: string (nullable = true)
 |-- possibly_sensitive: string (nullable = true)
 |-- quote_id: string (nullable = true)
 |-- retweet_count: string (nullable = true)
 |-- retweet_id: string (nullable = true)
 |-- retweet_screen_name: string (nullable = true)
 |-- source: string (nullable = true)
 |-- text: string (nullable = true)
 |-- tweet_url: string (nullable = true)
 |-- user_created_at: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- user_default_profile_image: string (nullable = true

In [None]:
df_schema = StructType([StructField("date", StringType(), True),
                        StructField("tweet", StringType(), True)])


In [None]:
tweets_1.select(col())

* RangeIndex: 6859 entries, 0 to 6858
* Data columns (total 36 columns)

### word count

In [None]:
# type(df)
# df.head(2)
pandas_df = df[['date','tweet']]
pandas_df.head(2)

Unnamed: 0,date,tweet
0,2022-08-06,So has anyone begun to compile 'here's the sta...
1,2022-08-06,"Getting some groceries, topic shifted to covid..."


In [None]:
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
df_schema = StructType([StructField("date", StringType(), True),
                        StructField("tweet", StringType(), True)])


In [None]:
# tweet_df = spark.read.csv('/content/tweets.csv', inferSchema=True, header='true')

spark_df = spark.createDataFrame(pandas_df, schema=df_schema)
spark_df.printSchema()
spark_df.show()

root
 |-- date: string (nullable = true)
 |-- tweet: string (nullable = true)

+----------+--------------------+
|      date|               tweet|
+----------+--------------------+
|2022-08-06|So has anyone beg...|
|2022-08-06|Getting some groc...|
|2022-08-06|"Illinois Childre...|
|2022-08-06|Illinois daycare ...|
|2022-08-06|@Natrone86 @Junot...|
|2022-08-06|"CDC Recommends L...|
|2022-08-06|1,058,637 America...|
|2022-08-06|MF didn’t know sh...|
|2022-08-06|Three people of A...|
|2022-08-06|MonkeyPox Case: ರ...|
|2022-08-06|#Monkeypox Fallza...|
|2022-08-06|RT-PCR detection ...|
|2022-08-06|@BigBadDenis When...|
|2022-08-06|News Outlook TOP ...|
|2022-08-06|CDC Issues Eye-Op...|
|2022-08-06|United States dec...|
|2022-08-06|@FoxNews They are...|
|2022-08-06|@PetiteNicoco No....|
|2022-08-06|Idc if it’s Monke...|
|2022-08-06|In the technical ...|
+----------+--------------------+
only showing top 20 rows



In [None]:
spark_df = spark_df.withColumn('tweet', 
                               explode(split(lower(col('tweet')), '\s')))

In [None]:
(spark_df.groupBy('tweet')
  .count()
  .orderBy('count', ascending=False)
  .show(40))

+----------+-----+
|     tweet|count|
+----------+-----+
|          | 4898|
|       the| 4351|
| monkeypox| 3230|
|        to| 2957|
|         a| 2400|
|       and| 2344|
|    monkey| 2151|
|        is| 2014|
|        of| 1901|
|         i| 1721|
|       pox| 1683|
|        in| 1666|
|       for| 1239|
|       you| 1083|
|      that| 1071|
|        it|  973|
|      with|  967|
|      this|  938|
|       not|  868|
|       are|  838|
|        be|  828|
|        on|  818|
|      have|  805|
|     about|  659|
|#monkeypox|  651|
|       get|  620|
|      from|  597|
|       but|  595|
|      they|  585|
|        we|  583|
|        so|  545|
|        my|  538|
|     covid|  528|
|        if|  520|
|        at|  511|
|      like|  509|
|      just|  489|
|        as|  480|
|    people|  471|
|        me|  454|
+----------+-----+
only showing top 40 rows



In [None]:
(spark_df)

DataFrame[date: string, tweet: string]

###❓Count specific words using PySpark?[link text](https://)
* groupby, count, agg, 
* google = pyspark word frequency


### Corpus `tweets`

In [None]:
tweets = df[['language','date','username','hashtags','tweet']]
tweets.head(2)

Unnamed: 0,language,date,username,hashtags,tweet
0,en,2022-08-06,thetenth2022,[],So has anyone begun to compile 'here's the sta...
1,en,2022-08-06,ashemedai,[],"Getting some groceries, topic shifted to covid..."


In [None]:
tweets.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6859 entries, 0 to 6858
Data columns (total 5 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   language  6859 non-null   object
 1   date      6859 non-null   object
 2   username  6859 non-null   object
 3   hashtags  6859 non-null   object
 4   tweet     6859 non-null   object
dtypes: object(5)
memory usage: 268.1+ KB


In [None]:
tweets['username'].nunique()

6028

In [None]:
# How many languages? 
tweets['language'].unique()

array(['en', 'kn', 'de', 'in', 'tl', 'fr', 'pt', 'te', 'tr', 'it', 'qme',
       'bn', 'qht', 'pl', 'es', 'el', 'nl', 'cy', 'ta', 'hi', 'sv', 'ja',
       'th', 'mr', 'et', 'gu', 'da', 'ro', 'ml', 'zxx', 'und', 'pa', 'ur',
       'ko', 'am', 'fi', 'zh', 'lt', 'hu', 'ru', 'ar', 'si'], dtype=object)

In [None]:
# How many tweets in English?
print('English entries:', (tweets[tweets["language"] == 'en'].count())['language'])

# How many tweets in other languages?
print('Spanish entries:', (tweets[tweets["language"] == 'es'].count())['language'])
print('Italian entries:', (tweets[tweets["language"] == 'it'].count())['language'])

English entries: 6410
Spanish entries: 51
Italian entries: 10


In [None]:
# Keep only English languge tweets
tweets = tweets[(tweets['language'] == 'en')]

In [None]:
tweets.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 6410 entries, 0 to 6858
Data columns (total 5 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   language  6410 non-null   object
 1   date      6410 non-null   object
 2   username  6410 non-null   object
 3   hashtags  6410 non-null   object
 4   tweet     6410 non-null   object
dtypes: object(5)
memory usage: 300.5+ KB


In [None]:
tweets.head(2)

Unnamed: 0,language,date,username,hashtags,tweet
0,en,2022-08-06,thetenth2022,[],So has anyone begun to compile 'here's the sta...
1,en,2022-08-06,ashemedai,[],"Getting some groceries, topic shifted to covid..."


In [None]:
# Drop language column since we have only English tweets
tweets = tweets.drop(columns=['language'])

In [None]:
tweets.head(10)

Unnamed: 0,date,username,hashtags,tweet
0,2022-08-06,thetenth2022,[],So has anyone begun to compile 'here's the sta...
1,2022-08-06,ashemedai,[],"Getting some groceries, topic shifted to covid..."
2,2022-08-06,democracymotion,[],"""Illinois Children's Daycare Worker Tests Posi..."
3,2022-08-06,thegoogle93,[],Illinois daycare worker tests positive for mon...
4,2022-08-06,bufflosouljah1,[],@Natrone86 @JunotIsrael @elcavaqueen @thechicc...
5,2022-08-06,democracymotion,[],"""CDC Recommends Limiting Sex Partners to Avoid..."
6,2022-08-06,salemjakes,[],"1,058,637 Americans have died in the U.S. Covi..."
7,2022-08-06,fineassbrei,[],MF didn’t know shit about monkey pox in June c...
8,2022-08-06,xenohadi,[],Three people of African origin still admitted ...
11,2022-08-06,geopoliticsind,[],RT-PCR detection kit for human monkeypox virus...


In [None]:
# Save corpus
tweets.to_pickle('/content/tweets.pkl')
tweets.to_csv(r'/content/tweets.csv', index=False)

In [None]:
# # copy corpus csv to Google Drive for Tableau
# shutil.copyfile('/content/tweets.csv', '/content/drive/MyDrive/tweets.csv')

# 4 | Storage

#### SQL Database `monkeypox.db`

##### helper functions

In [None]:
# https://towardsdatascience.com/have-a-sql-interview-coming-up-ace-it-using-google-colab-6d3c0ffb29dc

def pd_to_sqlDB(input_df: pd.DataFrame,
                table_name: str,
                db_name: str = 'default.db') -> None:

    # # Setup local logging
    # logging.basicConfig(level=logging.INFO,
    #                     format='%(asctime)s %(levelname)s: %(message)s',
    #                     datefmt='%Y-%m-%d %H:%M:%S')

    # Find columns in the dataframe
    cols = input_df.columns
    cols_string = ','.join(cols)
    val_wildcard_string = ','.join(['?'] * len(cols))

    # Connect to a DB file if it exists, else create a new file
    con = sqlite3.connect(db_name)
    cur = con.cursor()
    # logging.info(f'SQL DB {db_name} created')

    # Create table
    sql_string = f"""CREATE TABLE {table_name} ({cols_string});"""
    cur.execute(sql_string)
    # logging.info(f'SQL Table {table_name} created with {len(cols)} columns')

    # Upload df
    rows_to_upload = input_df.to_dict(orient='split')['data']
    sql_string = f"""INSERT INTO {table_name} ({cols_string}) VALUES ({val_wildcard_string});"""    
    cur.executemany(sql_string, rows_to_upload)
    # logging.info(f'{len(rows_to_upload)} rows uploaded to {table_name}')
  
    # Commit the changes and close the connection
    con.commit()
    con.close()

In [None]:
#  https://towardsdatascience.com/have-a-sql-interview-coming-up-ace-it-using-google-colab-6d3c0ffb29dc

def sql_query_to_pd(sql_query_string: str, db_name: str ='mpox.db') -> pd.DataFrame:
    
    # Connect to the SQL DB
    con = sqlite3.connect(db_name)

    # Execute the SQL query
    cursor = con.execute(sql_query_string)

    # Fetch the data and column names
    result_data = cursor.fetchall()
    cols = [description[0] for description in cursor.description]

    # Close the connection
    con.close()

    # Return as df
    return pd.DataFrame(result_data, columns=cols)

##### sql_to_df

In [None]:
# https://towardsdatascience.com/have-a-sql-interview-coming-up-ace-it-using-google-colab-6d3c0ffb29dc

# Read  csv as df
input_df = pd.read_csv('/content/tweets.csv')

# Upload df to a SQL table
pd_to_sqlDB(input_df,
            table_name='tweets',
            db_name='monkeypox.db')

# Write SQL query in a string variable
sql_query_string = """
    SELECT *
    FROM tweets
"""
# Exectue  SQL query
corpus = sql_query_to_pd(sql_query_string, db_name='monkeypox.db')
corpus

Unnamed: 0,date,username,hashtags,tweet
0,2022-08-06,thetenth2022,[],So has anyone begun to compile 'here's the sta...
1,2022-08-06,ashemedai,[],"Getting some groceries, topic shifted to covid..."
2,2022-08-06,democracymotion,[],"""Illinois Children's Daycare Worker Tests Posi..."
3,2022-08-06,thegoogle93,[],Illinois daycare worker tests positive for mon...
4,2022-08-06,bufflosouljah1,[],@Natrone86 @JunotIsrael @elcavaqueen @thechicc...
...,...,...,...,...
6405,2022-08-06,arkcowgirl62,[],"@hurtmeknots With Monkeypox on the rise, maybe..."
6406,2022-08-06,speehanagram,"['monkeypox', 'besafe']",@FITNESSSF can you start filling up the disinf...
6407,2022-08-06,pritzkertoilet,[],@politvidchannel 71% already have monkey pox .
6408,2022-08-06,thumbressler,['monkeypox'],RT if you have #monkeypox


* **database = `monkeypox.db`**
* **table_1 = `corpus` (6410 rows × 4 columns)**

In [None]:
# # copy corpus csv to Google Drive for Tableau
# shutil.copyfile('/content/tweets.csv', '/content/drive/MyDrive/tweets.csv')

# 5 | Processing

### Case Counts by State

In [None]:
# import sheet with state lattitude and longitude 
# data source: https://developers.google.com/public-data/docs/canonical/states_csv

# open google spreadsheet
worksheet = gc.open('USA-State-Coordinates').sheet1

# get_all_values gives a list of rows.
rows = worksheet.get_all_values()

states = pd.DataFrame.from_records(rows)

states.columns = states.iloc[0]
states.drop([0], inplace=True)
states.drop(['state'], axis=1, inplace=True)
states.sort_values(by=['name'], inplace=True)
states = states.rename(columns={'latitude': 'lat', 'longitude': 'lon', 'name': 'state'})

states.tail(2)

Unnamed: 0,lat,lon,state
51,43.78444,-88.787868,Wisconsin
52,43.075968,-107.290284,Wyoming


In [None]:
states.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 52 entries, 1 to 52
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   lat     52 non-null     object
 1   lon     52 non-null     object
 2   state   52 non-null     object
dtypes: object(3)
memory usage: 1.6+ KB


In [None]:
# import sheet with US case count by state 
# data source: https://www.cdc.gov/poxvirus/monkeypox/response/2022/us-map.html

worksheet = gc.open('2022-US-mpx-cases-by-state').sheet1
rows = worksheet.get_all_values()
cases = pd.DataFrame.from_records(rows)

cases.columns = cases.iloc[0]
cases.drop([0], inplace=True)
cases.drop(['AsOf', 'Case Range'], axis=1, inplace=True)
cases.sort_values(by=['Location'], inplace=True)

cases.tail(2)

Unnamed: 0,Location,Cases
51,Wisconsin,56
52,Wyoming,1


In [None]:
# US case count by state (long + lat)
map_data = pd.concat([states, cases], axis=1)
map_data = map_data[['Location', 'Cases', 'lat', 'lon' ]]
map_data = map_data.rename(columns={'Location': 'state', 'Cases':'cases'})

map_data.tail(2)
# map_data.info()

Unnamed: 0,state,cases,lat,lon
51,Wisconsin,56,43.78444,-88.787868
52,Wyoming,1,43.075968,-107.290284


In [None]:
map_data = map_data.astype({'cases':'int'})

map_data.sort_values(by=['cases'], ascending=False)
# map_data.sort_values(by=['latitude'], ascending=False)



Unnamed: 0,state,cases,lat,lon
5,California,3291,36.778261,-119.417932
33,New York,3124,43.299428,-74.217933
10,Florida,1739,27.664827,-81.515754
45,Texas,1472,31.968599,-99.901813
11,Georgia,1299,32.157435,-82.907123
14,Illinois,1005,40.633125,-89.398528
31,New Jersey,479,40.058324,-74.405661
39,Pennsylvania,477,41.203322,-77.194525
21,Maryland,461,39.045755,-76.641271
9,District of Columbia,414,38.905985,-77.033418


In [None]:
map_data.head(2)

Unnamed: 0,state,cases,lat,lon
1,Alabama,53,32.318231,-86.902298
2,Alaska,3,63.588753,-154.493062


In [None]:
map_data.to_csv('map_data.csv')  

# 6 | Deployment

See draft streamlit app here: https://slp22-data-engineering-project-streamlit-mpx-app-ckpzq2.streamlitapp.com/

Streamlit cheat sheet: https://daniellewisdl-streamlit-cheat-sheet-app-ytm9sg.streamlitapp.com/

In [None]:
# wordcloud
# https://www.geeksforgeeks.org/generating-word-cloud-python/
comment_words = ''
stopwords = set(stopwords)
 
# iterate through the csv file
for val in df.text:
     
    # typecaste each val to string
    val = str(val)
 
    # split the value
    tokens = val.split()
     
    # Converts each token into lowercase
    for i in range(len(tokens)):
        tokens[i] = tokens[i].lower()
     
    comment_words += " ".join(tokens)+" "
 
wordcloud = WordCloud(width = 800, height = 800,
                background_color ='white',
                stopwords = stopwords,
                min_font_size = 10).generate(comment_words)
 
# plot the WordCloud image                      
plt.figure(figsize = (8, 8), facecolor = None)
plt.imshow(wordcloud)
plt.axis("off")
plt.tight_layout(pad = 0)
plt.title('Coronavirus Tweets April 2020')
plt.savefig("coronavirus-tweets-word-cloud.jpeg");

# 7 | Testing/Robustness

[Python schedule](https://schedule.readthedocs.io/en/stable/examples.html#run-a-job-every-x-minute)