# Create Training and Testing Data for Deep Learning 
## Version: Patch data during Current or Future Climate
### Requires: create_UHindx_file_step1, ..file_step2, ..file_step3 (e.g., future_uh75patches_12.nc)

First, import relevant packages.

In [1]:
import xarray as xr
import numpy as np
from ncar_jobqueue import NCARCluster
from dask.distributed import Client

Choose the climate to work with (e.g., current or future).

In [22]:
which_climate = 'future'

Start dask workers with adaptive scaling to load data for training.

In [3]:
#--------------------------------------------------

#if __name__== "__main__":

#start dask workers
cluster = NCARCluster(memory="109GB", cores=36)
cluster.adapt(minimum=1, maximum=10, wait_count=60)
cluster
#print scripts
print(cluster.job_script())
#start client
client = Client(cluster)
client

#--------------------------------------------------

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -q regular
#PBS -A P54048000
#PBS -l select=1:ncpus=36:mem=109GB
#PBS -l walltime=01:00:00
#PBS -e /glade/scratch/molina/
#PBS -o /glade/scratch/molina/
JOB_ID=${PBS_JOBID%%.*}



/glade/work/molina/miniconda3/envs/python-tutorial/bin/python -m distributed.cli.dask_worker tcp://10.148.10.15:45223 --nthreads 36 --memory-limit 109.00GB --name dask-worker--${JOB_ID}-- --death-timeout 60 --local-directory /glade/scratch/molina --interface ib0



0,1
Client  Scheduler: tcp://10.148.10.15:45223  Dashboard: https://jupyterhub.ucar.edu/ch/user/molina/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


Load storm patch data that was previously separated using UH>75 and UH<75 m2/s2 thresholds.

In [23]:
data_dec_above = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_uh75patches_12.nc", 
                                   parallel=True, combine='by_coords')
data_jan_above = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_uh75patches_01.nc",
                                   parallel=True, combine='by_coords')
data_feb_above = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_uh75patches_02.nc", 
                                   parallel=True, combine='by_coords')

data_mar_above = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_uh75patches_03.nc", 
                                   parallel=True, combine='by_coords')
data_apr_above = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_uh75patches_04.nc", 
                                   parallel=True, combine='by_coords')
data_may_above = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_uh75patches_05.nc", 
                                   parallel=True, combine='by_coords')

data_above = xr.concat([data_dec_above, data_jan_above, data_feb_above, data_mar_above, data_apr_above, data_may_above], dim='patch')

In [24]:
data_dec_below = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_nonuh75patches_12.nc",
                                  parallel=True, combine='by_coords')
data_jan_below = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_nonuh75patches_01.nc",
                                   parallel=True, combine='by_coords')
data_feb_below = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_nonuh75patches_02.nc",
                                   parallel=True, combine='by_coords')

data_mar_below = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_nonuh75patches_03.nc",
                                   parallel=True, combine='by_coords')
data_apr_below = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_nonuh75patches_04.nc",
                                   parallel=True, combine='by_coords')
data_may_below = xr.open_mfdataset(f"/glade/scratch/molina/WRF_CONUS1_derived/storm_envs/{which_climate}_nonuh75patches_05.nc",
                                   parallel=True, combine='by_coords')

data_below = xr.concat([data_dec_below, data_jan_below, data_feb_below, data_mar_below, data_apr_below, data_may_below], dim='patch')

In [25]:
print("The ratio of strongly rotating to not strongly rotating storm patches expressed as a percent is about: ", 
      round((data_above.patch.size/data_below.patch.size)*100), "%")

The ratio of strongly rotating to not strongly rotating storm patches expressed as a percent is about:  3 %


Creation of various functions for use in analysis.

In [26]:
def create_traintest_data(data_b, data_a, split_perc=0.6, return_label=False):
    #balancing of above and below threshold data for training data, spitting out remainder for testing
    #permute and slice the below threshold data to equal the above threshold data shape.
    
    #train above
    np.random.seed(0)
    select_data = np.random.permutation(data_a.shape[0])[:int(data_a.shape[0]*split_perc)]
    train_above = data_a[select_data]
    
    #train below
    np.random.seed(0)
    select_data = np.random.permutation(data_b.shape[0])[:int(data_a.shape[0]*split_perc)]
    train_below = data_b[select_data]
    
    #test above
    np.random.seed(0)
    select_data = np.random.permutation(data_a.shape[0])[int(data_a.shape[0]*split_perc):]
    test_above = data_a[select_data]
    
    #test below
    np.random.seed(0)
    #slicing to get respective ratio of above to below UH data patches
    select_data = np.random.permutation(data_b.shape[0])[int(data_a.shape[0]*split_perc):
                                                         int((((data_a.shape[0]*(1-split_perc))*data_b.shape[0])/data_a.shape[0])+(data_a.shape[0]*(1-split_perc)))]
    test_below = data_b[select_data]

    #create the label data
    train_above_label = np.ones(train_above.shape[0])
    train_below_label = np.zeros(train_below.shape[0])
    test_above_label = np.ones(test_above.shape[0])
    test_below_label = np.zeros(test_below.shape[0])
    
    #merge above and below data in prep to shuffle/permute
    train_data = np.vstack([train_above, train_below])
    if return_label:
        train_label = np.hstack([train_above_label, train_below_label])
    test_data = np.vstack([test_above, test_below])
    if return_label:
        test_label = np.hstack([test_above_label, test_below_label])
    
    #finally, permute the data that has been merged and properly balanced
    np.random.seed(10)
    train_data = np.random.permutation(train_data)
    np.random.seed(10)
    test_data = np.random.permutation(test_data)
    if not return_label:
        return train_data, test_data  
    if return_label:
        np.random.seed(10)
        train_label = np.random.permutation(train_label)
        np.random.seed(10)
        test_label = np.random.permutation(test_label)    
        return train_data, test_data, train_label, test_label


def minmax_scale_apply(thedata):
    #apply min max normalize the input data
    from sklearn.preprocessing import MinMaxScaler
    scaler = MinMaxScaler(feature_range=(0, 1))
    scaler.fit(thedata)
    return scaler.transform(thedata)

def standardize_scale_apply(thedata):
    #standardization of the data
    #to interpret: "this data point is X standard deviations below/above the mean of the data set."
    return np.divide((thedata - np.nanmean(thedata)), np.nanstd(thedata))

def standardize_scale_apply_test(thedatatrain, thedatatest):
    #standardization of the test data using the training mean and standard deviation.
    return np.divide((thedatatest - np.nanmean(thedatatrain)), np.nanstd(thedatatrain))


Given the large size of the storm patch files (file generation triggered memory errors when writing deep learning files), data were preprocessed for deep learning models one variable at a time (across the four vertical levels). 

Selection of the variable occurs below.

In [80]:
###################################################################################################

#T, EV, EU, QV, P

what_analysis = 'P'

###################################################################################################

if what_analysis == "T":
    choice_var1 = "temp_sev_1"
    choice_var3 = "temp_sev_3"
    choice_var5 = "temp_sev_5"
    choice_var7 = "temp_sev_7"
    attrs_array = np.array(["tk_1km", "tk_3km", "tk_5km", "tk_7km"])
    var_namer = "tk"

if what_analysis == "EV":
    choice_var1 = "evwd_sev_1"
    choice_var3 = "evwd_sev_3"
    choice_var5 = "evwd_sev_5"
    choice_var7 = "evwd_sev_7"
    attrs_array = np.array(["ev_1km", "ev_3km", "ev_5km", "ev_7km"])
    var_namer = "ev"

if what_analysis == "EU":
    choice_var1 = "euwd_sev_1"
    choice_var3 = "euwd_sev_3"
    choice_var5 = "euwd_sev_5"
    choice_var7 = "euwd_sev_7"
    attrs_array = np.array(["eu_1km", "eu_3km", "eu_5km", "eu_7km"])
    var_namer = "eu"

if what_analysis == "QV":
    choice_var1 = "qvap_sev_1"
    choice_var3 = "qvap_sev_3"
    choice_var5 = "qvap_sev_5"
    choice_var7 = "qvap_sev_7"
    attrs_array = np.array(["qv_1km", "qv_3km", "qv_5km", "qv_7km"])
    var_namer = "qv"

if what_analysis == "P":
    choice_var1 = "pres_sev_1"
    choice_var3 = "pres_sev_3"
    choice_var5 = "pres_sev_5"
    choice_var7 = "pres_sev_7"
    attrs_array = np.array(["pr_1km", "pr_3km", "pr_5km", "pr_7km"])
    var_namer = "pr"

Extract the dask array data and load into numpy arrays, then permute/split data, and then standardize.

In [81]:
#above 
the_above_data_1 = data_above[choice_var1].values
the_above_data_3 = data_above[choice_var3].values
the_above_data_5 = data_above[choice_var5].values
the_above_data_7 = data_above[choice_var7].values

#below
the_below_data_1 = data_below[choice_var1].values
the_below_data_3 = data_below[choice_var3].values
the_below_data_5 = data_below[choice_var5].values
the_below_data_7 = data_below[choice_var7].values

In [84]:
the_train_1, the_test_1, train_label, test_label = create_traintest_data(the_below_data_1, the_above_data_1, split_perc=0.6, return_label=True)
the_train_3, the_test_3 = create_traintest_data(the_below_data_3, the_above_data_3, split_perc=0.6, return_label=False)
the_train_5, the_test_5 = create_traintest_data(the_below_data_5, the_above_data_5, split_perc=0.6, return_label=False)
the_train_7, the_test_7 = create_traintest_data(the_below_data_7, the_above_data_7, split_perc=0.6, return_label=False)

In [85]:
data_scaled_train_1 = standardize_scale_apply(the_train_1)
data_scaled_train_3 = standardize_scale_apply(the_train_3)
data_scaled_train_5 = standardize_scale_apply(the_train_5)
data_scaled_train_7 = standardize_scale_apply(the_train_7)

In [86]:
data_scaled_test_1 = standardize_scale_apply_test(the_train_1, the_test_1)
data_scaled_test_3 = standardize_scale_apply_test(the_train_3, the_test_3)
data_scaled_test_5 = standardize_scale_apply_test(the_train_5, the_test_5)
data_scaled_test_7 = standardize_scale_apply_test(the_train_7, the_test_7)

In [87]:
X_train = np.stack([data_scaled_train_1, data_scaled_train_3, data_scaled_train_5, data_scaled_train_7])

In [88]:
X_test = np.stack([data_scaled_test_1, data_scaled_test_3, data_scaled_test_5, data_scaled_test_7])

Create xarray dataset for saving for the specific variable.

In [89]:
data_assemble = xr.Dataset({
    'X_train':(['a','x','y','features'], X_train.reshape(X_train.shape[1],32,32,4)),
    'X_train_label':(['a'], train_label),
    'X_test':(['b','x','y','features'], X_test.reshape(X_test.shape[1],32,32,4)),
    'X_test_label':(['b'], test_label),
    },
     coords=
    {'feature':(['features'],attrs_array),
    })

In [90]:
data_assemble

<xarray.Dataset>
Dimensions:        (a: 47702, b: 560022, features: 4, x: 32, y: 32)
Coordinates:
    feature        (features) <U6 'pr_1km' 'pr_3km' 'pr_5km' 'pr_7km'
Dimensions without coordinates: a, b, features, x, y
Data variables:
    X_train        (a, x, y, features) float32 1.019782 1.0193924 ... -1.871275
    X_train_label  (a) float64 1.0 1.0 0.0 0.0 1.0 1.0 ... 0.0 0.0 0.0 0.0 1.0
    X_test         (b, x, y, features) float32 0.528231 0.52410966 ... 0.7838981
    X_test_label   (b) float64 0.0 0.0 0.0 0.0 0.0 1.0 ... 0.0 0.0 0.0 0.0 0.0

In [92]:
data_assemble.to_netcdf(f"/glade/scratch/molina/WRF_CONUS1_derived/deep_learning/{which_climate}_{var_namer}patch_traintestdata_unbalanced.nc")