# Badr's Setup:<br>Dask Grid Search for NN Prediction of Sahelian Summer Rainfall
***

In [None]:
import numpy as np
import pandas as pd 
import xarray as xr
import dask.bag as db
import scipy.stats as st
from sklearn.model_selection import ShuffleSplit
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from tensorflow.keras import layers
from tensorboard.plugins.hparams import api as hp
%load_ext tensorboard

<br>

## 1. Dask Client
---

In [2]:
from dask.distributed import Client

client = Client(n_workers=1, threads_per_worker=4, memory_limit=16e9)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 41101 instead


0,1
Client  Scheduler: tcp://127.0.0.1:45923  Dashboard: http://127.0.0.1:41101/status,Cluster  Workers: 1  Cores: 4  Memory: 16.00 GB


<br>

## 2. Data Loading
***

**feat_pc:** Principal Components of climate indices. Used for model input.  
**labels:** from Sahelrainfall data serves as reference data.

In [3]:
feat_pc = xr.open_dataset('data/da_final_badr.nc').feat_pc.to_pandas()
labels = xr.open_dataset('data/da_final_badr.nc').labels.values

<br>


## 3. MODEL SETUP
***

<br>

### Build Model Function
---

In [4]:
def BuildModel(HPARAM):      
    
    
    model = keras.Sequential([
            layers.Dense(3, activation="sigmoid", name="layer1", input_shape=(9,)),
            layers.Dense(1, name='output')
        ])
    
    model.compile(
        loss='mean_squared_error',
        optimizer=getattr(tfa.optimizers, HPARAM['optimizer'])(
            learning_rate=HPARAM['learn_rate'],
            weight_decay=HPARAM['weight_decay']
        )
    )
    return model

<br>

### Bagging Function
---

In [5]:
def Bagging(HPARAM, SPLIT, model, features):
    
    
    
    # set emty output matrices
    y_train_bagging = np.zeros((SPLIT['train_index'].size, N_BAGGS))
    y_test_bagging = np.zeros((SPLIT['test_index'].size, N_BAGGS))    
    
    
    #Train the model 'N_BAGGS' times and store model predictions into matrice
    for n in range(N_BAGGS):
        
#         print ('baggin run', n)
#         print ('PREDICTION ON TEST DATA:', y_test_bagging)
        
        # Bootstrap sampling from training Data with Size(Training Data)
        train_index_bootstrap = np.random.choice(SPLIT['train_index'], SPLIT['train_index'].size)

        #Train the model 
        model.fit(
            features[train_index_bootstrap],
            labels[train_index_bootstrap],
            batch_size=HPARAM['batch_size'],
            epochs=HPARAM['n_epochs'],
            verbose=0
        )
        
        #Run the model for insample data and store in one matrix:
        y_train_bagging[:, n] = np.squeeze(model.predict(features[SPLIT['train_index']]))
        
        # ... and for out of sample data        
        y_test_bagging[:, n] = np.squeeze(model.predict(features[SPLIT['test_index']]))

    # return mean of the outputs over baggins (1st dimension)
    return y_train_bagging.mean(1), y_test_bagging.mean(1)

<br>

### Single Run Training & Error Calculation Funktion
---

In [6]:
def TrainModel(SPLIT, HPARAM, features):
    
    model = BuildModel(HPARAM)
       
    y_train, y_test = Bagging(HPARAM, SPLIT, model, features)

    
    train_error = y_train - labels[SPLIT['train_index']]
    train_ae = np.absolute(train_error)
    train_mae = np.mean(train_ae)
    train_mad = np.median(np.absolute(train_error - np.median(train_error)))
    train_mse = np.mean(train_error**2)
    train_rmse = np.sqrt(train_mse)
    train_corr = st.pearsonr(y_train, labels[SPLIT['train_index']])[0]
    
    test_error = y_test - labels[SPLIT['test_index']]
    test_ae = np.absolute(test_error)
    test_mae = np.mean(test_ae)
    test_mad = np.median(np.absolute(test_error - np.median(test_error)))
    test_mse = np.mean(test_error**2)
    test_rmse = np.sqrt(test_mse)
    test_corr = st.pearsonr(y_test, labels[SPLIT['test_index']])[0]

    
    metrics = {
        'train_mae': train_mae,
        'train_mad': train_mad,
        'train_mse': train_mse,
        'train_rmse': train_rmse,
        'train_corr': train_corr,
        'test_mae': test_mae,
        'test_mad': test_mad,
        'test_mse': test_mse,
        'test_rmse': test_rmse,
        'test_corr': test_corr,
    }
    
    return metrics

<br>

### Cross-Validation and Log Function
---

In [7]:
def TuneModel(HPARAM):
    
    
    with tf.summary.create_file_writer(parent_dir + f"/run-{HPARAM['grid_num']:04d}").as_default():
        hp.hparams({
            HP_INPUT_VAR_NINE: HPARAM['input_var_nine'],
            HP_OPTIMIZER: HPARAM['optimizer'],
            HP_LEARN_RATE: HPARAM['learn_rate'],
            HP_WEIGHT_DECAY: HPARAM['weight_decay'],
            HP_BATCH_SIZE: HPARAM['batch_size'],
            HP_EPOCHS: HPARAM['n_epochs']
        })
        
        features = feat_pc.loc[:,['PC1', 'PC2', 'PC3', 'PC4', 'PC5', 'PC6', 'PC7', 'PC8', HPARAM['input_var_nine']]].to_numpy()
        
        metrics = SPLITS.map(lambda SPLIT: TrainModel(SPLIT, HPARAM, features)).compute()

        train_mae =  [metric['train_mae']  for metric in metrics]
        train_mad =  [metric['train_mad']  for metric in metrics]
        train_mse =  [metric['train_mse']  for metric in metrics]
        train_rmse = [metric['train_rmse'] for metric in metrics]
        train_corr = [metric['train_corr'] for metric in metrics]
        test_mae =   [metric['test_mae']   for metric in metrics]
        test_mad =   [metric['test_mad']   for metric in metrics]
        test_mse =   [metric['test_mse']   for metric in metrics]
        test_rmse =  [metric['test_rmse']  for metric in metrics]
        test_corr =  [metric['test_corr']  for metric in metrics]

        
        tf.summary.scalar('train_mae_mu',   np.mean(train_mae),  step=1)
        tf.summary.scalar('train_mae_sig',  np.std( train_mae),  step=1)        
        tf.summary.scalar('train_mad_mu',   np.mean(train_mad),  step=1)
        tf.summary.scalar('train_mad_sig',  np.std( train_mad),  step=1)
        tf.summary.scalar('train_mse_mu',   np.mean(train_mse),  step=1)
        tf.summary.scalar('train_mse_sig',  np.std( train_mse),  step=1)        
        tf.summary.scalar('train_rmse_mu',  np.mean(train_rmse), step=1)
        tf.summary.scalar('train_rmse_sig', np.std( train_rmse), step=1)
        tf.summary.scalar('train_corr_mu',  np.mean(train_corr), step=1)
        tf.summary.scalar('train_corr_sig', np.std( train_corr), step=1)
        tf.summary.scalar('test_mae_mu',    np.mean(test_mae),   step=1)
        tf.summary.scalar('test_mae_sig',   np.std( test_mae),   step=1)        
        tf.summary.scalar('test_mad_mu',    np.mean(test_mad),   step=1)
        tf.summary.scalar('test_mad_sig',   np.std( test_mad),   step=1)
        tf.summary.scalar('test_mse_mu',    np.mean(test_mse),   step=1)
        tf.summary.scalar('test_mse_sig',   np.std( test_mse),   step=1)        
        tf.summary.scalar('test_rmse_mu',   np.mean(test_rmse),  step=1)
        tf.summary.scalar('test_rmse_sig',  np.std( test_rmse),  step=1)
        tf.summary.scalar('test_corr_mu',   np.mean(test_corr),  step=1)
        tf.summary.scalar('test_corr_sig',  np.std( test_corr),  step=1)
    
    return None

<br>

## 4. Setup and Stard Grid Search
---

<br>

### Set Log Directory
***

In [8]:
parent_dir = 'logs/badr_gridsearch_pc10_11/' #CAUTION WITH: rm -rf logs/badr_gridsearch1/*

In [9]:
rm -rf logs/badr_gridsearch_pc10_11/*

<br>

### Hyperparameter Selection
***

In [10]:
###################################
#####EXAMPLE SETUP FOR TESTING#####
###################################


# #GRID SERACH HYPERPARAMETER#
# #--------------------------#
# HP_INPUT_VAR_NINE = hp.HParam('input_var_nine', hp.Discrete(['PC9']),display_name='9th Input Variable')
# HP_OPTIMIZER = hp.HParam('optimizer', hp.Discrete(['AdamW']),display_name='Optimizer')
# HP_LEARN_RATE = hp.HParam('learn_rate', hp.Discrete([0.09, 0.1]),display_name='Learning Rate')
# HP_WEIGHT_DECAY = hp.HParam('weight_decay', hp.Discrete([0.001]),display_name='Weight Decay')
# HP_BATCH_SIZE = hp.HParam('batch_size', hp.Discrete([10]),display_name='Batch Size')
# HP_EPOCHS = hp.HParam('n_epochs', hp.Discrete([80]),display_name='Epochs')


# #CROSS VALIDATION PARAMETER (NO PART OF GRID SEARCH)#
# #---------------------------------------------------#
# CV_PARAM={
#     'N_FOLDS': 80,         # number of folds -> small for Test Runs
#     'TEST_FRAC': .1    # factrion that is held out for test
# }


# #BAGGING PARAMETER (NO PART OF GRID SEARCH)#
# #-------------------------------------------
# N_BAGGS = 5  # number of baggs -> small for test runs

In [11]:
####################
#####FULL SETUP#####
####################


#GRID SERACH HYPERPARAMETER#
#--------------------------#
HP_INPUT_VAR_NINE = hp.HParam('input_var_nine', hp.Discrete(['PC10', 'PC11']),display_name='9th Input Variable')
HP_OPTIMIZER = hp.HParam('optimizer', hp.Discrete(['AdamW']),display_name='Optimizer')
HP_LEARN_RATE = hp.HParam('learn_rate', hp.Discrete([0.001, 0.01, 0.1, 0.2]),display_name='Learning Rate')
HP_WEIGHT_DECAY = hp.HParam('weight_decay', hp.Discrete([0.001, 0.01, 0.1]),display_name='Weight Decay')
HP_BATCH_SIZE = hp.HParam('batch_size', hp.Discrete([1, 4, 10, 40]),display_name='Batch Size')
HP_EPOCHS = hp.HParam('n_epochs', hp.Discrete([80, 120]),display_name='Epochs')


#CROSS VALIDATION PARAMETER (NO PART OF GRID SEARCH)#
#---------------------------------------------------#
CV_PARAM={
    'N_FOLDS': 80,      # number of folds -> sample size as in Badr (105)
    'TEST_FRAC': .1    # factrion that is held out for test
}


#BAGGING PARAMETER (NO PART OF GRID SEARCH)#
#------------------------------------------#
N_BAGGS = 5 # number of baggs -> 10 as in Badr

<br>

### Create HP Bag
---

In [12]:
grid_num = 0
hparams = []
for input_var_nine in HP_INPUT_VAR_NINE.domain.values:
    for optimizer in HP_OPTIMIZER.domain.values:
        for learn_rate in HP_LEARN_RATE.domain.values:
            for weight_decay in HP_WEIGHT_DECAY.domain.values:
                for batch_size in HP_BATCH_SIZE.domain.values:
                    for n_epochs in HP_EPOCHS.domain.values:


                        hparams.append(
                                {
                                'input_var_nine': input_var_nine,
                                'optimizer': optimizer,
                                'learn_rate': learn_rate,
                                'weight_decay': weight_decay,
                                'batch_size': batch_size,
                                'n_epochs': n_epochs,       
                                'grid_num': grid_num
                                }
                            )
                        grid_num += 1
                        
HPARAMS = db.from_sequence(hparams, npartitions = 10) 

# HPARAMS.take(1)

In [13]:
HPARAMS

dask.bag<from_sequence, npartitions=10>

In [14]:
db.from_sequence?

[0;31mSignature:[0m [0mdb[0m[0;34m.[0m[0mfrom_sequence[0m[0;34m([0m[0mseq[0m[0;34m,[0m [0mpartition_size[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mnpartitions[0m[0;34m=[0m[0;32mNone[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Create a dask Bag from Python sequence.

This sequence should be relatively small in memory.  Dask Bag works
best when it handles loading your data itself.  Commonly we load a
sequence of filenames into a Bag and then use ``.map`` to open them.

Parameters
----------
seq: Iterable
    A sequence of elements to put into the dask
partition_size: int (optional)
    The length of each partition
npartitions: int (optional)
    The number of desired partitions

It is best to provide either ``partition_size`` or ``npartitions``
(though not both.)

Examples
--------
>>> import dask.bag as db
>>> b = db.from_sequence(['Alice', 'Bob', 'Chuck'], partition_size=2)

See Also
--------
read_text: Create bag from text files
[0;31mFile:

In [15]:
HPARAMS

dask.bag<from_sequence, npartitions=10>

<br>

### Create Data Splits Bag (RRHCV)
---

In [16]:
split_num = 0
splits = []
for train, test in ShuffleSplit(n_splits=CV_PARAM['N_FOLDS'], test_size=CV_PARAM['TEST_FRAC']).split(feat_pc):
    splits.append(
        {
        'train_index': train,
        'test_index': test,
        'split_num': split_num
        }
    )
    split_num += 1 
SPLITS = db.from_sequence (splits, npartitions=10)

SPLITS

dask.bag<from_sequence, npartitions=10>

<br>

### Log Experiment Confiuration to TensorBoard
---

In [17]:
with tf.summary.create_file_writer(parent_dir).as_default():
    hp.hparams_config(
        hparams=[HP_INPUT_VAR_NINE, HP_OPTIMIZER, HP_LEARN_RATE, HP_WEIGHT_DECAY, HP_BATCH_SIZE, HP_EPOCHS],
        metrics=[
            hp.Metric('train_mae_mu',   display_name='Training Sample MAE µ'),
            hp.Metric('train_mae_sig',  display_name='Training Sample MAE σ'),
            hp.Metric('train_mad_mu',   display_name='Training Sample MAD µ'),
            hp.Metric('train_mad_sig',  display_name='Training Sample MAD σ'),
            hp.Metric('train_mse_mu',   display_name='Training Sample MSE µ'),
            hp.Metric('train_mse_sig',  display_name='Training Sample MSE σ'),
            hp.Metric('train_rmse_mu',   display_name='Training Sample RMSE µ'),
            hp.Metric('train_rmse_sig',  display_name='Training Sample RMSE σ'),
            hp.Metric('train_corr_mu',  display_name='Training Sample Correlation µ'),
            hp.Metric('train_corr_sig', display_name='Training Sample Correlation σ'),
            hp.Metric('test_mae_mu',   display_name='Test Sample MAE µ'),
            hp.Metric('test_mae_sig',  display_name='Test Sample MAE σ'),
            hp.Metric('test_mad_mu',   display_name='Test Sample MAD µ'),
            hp.Metric('test_mad_sig',  display_name='Test Sample MAD σ'),
            hp.Metric('test_mse_mu',   display_name='Test Sample MSE µ'),
            hp.Metric('test_mse_sig',  display_name='Test Sample MSE σ'),
            hp.Metric('test_rmse_mu',   display_name='Test Sample RMSE µ'),
            hp.Metric('test_rmse_sig',  display_name='Test Sample RMSE σ'),
            hp.Metric('test_corr_mu',  display_name='Test Sample Correlation µ'),
            hp.Metric('test_corr_sig', display_name='Test Sample Correlation σ'),            

        ],
    )

<br>

### Run Model
---

In [18]:
%%time
results = HPARAMS.map(lambda HPARAM: TuneModel(HPARAM)).compute()

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/site-packages/IPython/core/magics/execution.py", line 1321, in time
    exec(code, glob, local_ns)
  File "<timed exec>", line 1, in <module>
  File "/opt/conda/lib/python3.8/site-packages/dask/base.py", line 283, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/dask/base.py", line 565, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 2654, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 1963, in gather
    return self.sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/client.py", line 837, in sync
    return sync(
  File "/opt/conda/lib/python3.8/site-packages/distributed/utils.py", line 348, in sync
    e.wait(10)
  File "/opt/conda/lib/python3.8/thr



TypeError: object of type 'NoneType' has no len()



<br>

### Close Client after finishing / before using it in another Notebook
***

In [None]:
client.close()

<br>

## 5. Results

In [None]:
%tensorboard --logdir logs/badr_gridsearch1/

<br>
<br>

***
***
***
<br>
<br>

In [None]:
with open(f'{parent_dir}readme.py', 'a') as file:
    file.write(hallo)
    

In [None]:
%%writefile logs/testrun/README.py

#hallo

In [None]:
BuildModel().summary()

In [None]:
%%writefile logs/hallo.txt

a = 1 + 

In [None]:
def do(x):
    print(x + 1)
    
    return None

In [None]:
x = 1

In [None]:
b = do(x)
b

In [None]:
b = [do(l) for l in range(5)]
b

In [None]:
b