## Data extraction

Extract only the required data from raw CSV and save to Pickle files.

In [6]:
# to allow relative imports
import os
import sys
from experiments import append_sys_path
append_sys_path()

import pandas as pd
import numpy as np
from typing import Dict
import time
import shutil

from lib.configuration import RAW, OUT, REMOTE_RAW
from lib.experiments.utils.data_repo_api import DataRepoAPI

# use remote files if local files not available
if not os.path.isdir(RAW) or len(os.listdir(RAW)) == 0:
    RAW = REMOTE_RAW
    
# # make output directory if required
# if not os.path.isdir(OUT):
#     os.mkdir(OUT)

Define the files to be read in.

In [7]:
# define the required cbg_group_regex
CBG_CODE = '09009'
COUNTY_NAME = 'new haven county'

# define the data to be extracted

# census files
CBG_B01 = {
    'file': 'cbg_b01.csv',
    'cols': [0, 159, 160],
    'names': ['cbg', 'B01003e1', 'B01003m1'],
    'dtypes': {0: 'string', 159: np.int32, 160: np.int32}
}

CBG_B25 = {
    'file': 'cbg_b25.csv',
    'cols': [0, 187, 188],
    'names': ['cbg', 'B25010e1', 'B25010m1'],
    'dtypes': {0: 'string', 187: np.float32, 188: np.float32}
}

# patterns files (pre and post lockdown)
PATTERNS_PRE = {
    'file': 'feb2020_core_poi-patterns.csv',
    'cols': [0, 25, 35],
    'names': ['placekey', 'visitor_cbg', 'cbg'],
    'dtypes': {0: 'string', 35: 'string', 25: 'string'},
}

PATTERNS_POST = {
    'file': 'apr2020_core_poi-patterns.csv',
    'cols': [0, 25, 35],
    'names': ['placekey', 'visitor_cbg', 'cbg'],
    'dtypes': {0: 'string', 35: 'string', 25: 'string'},
}

# Google mobility data
GOOGLE_MOBILITY = {
    'file': '2020_US_Region_Mobility_Report.csv',
    'cols': [3, 8, 9, 10, 11, 12, 13, 14],
    'names': ['county', 'date', 'retail_recreation', 'grocery_pharmacy', 'park', 
              'transit', 'workplace', 'residential'],
    'date_cols': ['date'],
    'dtypes': {3: 'string'},
    'google': True
}

Define the function to read the data.

In [8]:
def read(data: Dict) -> pd.DataFrame:
    """
    Read raw data from a csv file.
    :param data: contains info on the data to extract.
    :returns: data in a pandas data frame.
    """
    
    if not 'date_cols' in data:
        data['date_cols'] = False
    
    iter_csv = pd.read_csv(f"{RAW}{data['file']}", usecols=data['cols'],
                           dtype=data['dtypes'], parse_dates=data['date_cols'],
                           header=0, names=data['names'], iterator=True,
                           chunksize=1000)
    
    # google mobility data
    if 'google' in data.keys():
        
        # filter county
        county_filter = lambda x: x.lower() == COUNTY_NAME\
            if not pd.isnull(x) else False

        df = pd.concat([chunk[chunk['county'].apply(county_filter)]
                        for chunk in iter_csv])
    
    else:

        # filter cbg
        cbg_filter = lambda x: x.startswith(CBG_CODE) \
            if not pd.isnull(x) else False

        df = pd.concat([chunk[chunk['cbg'].apply(cbg_filter)]
                        for chunk in iter_csv])
    
    return df

Read in the raw data.

In [9]:
%%time
    
# read from csv
df_google = read(GOOGLE_MOBILITY)
df_pat_pre = read(PATTERNS_PRE)
df_pat_post = read(PATTERNS_POST)
df_b01 = read(CBG_B01)
df_b25 = read(CBG_B25)

TimeoutError: [Errno 60] Operation timed out

Save data to Pickle.

In [None]:
def extracted_df_to_repo(df, file_name, file_path):
    
    # pickle file
    target = os.path.join(file_path, file_name + '.pkl')
    df.to_pickle(target)
    
    # upload to github
    DataRepoAPI.update_or_create(file_name=file_name + '.pkl', file_path=file_path, repo_path='network-data')

In [5]:
%%time

# create temporary directory
TMP_DIR = 'tmp' + str(int(time.time()))
os.mkdir(TMP_DIR)

extracted_dfs = [
    (df_google, 'df_google'),
    (df_pat_pre, 'df_pat_pre'),
    (df_pat_post, 'df_pat_post'),
    (df_b01, 'df_b01'),
    (df_b25, 'df_b25'),
]

try:
    # upload to repo
    for e_df in extracted_dfs:
        extracted_df_to_repo(e_df[0], e_df[1], TMP_DIR)
finally:
    # remove temporary directory
    shutil.rmtree(TMP_DIR)

    
# save to pickle
# df_google.to_pickle(f'{OUT}df_google.pkl')
# df_pat_pre.to_pickle(f'{OUT}df_pat_pre.pkl')
# df_pat_post.to_pickle(f'{OUT}df_pat_post.pkl')
# df_b01.to_pickle(f'{OUT}df_b01.pkl')
# df_b25.to_pickle(f'{OUT}df_b25.pkl')

CPU times: user 41.1 ms, sys: 36.3 ms, total: 77.4 ms
Wall time: 79.3 ms
