In [1]:
import os
import sys
import numpy as np
import pandas as pd

import hyperopt
from hyperopt.pyll.base import scope

from pyspark.sql import SparkSession
import mlflow

from sklearn import metrics
from sklearn.model_selection import cross_val_predict

from tensorflow import keras

from keras.layers import Dense, Input
from keras.models import Sequential
from keras.regularizers import l2, l1
from scikeras.wrappers import KerasRegressor

In [2]:
os.environ['PYSPARK_PYTHON'] = "python"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [8]:
spark.stop()
spark = SparkSession.builder.master("local[4]").appName("INDEL_PARAM_SEARCH").getOrCreate()

In [6]:
data = pd.read_csv("./data/Lindel_training.txt", sep='\t', header=None)
x_t = np.array(data.iloc[:, 1:3034])  # the full one hot encoding
# 557 observed outcome frequencies
y_t = np.array(data.iloc[:, 3034:])
        
x_t = x_t[:, -384:]

y_ins = np.sum(y_t[:, -21:], axis=1)
y_del = np.sum(y_t[:, :-21], axis=1)

y_t = np.array([[0, 1] if y_ins > y_del else [1, 0]
                for y_ins, y_del in zip(y_ins, y_del)]).astype('float32')

train_size = round(len(x_t) * 0.9)

x_train, x_test = x_t[:train_size, :], x_t[train_size:, :]
y_train, y_test = y_t[:train_size], y_t[train_size:]

In [7]:
def model_creator(
    input_shape,
    num_units=2,
    num_layers=0,
    kernel_regularizer="l2",
    kernel_weight=10**-4,
    activation="relu",
    loss="categorical_crossentropy",
    learning_rate=0.01
):
    kernel_regularizer = l2 if kernel_regularizer == "l2" else l1


    model = Sequential()
    model.add(Input(shape=(input_shape[1])))

    for _ in range(num_layers):
        model.add(Dense(units=num_units, activation=activation, kernel_regularizer=kernel_regularizer(kernel_weight)))
    
    # output layer
    model.add(Dense(2,  activation="softmax", kernel_regularizer=kernel_regularizer(kernel_weight)))

    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)

    model.compile(optimizer=optimizer,
                  loss=loss, metrics=['mse'])

    return model

def regression_metrics(actual, pred):
    return {
        "MAE": metrics.mean_absolute_error(actual, pred),
        "RMSE": np.sqrt(metrics.mean_absolute_error(actual, pred)),
        "MSE": metrics.mean_squared_error(actual, pred),
    }

def fit_and_log_cv(x_train, y_train, x_test, y_test, params, nested=False):
    with mlflow.start_run(nested=nested, experiment_id=1) as run:
        print(params)
        print(x_train.shape)
        print(x_test.shape)
        print(y_train.shape)
        print(y_test.shape)

        model_cv = KerasRegressor(
            model=model_creator,
            input_shape=x_train.shape, 
            batch_size=32,
            **params
        )
        
        y_pred_cv = cross_val_predict(model_cv, x_train, y_train)
        # print(y_pred_cv.shape)
        # print(y_pred_cv)
        metrics_cv = {f"val_{metric}": value for metric, value in regression_metrics(y_train, y_pred_cv).items()}

        mlflow.tensorflow.autolog()
        model = KerasRegressor(
            model=model_creator,
            input_shape=x_train.shape, 
            **params
        )
        model.fit(x_train, y_train)
        y_pred_test = model.predict(x_test)
        metrics_test = {f"test_{metric}": value for metric, value in regression_metrics(y_test, y_pred_test).items()}

        metrics = {**metrics_test, **metrics_cv}
        mlflow.log_metrics(metrics)
        mlflow.log_params(params)

        return metrics

def mse(x, y):
    return ((x-y)**2).mean()

def log_best(run: mlflow.entities.Run,
         metric: str) -> None:
        """Log the best parameters from optimization to the parent experiment.

        Args:
            run: current run to log metrics
            metric: name of metric to select best and log
        """

        client = mlflow.tracking.MlflowClient()
        runs = client.search_runs(
            [run.info.experiment_id],
            "tags.mlflow.parentRunId = '{run_id}' ".format(run_id=run.info.run_id))

        best_run = min(runs, key=lambda run: run.data.metrics[metric])

        mlflow.set_tag("best_run", best_run.info.run_id)
        mlflow.log_metric(f"best_{metric}", best_run.data.metrics[metric])

def train_model_v2( 
        x_train,
        y_train,
        x_test,
        y_test,
        metric:str
    ):
        def train_func(params):
            metrics = fit_and_log_cv(x_train, y_train, x_test, y_test, params, nested=True)
            print(metrics)
            print({'status': hyperopt.STATUS_OK, 'loss': metrics[metric]})
            return {'status': hyperopt.STATUS_OK, 'loss': float(metrics[metric])}
        
        return train_func

In [9]:
MAX_EVALS = 4
METRIC = "val_RMSE"
# Number of experiments to run at once
PARALLELISM = 2

space = {
    'num_units': scope.int(hyperopt.hp.quniform('num_units', 2, 64, 2)),
    'num_layers': scope.int(hyperopt.hp.quniform('num_iterations', 0, 10, 1)),
    # The parameters below are cast to int using the scope.int() wrapper
    'kernel_regularizer': hyperopt.hp.choice('kernel_regularizer', ['l1', 'l2']),
    'kernel_weight': 10**hyperopt.hp.quniform('kernel_weight', -10, -1, 1),
    'activation': hyperopt.hp.choice('activation', ['relu', 'sigmoid']),
    'learning_rate': hyperopt.hp.choice('learning_rate', [0.01, 0.001, 0.0001])
}

litte_space = {
    'num_units': scope.int(hyperopt.hp.quniform('num_units', 2, 4, 2)),
    'num_layers': scope.int(hyperopt.hp.quniform('num_iterations', 0, 1, 1)),
    # The parameters below are cast to int using the scope.int() wrapper
    'kernel_regularizer': hyperopt.hp.choice('kernel_regularizer', ['l1']),
    'kernel_weight': 10**hyperopt.hp.quniform('kernel_weight', -10, -9, 1),
    'activation': hyperopt.hp.choice('activation', ['relu']),
    'learning_rate': hyperopt.hp.choice('learning_rate', [0.01])
}



trials = hyperopt.SparkTrials(parallelism=PARALLELISM, spark_session=spark)
# trials = hyperopt.SparkTrials()
train_objective = train_model_v2(x_train, y_train, x_test, y_test, METRIC)

with mlflow.start_run(experiment_id=1) as run:
    search_run_id = run.info.run_id
    experiment_id = run.info.experiment_id

    print(search_run_id)
    print(experiment_id)

    hyperopt.fmin(fn=train_objective,
                        space=litte_space,
                        algo=hyperopt.tpe.suggest,
                        max_evals=MAX_EVALS,
                        trials=trials)
    log_best(run, METRIC)

715335af2915485bbedf014fdd1a0ad2
1
  0%|          | 0/4 [00:00<?, ?trial/s, best loss=?]

trial task 0 failed, exception is An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.0.120 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:595)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:577)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:718)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator

  0%|          | 0/4 [00:45<?, ?trial/s, best loss=?]


trial task 2 failed, exception is An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2) (192.168.0.120 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:595)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:577)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:718)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:695)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator

KeyboardInterrupt: 