In [1]:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster
import dask_cudf
import argparse
import yaml
from pprint import pprint
from shutil import rmtree
import numpy as np

In [2]:
cur_dir = '/home/jovyan/work/projects/GPU_gene_ETL'
config_subdir = 'configs/make_dataset_config.yaml'

In [3]:
config_dir = f"{cur_dir}/{config_subdir}"

In [4]:
print(f"loading yaml file...")
config = yaml.safe_load(open(config_dir, 'r'))
pprint(config)

loading yaml file...
{'CUDA_VISIBLE_DEVICES': '0',
 'do_cuda_vis_dev': False,
 'do_rand_seed': True,
 'do_unknown_class': True,
 'dsplit_file': '/home/jovyan/work/projects/GPU_gene_ETL/configs/make_data_splits_config.yaml',
 'in_dir': 'gs://jcosme/clean_data/genomics/data_splits_inverse_1_mer',
 'inp_col': 'seq',
 'name_for_unknown_class': '_UNKOWN_',
 'numbers_of_classes': [1, 2, 4, 8, 16],
 'out_base_dir': 'gs://jcosme/clean_data/genomics/datasets',
 'partition_size': '100M',
 'rand_seed': 42,
 'sizes_per_class': [500, 1000, 2000, 4000],
 'tgt_col': 'label',
 'unq_classes_file': 'gs://jcosme/clean_data/genomics/MarRef_species_unq.parquet'}


In [5]:
in_dir = config['in_dir']  
out_base_dir = config['out_base_dir']  
dsplit_file = config['dsplit_file'] 
unq_classes_file = config['unq_classes_file']  
numbers_of_classes = config['numbers_of_classes']  
sizes_per_class = config['sizes_per_class']  
rand_seed = config['rand_seed']  
do_rand_seed = config['do_rand_seed']  
tgt_col = config['tgt_col']  
inp_col = config['inp_col']  
do_unknown_class = config['do_unknown_class']  
name_for_unknown_class = config['name_for_unknown_class']  

CUDA_VISIBLE_DEVICES = config['CUDA_VISIBLE_DEVICES']  
do_cuda_vis_dev = config['do_cuda_vis_dev']  
partition_size = config['partition_size']  

In [6]:
# original_size_per_class = config['size_per_class']
temp_df_file = '/tmp/df.parquet'

In [7]:
dsplit_config = yaml.safe_load(open(dsplit_file, 'r'))
splits = dsplit_config['splits'] 

split_names = []
for key, val in splits.items():
    split_names.append(key)

split_names = [ x + '.parquet' for x in split_names]
split_files = [f"{in_dir}/{x}" for x in split_names]

In [8]:
print(f"train size per class: {sizes_per_class}")
# turn off random seed if needed
if not do_rand_seed:
    rand_seed = None

train size per class: [500, 1000, 2000, 4000]


In [9]:
for c_i, cur_n_class in enumerate(numbers_of_classes):
    for s_i, cur_size in enumerate(sizes_per_class):
        cur_name = f"num_cls_{cur_n_class}_sz_cls_{cur_size}_unkn_cls_{do_unknown_class}"
        cur_out_dir = f"{out_base_dir}/{cur_name}"
        print(cur_out_dir)

gs://jcosme/clean_data/genomics/datasets/num_cls_1_sz_cls_500_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_1_sz_cls_1000_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_1_sz_cls_2000_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_1_sz_cls_4000_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_2_sz_cls_500_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_2_sz_cls_1000_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_2_sz_cls_2000_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_2_sz_cls_4000_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_4_sz_cls_500_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_4_sz_cls_1000_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_4_sz_cls_2000_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_4_sz_cls_4000_unkn_cls_True
gs://jcosme/clean_data/genomics/datasets/num_cls_8_sz_cls_500_unkn_cls_True
gs:

In [10]:
tmp_dask_dir = '/tmp/dask'
try:
    rmtree(tmp_dask_dir)
except:
    pass

In [11]:
print(f"starting Dask GPU cluster...")
if do_cuda_vis_dev:
    cluster = LocalCUDACluster(
        protocol="ucx",
        enable_tcp_over_ucx=True,
        CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES,
        local_directory=tmp_dask_dir,
    )
else:
    cluster = LocalCUDACluster(
        protocol="ucx",
        enable_tcp_over_ucx=True,
        local_directory=tmp_dask_dir,
    )
client = Client(cluster)

starting Dask GPU cluster...


2022-07-13 01:41:23,480 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2022-07-13 01:41:23,501 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


In [12]:
def add_unknown_class(df):
    bool_mask = df[tgt_col].isin(selected_classes)
    df.loc[~bool_mask, tgt_col] = name_for_unknown_class
    return df

In [13]:
all_classes = dask_cudf.read_parquet(unq_classes_file).compute().to_pandas().values.flatten()
total_classes = len(all_classes)

In [14]:
for c_i, cur_n_class in enumerate(numbers_of_classes):
    for s_i, cur_size in enumerate(sizes_per_class):
        print(f"current sizes_per_class: {cur_size}")
        print(f"current cur_n_class: {cur_n_class}")
        cur_name = f"num_cls_{cur_n_class}_sz_cls_{cur_size}_unkn_cls_{do_unknown_class}"
        cur_out_dir = f"{out_base_dir}/{cur_name}"
        #print(cur_out_dir)
        
        #print(f"randomly selecting {cur_n_class} of {total_classes} classes")
        if do_rand_seed:
            np.random.seed(rand_seed)
        cur_selected_classes = np.random.choice(all_classes, cur_n_class, replace=False)
        
        for i, in_file in enumerate(split_files):
            cur_split = split_names[i].split('.')[0]
            cur_perc = splits[cur_split]
            print(f"current in_file: {in_file}")
            #print(f"current split: {cur_split}\ncurrent percent: {cur_perc}")

            selected_classes = cur_selected_classes.copy()
            size_per_class = cur_size
            if cur_split != 'train':
                #print("not train!")
                size_per_class /= splits['train']
                size_per_class *= cur_perc
                size_per_class = int(round(size_per_class))
            #print(f"current size_per_class: {size_per_class}")

            df = dask_cudf.read_parquet(in_file, partition_size=partition_size)

            if do_unknown_class:
                #print(f"adding unknown class name to list")
                selected_classes = np.sort(np.append(name_for_unknown_class, selected_classes))
                df = df.map_partitions(add_unknown_class)
            else:
                #print(f"sorting selected class names")
                selected_classes = np.sort(selected_classes)


            #print(f"saving temp df...")
            _ = df.to_parquet(temp_df_file)
            #print(f"reading temp df...")
            df = dask_cudf.read_parquet(temp_df_file, partition_size=partition_size)

            #print(f"performing random selections per class...")
            out_df = []
            total_classes = len(selected_classes)
            for cur_class_i, cur_class in enumerate(selected_classes):
                #print(f"\tclass {cur_class_i + 1} of {total_classes}: {cur_class}")
                temp_ddf = df[df[tgt_col] == cur_class].copy() 
                temp_row_cnt = len(temp_ddf)  # get the number of observations
                cur_sample_amt = min([size_per_class, temp_row_cnt])
                #print(f"\ttarget sample size is {cur_sample_amt}")
                keep_frac = float(cur_sample_amt / temp_row_cnt)
                temp_ddf = temp_ddf.sample(frac=keep_frac, replace=False, random_state=rand_seed)
                out_df.append(temp_ddf.copy())
                client.cancel(temp_ddf)
            #print(f"concating dataframes...")
            out_df = dask_cudf.concat(out_df).reset_index(True)
            client.cancel(df)

            cur_out_file = f"{cur_out_dir}/{cur_split}.parquet"
            print(f"saving {cur_out_file}")
            _ = out_df.to_parquet(cur_out_file)

            client.cancel(out_df)

            try:
                rmtree(temp_df_file)
            except:
                pass

current sizes_per_class: 500
current cur_n_class: 1
current in_file: gs://jcosme/clean_data/genomics/data_splits_inverse_1_mer/train.parquet
saving gs://jcosme/clean_data/genomics/datasets/num_cls_1_sz_cls_500_unkn_cls_True/train.parquet
current in_file: gs://jcosme/clean_data/genomics/data_splits_inverse_1_mer/val.parquet
saving gs://jcosme/clean_data/genomics/datasets/num_cls_1_sz_cls_500_unkn_cls_True/val.parquet
current in_file: gs://jcosme/clean_data/genomics/data_splits_inverse_1_mer/test.parquet
saving gs://jcosme/clean_data/genomics/datasets/num_cls_1_sz_cls_500_unkn_cls_True/test.parquet
current sizes_per_class: 1000
current cur_n_class: 1
current in_file: gs://jcosme/clean_data/genomics/data_splits_inverse_1_mer/train.parquet
saving gs://jcosme/clean_data/genomics/datasets/num_cls_1_sz_cls_1000_unkn_cls_True/train.parquet
current in_file: gs://jcosme/clean_data/genomics/data_splits_inverse_1_mer/val.parquet
saving gs://jcosme/clean_data/genomics/datasets/num_cls_1_sz_cls_1000

In [15]:
print(f"shutting down Dask client")
client.shutdown()
print(f"finished")

shutting down Dask client
finished
