In [26]:
import requests
import json
import re
import pandas as pd
from datetime import datetime
import sqlite3
import concurrent.futures 
import asyncio
from typing import List
from tqdm import tqdm
import time
import multiprocessing 

    
def transform(response) -> pd.DataFrame:
    ## Parsing Column Names
    columns_meta_data = response.json()["meta"]["view"]["columns"]
    column_names = [re.sub(':', '' , x["fieldName"]) for x in columns_meta_data]
    print("There are {} columns in this data set".format (str(len(column_names))))
    print("There are {} rows of data".format(str(len(response.json()["data"]))))

    ## Storing and cleaning data as a DataFrame
    df = pd.DataFrame(response.json()["data"], columns=column_names)
    df["test_date"] = pd.to_datetime(df["test_date"]).astype("str")
    df["county"] = df["county"].apply(lambda x: re.sub(' ', '_' , x.lower()).replace(".", ""))
    df[["new_positives", "cumulative_number_of_positives", "total_number_of_tests", "cumulative_number_of_tests"]] = \
        df[["new_positives", "cumulative_number_of_positives", "total_number_of_tests", "cumulative_number_of_tests"]].astype("int")
    
    ## Selecting Desired Columns 
    df = df[["county", "test_date", "new_positives", "cumulative_number_of_positives", "total_number_of_tests", "cumulative_number_of_tests"]]
    df["load_date"] = datetime.today().strftime("%Y-%m-%d")
    
    print("Sample Data")
    display(df.sample(5))
    
    return df

def load(df, county_names, db_name = "covid.db"):
    # Create tables
    conn = sqlite3.connect(db_name)
    
    ## Since our program is CPU bound and not IO bound - using multi-processing instead of multi-threading 
    t1 = time.perf_counter()

    with concurrent.futures.ProcessPoolExecutor() as executer:
        results = [executer.submit(ingest, df, county_name, db_name) for county_name in county_names]

        for f in concurrent.futures.as_completed(results):
            print(f.result())


    t2 = time.perf_counter()
    print(f'Finished in {t2-t1} seconds')
    
    
def create_table_cmd(county_name):
    type_map = { "test_date": "TEXT", 
                    "new_positives": "INTEGER",
                    "cumulative_number_of_positives": "INTEGER",
                    "total_number_of_tests": "INTEGER",
                    "cumulative_number_of_tests": "INTEGER",
                    "load_date": "TEXT"  }
    sql_cols =  []
    sql_cols += [f"    {col} {type_map[col]}" for col in type_map]
    sql_cols = ',\n'.join(sql_cols)
    
    cmd = f"""CREATE TABLE if not exists {county_name} (
                {sql_cols}
                );"""
    return cmd
    
def ingest(df, county_name, db_name) -> str:
    conn_temp = sqlite3.connect(db_name)
    c = conn_temp.cursor()
    
    # Create Table with for County if it does not exist
    cmd = create_table_cmd(county_name)
    c.execute(cmd)
    
    # Adding Data to Table 
    df_county = df[df["county"] == county_name].drop(["county"], axis = 1)
    
    max_date_in_table = pd.read_sql(f"select max(test_date) from {county_name}", conn_temp).values[0][0]
    
    if max_date_in_table is not None:
         df_county = df_county[pd.to_datetime(df_county.test_date) > pd.to_datetime(max_date_in_table)]
            
    df_county.to_sql(county_name, conn_temp, if_exists='append', index = False)
    
    conn_temp.commit()
    conn_temp.close()
    
    return f"{county_name} table is updated on {datetime.today().date()} at {datetime.today().time().strftime('%H:%M %p')}. {df_county.shape[0]} row(s) added."

## Main ETL Function 
def main(url, db_name):
    try:
        response = requests.get(url)
        print(f"Loaded response from {url}")
        response.raise_for_status()
        
    except requests.exceptions.HTTPError as e:
        print (e.response.text)
        
    print(f"Response Code: {response.status_code}")
    
    df = transform(response)
    county_names = df.county.unique()
    assert(len(county_names) == 62), "Mismatch in the number of counties"
    
    load(df, county_names, db_name)

if __name__ == "__main__":
    
    # Specify the URL and DB Name
    url = "https://health.data.ny.gov/api/views/xdss-u53e/rows.json?accessType=DOWNLOAD"
    db_name = "covid.db"
    
    main(url, db_name)

Loaded response from https://health.data.ny.gov/api/views/xdss-u53e/rows.json?accessType=DOWNLOAD
Response Code: 200
There are 14 columns in this data set
There are 20646 rows of data
Sample Data


Unnamed: 0,county,test_date,new_positives,cumulative_number_of_positives,total_number_of_tests,cumulative_number_of_tests,load_date
11494,ontario,2020-08-20,0,373,720,35247,2021-01-29
8520,livingston,2020-09-12,2,196,91,22265,2021-01-29
3026,clinton,2020-03-30,4,21,9,190,2021-01-29
3109,clinton,2020-06-21,0,100,192,9757,2021-01-29
7354,jefferson,2020-03-29,4,11,100,332,2021-01-29


albany table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
allegany table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
chenango table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
bronx table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
clinton table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
broome table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
columbia table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
cortland table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
cattaraugus table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
cayuga table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
delaware table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
dutchess table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
erie table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
chautauqua table is updated on 2021-01-29 at 02:14 AM. 0 row(s) added.
essex table is updated on 2021-01-29 at 02:

In [None]:
conn_temp = sqlite3.connect("covid.db")
# c = conn_temp.cursor()

# c.execute("select * from albany")
# c.fetchall()

In [None]:
conn_temp = sqlite3.connect("covid.db")
# pd.read_sql("select max(test_date) from albany", conn_temp).values[0][0]
county_name = "albany"
max_date_in_table = pd.read_sql(f"select max(test_date) from {county_name}", conn_temp).values[0][0]
# pd.concat([df_temp, df_temp], ignore_index=True).groupby(["test_date", "new_positives", "cumulative_number_of_positives", "total_number_of_tests", "cumulative_number_of_tests"]).load_date.min()

# df_county.to_sql(county_name, conn_temp, if_exists='append', index = False)
# conn_temp.commit()
# conn_temp.close()
