In [8]:
import os, shutil
from typing import Any, Dict, Tuple, Union

import mlflow
import numpy as np
import torch
import gymnasium

from stable_baselines3 import DDPG, SAC, TD3, PPO, A2C
from stable_baselines3.common.logger import KVWriter, Logger, Video
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.evaluation import evaluate_policy
from moviepy.editor import ImageSequenceClip

from pytz import timezone
from datetime import datetime

TZ = timezone('Europe/Moscow')
models = {'DDPG':DDPG, 'SAC':SAC, 'TD3':TD3, 'PPO':PPO, 'A2C':A2C}
activations = {'ReLU':torch.nn.ReLU, 'Tanh':torch.nn.Tanh}
device = "cuda" if torch.cuda.is_available() else "cpu"

In [9]:
class MLflowCheckpointCallback(BaseCallback):
    """
    Callback for saving a model every ``save_freq`` calls
    to ``env.step()``.
    By default, it only saves model checkpoints,
    you need to pass ``save_replay_buffer=True``,
    and ``save_vecnormalize=True`` to also save replay buffer checkpoints
    and normalization statistics checkpoints.

    .. warning::

      When using multiple environments, each call to  ``env.step()``
      will effectively correspond to ``n_envs`` steps.
      To account for that, you can use ``save_freq = max(save_freq // n_envs, 1)``

    :param save_freq: Save checkpoints every ``save_freq`` call of the callback.
    :param save_path: Path to the folder where the model will be saved.
    :param save_replay_buffer: Save the model replay buffer
    :param save_vecnormalize: Save the ``VecNormalize`` statistics
    :param verbose: Verbosity level: 0 for no output, 2 for indicating when saving model checkpoint
    """

    def __init__(
        self,
        eval_env: gymnasium.Env,
        save_freq: int,
        save_path: str,
        save_replay_buffer: bool = False,
        save_vecnormalize: bool = False,
        verbose: int = 0,
    ):
        super().__init__(verbose)
        self.eval_env = eval_env
        self.save_freq = save_freq
        self.save_path = save_path
        self.save_replay_buffer = save_replay_buffer
        self.save_vecnormalize = save_vecnormalize

    def _checkpoint_path(self, checkpoint_type: str = "") -> str:
        return f'{checkpoint_type}{self.save_path}/{self.n_calls}'

    def _on_step(self) -> bool:
        if self.n_calls % self.save_freq == 0:

            save_path = self._checkpoint_path()
            os.mkdir(save_path)
            model_path = save_path + '/model.zip'
            self.model.save(model_path)
            mlflow.log_artifact(model_path, save_path + '/sb3')

            if self.verbose >= 2:
                print(f"Saving model checkpoint to {save_path}")

            if self.save_replay_buffer and hasattr(self.model, "replay_buffer") and self.model.replay_buffer is not None:
                # If model has a replay buffer, save it too

                replay_buffer_path = save_path + "/replay_buffer.pkl"
                self.model.save_replay_buffer(replay_buffer_path)  # type: ignore[attr-defined]
                mlflow.log_artifact(replay_buffer_path, save_path + '/replay_buffer')
                if self.verbose > 1:
                    print(f"Saving model replay buffer checkpoint to {replay_buffer_path}")

            if self.save_vecnormalize and self.model.get_vec_normalize_env() is not None:
                # Save the VecNormalize statistics
                vec_normalize_path = save_path + "/vecnormalize.pkl"
                self.model.get_vec_normalize_env().save(vec_normalize_path)  # type: ignore[union-attr]
                mlflow.log_artifact(vec_normalize_path, save_path + '/vecnormalize')
                if self.verbose >= 2:
                    print(f"Saving model VecNormalize to {vec_normalize_path}")

            
        return True


In [10]:


class MLflowOutputFormat(KVWriter):
    """
    Dumps key/value pairs into MLflow's numeric format.
    """

    def write(
        self,
        key_values: Dict[str, Any],
        key_excluded: Dict[str, Union[str, Tuple[str, ...]]],
        step: int = 0,
    ) -> None:

        for (key, value), (_, excluded) in zip(
            sorted(key_values.items()), sorted(key_excluded.items())
        ):

            if excluded is not None and "mlflow" in excluded:
                continue

            if isinstance(value, np.ScalarType):
                if not isinstance(value, str):
                    mlflow.log_metric(key, value, step)


In [11]:
MLFLOW_TRACKING_URI = "http://192.168.0.206:2670" 

mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

In [12]:
exp_params = {
    'env_name': 'Pendulum-v1',
    'algorithm_name': 'SAC',
    'exp_id': f'exp_{datetime.now(TZ).strftime("%d%m_%H%M%S")}',
    'seed': 21,
    'net': {
        'activation': 'ReLU',
        'pi': [256, 256],
        'qf': [256, 256],
        'vf': None,
    },
    'training': {
        'iteration_count': 1,
        'episode_count': 1000,
        'policy': 'MlpPolicy',
        'learning_rate': 0.003,
        'buffer_size': 1000000,
        'learning_starts': 100,
        'batch_size': 256,
        'tau': 0.005,
        'gamma': 0.99,
        'verbose': 0,
        'device': device,
    },
    'validation': {
        'validate_agent_every_n_eps': 300,
        'log_interval': 10
    },
    'evaluation': {
        'episode_count': 1
    }
}

loggers = Logger(
    folder=None,
    output_formats=[MLflowOutputFormat()],
)

policy_kwargs = dict(activation_fn = activations[exp_params['net']['activation']],
                     net_arch = dict(pi=exp_params['net']['pi'], qf=exp_params['net']['qf']))

exp_name = 'test_' + exp_params['env_name'] + '_' + exp_params['algorithm_name'] + '_' + exp_params['exp_id']
print(exp_name)
os.mkdir(exp_name)


test_Pendulum-v1_SAC_exp_0109_145419


In [13]:
def VideoRecord(eval_env: gymnasium.Env, path: str):
    
    #Video
    load_model = models[exp_params['algorithm_name']].load(path+'model.zip', eval_env)

    screens = []
    def grab_screens(_locals: Dict[str, Any], _globals: Dict[str, Any]) -> None:
        screen = eval_env.render()
        screens.append(screen)

    evaluate_policy(
        load_model,
        eval_env,
        callback=grab_screens,
        n_eval_episodes=1,
        deterministic=True,
    )
    
    video_path = path + 'agent.mp4'
    clip = ImageSequenceClip(screens, fps=30)
    clip.write_videofile(video_path)
    return 'agent.mp4'

In [14]:

save_callback = MLflowCheckpointCallback(
  eval_env=gymnasium.make(exp_params['env_name'], render_mode="rgb_array"),
  save_freq=exp_params['validation']['validate_agent_every_n_eps'],
  save_path=exp_name,
  verbose=0,
)

exp = mlflow.set_experiment(exp_name)
exp.experiment_id

with mlflow.start_run() as run:
    model = models[exp_params['algorithm_name']](exp_params['training']['policy'], 
                                                 exp_params['env_name'],
                                                 policy_kwargs=policy_kwargs,
                                                 learning_rate=exp_params['training']['learning_rate'],
                                                 buffer_size=exp_params['training']['buffer_size'],
                                                 learning_starts=exp_params['training']['learning_starts'],
                                                 batch_size=exp_params['training']['batch_size'],
                                                 tau=exp_params['training']['tau'],
                                                 gamma=exp_params['training']['gamma'],
                                                 verbose=exp_params['training']['verbose'],
                                                 device=device,
                                                 )
    
    mlflow.log_dict(exp_params, exp_name + '/parameters.json')
    mlflow.log_params(exp_params)
    
    model.set_logger(loggers)
    model.learn(total_timesteps=exp_params['training']['episode_count'],
                log_interval=exp_params['validation']['log_interval'],
                callback = save_callback,
                progress_bar=True
                )
    
    model.save(exp_name + '/model.zip')
    
    mlflow.pytorch.log_model(model.actor, exp_name + '/actor')

    mlflow.log_artifact(exp_name + '/model.zip', exp_name + '/sb3')

    dirs = os.listdir(exp_name)
    dirs = [exp_name+'/'+ f + '/' for f in dirs if os.path.isdir(exp_name+'/'+f)]
    dirs.append(exp_name+'/')
    for dir in dirs:
      video = VideoRecord(model.env, dir)
      mlflow.log_artifact(dir + video, dir + 'video')

#model_uri = f"runs:/{run.info.run_id}/{exp_params['algorithm_name']}"
#print(model_uri)
#mv = mlflow.register_model(model_uri, exp_name)


shutil.rmtree(os.path.join(exp_name))

2024/09/01 14:54:23 INFO mlflow.tracking.fluent: Experiment with name 'test_Pendulum-v1_SAC_exp_0109_145419' does not exist. Creating a new experiment.


Output()



Moviepy - Building video test_Pendulum-v1_SAC_exp_0109_145419/300/agent.mp4.
Moviepy - Writing video test_Pendulum-v1_SAC_exp_0109_145419/300/agent.mp4



                                                               

Moviepy - Done !
Moviepy - video ready test_Pendulum-v1_SAC_exp_0109_145419/300/agent.mp4
Moviepy - Building video test_Pendulum-v1_SAC_exp_0109_145419/600/agent.mp4.
Moviepy - Writing video test_Pendulum-v1_SAC_exp_0109_145419/600/agent.mp4



                                                               

Moviepy - Done !
Moviepy - video ready test_Pendulum-v1_SAC_exp_0109_145419/600/agent.mp4
Moviepy - Building video test_Pendulum-v1_SAC_exp_0109_145419/900/agent.mp4.
Moviepy - Writing video test_Pendulum-v1_SAC_exp_0109_145419/900/agent.mp4



                                                               

Moviepy - Done !
Moviepy - video ready test_Pendulum-v1_SAC_exp_0109_145419/900/agent.mp4
Moviepy - Building video test_Pendulum-v1_SAC_exp_0109_145419/agent.mp4.
Moviepy - Writing video test_Pendulum-v1_SAC_exp_0109_145419/agent.mp4



                                                               

Moviepy - Done !
Moviepy - video ready test_Pendulum-v1_SAC_exp_0109_145419/agent.mp4


In [None]:
os.mkdir(exp_name)

In [None]:
exp.experiment_id

In [None]:
experiment = mlflow.get_experiment("37")

In [None]:
experiment

In [None]:
art_loc = f'{experiment.artifact_location}/f71979d6fa3e4fe3a1caa8adae3714c9/artifacts/{experiment.name}/'
art_loc

In [None]:
mlflow.artifacts.download_artifacts(artifact_uri=art_loc + 'sb3/model.zip', dst_path=exp_name)