In [1]:
import pandas as pd
import numpy as np
import concurrent.futures
from functools import partial
import time

In [2]:
relevant_cols = [
                    'postal_code',
                    'date', 
                    'avg_temperature_air_2m_f', 
                    'avg_humidity_relative_2m_pct'
                ]

df = pd.read_csv(
    filepath_or_buffer='w_data.csv',
    parse_dates=True,
    infer_datetime_format=True,
    index_col=['postal_code', 'date'], 
    usecols=relevant_cols,
    dtype={
        'avg_temperature_air_2m_f':np.float64, 
        'avg_humidity_relative_2m_pct':np.int32}
)

In [3]:
df = df.sort_index(ascending=True, inplace=False)

In [4]:
df.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,avg_temperature_air_2m_f,avg_humidity_relative_2m_pct
postal_code,date,Unnamed: 2_level_1,Unnamed: 3_level_1
1029200,2020-02-01,44.6,92
1029200,2020-02-02,45.6,93
1029200,2020-02-03,48.1,84
1029200,2020-02-04,48.7,90
1029200,2020-02-05,51.3,86


In [5]:
rdf = df.groupby(['postal_code'])[['avg_temperature_air_2m_f', 'avg_humidity_relative_2m_pct']].diff(1)

In [6]:
rdf.columns = ['delta_tempreture_previous_day', 'delta_humidity_previous_day']

In [7]:
rdf.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,delta_tempreture_previous_day,delta_humidity_previous_day
postal_code,date,Unnamed: 2_level_1,Unnamed: 3_level_1
1029200,2020-02-01,,
1029200,2020-02-02,1.0,1.0
1029200,2020-02-03,2.5,-9.0
1029200,2020-02-04,0.6,6.0
1029200,2020-02-05,2.6,-4.0


In [8]:
def create_potal_code_chunks(result_df: pd.DataFrame, chunk_size):
    postal_code_dict = dict()
    postal_codes = list(result_df.index.unique(level='postal_code'))
    postal_codes = list(map(str, postal_codes))
    chunks = [postal_codes[x:x+chunk_size] for x in range(0, len(postal_codes), chunk_size)]
    for chunk in chunks:
        lookupkey = f'{chunk[0]}-{chunk[-1]}'
        postal_code_dict[lookupkey] = chunk
    print(len(postal_code_dict.keys()))
    return postal_code_dict

postal_code_keys = create_potal_code_chunks(rdf, 100)

32


In [9]:
processed_chunk = dict()

In [None]:
for postal_code_key in postal_code_keys.keys():
    print(postal_code_key)
    if not processed_chunk.get(postal_code_key):
        list_postal_codes = postal_code_keys.get(postal_code_key)
        print(list_postal_codes)
        sdf = rdf.loc[df.index.isin(list_postal_codes, level=0)]
        
        def p_generate_files(postal_code: str, result_df: pd.DataFrame=sdf) -> None:
            tmp_df = result_df.filter(like=str(postal_code), axis=0)
            start_date = tmp_df.index.unique(level='date').min().to_pydatetime().strftime('%b').upper()
            end_date = tmp_df.index.unique(level='date').max().to_pydatetime().strftime('%b').upper()
            file_name_path = f'data/COVID-WEATHER-{postal_code}-{start_date}-{end_date}-2020.json'
            tmp_df.reset_index(inplace=True)

            tmp_df.to_json(
                    file_name_path,
                    orient="records",
                    index=True,
                    lines=True, 
                    date_format='iso')

        with concurrent.futures.ProcessPoolExecutor() as executor:
            executor.map(p_generate_files, list_postal_codes)
        
        time.sleep(15)
        processed_chunk[postal_code_key]=f"Successful Processed {postal_code_key}"
    else:
        print(processed_chunk[postal_code_key])