# Fit feedforward Neural Network model With Dask
This notebook takes the "Fit feedforward Neural Network model" notebook and parallelizes the processes using Dask.
It will skip over explanation of code unrelated to Dask. Refer to the "Fit feedforward Neural Network model" notebook for more details on this notebook.

First import packages, and initialize the scheduler

In [None]:
import joblib
from besos import eppy_funcs as ef, sampling
from besos.evaluator import EvaluatorEP, EvaluatorGeneric
from besos.problem import EPProblem
from dask.distributed import Client
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.neural_network import MLPRegressor
from sklearn.preprocessing import StandardScaler
import warnings
from parameter_sets import parameter_set

In [None]:
from dask.distributed import Client
client = Client()
client

The evaluator can be parallized by passing in `multi=True`

In [None]:
parameters = parameter_set(7)
problem = EPProblem(parameters, ["Electricity:Facility"])
building = ef.get_building()
evaluator = EvaluatorEP(problem, building, multi=True)

When df_apply is called, the dataframe will be processed concurrently. By passing in the `processes` parameter you can define the number of paritions the dataframe will be divided into.
If you are running this notebook locally, you can open the Dask dashboard. A link is provided by the `client` object (refer to the first cell in the notebook where we initialized `Client`). On the dashboard, you can see what processes are running.

In [None]:
%%time
inputs = sampling.dist_sampler(sampling.lhs, problem, 50)
outputs = evaluator.df_apply(inputs, processes=4)
inputs

## Set up model parameters
In this cell, we setup the model. More detail can be found in the "Fit feedforward Neural Network model"  notebook

In [None]:
train_in, test_in, train_out, test_out = train_test_split(
    inputs, outputs, test_size=0.2
)

scaler = StandardScaler()
inputs = scaler.fit_transform(X=train_in)

scaler_out = StandardScaler()
outputs = scaler_out.fit_transform(X=train_out)

hyperparameters = {
    "hidden_layer_sizes": (
        (len(parameters) * 16,),
        (len(parameters) * 16, len(parameters) * 16),
    ),
    "alpha": [1, 10, 10 ** 3],
}

neural_net = MLPRegressor(max_iter=1000, early_stopping=False)
folds = 3

## Model fitting with Dask

Here, we use the NN model from ScikitLearn.
In a [different example](FitNNTF.ipynb) we use TensorFlow (with and without the Keras wrapper).

Below we parallelize the model fit.
Normally, SciketLearn uses joblib to parallelize model fitting. By specifying the parrallel backend to be Dask, joblib switches over to using the Dask scheduler.
For this example, using Dask may not be any faster. This is because joblib also has the ability to parrallelize accross cores.
An example where this tool would be useful is when Dask is using a ditributed network with access to more cores.

In [None]:
%%time
with joblib.parallel_backend("dask"):
    clf = GridSearchCV(neural_net, hyperparameters, iid=True, cv=folds)
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", category=FutureWarning)
        clf.fit(inputs, outputs.ravel())

print(f"Best performing model $R^2$ score on training set: {clf.best_score_}")
print(f"Model $R^2$ parameters: {clf.best_params_}")
print(
    f"Best performing model $R^2$ score on a separate test set: {clf.best_estimator_.score(scaler.transform(test_in), scaler_out.transform(test_out))}"
)

## Surrogate Modelling Evaluator object
We can wrap the fitted model in a BESOS `Evaluator`.
This has identical behaviour to the original EnergyPlus Evaluator object.

To parrallelize the surrogate model evaluator we simply pass in `multi=True` again.
The parrallelization occurs when calling the df_apply function.

In [None]:
def evaluation_func(ind, scaler=scaler):
    ind = scaler.transform(X=[ind])
    return (scaler_out.inverse_transform(clf.predict(ind))[0],)


NN_SM = EvaluatorGeneric(evaluation_func, problem, multi=True)

## Running a large surrogate evaluation
Here we bump up the sample count to 50,000 and partition the data into 4. (if you have more cores available, feel free to try increasing the proccesses)

In [None]:
%%time
inputs = sampling.dist_sampler(sampling.lhs, problem, 50000)
outputs = NN_SM.df_apply(inputs, processes=4)
results = inputs.join(outputs)
results.head()