In [1]:
import sys
sys.path.append('/home/sfang/windows/gitlab/stanleysfang/surveillance_2019_ncov/prod')

In [2]:
from google.cloud import bigquery
from BigQueryWrapper import QueryRunner, Loader, Extractor
import pandas as pd
import datetime
import re

In [3]:
project_id = 'stanleysfang'

In [4]:
client = bigquery.Client(project=project_id)

In [5]:
qr = QueryRunner(client=client)
loader = Loader(client=client)
extractor = Extractor(client=client)

### Functions

In [6]:
def find_all_cols(url, start_dt, end_dt=datetime.date.today() - datetime.timedelta(days=1)):
    col_set = set()
    for d in pd.date_range(start_dt, end_dt):
        print(d.strftime('%Y-%m-%d'))
        df = pd.read_csv(url + d.strftime('%m-%d-%Y') + '.csv')
        for col in df.columns:
            if col not in col_set:
                col_set.add(col)
    print(col_set)
    return col_set

In [7]:
def standardize_daily_reports(df, col_mapping, col_order):
    cols = []
    for col in df.columns:
        cols.append(col_mapping[col][0])
        df[col] = df[col].astype(col_mapping[col][1])
    df.columns = cols
    for col, dtype in set(col_mapping.values()):
        if col not in df.columns:
            df[col] = pd.Series(dtype=dtype)
    df = df[col_order]
    return df

### US

In [8]:
d = datetime.date.today() - datetime.timedelta(days=1)
# d = datetime.date(2020, 4, 12)

In [9]:
url = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports_us/'

In [10]:
# col_set = find_all_cols(url, start_dt=datetime.date(2020, 4, 12), end_dt=d)

In [11]:
col_mapping_us = {
    'UID': ('UID', 'float64'),
    'FIPS': ('FIPS', 'float64'), # Federal Information Processing Standards code that uniquely identifies counties within the US
    'ISO3': ('iso3', 'object'),
    'Province_State': ('province_state', 'object'),
    'Country_Region': ('country_region', 'object'),
    'Lat': ('latitude', 'float64'),
    'Long_': ('longitude', 'float64'),
    'Confirmed': ('confirmed', 'float64'),
    'Deaths': ('deaths', 'float64'),
    'Recovered': ('recovered', 'float64'),
    'Active': ('active', 'float64'),
    'Incident_Rate': ('incident_rate', 'float64'),
    'Case_Fatality_Ratio': ('case_fatality_ratio', 'float64'),
    'Mortality_Rate': ('case_fatality_ratio', 'float64'),
    'Total_Test_Results': ('total_test_results', 'float64'),
    'People_Tested': ('total_test_results', 'float64'),
    'Testing_Rate': ('testing_rate', 'float64'),
    'People_Hospitalized': ('people_hospitalized', 'float64'),
    'Hospitalization_Rate': ('hospitalization_rate', 'float64'),
    'Last_Update': ('last_update', 'datetime64'),
}

In [12]:
col_order_us = [
    'UID',
    'FIPS',
    'iso3',
    'province_state',
    'country_region',
    'latitude',
    'longitude',
    'confirmed',
    'deaths',
    'recovered',
    'active',
    'incident_rate',
    'case_fatality_ratio',
    'total_test_results',
    'testing_rate',
    'people_hospitalized',
    'hospitalization_rate',
    'last_update',
]

In [13]:
daily_report_us = pd.read_csv(url + d.strftime('%m-%d-%Y') + '.csv')
daily_report_us = standardize_daily_reports(daily_report_us, col_mapping_us, col_order_us)

In [14]:
daily_report_us.shape

(58, 18)

In [15]:
daily_report_us.dtypes

UID                            float64
FIPS                           float64
ISO3                            object
province_state                  object
country_region                  object
latitude                       float64
longitude                      float64
confirmed                      float64
deaths                         float64
recovered                      float64
active                         float64
incident_rate                  float64
case_fatality_ratio            float64
total_test_results             float64
testing_rate                   float64
people_hospitalized            float64
hospitalization_rate           float64
last_update             datetime64[ns]
dtype: object

In [16]:
daily_report_us.head()

Unnamed: 0,UID,FIPS,ISO3,province_state,country_region,latitude,longitude,confirmed,deaths,recovered,active,incident_rate,case_fatality_ratio,total_test_results,testing_rate,people_hospitalized,hospitalization_rate,last_update
0,84000001.0,1.0,USA,Alabama,US,32.3182,-86.9023,220848.0,3301.0,88038.0,129509.0,4504.174328,1.494693,1473546.0,30052.833005,,,2020-11-18 05:42:00
1,84000002.0,2.0,USA,Alaska,US,61.3707,-152.4044,25041.0,100.0,7165.0,17776.0,3423.029342,0.399345,878622.0,120104.983289,,,2020-11-18 05:42:00
2,16.0,60.0,ASM,American Samoa,US,-14.271,-170.132,0.0,0.0,,0.0,0.0,,1988.0,3572.904872,,,2020-11-18 05:42:00
3,84000004.0,4.0,USA,Arizona,US,33.7298,-111.4312,279896.0,6312.0,46103.0,227481.0,3845.402974,2.255123,2004208.0,27535.182368,,,2020-11-18 05:42:00
4,84000005.0,5.0,USA,Arkansas,US,34.9697,-92.3731,135902.0,2245.0,117068.0,16589.0,4503.34084,1.651926,1525279.0,50542.679379,,,2020-11-18 05:42:00


In [17]:
schema_us = [
    ('UID', 'INT64'),
    ('FIPS', 'INT64'),
    ('iso3', 'STRING'),
    ('province_state', 'STRING'),
    ('country_region', 'STRING'),
    ('latitude', 'FLOAT64'),
    ('longitude', 'FLOAT64'),
    ('confirmed', 'INT64'),
    ('deaths', 'INT64'),
    ('recovered', 'INT64'),
    ('active', 'INT64'),
    ('incident_rate', 'FLOAT64'),
    ('case_fatality_ratio', 'FLOAT64'),
    ('total_test_results', 'INT64'),
    ('testing_rate', 'FLOAT64'),
    ('people_hospitalized', 'INT64'),
    ('hospitalization_rate', 'FLOAT64'),
    ('last_update', 'TIMESTAMP'),
]

In [18]:
load_job = loader.load_df(
    daily_report_us,
    'stanleysfang.surveillance_2019_ncov.csse_covid_19_daily_reports_us${}'.format(d.strftime('%Y%m%d')),
    schema=schema_us,
    time_partitioning=True
)

In [19]:
load_job.result()

<google.cloud.bigquery.job.load.LoadJob at 0x7f9c50bc4828>

### Global

In [20]:
d = datetime.date.today() - datetime.timedelta(days=1)
# d = datetime.date(2020, 1, 22)

In [21]:
url = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/'

In [22]:
# col_set = find_all_cols(url, start_dt=datetime.date(2020, 1, 22), end_dt=d)

In [23]:
col_mapping_global = {
    'FIPS': ('FIPS', 'float64'), # Federal Information Processing Standards code that uniquely identifies counties within the US
    'Admin2': ('county', 'object'), # US only
    'Province/State': ('province_state', 'object'),
    'Province_State': ('province_state', 'object'),
    'Country_Region': ('country_region', 'object'),
    'Country/Region': ('country_region', 'object'),
    'Combined_Key': ('combined_key', 'object'),
    'Latitude': ('latitude', 'float64'),
    'Lat': ('latitude', 'float64'),
    'Longitude': ('longitude', 'float64'),
    'Long_': ('longitude', 'float64'),
    'Confirmed': ('confirmed', 'float64'),
    'Deaths': ('deaths', 'float64'),
    'Recovered': ('recovered', 'float64'),
    'Active': ('active', 'float64'),
    'Incident_Rate': ('incident_rate', 'float64'),
    'Incidence_Rate': ('incident_rate', 'float64'),
    'Case_Fatality_Ratio': ('case_fatality_ratio', 'float64'),
    'Case-Fatality_Ratio': ('case_fatality_ratio', 'float64'),
    'Last Update': ('last_update', 'datetime64'),
    'Last_Update': ('last_update', 'datetime64'),
}

In [24]:
col_order_global = [
    'FIPS',
    'county',
    'province_state',
    'country_region',
    'combined_key',
    'latitude',
    'longitude',
    'confirmed',
    'deaths',
    'recovered',
    'active',
    'incident_rate',
    'case_fatality_ratio',
    'last_update',
]

In [25]:
daily_report_global = pd.read_csv(url + d.strftime('%m-%d-%Y') + '.csv')
daily_report_global = standardize_daily_reports(daily_report_global, col_mapping_global, col_order_global)

In [26]:
daily_report_global.shape

(3974, 14)

In [27]:
daily_report_global.dtypes

FIPS                          float64
county                         object
province_state                 object
country_region                 object
combined_key                   object
latitude                      float64
longitude                     float64
confirmed                     float64
deaths                        float64
recovered                     float64
active                        float64
incident_rate                 float64
case_fatality_ratio           float64
last_update            datetime64[ns]
dtype: object

In [28]:
daily_report_global.head()

Unnamed: 0,FIPS,county,province_state,country_region,combined_key,latitude,longitude,confirmed,deaths,recovered,active,incident_rate,case_fatality_ratio,last_update
0,,,,Afghanistan,Afghanistan,33.93911,67.709953,43628.0,1638.0,35160.0,6830.0,112.07259,3.75447,2020-11-18 05:40:31
1,,,,Albania,Albania,41.1533,20.1683,29126.0,637.0,13804.0,14685.0,1012.092571,2.187049,2020-11-18 05:40:31
2,,,,Algeria,Algeria,28.0339,1.6596,69591.0,2186.0,45148.0,22257.0,158.69862,3.141211,2020-11-18 05:40:31
3,,,,Andorra,Andorra,42.5063,1.5218,5951.0,76.0,4965.0,910.0,7702.064324,1.277096,2020-11-18 05:40:31
4,,,,Angola,Angola,-11.2027,17.8739,13818.0,328.0,6582.0,6908.0,42.043106,2.373715,2020-11-18 05:40:31


In [29]:
schema_global = [
    ('FIPS', 'INT64'),
    ('county', 'STRING'),
    ('province_state', 'STRING'),
    ('country_region', 'STRING'),
    ('combined_key', 'STRING'),
    ('latitude', 'FLOAT64'),
    ('longitude', 'FLOAT64'),
    ('confirmed', 'INT64'),
    ('deaths', 'INT64'),
    ('recovered', 'INT64'),
    ('active', 'INT64'),
    ('incident_rate', 'FLOAT64'),
    ('case_fatality_ratio', 'FLOAT64'),
    ('last_update', 'TIMESTAMP'),
]

In [30]:
load_job = loader.load_df(
    daily_report_global,
    'stanleysfang.surveillance_2019_ncov.csse_covid_19_daily_reports_global${}'.format(d.strftime('%Y%m%d')),
    schema=schema_global,
    time_partitioning=True
)

In [31]:
load_job.result()

<google.cloud.bigquery.job.load.LoadJob at 0x7f9c50b53128>

### OOP

In [32]:
class CSSECovid19DailyReports:
    def __init__(self, client=None, run_project="stanleysfang"):
        if client:
            self.client = client
        else:
            self.client = bigquery.Client(project=run_project)
        
        self.run_project = self.client.project
        self.qr = QueryRunner(client=self.client)
        self.loader = Loader(client=self.client)
        self.extractor = Extractor(client=self.client)
        
        # US
        self.url_us = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports_us/'
        self.col_mapping_us = {
            'UID': ('UID', 'float64'),
            'FIPS': ('FIPS', 'float64'), # Federal Information Processing Standards code that uniquely identifies counties within the US
            'ISO3': ('iso3', 'object'),
            'Province_State': ('province_state', 'object'),
            'Country_Region': ('country_region', 'object'),
            'Lat': ('latitude', 'float64'),
            'Long_': ('longitude', 'float64'),
            'Confirmed': ('confirmed', 'float64'),
            'Deaths': ('deaths', 'float64'),
            'Recovered': ('recovered', 'float64'),
            'Active': ('active', 'float64'),
            'Incident_Rate': ('incident_rate', 'float64'),
            'Case_Fatality_Ratio': ('case_fatality_ratio', 'float64'),
            'Mortality_Rate': ('case_fatality_ratio', 'float64'),
            'Total_Test_Results': ('total_test_results', 'float64'),
            'People_Tested': ('total_test_results', 'float64'),
            'Testing_Rate': ('testing_rate', 'float64'),
            'People_Hospitalized': ('people_hospitalized', 'float64'),
            'Hospitalization_Rate': ('hospitalization_rate', 'float64'),
            'Last_Update': ('last_update', 'datetime64'),
        }
        self.col_order_us = [
            'UID',
            'FIPS',
            'iso3',
            'province_state',
            'country_region',
            'latitude',
            'longitude',
            'confirmed',
            'deaths',
            'recovered',
            'active',
            'incident_rate',
            'case_fatality_ratio',
            'total_test_results',
            'testing_rate',
            'people_hospitalized',
            'hospitalization_rate',
            'last_update',
        ]
        self.schema_us = [
            ('UID', 'INT64'),
            ('FIPS', 'INT64'),
            ('iso3', 'STRING'),
            ('province_state', 'STRING'),
            ('country_region', 'STRING'),
            ('latitude', 'FLOAT64'),
            ('longitude', 'FLOAT64'),
            ('confirmed', 'INT64'),
            ('deaths', 'INT64'),
            ('recovered', 'INT64'),
            ('active', 'INT64'),
            ('incident_rate', 'FLOAT64'),
            ('case_fatality_ratio', 'FLOAT64'),
            ('total_test_results', 'INT64'),
            ('testing_rate', 'FLOAT64'),
            ('people_hospitalized', 'INT64'),
            ('hospitalization_rate', 'FLOAT64'),
            ('last_update', 'TIMESTAMP'),
        ]
        
        # Global
        self.url_global = 'https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/'
        self.col_mapping_global = {
            'FIPS': ('FIPS', 'float64'), # Federal Information Processing Standards code that uniquely identifies counties within the US
            'Admin2': ('county', 'object'), # US only
            'Province/State': ('province_state', 'object'),
            'Province_State': ('province_state', 'object'),
            'Country_Region': ('country_region', 'object'),
            'Country/Region': ('country_region', 'object'),
            'Combined_Key': ('combined_key', 'object'),
            'Latitude': ('latitude', 'float64'),
            'Lat': ('latitude', 'float64'),
            'Longitude': ('longitude', 'float64'),
            'Long_': ('longitude', 'float64'),
            'Confirmed': ('confirmed', 'float64'),
            'Deaths': ('deaths', 'float64'),
            'Recovered': ('recovered', 'float64'),
            'Active': ('active', 'float64'),
            'Incident_Rate': ('incident_rate', 'float64'),
            'Incidence_Rate': ('incident_rate', 'float64'),
            'Case_Fatality_Ratio': ('case_fatality_ratio', 'float64'),
            'Case-Fatality_Ratio': ('case_fatality_ratio', 'float64'),
            'Last Update': ('last_update', 'datetime64'),
            'Last_Update': ('last_update', 'datetime64'),
        }
        self.col_order_global = [
            'FIPS',
            'county',
            'province_state',
            'country_region',
            'combined_key',
            'latitude',
            'longitude',
            'confirmed',
            'deaths',
            'recovered',
            'active',
            'incident_rate',
            'case_fatality_ratio',
            'last_update',
        ]
        self.schema_global = [
            ('FIPS', 'INT64'),
            ('county', 'STRING'),
            ('province_state', 'STRING'),
            ('country_region', 'STRING'),
            ('combined_key', 'STRING'),
            ('latitude', 'FLOAT64'),
            ('longitude', 'FLOAT64'),
            ('confirmed', 'INT64'),
            ('deaths', 'INT64'),
            ('recovered', 'INT64'),
            ('active', 'INT64'),
            ('incident_rate', 'FLOAT64'),
            ('case_fatality_ratio', 'FLOAT64'),
            ('last_update', 'TIMESTAMP'),
        ]
    
    def find_all_cols(self, url, start_dt, end_dt=datetime.date.today() - datetime.timedelta(days=1)):
        col_set = set()
        for d in pd.date_range(start_dt, end_dt):
            print(d.strftime('%Y-%m-%d'))
            df = pd.read_csv(url + d.strftime('%m-%d-%Y') + '.csv')
            for col in df.columns:
                if col not in col_set:
                    col_set.add(col)
        print(col_set)
        return col_set
    
    def standardize_daily_reports(self, df, col_mapping, col_order):
        cols = []
        for col in df.columns:
            cols.append(col_mapping[col][0])
            df[col] = df[col].astype(col_mapping[col][1])
        df.columns = cols
        for col, dtype in set(col_mapping.values()):
            if col not in df.columns:
                df[col] = pd.Series(dtype=dtype)
        df = df[col_order]
        return df
    
    def update(self, dt, end_dt, destination_table, url, col_mapping, col_order, schema):
        dt_list = None
        if isinstance(dt, (str, datetime.date)) and isinstance(end_dt, (str, datetime.date)):
            dt_list = pd.date_range(start=dt, end=end_dt).tolist()
        elif isinstance(dt, (str, datetime.date)):
            dt_list = [dt]
        elif isinstance(dt, list):
            dt_list = dt
        
        assert isinstance(dt_list, list), 'dt must be a str in "YYYY-mm-dd" format or a datetime.date object or a list of these'
        
        for d in dt_list:
            assert isinstance(d, (str, datetime.date)), 'dt must be a str in "YYYY-mm-dd" format or a datetime.date object or a list of these'
            
            if isinstance(d, str):
                d = datetime.datetime.strptime(d, '%Y-%m-%d').date()
            
            print('Updating ' + d.strftime('%Y-%m-%d') + ' ... ', end='', flush=True)
            daily_report = pd.read_csv(url + d.strftime('%m-%d-%Y') + '.csv')
            daily_report = self.standardize_daily_reports(daily_report, col_mapping, col_order)
            load_job = self.loader.load_df(
                daily_report,
                '{destination_table}${partition}'.format(destination_table=destination_table, partition=d.strftime('%Y%m%d')),
                schema=schema,
                time_partitioning=True
            )
            load_job.result()
            print('Done')
    
    def update_us(self, dt, end_dt=None, destination_table='stanleysfang.surveillance_2019_ncov.csse_covid_19_daily_reports_us'):
        print('CSSE COVID-19 Daily Reports US:')
        self.update(
            dt, end_dt, destination_table,
            url=self.url_us,
            col_mapping=self.col_mapping_us,
            col_order=self.col_order_us,
            schema=self.schema_us
        )
    
    def update_global(self, dt, end_dt=None, destination_table='stanleysfang.surveillance_2019_ncov.csse_covid_19_daily_reports_global'):
        print('CSSE COVID-19 Daily Reports Global:')
        self.update(
            dt, end_dt, destination_table,
            url=self.url_global,
            col_mapping=self.col_mapping_global,
            col_order=self.col_order_global,
            schema=self.schema_global
        )


In [33]:
daily_reports = CSSECovid19DailyReports(client)

In [34]:
daily_reports.update_us(datetime.date.today() - datetime.timedelta(days=1))

Updating 2020-11-17 ... Done


In [35]:
daily_reports.update_global(datetime.date.today() - datetime.timedelta(days=1))

Updating 2020-11-17 ... Done
