In [2]:
import json
import os
import sys
import tempfile
from time import time
from typing import List, Dict, Any, Tuple

import mlflow
from mlflow import MlflowClient
import ray
from ray import train
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch import TorchTrainer
import requests
import torch
from torch import nn
from torch import optim

import data_utilities as du
import torch_utilities as tu

device = torch.device('cpu')  #torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [3]:
os.environ['MLFLOW_TRACKING_URI'] = "https://dagshub.com/xsqian/dagshub-demo.mlflow"
os.environ['MLFLOW_TRACKING_USERNAME'] = "xsqian"
os.environ['MLFLOW_TRACKING_PASSWORD'] = "rfg8@5LZ88igXX@"

### Ray Train

In [8]:
def initialize_ray():
    runtime_env = {
        'pip': ['minio', 'mlflow']
    }
    ray.init(runtime_env=runtime_env)


def log_metric(base_url: str, run_id: str, metric: Dict[str, float]) -> int:
    '''Log a metric dict for the given run.'''
    base_url = f'{base_url}/api/2.0/mlflow'
    url = base_url + '/runs/log-metric'
    payload = {
        "run_id": run_id,
        "key": metric["key"],
        "value": metric["value"],
        "timestamp": mlflow.utils.time.get_current_time_millis(),
        "step": metric["step"],
    }
    r = requests.post(url, json=payload)
    return r.status_code


def get_minio_run_config():
    import s3fs
    import pyarrow.fs

    s3_fs = s3fs.S3FileSystem(
        key = os.environ['MINIO_ACCESS_KEY'],
        secret = os.environ['MINIO_SECRET_ACCESS_KEY'],
        endpoint_url = 'http://localhost:9000' #os.environ['MINIO_URL']
    )
    custom_fs = pyarrow.fs.PyFileSystem(pyarrow.fs.FSSpecHandler(s3_fs))

    run_config = train.RunConfig(storage_path='ray-train', storage_filesystem=custom_fs)
    return run_config


def train_model(model: tu.MNISTModel, train_data: ray.data.dataset.MaterializedDataset, training_parameters: Dict[str, Any]) -> Dict[str, Any]:
    loss_func = nn.NLLLoss()
    optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'], momentum=training_parameters['momentum'])

    for epoch in range(training_parameters['epochs']):
        total_loss = 0
        batch_count = 0
        for batch in train_data.iter_torch_batches(batch_size=training_parameters['batch_size_per_worker']):
            # Get the images and labels from the batch.
            images, labels = batch['X'], batch['y']
            labels = labels.type(torch.LongTensor)   # casting to long
            images, labels = images.to(device), labels.to(device)

            # Flatten MNIST images into a 784 long vector.
            images = images.view(images.shape[0], -1)

            # Training pass
            optimizer.zero_grad()

            output = model(images)
            loss = loss_func(output, labels)

            # This is where the model learns by backpropagating
            loss.backward()

            # And optimizes its weights here
            optimizer.step()
            
            total_loss += loss.item()
            batch_count +=1
            
        #ray.train.report({'training_loss': total_loss/len(loader)})
        print("Epoch {} - Training loss: {}".format(epoch+1, total_loss/batch_count))

    #return training_metrics


def train_func_per_worker(training_parameters):
    logger = du.create_logger()
    
    # Train the model and log training metrics.
    model = tu.MNISTModel(training_parameters['input_size'], training_parameters['hidden_sizes'], 
                          training_parameters['output_size'])
    model = ray.train.torch.prepare_model(model)

    # Get the dataset shard for the training worker.
    train_data_shard = train.get_dataset_shard('train')

    loss_func = nn.NLLLoss()
    optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'], momentum=training_parameters['momentum'])

    metrics = {}
    batch_size_per_worker = training_parameters['batch_size_per_worker']
    for epoch in range(training_parameters['epochs']):
        total_loss = 0
        batch_count = 0
        for batch in train_data_shard.iter_torch_batches(batch_size=batch_size_per_worker):
            # Get the images and labels from the batch.
            images, labels = batch['X'], batch['y']
            labels = labels.type(torch.LongTensor)   # casting to long
            images, labels = images.to(device), labels.to(device)

            # Flatten MNIST images into a 784 long vector.
            images = images.view(images.shape[0], -1)
        
            # Training pass
            optimizer.zero_grad()            
            output = model(images)

            loss = loss_func(output, labels)
            
            # This is where the model learns by backpropagating
            loss.backward()
            
            # And optimizes its weights here
            optimizer.step()
            
            total_loss += loss.item()
            batch_count += 1

        metrics = {'training_loss': total_loss/batch_count}
        checkpoint = None
        if train.get_context().get_world_rank() == 0:
            temp_dir = os.path.join(os.getcwd(), 'checkpoint')
            torch.save(model.module.state_dict(), os.path.join(temp_dir, 'mnist_model.pt'))
            checkpoint = Checkpoint.from_directory(temp_dir)
            mlflow_metric = {}
            mlflow_metric['key'] = 'training_loss'
            mlflow_metric['value'] = loss.item()
            mlflow_metric['step'] = epoch+1
            log_metric(training_parameters['mlflow_base_url'], training_parameters['run_id'], mlflow_metric)

        train.report(metrics, checkpoint=checkpoint)
        #logger.info('Sending metrics:')
        #logger.info(metrics)


def distributed_training(training_parameters, num_workers: int, use_gpu: bool):
    logger = du.create_logger()

    # Setup mlflow to point to our server.
    experiment_name = 'MLFlow - Ray test'
    run_name = 'Testing Epoch metrics'
    mlflow_base_url = os.environ['MLFLOW_TRACKING_URI']
    mlflow.set_tracking_uri(mlflow_base_url)
    active_experiment = mlflow.set_experiment(experiment_name)
    active_run = mlflow.start_run(run_name=run_name)
    training_parameters['mlflow_base_url'] = mlflow_base_url
    training_parameters['run_id'] = active_run.info.run_id
    # Log parameters
    mlflow.log_params(training_parameters)
    
    logger.info('Initializing Ray.')
    initialize_ray()

    train_data, test_data, load_time_sec = du.get_ray_dataset(training_parameters)

    # Scaling configuration
    scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)

    # Initialize a Ray TorchTrainer
    start_time = time()
    trainer = TorchTrainer(
        train_loop_per_worker=train_func_per_worker,
        train_loop_config=training_parameters,
        datasets={'train': train_data},
        scaling_config=scaling_config,
        run_config= train.RunConfig(storage_path=os.getcwd(), name="ray_experiments") #get_minio_run_config()
    )
    result = trainer.fit()
    training_time_sec = (time()-start_time)

    logger.info(result)
    logger.info(f'Load Time (in seconds) = {load_time_sec}')
    logger.info(f'Training Time (in seconds) = {training_time_sec}')
    
    model = tu.MNISTModel(training_parameters['input_size'], training_parameters['hidden_sizes'], training_parameters['output_size'])
    with result.checkpoint.as_directory() as checkpoint_dir:
        model.load_state_dict(torch.load(os.path.join(checkpoint_dir, "model.pt")))
    tu.test_model(model, test_data)

    # Shut down Ray    
    ray.shutdown()
    # End the run
    mlflow.end_run()


def local_training(training_parameters):
    logger = du.create_logger()

    train_data, test_data, load_time_sec = du.get_ray_dataset(training_parameters)

    # Train the model and log training metrics.
    model = tu.MNISTModel(training_parameters['input_size'], training_parameters['hidden_sizes'], training_parameters['output_size'])
    model.to(device)
    logger.info(f'Model created on device {device}')
    
    start_time = time()
    train_model(model, train_data, training_parameters)
    training_time_sec = (time()-start_time)
    logger.info(f'Load Time (in seconds) = {load_time_sec}')
    logger.info(f'Training Time (in seconds) = {training_time_sec}')

    tu.test_model(model, test_data)

AttributeError: module 'torch_utilities' has no attribute 'MNISTModel'

In [6]:
import torch_utilities as tu


In [3]:
tempfile.TemporaryDirectory()

<TemporaryDirectory '/var/folders/_5/jt7lb09d49n9qscq4l2m3sph0000gn/T/tmpf2ryzk7t'>

In [4]:
temp_dir = os.path.join(os.getcwd(), 'checkpoint')
temp_dir

'/Users/keithpij/Documents/code/ds-engine/ray_train/checkpoint'

In [5]:
with tempfile.TemporaryDirectory() as temp_dir:
    print(os.path.join(temp_dir, 'mnist_model.pt'))

/var/folders/_5/jt7lb09d49n9qscq4l2m3sph0000gn/T/tmpk6j4jvp_/mnist_model.pt


In [6]:
# Load the credentials and connection information.
with open('credentials.json') as f:
    credentials = json.load(f)

os.environ['MINIO_URL'] = credentials['url']
os.environ['MINIO_ACCESS_KEY'] = credentials['accessKey']
os.environ['MINIO_SECRET_ACCESS_KEY'] = credentials['secretKey']

num_workers = 2
use_gpu = False

# training configuration
training_parameters = {
    'batch_size_per_worker': 64 // num_workers,
    'epochs': 3,
    'input_size': 784,
    'hidden_sizes': [1024, 1024, 1024, 1024],
    'lr': 0.025,
    'momentum': 0.5,
    'output_size': 10,
    'smoke_test_size': 0
    }

distributed_training(training_parameters, num_workers, use_gpu)
#local_training(training_parameters)

0,1
Current time:,2023-12-28 16:54:29
Running for:,00:00:18.13
Memory:,17.2/32.0 GiB

Trial name,# failures,error file
TorchTrainer_a1faa_00000,1,/Users/keithpij/ray_results/TorchTrainer_2023-12-28_16-54-10/TorchTrainer_a1faa_00000_0_2023-12-28_16-54-10/error.txt

Trial name,status,loc
TorchTrainer_a1faa_00000,ERROR,127.0.0.1:3768


[36m(TorchTrainer pid=3768)[0m Starting distributed worker processes: ['3776 (127.0.0.1)', '3777 (127.0.0.1)']
[36m(RayTrainWorker pid=3776)[0m Setting up process group for: env:// [rank=0, world_size=2]




[36m(RayTrainWorker pid=3776)[0m [/Users/runner/work/pytorch/pytorch/pytorch/third_party/gloo/gloo/transport/uv/libuv.h:596] uv_accept: invalid argument
[36m(RayTrainWorker pid=3776)[0m *** SIGABRT received at time=1703800468 ***
[36m(RayTrainWorker pid=3776)[0m PC: @        0x1897be0dc  (unknown)  __pthread_kill
[36m(RayTrainWorker pid=3776)[0m     @        0x108161fc8  (unknown)  absl::lts_20220623::WriteFailureInfo()
[36m(RayTrainWorker pid=3776)[0m     @        0x108161d14  (unknown)  absl::lts_20220623::AbslFailureSignalHandler()
[36m(RayTrainWorker pid=3776)[0m     @        0x189825a24  (unknown)  _sigtramp
[36m(RayTrainWorker pid=3776)[0m     @        0x1897f5cc0  (unknown)  pthread_kill
[36m(RayTrainWorker pid=3776)[0m     @        0x189701a40  (unknown)  abort
[36m(RayTrainWorker pid=3776)[0m     @        0x304e7d58c  (unknown)  gloo::transport::uv::Device::listenCallback()
[36m(RayTrainWorker pid=3776)[0m     @        0x304e8e254  (unknown)  gloo::transpor

3521 2023-12-28 16:54:29,012 | ERROR | Trial task failed for trial TorchTrainer_a1faa_00000
Traceback (most recent call last):
  File "/Users/keithpij/Documents/code/ds-engine/.venv/lib/python3.10/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
    result = ray.get(future)
  File "/Users/keithpij/Documents/code/ds-engine/.venv/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/Users/keithpij/Documents/code/ds-engine/.venv/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/Users/keithpij/Documents/code/ds-engine/.venv/lib/python3.10/site-packages/ray/_private/worker.py", line 2563, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TypeError): [36mray::_Inner.train()[39m (pid=3768, ip=127.0.0.1, actor_id=ce39f17c48211ac6304f239001000000, repr=TorchTrainer)
  File "/tmp

TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: `trainer = TorchTrainer.restore("/Users/keithpij/ray_results/TorchTrainer_2023-12-28_16-54-10")`.
To start a new run that will retry on training failures, set `train.RunConfig(failure_config=train.FailureConfig(max_failures))` in the Trainer's `run_config` with `max_failures > 0`, or `max_failures = -1` for unlimited retries.

In [None]:
ray.shutdown()
mlflow.end_run()