In [1]:
import glob
import pickle
import joblib
import re
import os
import os
import sys
import time

import pandas as pd
import numpy as np
from sklearn.model_selection import cross_validate
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import PowerTransformer
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern
from tpot.export_utils import set_param_recursive
import xarray as xr
from SALib.sample import saltelli
from SALib.analyze import sobol
import dask
import dask.bag as db
import dask.dataframe as dd
import cloudpickle

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import cartopy.crs as ccrs
from cartopy.feature import ShapelyFeature
from cartopy.io.shapereader import Reader
params = {
    'text.latex.preamble': ['\\usepackage{gensymb}'],
    'axes.grid': False,
    'savefig.dpi': 700,
    'font.size': 12,
    'text.usetex': False,
    'figure.figsize': [5, 5],
    'font.family': 'serif',
}
matplotlib.rcParams.update(params)

In [2]:
from dask_jobqueue import SGECluster
from dask.distributed import Client

cluster = SGECluster(
    walltime='12:00:00', 
    memory='2 G',
    resource_spec='h_vmem=2G',
    scheduler_options={
        'dashboard_address': ':5757',
    },
    project='admiralty'
)

client = Client(cluster)

In [3]:
cluster.scale(jobs=50)

In [None]:
client.close()
cluster.close()

In [4]:
output = 'PM2_5_DRY'
path = '/nobackup/earlacoa/machinelearning/data/'

with open(path + 'dict_train.pickle', 'rb') as ds:
    dict_train = pickle.load(ds)
    
df_train = pd.concat(dict_train, ignore_index=True)
gridcells = df_train[['lat', 'lon']].drop_duplicates().values.tolist()

#### create control using emulators

In [None]:
outputs = [
    'PM2_5_DRY',
    'o3',
    'AOD550_sfc',
    'asoaX_2p5',
    'bc_2p5',
    'bsoaX_2p5',
    'nh4_2p5',
    'no3_2p5',
    'oc_2p5',
    'oin_2p5',
    'so4_2p5'
]

fraction_res = 1.0
fraction_ind = 1.0
fraction_tra = 1.0
fraction_agr = 1.0
fraction_ene = 1.0

custom_inputs = np.array([
    fraction_res,
    fraction_ind,
    fraction_tra,
    fraction_agr,
    fraction_ene
]).reshape(1, -1)

empty_values = np.empty((580, 1440))
empty_values[:] = np.nan

for output in outputs:
    emulator_files = glob.glob(path + output + '/emulator_' + output + '_*.joblib')
    
    ds_custom_output = xr.DataArray(
        empty_values, 
        dims=('lat', 'lon'), 
        coords={'lat': np.arange(-60, 85, 0.25), 'lon': np.arange(-180, 180, 0.25)}
    )
    
    for emulator_file in emulator_files:
        lat, lon = [float(item) for item in re.findall(r'\d+\.\d+', emulator_file)]
        emulator = joblib.load(emulator_file)
        
        try:
            custom_output = emulator.predict(custom_inputs)
            ds_custom_output = xr.where(
                (ds_custom_output.coords['lat'] == lat) & (ds_custom_output.coords['lon'] == lon),
                custom_output,
                ds_custom_output
            )
        except:
            RuntimeError
    
    ds_custom_output.name = output
    ds_custom_output.to_netcdf(
        path + 'summary/ds_ctl_' + output + '.nc'
    )

#### create individual 10% emulators while holding other inputs at 1.0

In [None]:
output = 'PM2_5_DRY'
#output = 'o3'

emulator_files = glob.glob(path + output + '/emulator_' + output + '_*.joblib')

empty_values = np.empty((580, 1440))
empty_values[:] = np.nan

matrix1 = np.array(np.meshgrid(np.linspace(0, 1.5, 16), 1, 1, 1, 1)).T.reshape(-1, 5)
matrix2 = np.array(np.meshgrid(1, np.linspace(0, 1.5, 16), 1, 1, 1)).T.reshape(-1, 5)
matrix3 = np.array(np.meshgrid(1, 1, np.linspace(0, 1.5, 16), 1, 1)).T.reshape(-1, 5)
matrix4 = np.array(np.meshgrid(1, 1, 1, np.linspace(0, 1.5, 16), 1)).T.reshape(-1, 5)
matrix5 = np.array(np.meshgrid(1, 1, 1, 1, np.linspace(0, 1.5, 16))).T.reshape(-1, 5)
matrix_stacked = np.vstack((matrix1, matrix2, matrix3, matrix4, matrix5))

for matrix in matrix_stacked:
    custom_inputs = matrix.reshape(1, -1)
    filename = 'RES' + str(np.round(custom_inputs[0][0], decimals=1)) \
                + '_IND' + str(np.round(custom_inputs[0][1], decimals=1)) \
                + '_TRA' + str(np.round(custom_inputs[0][2], decimals=1)) \
                + '_AGR' + str(np.round(custom_inputs[0][3], decimals=1)) \
                + '_ENE' + str(np.round(custom_inputs[0][4], decimals=1))

    ds_custom_output = xr.DataArray(
        empty_values, 
        dims=('lat', 'lon'), 
        coords={'lat': np.arange(-60, 85, 0.25), 'lon': np.arange(-180, 180, 0.25)}
    )

    for emulator_file in emulator_files:
        lat, lon = [float(item) for item in re.findall(r'\d+\.\d+', emulator_file)]
        emulator = joblib.load(emulator_file)
        custom_output = emulator.predict(custom_inputs)
        ds_custom_output = xr.where(
            (ds_custom_output.coords['lat'] == lat) & (ds_custom_output.coords['lon'] == lon),
            custom_output,
            ds_custom_output
        )
    
    ds_custom_output.name = output
    
    ds_custom_output.to_netcdf(
        path + '/summary/ds_' + filename + '_' + output + '.nc'
    )

#### create 10% emulators - pangeo
parallelise over the custom inputs (as these are independent, while the dataset for gridcells are dependent)

Dask bag good practices
- no inter-worker communication
    - use the bag to load data
- minimise IO
- cloudpickle functions
- CPU % should ramp up to 100% per worker

Dask bag features
- immutable
- multi-processing (by default)
- multiple bags need identical partitions (number and size)

##### method 1
- ramps to 2700% for 32 cores
- without dataset creation for 20 custom inputs = CPU: 9 secs, Wall: 35 secs
- with dataset creation for 20 custom inputs = CPU: 26 secs, Wall: 1 min

In [221]:
matrix_stacked = np.array(np.meshgrid(
    np.linspace(0, 1.5, 16), 
    np.linspace(0, 1.5, 16),
    np.linspace(0, 1.5, 16),
    np.linspace(0, 1.5, 16),
    np.linspace(0, 1.5, 16)
)).T.reshape(-1, 5)

custom_inputs = [item.reshape(1, -1) for item in matrix_stacked]

In [None]:
# remove duplicates of ones already completed
custom_inputs_completed_filenames = glob.glob(path + 'summary/ds*')
custom_inputs_completed_list = []
for custom_inputs_completed_filename in custom_inputs_completed_filenames:
    custom_inputs_completed_list.append(
        [float(item) for item in re.findall(r'\d+\.\d+', custom_inputs_completed_filename)]
    )
    
custom_inputs_list = []
for custom_input in custom_inputs:
    custom_inputs_list.append(
        [float(item) for item in re.findall(r'[0-9]\.[0-9]?', str(custom_input))]
    )
    
custom_inputs = [np.array(item).reshape(1, -1) for item in custom_inputs_list if item not in custom_inputs_completed_list]

In [222]:
df_gridcells_china = pd.DataFrame(gridcells, columns=['lat', 'lon']).set_index(['lat', 'lon'])

empty_values = np.empty((580, 1440))
empty_values[:] = np.nan

ds_empty = xr.DataArray(
    empty_values, 
    dims=('lat', 'lon'), 
    coords={
        'lat': np.arange(-60, 85, 0.25), 
        'lon': np.arange(-180, 180, 0.25)
    }
)
ds_empty = ds_empty.rename(output)
df_empty = ds_empty.to_dataframe().reset_index()
df_empty = df_empty.set_index(['lat', 'lon']).sort_index()

df_empty = df_empty.drop(df_gridcells_china.index, axis=0)

In [223]:
def load_emulator(emulator_file):
    lat, lon = [float(item) for item in re.findall(r'\d+\.\d+', emulator_file)]
    emulator = joblib.load(emulator_file)
    return lat, lon, emulator


def custom_predict(loaded_emulator, custom_input):
    lat, lon, emulator = loaded_emulator
    custom_output = emulator.predict(custom_input)[0]
    return lat, lon, custom_input, custom_output


def create_dataset(result):
    lats = [item[0] for item in result]
    lons = [item[1] for item in result]
    custom_input = [item[2] for item in result][0]
    custom_outputs = [item[3] for item in result]
    
    filename = 'RES' + str(np.round(custom_input[0][0], decimals=1)) \
                + '_IND' + str(np.round(custom_input[0][1], decimals=1)) \
                + '_TRA' + str(np.round(custom_input[0][2], decimals=1)) \
                + '_AGR' + str(np.round(custom_input[0][3], decimals=1)) \
                + '_ENE' + str(np.round(custom_input[0][4], decimals=1))
    
    df_results = pd.DataFrame([lats, lons, custom_outputs]).T
    df_results.columns = ['lat', 'lon', output]
    df_results = df_results.set_index(['lat', 'lon']).sort_index()
    
    df_custom_output = pd.concat([df_empty, df_results]).sort_index()
    ds_custom_output = xr.Dataset.from_dataframe(df_custom_output)    
    
    ds_custom_output.to_netcdf(
        path + 'summary/ds_' + filename + '_' + output + '.nc'
    )

In [224]:
pickled_custom_predict = cloudpickle.dumps(custom_predict)
depickled_custom_predict = pickle.loads(pickled_custom_predict)

pickled_load_emulator = cloudpickle.dumps(load_emulator)
depickled_load_emulator = pickle.loads(pickled_load_emulator)

pickled_create_dataset = cloudpickle.dumps(create_dataset)
depickled_create_dataset = pickle.loads(pickled_create_dataset)

In [225]:
output = 'PM2_5_DRY'
emulator_files = glob.glob(path + output + '/emulator_' + output + '_*.joblib')

In [226]:
custom_inputs_sample = custom_inputs[0:20]

In [227]:
bag_emulators = db.from_sequence(emulator_files).map(depickled_load_emulator)

In [231]:
%%time
# without dataset creation
for custom_input in custom_inputs_sample:
    results = []
    results.append(bag_emulators.map(depickled_custom_predict, custom_input).compute())

CPU times: user 8.21 s, sys: 273 ms, total: 8.49 s
Wall time: 34.2 s


In [232]:
%%time
# with dataset creation
for custom_input in custom_inputs_sample:
    results = []
    results.append(bag_emulators.map(depickled_custom_predict, custom_input).compute())
    
    create_dataset(results[0])

CPU times: user 26.6 s, sys: 858 ms, total: 27.5 s
Wall time: 58.9 s


##### method 2
- ramps to 3200% for 32 cores
- without dataset creation for 20 custom inputs = CPU: 1 secs, Wall: 10 secs
- with dataset creation for 20 custom inputs = CPU: 18 secs, Wall: 25 secs

In [314]:
CUSTOM_INPUTS = None
number_of_inputs = 20

def get_emulator_files(file_path, file_pattern='emulator*'):
    emulator_files = glob.glob(os.sep.join([file_path, file_pattern]))
    return emulator_files

def load_emulator(emulator_file):
    lat, lon = [float(item) for item in re.findall(r'\d+\.\d+', emulator_file)]
    emulator = joblib.load(emulator_file)
    return lat, lon, emulator

def get_custom_inputs():
    custom_inputs = np.array(np.meshgrid(
        np.linspace(0, 1.5, 16),
        np.linspace(0, 1.5, 16),
        np.linspace(0, 1.5, 16),
        np.linspace(0, 1.5, 16),
        np.linspace(0, 1.5, 16)
    )).T.reshape(-1, 5)
    custom_inputs = [custom_input.reshape(1, -1) for custom_input in custom_inputs]
    return custom_inputs

def custom_predicts(emulator_file):
    lat, lon, emulator = load_emulator(emulator_file)

    def emulator_wrap(custom_input):
        return lat, lon, custom_input, emulator.predict(custom_input)[0]

    global CUSTOM_INPUTS
    if not CUSTOM_INPUTS:
        CUSTOM_INPUTS = get_custom_inputs()[0:number_of_inputs]
    custom_inputs = CUSTOM_INPUTS

    results = []
    results = results + list(map(emulator_wrap, custom_inputs))
    return results

def create_dataset(lats, lons, custom_input, custom_outputs):
    filename = 'RES' + str(np.round(custom_input[0][0], decimals=1)) \
                + '_IND' + str(np.round(custom_input[0][1], decimals=1)) \
                + '_TRA' + str(np.round(custom_input[0][2], decimals=1)) \
                + '_AGR' + str(np.round(custom_input[0][3], decimals=1)) \
                + '_ENE' + str(np.round(custom_input[0][4], decimals=1))
    
    df_results = pd.DataFrame([lats, lons, custom_outputs]).T
    df_results.columns = ['lat', 'lon', output]
    df_results = df_results.set_index(['lat', 'lon']).sort_index()
    
    df_custom_output = pd.concat([df_empty, df_results]).sort_index()
    ds_custom_output = xr.Dataset.from_dataframe(df_custom_output)    
    
    ds_custom_output.to_netcdf(
        path + 'summary/ds_' + filename + '_' + output + '.nc'
    )

def main():
    output = 'PM2_5_DRY'
    data_dir = f'/nobackup/earlacoa/machinelearning/data/{output}/' 

    emulator_files = get_emulator_files(data_dir)

    emulator_bag = db.from_sequence(emulator_files)

    results = emulator_bag.map(custom_predicts).compute()

    print(f'Does the total results length equal the number of emulators? {len(results) == len(emulator_files)}')
    print(f'Does each emulator length equal the number of custom_inputs? {len(results[0]) == number_of_inputs}')
    
    result = [result[0] for result in results]
    lats = [item[0] for item in result]
    lons = [item[1] for item in result]
    custom_inputs = [item[2] for item in results[0]]

    for index, custom_input in enumerate(custom_inputs):
        result = [result[index] for result in results]
        custom_outputs = [item[3] for item in result]
        create_dataset(lats, lons, custom_input, custom_outputs)

In [283]:
%%time
# without dataset creation
if __name__ == '__main__':
    results = main()

Does the total results length equal the number of emulators? True
Does each emulator length equal the number of custom_inputs? True
CPU times: user 1.22 s, sys: 88.2 ms, total: 1.31 s
Wall time: 9.88 s


In [316]:
%%time
# with dataset creation
if __name__ == '__main__':
    results = main()

Does the total results length equal the number of emulators? True
Does each emulator length equal the number of custom_inputs? True
CPU times: user 18.5 s, sys: 292 ms, total: 18.7 s
Wall time: 28.4 s
