### Preprocessing Datasets

Here we load, process, and save back to drive all h5 datasets

**Mount Google Drive**

In [None]:
!pip install --upgrade tables

Collecting tables
[?25l  Downloading https://files.pythonhosted.org/packages/ed/c3/8fd9e3bb21872f9d69eb93b3014c86479864cca94e625fd03713ccacec80/tables-3.6.1-cp36-cp36m-manylinux1_x86_64.whl (4.3MB)
[K     |████████████████████████████████| 4.3MB 8.6MB/s 
Installing collected packages: tables
  Found existing installation: tables 3.4.4
    Uninstalling tables-3.4.4:
      Successfully uninstalled tables-3.4.4
Successfully installed tables-3.6.1


In [1]:
from google.colab import drive
drive.mount('/content/drive')
repo_path = "/content/drive/My Drive/repos/subseasonal_rodeo/"

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


**Pre-Processing of Core Data**

In [None]:
import pandas as pd
import numpy as np
# %matplotlib inline
import datetime
from sklearn.preprocessing import MinMaxScaler
import joblib
import math

**Load & Process Data**

In [None]:
def process_series(data_path):
  data = pd.read_hdf(repo_path+data_path)
  data = pd.DataFrame(data.reset_index(drop=False))
  data = data[data['start_date'] > datetime.datetime.strptime('1978-12-31',"%Y-%m-%d")].reset_index(drop=True)
  return(data)

In [None]:
# Series datasets
pres = process_series('data/gt-contest_pres.sfc.gauss-14d-1948-2018.h5') # Pressure
hgt = process_series('data/gt-contest_wind_hgt_10-14d-1948-2018.h5') # Wind @10m geopotential height
rhum = process_series('data/gt-contest_rhum.sig995-14d-1948-2018.h5') # Relative humidity
slp = process_series('data/gt-contest_slp-14d-1948-2018.h5') # Sea level pressure
prec = process_series('data/gt-contest_precip-14d-1948-2018.h5') # Precipitation
prwtr = process_series('data/gt-contest_pr_wtr.eatm-14d-1948-2018.h5') # Precipitable water
pevpr = process_series('data/gt-contest_pevpr.sfc.gauss-14d-1948-2018.h5') # Potential evaporation

In [None]:
# Tmp2m dataset
temp = pd.read_hdf(repo_path+'data/gt-contest_tmp2m-14d-1979-2018.h5')
temp['start_date'] = pd.to_datetime(temp['start_date'], unit='ns')
temp.drop(['tmp2m_sqd',	'tmp2m_std'], axis=1, inplace=True)

In [None]:
# Tmp2m & Precipitation Climatology & Anomaly
df = temp.merge(prec, left_on = ['lon','lat','start_date'], right_on = ['lon','lat','start_date'], how = 'inner')
df['year'] = df['start_date'].dt.year
df['month'] = df['start_date'].dt.month
df['day'] = df['start_date'].dt.day
df['dayofyear'] = df['start_date'].dt.dayofyear

**Join all Datasets**

In [None]:
# Merge all into single df
# Spatial temporal datasets
df = df.merge(pres, left_on = ['lon','lat','start_date'], right_on = ['lon','lat','start_date'], how = 'inner')
del pres
df = df.merge(hgt, left_on = ['lon','lat','start_date'], right_on = ['lon','lat','start_date'], how = 'inner')
del hgt
df = df.merge(rhum, left_on = ['lon','lat','start_date'], right_on = ['lon','lat','start_date'], how = 'inner')
del rhum
df = df.merge(slp, left_on = ['lon','lat','start_date'], right_on = ['lon','lat','start_date'], how = 'inner')
del slp
df = df.merge(prwtr, left_on = ['lon','lat','start_date'], right_on = ['lon','lat','start_date'], how = 'inner')
del prwtr
df = df.merge(pevpr, left_on = ['lon','lat','start_date'], right_on = ['lon','lat','start_date'], how = 'inner')
del pevpr

In [None]:
df.head()

Unnamed: 0,lat,lon,start_date,tmp2m,precip,year,month,day,dayofyear,pres,contest_wind_hgt_10,rhum,slp,pr_wtr,pevpr
0,27.0,261.0,1979-01-01,7.683595,8.122134,1979,1,1,1,99634.573521,31083.037249,75.348356,102332.270229,16.637908,140.257407
1,27.0,261.0,1979-01-02,7.283579,7.038652,1979,1,2,2,99673.597796,31086.626395,72.662776,102404.656669,15.724481,144.829959
2,27.0,261.0,1979-01-03,7.792389,7.038652,1979,1,3,3,99545.359933,31088.482282,74.123019,102301.680943,16.68181,139.518139
3,27.0,261.0,1979-01-04,8.906203,7.118693,1979,1,4,4,99447.93471,31089.829381,77.210133,102189.965541,17.933701,141.163577
4,27.0,261.0,1979-01-05,9.719387,7.325908,1979,1,5,5,99365.20731,31084.936942,77.090285,102108.830776,18.548535,147.473729


**Feature Engineer Date Features**

In [None]:
df['month_sin'] = np.sin(df['month']*(2.*np.pi/12))
df['month_cos'] = np.cos(df['month']*(2.*np.pi/12))
df['dayofyear_sin'] = np.sin(df['dayofyear']*(2.*np.pi/366))
df['dayofyear_cos'] = np.cos(df['dayofyear']*(2.*np.pi/366))
# df.drop('start_month', inplace=True, axis=1)

In [None]:
# Sort by date as primary sorting key so we can split for train/validation/test easily
cols = list(df)
cols.insert(0, cols.pop(cols.index('start_date')))
df = df.loc[:, cols]
df = df.sort_values(by=['start_date', 'lat','lon']).reset_index(drop=True)
df = df.drop(['year','month','day','dayofyear'], axis=1)
for col in df.columns[1:]:
  df[col] = df[col].astype('float32')
df.head()

Unnamed: 0,start_date,lat,lon,tmp2m,precip,pres,contest_wind_hgt_10,rhum,slp,pr_wtr,pevpr,month_sin,month_cos,dayofyear_sin,dayofyear_cos
0,1979-01-01,27.0,261.0,7.683595,8.122134,99634.570312,31083.037109,75.348358,102332.273438,16.637909,140.257401,0.5,0.866025,0.017166,0.999853
1,1979-01-01,27.0,262.0,8.21541,20.169296,100262.96875,31085.554688,78.908356,102335.476562,17.451164,132.152435,0.5,0.866025,0.017166,0.999853
2,1979-01-01,28.0,261.0,6.222338,12.087295,99910.59375,31073.150391,71.534996,102473.265625,14.227145,136.018433,0.5,0.866025,0.017166,0.999853
3,1979-01-01,28.0,262.0,6.34815,51.016727,100531.703125,31077.091797,75.604515,102458.054688,15.213875,126.014656,0.5,0.866025,0.017166,0.999853
4,1979-01-01,28.0,263.0,6.77842,116.063667,101668.0625,31084.351562,83.178177,102417.3125,17.226011,109.695877,0.5,0.866025,0.017166,0.999853


In [None]:
# Sort by date as primary sorting key so we can split for train/validation/test easily
df = df.sort_values(by=['lat','lon', 'start_date']).reset_index(drop=True)
df.head()

Unnamed: 0,start_date,lat,lon,tmp2m,precip,pres,contest_wind_hgt_10,rhum,slp,pr_wtr,pevpr,month_sin,month_cos,dayofyear_sin,dayofyear_cos
0,1979-01-01,27.0,261.0,7.683595,8.122134,99634.570312,31083.037109,75.348358,102332.273438,16.637909,140.257401,0.5,0.866025,0.017166,0.999853
1,1979-01-02,27.0,261.0,7.283579,7.038651,99673.601562,31086.626953,72.662773,102404.65625,15.724482,144.829956,0.5,0.866025,0.034328,0.999411
2,1979-01-03,27.0,261.0,7.792389,7.038651,99545.359375,31088.482422,74.123016,102301.679688,16.68181,139.518143,0.5,0.866025,0.051479,0.998674
3,1979-01-04,27.0,261.0,8.906203,7.118693,99447.9375,31089.830078,77.210136,102189.96875,17.933701,141.163574,0.5,0.866025,0.068615,0.997643
4,1979-01-05,27.0,261.0,9.719387,7.325908,99365.210938,31084.9375,77.090286,102108.828125,18.548536,147.473724,0.5,0.866025,0.085731,0.996318


**Save Unscaled Merged Dataset & Column Names**

In [None]:
# pd.DataFrame(df.columns.values).to_csv(repo_path+'data/processed/spatial_temporal/column_names.csv')
# np.save(repo_path+'data/processed/spatial_temporal/full_data_unscaled_1979-2018', np.array(df))
df = np.array(np.load(repo_path+'data/processed/spatial_temporal/full_data_unscaled_1979-2018.npy',allow_pickle=True))

**Conduct Preprocessing -- standardize, scale, transform to spatial 2D, mask missing regions, outer pad spatial tensor**

In [None]:
col_names = pd.read_csv(repo_path+'data/processed/spatial_temporal/column_names.csv')
col_names = list(col_names['0'].values)

In [None]:
locations = pd.read_csv(repo_path+'data/processed/spatial_temporal/target_points.csv')
locations['region_id'] = list(zip(locations['lat'], locations['lon']))

In [None]:
class PreprocessTemporalSpatialData:
    """
    Class for conducting preprocessing pipeline for temporal spatial data
    Standardizes feature fields, and scales all categorical feature fiels
    Transforms dataset to spatial form [timesteps,lat,lon,features]
    Creates missing regions to enable above matrix transformation (filling in missing value with 0 - note this aligns well with [0,1] scaling)


    """
    def __init__(self, data:np.array, locations:np.array, col_names:list, num_regions:int, num_features:int, max_sg:int=5):
        self.data = data
        self.locations = locations
        self.regions = self.locations['region_id'].unique()
        self.latmin, self.latmax, self.lonmin, self.lonmax = self.locations['lat'].min(), self.locations['lat'].max(), self.locations['lon'].min(), self.locations['lon'].max()
        self.bin_width, self.bin_height = self.lonmax - self.lonmin + 1, self.latmax - self.latmin + 1
        self.col_names = col_names
        self.num_regions = num_regions
        self.num_features = num_features
        self.weather_features = 8
        self.cyclical_features = 4
        self.num_timesteps = self.data.reshape(self.num_regions, -1, self.num_features).shape[1]
        self.max_sg = max_sg
        
    def standardize_and_scale_data(self, save=False):
        """ Standarize features using mean, std from train set; then scale to 0-1 scale """
        # Ensure dataset is order by start date, by region (lat, then lon)
#         self.data = self.data.sort_values(by=['start_date', 'lat','lon']).reset_index(drop=True)
        # Reshape all regions together
        self.data  = np.array(self.data).reshape(-1, self.num_features)
        # Extract train_split - note we only standardize using mean and std from train set
        TRAIN_SPLIT = self.num_timesteps #- 2*1008*self.num_regions
        
        # Start Date - need this for indexing/grouping by region
        date = self.data[:,0].reshape((-1,1))

        # Keep lat/lon from scaling
        lat_lon = self.data[:,1:3].astype(np.float16)

        # Standardize feature fields
        features = self.data[:,3:-4].astype(np.float32)
        self.data_mean = features[:TRAIN_SPLIT].mean(axis=0)
        self.data_std = features[:TRAIN_SPLIT].std(axis=0)
        features = ((features-self.data_mean)/self.data_std).astype(np.float16)
        
        # Deal with cyclical features separately - these are already scaled
        cyclical_features = self.data[:,-4:].astype(np.float16)

        # All features - stack and scale
        all_features = np.hstack((features, cyclical_features))

        # Scale feature data to 0-1 scale
        scaler = MinMaxScaler()
        all_features = scaler.fit_transform(all_features)
        # Scalers for temp & pres only
        temp_scaler = MinMaxScaler()
        temp_scaler.fit(np.array(features[:,0]).reshape(-1,1))
        prec_scaler = MinMaxScaler()
        prec_scaler.fit(np.array(features[:,1]).reshape(-1,1))
        if save:
            np.save(repo_path+'/data/processed/spatial_temporal/feature_means', self.data_mean)
            np.save(repo_path+'/data/processed/spatial_temporal/feature_stds', self.data_std)
            joblib.dump(scaler,repo_path+'/data/processed/spatial_temporal/all_feature_scaler.pkl')
            joblib.dump(temp_scaler,repo_path+'/data/processed/spatial_temporal/temp_scaler.pkl')
            joblib.dump(prec_scaler,repo_path+'/data/processed/spatial_temporal/prec_scaler.pkl')
        
        # Recombine & Reshape
        self.data = np.hstack((date, lat_lon, all_features))
        self.data = self.data.reshape(-1, self.num_features)

        
    def process_datetime(self, dt_fmt:str='%Y-%m-%d', datetimecol='start_date'):
        #print("Parsing datetime fields \n")
        def lookup(s):
            """
            This is an extremely fast approach to datetime parsing.
            """
            dates = {date:pd.to_datetime(date, format = dt_fmt) for date in s.unique()}
            return s.map(dates)
        self.data[datetimecol] = lookup(self.data[datetimecol])
        self.all_dates = np.unique(self.data["start_date"].dt.strftime('%Y-%m-%d'))
        
    def get_missing_regions(self):
        """ Find regions in lat-lon box which are not modelled geographic regions  """
        self.all_region_ids = [(lat,lon) for lat in range(self.latmin, self.latmax+1) for lon in range(self.lonmin,self.lonmax+1)]
        self.num_total_regions = len(set(self.all_region_ids))
        self.missing_regions = list(set(self.all_region_ids) - set(list(self.regions)))
        self.missing_regions.sort() 
        
    def mask_missing_regions(self, mask_value=0):
        """ Create masked data for missing region - zero pad (0,1) scaled features """
        self.get_missing_regions()
        masked_rgn_lst = []
        for rgn in self.missing_regions:
            date_col = self.data.reshape(self.num_regions,-1,self.num_features)[0,:,0].reshape(self.num_timesteps,1) #take same dates
            lat_col = np.array([rgn[0]]*self.num_timesteps).reshape(self.num_timesteps,1) #take lat of current region
            lon_col = np.array([rgn[1]]*self.num_timesteps).reshape(self.num_timesteps,1) #take lon of current region
            feature_cols = np.array([mask_value]*self.num_timesteps*(self.weather_features)).reshape(self.num_timesteps,self.weather_features) #mask weather features
            cyclical_cols = self.data.reshape(self.num_regions,-1,self.num_features)[0,:,-self.cyclical_features:].reshape(self.num_timesteps,self.cyclical_features) #take time features
            masked_rgn = np.hstack((date_col, lat_col, lon_col, feature_cols, cyclical_cols))
            masked_rgn_lst.append(masked_rgn)
        masked_rgns = np.concatenate(masked_rgn_lst)
        self.masked_rgns_df = pd.DataFrame(masked_rgns)
        self.masked_rgns_df.columns = self.col_names
        self.masked_rgns_df['region_id'] = list(zip(self.masked_rgns_df['lat'].astype(int), self.masked_rgns_df['lon'].astype(int)))
        self.masked_rgns_df['model_region'] = False
        
    def convert_rgns_data_to_df(self):
        """ Convert to df of shape (num_timesteps*num_regions, num_features). Ordered by region by timestep """
        self.data = pd.DataFrame(self.data.reshape(-1,self.num_features))
        self.data.columns = self.col_names
        # Create unique region id
        self.data['region_id'] = list(zip(self.data['lat'].astype(int), self.data['lon'].astype(int)))
        self.data['model_region'] = True
        
    def join_masked_regions(self):
        """ Join masked regions df to rgns data df, and sort """
        self.data = pd.concat([self.data, self.masked_rgns_df])
        # Convert date to datetime
        self.process_datetime()
        # Sort
        self.data = self.data.sort_values(by=['lat', 'lon', 'start_date']).reset_index(drop=True)
        assert self.num_total_regions == len(self.data['region_id'].unique())
        
    def get_global_region_tensor(self, save=False):
        print("Generating global spatial grid \n")
        spatial_tw_list = []
#         self.all_dates = np.sort(self.data['start_date'].unique())
        # For every timestep, create binsize*binsize global spatial grid of demand
        for counter, time_window in enumerate(self.all_dates):
            mask = self.data['start_date'] == np.datetime64(time_window)
            pvt_current_time_window = np.flipud(np.array(self.data[mask]).reshape(self.bin_height,self.bin_width,-1))
            spatial_tw_list.append(pvt_current_time_window)
        # Store global_regions_tensor - stack as tensor: for bin_index [num_timesteps, bin_width , bin_height, number of channels]
        self.global_region_tensor = np.stack(spatial_tw_list).reshape(-1,self.bin_height,self.bin_width,pvt_current_time_window.shape[2])
        self.global_region_tensor = self.global_region_tensor[:,:,:,:-2]

        # Pad outer edge with max spatial granularity
        npad = ((0, 0), (self.max_sg, self.max_sg), (self.max_sg, self.max_sg), (0, 0)) # pad (before, after) on region height/width dimensions only
        self.global_region_tensor = np.pad(self.global_region_tensor, pad_width = npad, mode='constant', constant_values=0)
        # Save to disk
        if save:
            print("Saving global spatial tensor \n")
            np.save(repo_path+'/data/processed/spatial_temporal/global_region_tensor_scaled_sg'+str(self.max_sg), self.global_region_tensor)
    
    def preprocess_pipeline(self):
        self.standardize_and_scale_data(save=True)
        self.mask_missing_regions()
        self.convert_rgns_data_to_df()
        self.join_masked_regions()
        self.get_global_region_tensor(save=True)
        del self.data

In [None]:
y = PreprocessTemporalSpatialData(df, locations, col_names, num_regions=514, num_features=15, max_sg=5)

In [None]:
y.preprocess_pipeline()

In [None]:
# np.save(repo_path+'data/processed/full_data_scaled_1979-2018', df)