In [1]:
import pandas as pd
import numpy as np
import csv

import warnings
warnings.filterwarnings('ignore')

from tqdm import tqdm

from datetime import datetime

d = datetime.today().strftime('%Y%m%d-%H%M%S')

data_dir = '/work/users/k/4/k4thryn/Repos/EpSampling/data/'

In [2]:
load_date = '20240920-124150'
df_acs = pd.read_csv(f'{data_dir}/processed/all_county_acs_covs_{load_date}.csv')

load_date = '20240912-015510'
df_deaths = pd.read_csv(f'{data_dir}/processed/naive_deaths_all_counties_{load_date}.csv')

display(df_acs.head()) 
display(df_deaths.head())

Unnamed: 0,State,State_fips,Fips,POP_x2,POP_M,POP_F,POP_A0004,POP_A0509,POP_A1014,POP_A1517,...,HU_x15,HU_UIS01D,HU_UIS01A,HU_UIS02,HU_UIS0304,HU_UIS0509,HU_UIS1019,HU_UIS2049,HU_UIS50P,HU_UISOTHER
0,AK,2,2013,3409,2014,1395,122,103,151,106,...,1113,835,3,57,72,41,35,0,0,70
1,AK,2,2016,5251,2995,2256,162,215,148,110,...,1456,417,70,242,275,154,79,180,12,27
2,AK,2,2020,292545,149648,142897,20218,20836,18642,11308,...,118055,56649,16185,6372,12729,6852,4719,6892,2642,5015
3,AK,2,2050,18514,9724,8790,1902,1923,1696,1028,...,5992,4826,125,305,228,56,65,150,9,228
4,AK,2,2060,849,480,369,67,41,28,23,...,922,711,33,23,29,51,12,7,0,56


Unnamed: 0,State_fips,State,Postal,County,Fips,Date,COVIDhubEns_state_deaths,Pop,Pop_ratio,True_county_deaths,Naive_county_deaths
0,53,Washington,WA,Adams,53001,2020-04-18,644.228,19983,0.002624,0.0,1.690583
1,53,Washington,WA,Asotin,53003,2020-04-18,644.228,22582,0.002966,0.0,1.910461
2,53,Washington,WA,Benton,53005,2020-04-18,644.228,204390,0.026841,34.0,17.29161
3,53,Washington,WA,Chelan,53007,2020-04-18,644.228,77200,0.010138,6.0,6.531202
4,53,Washington,WA,Clallam,53009,2020-04-18,644.228,77331,0.010155,0.0,6.542284


In [None]:
import multiprocessing
from joblib import Parallel, delayed
from datetime import datetime
def parallel_mutate(ds, n_samp=-1, tag=None, work_path=None, num_muts=N_MUTS, 
                    actions=ACTIONS, mahala_thresh=MAHALA_THRESH, weight=WEIGHT):
    '''
    This function does parallel mutation. Not for use in slurm jobs. This function
        should be followed up by 'consolidate_mut_ds_chunks()'  
    Args: 
        ds: DataFrame object or complete path to csv.
        n_samp: Sample to take. If mutating entire dataframe, use -1.
    Output: csv_name
    '''           

    if isinstance(ds, pd.DataFrame): df = ds
    else: df = pd.read_csv(ds)   
    
    if n_samp > 0:
        df = df.sample(n_samp)
    else: 
        n_samp = 'full'
    
    df.reset_index(drop=True, inplace=True)

    if tag is None:
        t = datetime.today().strftime('%Y%m%d%H%M') 
        tag = f'{t}-{n_samp}ancs-{num_muts}muts-{mahala_thresh}thresh-{weight}w'.replace('.','pt')

    if work_path is None:
        work_path = f'../data/processed/scratch/mutants_{tag}/'
    os.makedirs(f'{work_path}', exist_ok=True)

    num_per_thread = len(df)//(multiprocessing.cpu_count()) 
    
    if num_per_thread == 0:
        raise Exception("Sample of anchors too small for multiprocessing!")
    partitions = np.arange(0, len(df), num_per_thread)
    partitions[-1] = len(df)+1
    idc_chunks = [[s,e] for s,e in zip(partitions, partitions[1:])]   
    
    print(f"Mutating dataset of {len(df)} compounds.")
    print(f" > Parallel jobs: {len(idc_chunks)}\n > Compounds per job: {num_per_thread}")
    print(f" > Saving intermediate sets to path '{work_path}chunk_<s>-<e>.csv'")

    subdfs = [df[s:e] for s,e in idc_chunks] 
    out_names = [f'{work_path}chunk_{s}-{e}' for s,e in idc_chunks]

    parallelizer = Parallel(n_jobs=multiprocessing.cpu_count(), backend= 'multiprocessing' )    
    tasks = (delayed(mutate_data)(df, -1, out, num_muts, actions, 
                                  mahala_thresh, weight) for df,out in zip(subdfs,out_names))
    parallelizer(tasks)
    
    return work_path, tag

In [14]:
import multiprocessing
from joblib import Parallel, delayed


fips_acs = list(df_acs.Fips.unique())
fips_deaths = list(df_deaths.Fips.unique())
fips_set = list(set(fips_acs) & set(fips_deaths))

num_per_thread = len(fips_set) // (multiprocessing.cpu_count()) 

if num_per_thread == 0:
    raise Exception("Sample of anchors too small for multiprocessing!")
    
partitions = np.arange(0, len(fips_set), num_per_thread)
partitions[-1] = len(fips_set)+1
idc_chunks = [[s,e] for s,e in zip(partitions, partitions[1:])]  

fips_chunks = [fips_set[s:e] for s,e in idc_chunks] 
fips_chunks[0]
#     out_names = [f'{work_path}chunk_{s}-{e}' for s,e in idc_chunks]

parallelizer = Parallel(n_jobs=multiprocessing.cpu_count(), backend= 'multiprocessing' )  
    
    
tasks = (delayed(mutate_data)(df, -1, out, num_muts, actions, 
                              mahala_thresh, weight) for df,out in zip(subdfs,out_names))
parallelizer(tasks)

return work_path, tag

[41001,
 41003,
 41005,
 41007,
 41009,
 41011,
 41013,
 41015,
 41017,
 41019,
 41021,
 41023,
 41025,
 41027,
 41029,
 41031,
 41033,
 41035,
 41037,
 41039,
 41041,
 41043,
 41045,
 41047,
 41049,
 41051,
 41053,
 41055,
 41057,
 41059,
 41061,
 41063,
 41065,
 41067,
 41069,
 41071,
 33001,
 33003,
 33005,
 33007,
 33009,
 33011,
 33013,
 33015,
 33017,
 33019,
 25001,
 25003,
 25005,
 25007,
 25009,
 25011,
 25013,
 25015,
 25017,
 25019,
 25021,
 25023,
 25025,
 25027,
 17001,
 17003,
 17005,
 17007,
 17009]

In [9]:
(multiprocessing.cpu_count()) 

48

In [7]:
fips_acs = list(df_acs.Fips.unique())
fips_deaths = list(df_deaths.Fips.unique())
fips_set = list(set(fips_acs) & set(fips_deaths))

# print(len(fips_acs),len(fips_deaths),len(fips_set))

dfs_full = []

for fips in tqdm(fips_set,total=len(fips_set)):
    subdf_acs = df_acs[df_acs.Fips==fips]
#     display(subdf_acs)
    subdf_deaths = df_deaths[df_deaths.Fips==fips]
    
    for col in subdf_acs.columns:
#         print(subdf_acs[col].values)
        subdf_deaths[col] = subdf_acs[col].values[0]
        
#     display(subdf_deaths)
#     break
    
#     print(len(subdf_acs),len(subdf_deaths))
    
#     subdf_full = df_acs.merge(df_deaths,on='Fips')
    dfs_full.append(subdf_deaths)
    

100%|██████████| 3130/3130 [03:49<00:00, 13.64it/s]


In [15]:
final_df = pd.concat(dfs_full)
final_df

Unnamed: 0,State_fips,State,Postal,County,Fips,Date,COVIDhubEns_state_deaths,Pop,Pop_ratio,True_county_deaths,...,HU_x15,HU_UIS01D,HU_UIS01A,HU_UIS02,HU_UIS0304,HU_UIS0509,HU_UIS1019,HU_UIS2049,HU_UIS50P,HU_UISOTHER
71624,41,OR,OR,Baker,41001,2020-05-09,125.655,16124,0.003823,0.0,...,8654,6300,109,162,159,183,105,90,157,1389
71657,41,OR,OR,Baker,41001,2020-05-16,143.950,16124,0.003823,0.0,...,8654,6300,109,162,159,183,105,90,157,1389
71690,41,OR,OR,Baker,41001,2020-05-23,150.378,16124,0.003823,0.0,...,8654,6300,109,162,159,183,105,90,157,1389
71723,41,OR,OR,Baker,41001,2020-05-30,159.093,16124,0.003823,0.0,...,8654,6300,109,162,159,183,105,90,157,1389
71757,41,OR,OR,Baker,41001,2020-06-06,164.199,16124,0.003823,0.0,...,8654,6300,109,162,159,183,105,90,157,1389
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
128856,24,MD,MD,Baltimore city,24510,2022-04-09,14367.000,593490,0.098168,1765.0,...,293718,43042,149324,14615,16871,17300,12674,7999,31436,457
128880,24,MD,MD,Baltimore city,24510,2022-04-16,14391.000,593490,0.098168,1771.0,...,293718,43042,149324,14615,16871,17300,12674,7999,31436,457
128904,24,MD,MD,Baltimore city,24510,2022-04-23,14423.000,593490,0.098168,1771.0,...,293718,43042,149324,14615,16871,17300,12674,7999,31436,457
128928,24,MD,MD,Baltimore city,24510,2022-04-30,14443.000,593490,0.098168,1773.0,...,293718,43042,149324,14615,16871,17300,12674,7999,31436,457


In [None]:
final_df.to_csv(f'{data_dir}/processed/merged_covs_deaths{d}.csv',index=False)

In [None]:
df_full = df_deaths.merge(df_acs,on='State_fips')
df_full

In [None]:
fips_acs = list(df_acs.Fips.unique())
fips_deaths = list(df_deaths.Fips.unique())

fips_set = list(set(fips_acs) & set(fips_deaths))

print(len(fips_acs),len(fips_deaths),len(fips_set))

In [None]:
ignore = ['State_fips','State','Postal','County']

In [None]:
def time_to_int(date):
    y,m,d = date.split('-')
    print(y,m,d)
    return 10*y + 100*m + d

In [None]:
time_to_int('2022-05-07')