In [1]:
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F
import pandas as pd
from typing import Dict 


import ray
# from ray import air, tune
# from ray.tune.schedulers import ASHAScheduler

import rtdl
from tablebench.core import TabularDataset, TabularDatasetConfig

from tablebench.datasets.experiment_configs import EXPERIMENT_CONFIGS
from tablebench.models import get_estimator

In [2]:
experiment = "adult"
expt_config = EXPERIMENT_CONFIGS[experiment]

dataset_config = TabularDatasetConfig()
dset = TabularDataset(experiment,
                      config=dataset_config,
                      splitter=expt_config.splitter,
                      grouper=expt_config.grouper,
                      preprocessor_config=expt_config.preprocessor_config,
                      **expt_config.tabular_dataset_kwargs)

[DEBUG] not downloading https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data; exists at tmp/adult.data
[DEBUG] not downloading https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.names; exists at tmp/adult.names
[DEBUG] not downloading https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test; exists at tmp/adult.test
[DEBUG] dropping data columns not in FeatureList: ['fnlwgt']
[DEBUG] checking feature Age
[DEBUG] casting feature Age from dtype int64 to dtype float
[DEBUG] checking feature Workclass
[DEBUG] casting feature Workclass from dtype object to dtype CategoricalDtype
[DEBUG] checking feature Education-Num
[DEBUG] casting feature Education-Num from dtype int64 to dtype CategoricalDtype
[DEBUG] checking feature Marital Status
[DEBUG] casting feature Marital Status from dtype object to dtype CategoricalDtype
[DEBUG] checking feature Occupation
[DEBUG] casting feature Occupation from dtype object to dtype CategoricalDtype
[D

In [31]:
def make_ray_dataset(dset: TabularDataset, split):
    X, y, G, _ = dset.get_pandas(split)
    df = pd.concat([X, y, G], axis=1)
    df = df.loc[:,~df.columns.duplicated()].copy()

    y_name = y.name
    G_names = G.columns.tolist()
    X_names = X.columns.tolist()

    y_tr = pd.DataFrame(y)

    dataset: ray.data.Dataset = ray.data.from_pandas([df])
    return dataset

from typing import List

def unpack_row(row) -> List:
    "Convert PandasRow to a dict of numpy arrays."
    x = row[X_names].values.astype(float)
    y = row[y_name].values.astype(float)
    g = row[G_names].values.astype(float)
    return {"x": x, "y": y, "g": g}

def _tmp_fn(x):
    print(x)
    return x

train_dataset = make_ray_dataset(dset, "train")
train_dataset = train_dataset.map_batches(unpack_row, batch_format="pandas")

val_dataset = make_ray_dataset(dset, "validation")
val_dataset = val_dataset.map_batches(unpack_row, batch_format="pandas")

# # show one element
# r = train_dataset.take(1)[0]
# print(r)

In [72]:
from ray import tune
from ray.tune import Tuner 
from ray.tune.schedulers import ASHAScheduler
from ray.tune.search.hyperopt import HyperOptSearch
from ray.air.config import RunConfig
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

from ray import train
from ray.air import session, Checkpoint
from ray.train.torch import TorchCheckpoint
import torch.nn as nn
import torch.optim as optim
import torchvision
import sklearn
import scipy
from typing import Tuple

def _cat_preds(prediction):
    # print(f"[JG] predictions shape is {[x.shape for x in prediction]}")
    return torch.cat(prediction).squeeze().cpu().numpy()

def _cat_labs(label):
    # print(f"[JG] label shape is {[x.shape for x in label]}")
    return torch.cat(label).squeeze().cpu().numpy()

@torch.no_grad()
def get_predictions_and_labels(model, loader, as_logits=False) -> Tuple[
    np.ndarray, np.ndarray]:
    """Get the predictions (as logits, or probabilities) and labels."""
    prediction = []
    label = []

    for batch in loader:
        batch_x, batch_y = batch["x"], batch["y"]
        batch_x = batch_x.float()
        batch_y = batch_y.float()
        # TODO(jpgard): handle categorical features here.
        prediction.append(model(batch_x))
        label.append(batch_y)
    # prediction = torch.cat(prediction).squeeze().cpu().numpy()
    prediction = _cat_preds(prediction)
    # target = torch.cat(label).squeeze().cpu().numpy()
    target = _cat_labs(label)
    if not as_logits:
        prediction = scipy.special.expit(prediction)
    return prediction, target


def train_loop_per_worker(config: Dict):
    model = get_estimator("mlp", d_in=dset.X_shape[1], d_layers=[config["d_hidden"]] * config["num_layers"])
    model = train.torch.prepare_model(model)

    criterion = F.binary_cross_entropy_with_logits
    
    optimizer = (
        model.make_default_optimizer()
        if isinstance(model, rtdl.FTTransformer)
        else torch.optim.AdamW(model.parameters(), 
                               lr=config.get("lr", 0.001), 
                               weight_decay=config.get("weight_decay", 0.)))

    train_dataset_shard = session.get_dataset_shard("train")
    val_dataset_shard = session.get_dataset_shard("validation")
    
    # TODO(jpgard): call model.train_epoch() instead of the training logic below.
    
    # Returns the current torch device; useful for sending to a device.
    # train.torch.get_device()
    
    for epoch in range(2):
        model.train()
        print(f"starting epoch {epoch}")
        running_loss = 0.0
        train_dataset_batches = train_dataset_shard.iter_torch_batches(batch_size=config["batch_size"])
        for i, batch in enumerate(train_dataset_batches):
            # get the inputs and labels
            inputs, labels, groups = batch["x"], batch["y"], batch["g"]
            inputs = inputs.float()
            labels = labels.float()

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:  # print every 2000 mini-batches
                print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
                running_loss = 0.0
        # compute the validation accuracy
        model.eval()
        prediction, target = get_predictions_and_labels(
            model, 
            val_dataset_shard.iter_torch_batches(batch_size=config["batch_size"]))
        prediction = np.round(prediction)
        val_acc = sklearn.metrics.accuracy_score(target, prediction)
        
        # Log the metrics for this epoch
        metrics = dict(train_loss=running_loss, validation_accuracy=val_acc)
        checkpoint = TorchCheckpoint.from_state_dict(model.module.state_dict())
        session.report(metrics, checkpoint=checkpoint)


# Hyperparameter search space; note that the scaling_config can also be tuned
# but is fixed here.
param_space = {
    # The params will be merged with the ones defined in the TorchTrainer
    "train_loop_config": {
        # This is a parameter that hasn't been set in the TorchTrainer
        "num_layers": tune.randint(1, 4),
        "lr": tune.loguniform(1e-4, 1e-1),
        "weight_decay": tune.loguniform(1e-4, 1e0),
        "d_hidden": tune.choice([64, 128, 256, 512]),
        # This will overwrite whatever was set when TorchTrainer was instantiated
        # "batch_size": tune.choice([4, 8]),
    },
    # Tune the number of distributed workers
    "scaling_config": ScalingConfig(num_workers=2),
    
    # Note: when num_workers=1, trials seemed to fail with AttributeError 
    # (MLPModel does not have attribute 'module'); not sure why.
    # "scaling_config": ScalingConfig(num_workers=tune.grid_search([1, 2])),
}

# Trainer object that will be passed to each worker.
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={"batch_size": 4, "epochs": 5},
    datasets={"train": train_dataset, "validation": val_dataset},
    scaling_config=ScalingConfig(num_workers=2, use_gpu=False),
)

# To run just a single training iteration (without tuning), can run:
# result = trainer.fit()
# latest_checkpoint = result.checkpoint

# Create Tuner
tuner = Tuner(
    trainable=trainer,
    run_config=RunConfig(name="test_tuner_notebook", local_dir="ray-results"),
    param_space=param_space,
    tune_config=tune.TuneConfig(
        # search_alg=HyperOptSearch(),
        # scheduler=ASHAScheduler(time_attr='training_iteration', 
        #                     metric="train_loss", 
        #                     mode="min", 
        #                     max_t=100, 
        #                     grace_period=1, 
        #                     reduction_factor=4, 
        #                     brackets=1, 
        #                     stop_last_trials=True),
        mode="min", metric="train_loss", 
        num_samples=2, max_concurrent_trials=2),
)

results = tuner.fit()

0,1
Current time:,2022-12-01 14:09:11
Running for:,00:01:44.08
Memory:,5.6/8.0 GiB

Trial name,status,loc,train_loop_config/d_ hidden,train_loop_config/lr,train_loop_config/nu m_layers,train_loop_config/we ight_decay,iter,total time (s),train_loss,validation_accuracy,_timestamp
TorchTrainer_c7007_00000,TERMINATED,127.0.0.1:32064,256,0.000185141,3,0.00512622,2,44.9865,326.86,0.855546,1669925300
TorchTrainer_c7007_00001,TERMINATED,127.0.0.1:32080,64,0.00532164,2,0.00142488,2,36.0985,333.197,0.855669,1669925348


[2m[36m(RayTrainWorker pid=32068)[0m 2022-12-01 14:07:41,139	INFO config.py:87 -- Setting up process group for: env:// [rank=0, world_size=2]
[2m[36m(RayTrainWorker pid=32069)[0m   return torch.as_tensor(ndarray, dtype=dtype, device=device)
[2m[36m(RayTrainWorker pid=32068)[0m 2022-12-01 14:07:45,058	INFO train_loop_utils.py:298 -- Moving model to device: cpu
[2m[36m(RayTrainWorker pid=32068)[0m 2022-12-01 14:07:45,058	INFO train_loop_utils.py:362 -- Wrapping provided model in DistributedDataParallel.
[2m[36m(RayTrainWorker pid=32068)[0m   return torch.as_tensor(ndarray, dtype=dtype, device=device)


[2m[36m(RayTrainWorker pid=32068)[0m starting epoch 0
[2m[36m(RayTrainWorker pid=32069)[0m starting epoch 0
[2m[36m(RayTrainWorker pid=32068)[0m [1,  2000] loss: 0.347
[2m[36m(RayTrainWorker pid=32069)[0m [1,  2000] loss: 0.337


Trial name,_time_this_iter_s,_timestamp,_training_iteration,date,done,episodes_total,experiment_id,experiment_tag,hostname,iterations_since_restore,node_ip,pid,should_checkpoint,time_since_restore,time_this_iter_s,time_total_s,timestamp,timesteps_since_restore,timesteps_total,train_loss,training_iteration,trial_id,validation_accuracy,warmup_time
TorchTrainer_c7007_00000,18.6489,1669925300,2,2022-12-01_14-08-21,True,,0131cf2b9b14427ba52a755565e875e8,"0_d_hidden=256,lr=0.0002,num_layers=3,weight_decay=0.0051",Joshuas-MacBook-Pro-10.local,2,127.0.0.1,32064,True,44.9865,18.6616,44.9865,1669925301,0,,326.86,2,c7007_00000,0.855546,1.64972
TorchTrainer_c7007_00001,13.2529,1669925348,2,2022-12-01_14-09-08,True,,95ecffe44c9740da83711808e8f3d1df,"1_d_hidden=64,lr=0.0053,num_layers=2,weight_decay=0.0014",Joshuas-MacBook-Pro-10.local,2,127.0.0.1,32080,True,36.0985,12.6639,36.0985,1669925348,0,,333.197,2,c7007_00001,0.855669,1.1508


[2m[36m(RayTrainWorker pid=32068)[0m starting epoch 1
[2m[36m(RayTrainWorker pid=32069)[0m starting epoch 1
[2m[36m(RayTrainWorker pid=32068)[0m [2,  2000] loss: 0.315
[2m[36m(RayTrainWorker pid=32069)[0m [2,  2000] loss: 0.305


[2m[36m(RayTrainWorker pid=32084)[0m 2022-12-01 14:08:36,885	INFO config.py:87 -- Setting up process group for: env:// [rank=0, world_size=2]


[2m[36m(RayTrainWorker pid=32085)[0m starting epoch 0
[2m[36m(RayTrainWorker pid=32084)[0m starting epoch 0


[2m[36m(RayTrainWorker pid=32085)[0m   return torch.as_tensor(ndarray, dtype=dtype, device=device)
[2m[36m(RayTrainWorker pid=32084)[0m 2022-12-01 14:08:42,246	INFO train_loop_utils.py:298 -- Moving model to device: cpu
[2m[36m(RayTrainWorker pid=32084)[0m 2022-12-01 14:08:42,246	INFO train_loop_utils.py:362 -- Wrapping provided model in DistributedDataParallel.
[2m[36m(RayTrainWorker pid=32084)[0m   return torch.as_tensor(ndarray, dtype=dtype, device=device)


[2m[36m(RayTrainWorker pid=32085)[0m [1,  2000] loss: 0.332
[2m[36m(RayTrainWorker pid=32084)[0m [1,  2000] loss: 0.345
[2m[36m(RayTrainWorker pid=32085)[0m starting epoch 1
[2m[36m(RayTrainWorker pid=32084)[0m starting epoch 1
[2m[36m(RayTrainWorker pid=32085)[0m [2,  2000] loss: 0.314
[2m[36m(RayTrainWorker pid=32084)[0m [2,  2000] loss: 0.324


2022-12-01 14:09:11,272	INFO tune.py:777 -- Total run time: 104.26 seconds (104.07 seconds for the tuning loop).


In [73]:
best_result = results.get_best_result("train_loss", "min")

print("Best trial config: {}".format(best_result.config))
print("Best trial final loss: {}".format(
    best_result.metrics["train_loss"]))
# print("Best trial final validation accuracy: {}".format(
#     best_result.metrics["accuracy"]))

Best trial config: {'train_loop_config': {'num_layers': 3, 'lr': 0.0001851412960649755, 'weight_decay': 0.005126223049159909, 'd_hidden': 256}, 'scaling_config': {'trainer_resources': None, 'num_workers': 2, 'use_gpu': False, 'resources_per_worker': None, 'placement_strategy': 'PACK', '_max_cpu_fraction_per_node': None}}
Best trial final loss: 326.8599194509443


## Load and normalize CIFAR-10

We'll train our classifier on a popular image dataset called [CIFAR-10](https://www.cs.toronto.edu/~kriz/cifar.html).

First, let's load CIFAR-10 into a Ray Dataset.

In [None]:
import ray
from ray.data.datasource import SimpleTorchDatasource
import torchvision
import torchvision.transforms as transforms

transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)


def train_dataset_factory():
    return torchvision.datasets.CIFAR10(
        root="./data", download=True, train=True, transform=transform
    )


def test_dataset_factory():
    return torchvision.datasets.CIFAR10(
        root="./data", download=True, train=False, transform=transform
    )


In [None]:
train_dataset: ray.data.Dataset = ray.data.read_datasource(
    SimpleTorchDatasource(), dataset_factory=train_dataset_factory
)
test_dataset: ray.data.Dataset = ray.data.read_datasource(
    SimpleTorchDatasource(), dataset_factory=test_dataset_factory
)

{py:class}`SimpleTorchDatasource <ray.data.datasource.SimpleTorchDatasource>` doesn't parallelize reads, so you shouldn't use it with larger datasets.

Next, let's represent our data using a dictionary of ndarrays instead of tuples. This lets us call {py:meth}`Dataset.iter_torch_batches <ray.data.Dataset.iter_torch_batches>` later in the tutorial.

In [None]:
from typing import Dict, Tuple
import numpy as np
import torch


def convert_batch_to_numpy(batch: Tuple[torch.Tensor, int]) -> Dict[str, np.ndarray]:
    images = np.array([image.numpy() for image, _ in batch])
    labels = np.array([label for _, label in batch])
    return {"image": images, "label": labels}


train_dataset = train_dataset.map_batches(convert_batch_to_numpy)
test_dataset = test_dataset.map_batches(convert_batch_to_numpy)

## Train a convolutional neural network

Now that we've created our datasets, let's define the training logic.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1)  # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

We define our training logic in a function called `train_loop_per_worker`. This function contains regular PyTorch code with a few notable exceptions:
* We wrap our model with {py:func}`train.torch.prepare_model <ray.train.torch.prepare_model>`.
* We call {py:func}`session.get_dataset_shard <ray.air.session.get_dataset_shard>` and {py:meth}`Dataset.iter_torch_batches <ray.data.Dataset.iter_torch_batches>` to get a subset of our training data.
* We save model state using {py:func}`session.report <ray.air.session.report>`.

In [None]:
from ray import train
from ray.air import session, Checkpoint
from ray.train.torch import TorchCheckpoint
import torch.nn as nn
import torch.optim as optim
import torchvision


def train_loop_per_worker(config):
    model = train.torch.prepare_model(Net())

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

    train_dataset_shard = session.get_dataset_shard("train")

    for epoch in range(2):
        running_loss = 0.0
        train_dataset_batches = train_dataset_shard.iter_torch_batches(
            batch_size=config["batch_size"],
        )
        for i, batch in enumerate(train_dataset_batches):
            # get the inputs and labels
            inputs, labels = batch["image"], batch["label"]

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
            if i % 2000 == 1999:  # print every 2000 mini-batches
                print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}")
                running_loss = 0.0

        metrics = dict(running_loss=running_loss)
        checkpoint = TorchCheckpoint.from_state_dict(model.module.state_dict())
        session.report(metrics, checkpoint=checkpoint)

Finally, we can train our model. This should take a few minutes to run.

In [None]:
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    train_loop_config={"batch_size": 2},
    datasets={"train": train_dataset},
    scaling_config=ScalingConfig(num_workers=2),
)
result = trainer.fit()
latest_checkpoint = result.checkpoint

To scale your training script, create a [Ray Cluster](cluster-index) and increase the number of workers. If your cluster contains GPUs, add `"use_gpu": True` to your scaling config.

```{code-block} python
scaling_config=ScalingConfig(num_workers=8, use_gpu=True)
```

## Test the network on the test data

Let's see how our model performs.

To classify images in the test dataset, we'll need to create a {py:class}`Predictor <ray.train.predictor.Predictor>`.

{py:class}`Predictors <ray.train.predictor.Predictor>` load data from checkpoints and efficiently perform inference. In contrast to {py:class}`TorchPredictor <ray.train.torch.TorchPredictor>`, which performs inference on a single batch, {py:class}`BatchPredictor <ray.train.batch_predictor.BatchPredictor>` performs inference on an entire dataset. Because we want to classify all of the images in the test dataset, we'll use a {py:class}`BatchPredictor <ray.train.batch_predictor.BatchPredictor>`.

In [None]:
from ray.train.torch import TorchPredictor
from ray.train.batch_predictor import BatchPredictor

batch_predictor = BatchPredictor.from_checkpoint(
    checkpoint=latest_checkpoint,
    predictor_cls=TorchPredictor,
    model=Net(),
)

outputs: ray.data.Dataset = batch_predictor.predict(
    data=test_dataset,
    dtype=torch.float,
    feature_columns=["image"],
    keep_columns=["label"],
)

Our model outputs a list of energies for each class. To classify an image, we
choose the class that has the highest energy.

In [None]:
import numpy as np


def convert_logits_to_classes(df):
    best_class = df["predictions"].map(lambda x: x.argmax())
    df["prediction"] = best_class
    return df[["prediction", "label"]]


predictions = outputs.map_batches(convert_logits_to_classes)

predictions.show(1)

Now that we've classified all of the images, let's figure out which images were
classified correctly. The ``predictions`` dataset contains predicted labels and 
the ``test_dataset`` contains the true labels. To determine whether an image 
was classified correctly, we join the two datasets and check if the predicted 
labels are the same as the actual labels.

In [None]:
def calculate_prediction_scores(df):
    df["correct"] = df["prediction"] == df["label"]
    return df


scores = predictions.map_batches(calculate_prediction_scores)

scores.show(1)

To compute our test accuracy, we'll count how many images the model classified 
correctly and divide that number by the total number of test images.

In [None]:
scores.sum(on="correct") / scores.count()

## Deploy the network and make a prediction

Our model seems to perform decently, so let's deploy the model to an 
endpoint. This allows us to make predictions over the Internet.

In [None]:
from ray import serve
from ray.serve import PredictorDeployment
from ray.serve.http_adapters import json_to_ndarray


serve.run(
    PredictorDeployment.bind(
        TorchPredictor,
        latest_checkpoint,
        model=Net(),
        http_adapter=json_to_ndarray,
    )
)

Let's classify a test image.

In [None]:
image = test_dataset.take(1)[0]["image"]

You can perform inference against a deployed model by posting a dictionary with an `"array"` key. To learn more about the default input schema, read the {py:class}`NdArray <ray.serve.http_adapters.NdArray>` documentation.

In [None]:
import requests

payload = {"array": image.tolist(), "dtype": "float32"}
response = requests.post("http://localhost:8000/", json=payload)
response.json()