In [None]:
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Union

from hyperopt import STATUS_OK, Trials, fmin, tpe
from loguru import logger
from mads_datasets import DatasetFactoryProvider, DatasetType
import mlflow
from mltrainer import ReportTypes, Trainer, TrainerSettings, metrics
from mltrainer.preprocessors import PaddedPreprocessor
from pydantic import BaseModel
import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
preprocessor = PaddedPreprocessor()
gestures_dataset_factory = DatasetFactoryProvider.create_factory(
    DatasetType.GESTURES
)
streamers = gestures_dataset_factory.create_datastreamer(
    batchsize=64,
    preprocessor=preprocessor,
)
train = streamers["train"]
valid = streamers["valid"]
train_streamer = train.stream()
valid_streamer = valid.stream()

In [None]:
mlflow.set_tracking_uri("sqlite:///mlflow.db")
mlflow.set_experiment("mlflow-gestures-rnn-hyperopt")

In [None]:
class ModelConfig(BaseModel):
    input_size: int
    hidden_size: int
    num_layers: int
    output_size: int
    dropout: float = 0.0


class GRU(nn.Module):
    def __init__(
        self,
        config: ModelConfig,
    ) -> None:
        super().__init__()
        self.rnn = nn.GRU(
            input_size=config.input_size,
            hidden_size=config.hidden_size,
            dropout=config.dropout,
            batch_first=True,
            num_layers=config.num_layers,
        )
        self.linear = nn.Linear(config.hidden_size, config.output_size)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x, _ = self.rnn(x)
        last_step = x[:, -1, :]
        yhat = self.linear(last_step)
        return yhat

In [None]:
settings = TrainerSettings(
    epochs=20,
    metrics=[metrics.Accuracy()],
    logdir="modellogs",
    train_steps=100,
    valid_steps=100,
    reporttypes=[ReportTypes.MLFLOW, ReportTypes.TOML],
)

In [None]:
def objective(params: Dict[str, Union[int, float]]) -> Dict[str, Any]:
    with mlflow.start_run():
        mlflow.set_tag("model", "recurrent-neural-network")
        mlflow.set_tag("dev", "vanesterik")
        mlflow.log_params(params)

        model = GRU(ModelConfig(**params))
        trainer = Trainer(
            model=model,
            settings=settings,
            loss_fn=nn.CrossEntropyLoss(),
            optimizer=optim.Adam,
            traindataloader=train_streamer,
            validdataloader=valid_streamer,
            scheduler=optim.lr_scheduler.ReduceLROnPlateau,
            device=torch.device("mps"),
        )
        trainer.loop()

        tag = datetime.now().strftime("%Y%m%d-%H%M")
        models_dir = Path("models").resolve()

        if not models_dir.exists():
            models_dir.mkdir()
            logger.info(f"Created {models_dir}")

        models_path = models_dir / (tag + "model.pt")
        torch.save(model, models_path)

        mlflow.log_artifact(
            local_path=models_path, artifact_path="pytorch_models"
        )

        return {"loss": trainer.test_loss, "status": STATUS_OK}


In [None]:
search_space = {
    "input_size": 3,
    "hidden_size": 64,
    "num_layers": 1,
    "output_size": 20,
    "dropout": 0.1,
}

In [None]:
results = fmin(
    fn=objective,
    space=search_space,
    algo=tpe.suggest,
    max_evals=1,
    trials=Trials(),
)

In [None]:
logger.info(f"\n\n{results}")