# CIFAR10 + MosaikML

In [1]:
import numpy as np
import torch
cuda_ver = torch.version.cuda.replace(".", "")

import time

import composer
from composer.models import ComposerResNetCIFAR
from torchvision import datasets, transforms

torch.manual_seed(42) # For replicability


from composer.datasets.ffcv_utils import ffcv_monkey_patches
from composer.datasets.ffcv_utils import write_ffcv_dataset

ffcv_monkey_patches()

device = "gpu"
batch_size = 32
num_workers = 1

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
cuda_ver

'113'

# Dataset and Loader
Next, we instantiate our CIFAR10 dataset and dataloader. We'll use the Torchvision CIFAR10 and PyTorch dataloader for the sake of familiarity.

In [3]:
# Normalization constants
mean = (0.507, 0.487, 0.441)
std = (0.267, 0.256, 0.276)

batch_size = 1024
num_workers = 2
data_directory = "/tmp"

cifar10_transforms = transforms.Compose([transforms.ToTensor(), transforms.Normalize(mean, std)])

train_dataset = datasets.CIFAR10(data_directory, train=True, download=True, transform=cifar10_transforms)
test_dataset = datasets.CIFAR10(data_directory, train=False, download=True, transform=cifar10_transforms)

train_dataloader = torch.utils.data.DataLoader(train_dataset, 
                                               num_workers=num_workers, 
                                               batch_size=batch_size,
                                               pin_memory=True,
                                               drop_last=True,
                                               shuffle=True)
test_dataloader = torch.utils.data.DataLoader(test_dataset, 
                                              num_workers=num_workers, 
                                              batch_size=batch_size,
                                              pin_memory=True,
                                              drop_last=False,
                                              shuffle=False)

Files already downloaded and verified
Files already downloaded and verified


In [4]:
type(train_dataset)

torchvision.datasets.cifar.CIFAR10

# Model
Next, we create our model. We're using composer's built-in ResNet18. To use your own custom model, please see the [custom models tutorial](https://docs.mosaicml.com/en/stable/tutorials/adding_models_datasets.html#models).

In [5]:
model = ComposerResNetCIFAR(model_name='resnet_20', num_classes=10)

# Optimizer and Scheduler
The trainer will handle instantiating the optimizer, but first we need to create the optimizer and LR scheduler. We're using [MosaicML's SGD with decoupled weight decay](https://arxiv.org/abs/1711.05101):

In [6]:
optimizer = composer.optim.DecoupledSGDW(
    model.parameters(), # Model parameters to update
    lr=0.05, # Peak learning rate
    momentum=0.9,
    weight_decay=2.0e-3 # If this looks large, it's because its not scaled by the LR as in non-decoupled weight decay
)

To keep the runtime short, we'll train our baseline model for five epochs. The first epoch will be linear warmup, followed by four epochs of constant LR. We achieve this by instantiating a `LinearWithWarmupScheduler` class. Feel free to increase the number of epochs in case you want to see the impact of running it for a longer duration.

In [7]:
lr_scheduler = composer.optim.LinearWithWarmupScheduler(
    t_warmup="1ep", # Warm up over 1 epoch
    alpha_i=1.0, # Flat LR schedule achieved by having alpha_i == alpha_f
    alpha_f=1.0
)

# Train a baseline model
And now we create our trainer: Note: We want to gpu as a device because FFCV works the best on GPU-capable machines.

In [8]:
train_epochs = "5ep" # Train for 5 epochs
device = "gpu"

trainer = composer.trainer.Trainer(
    model=model,
    train_dataloader=train_dataloader,
    eval_dataloader=test_dataloader,
    max_duration=train_epochs,
    optimizers=optimizer,
    schedulers=lr_scheduler,
    device=device
)

We train and measure the training time below.

In [9]:
start_time = time.perf_counter()
trainer.fit()
end_time = time.perf_counter()
print(f"It took {end_time - start_time:0.4f} seconds to train")

Epoch     0 train 100%|█████████████████████████| 48/48 [00:11<00:00,  4.16ba/s, loss/train=1.5926]         

Epoch     0 val     0%|                         | 0/10 [00:00<?, ?ba/s]         [A
Epoch     0 val    10%|██▌                      | 1/10 [00:00<00:02,  3.13ba/s]         [A
Epoch     0 val    30%|███████▌                 | 3/10 [00:00<00:00,  7.57ba/s]         [A
Epoch     0 val    50%|████████████▌            | 5/10 [00:00<00:00,  9.62ba/s]         [A
Epoch     0 val    70%|█████████████████▌       | 7/10 [00:00<00:00, 10.85ba/s]         [A
Epoch     0 val    90%|██████████████████████▌  | 9/10 [00:00<00:00, 11.66ba/s]         [A
Epoch     0 val   100%|█████████████████████████| 10/10 [00:00<00:00, 11.66ba/s]         [A
Epoch     0 val   100%|█████████████████████████| 10/10 [00:00<00:00, 11.66ba/s]         [A
Epoch     0 val   100%|█████████████████████████| 10/10 [00:01<00:00,  9.98ba/s, metrics/eval/Accuracy=0.2631]         [A
Epoch     1 train 100%|██████████████

It took 35.3331 seconds to train





# Use FFCV dataloaders to Speed Up Training
Next, we convert dataset to a format used by FFCV. FFCV uses it's own data format suitable for faster dataloading. Once this cell executes successfuly, you can find ```cifar_train.ffcv``` and ```cifar_val.ffcv``` in ```data_directory``` directory.

In [10]:
from composer.datasets.ffcv_utils import write_ffcv_dataset
from torchvision.datasets import CIFAR10


# Train dataset
ds = CIFAR10(root=data_directory, train=True, download=True)
write_ffcv_dataset(dataset=ds, write_path=data_directory + "/cifar_train.ffcv")

# validation dataset
ds = CIFAR10(root=data_directory, train=False, download=True)
write_ffcv_dataset(dataset=ds, write_path=data_directory + "/cifar_val.ffcv")

Files already downloaded and verified


100%|██████████| 50000/50000 [00:00<00:00, 99152.00it/s] 


Files already downloaded and verified


100%|██████████| 10000/10000 [00:00<00:00, 99664.10it/s]


Current version of ffcv (0.0.3) has a bug where calling [len(dataloader) does shuffling](https://github.com/libffcv/ffcv/issues/163) of image indices to load, therefore, calls to len are expensive. Composer calls len(dataloader) function in training loop for every batch and, hence, this is a performance hit. We fix it by patching the len function using ffcv_monkey_patches. 

In [11]:
from composer.datasets.ffcv_utils import ffcv_monkey_patches
ffcv_monkey_patches()



Now let us construct FFCV train and test dataloaders. We use the similar transformations as used for TorchVision datasets.


In [12]:
import ffcv
from ffcv.fields.decoders import IntDecoder, SimpleRGBImageDecoder

# Please note that this mean/std is different from the mean/std used for regular PyTorch dataloader as
# ToTensor does the normalization for PyTorch dataloaders.
cifar10_mean_ffcv = np.array([125.307, 122.961, 113.8575])
cifar10_std_ffcv = np.array([51.5865, 50.847, 51.255])
label_pipeline = [IntDecoder(), ffcv.transforms.ToTensor(), ffcv.transforms.Squeeze()]
image_pipeline = [SimpleRGBImageDecoder(), ffcv.transforms.ToTensor(),
                ffcv.transforms.ToTorchImage(channels_last=False, convert_back_int16=False),
                ffcv.transforms.Convert(torch.float32),
                transforms.Normalize(cifar10_mean_ffcv, cifar10_std_ffcv),
            ]

ffcv_train_dataloader = ffcv.Loader(
                data_directory + "/cifar_train.ffcv",
                batch_size=batch_size,
                num_workers=num_workers,
                order=ffcv.loader.OrderOption.RANDOM,
                pipelines={
                    'image': image_pipeline,
                    'label': label_pipeline
                },
                drop_last=True,
            )
ffcv_test_dataloader = ffcv.Loader(
                data_directory + "/cifar_val.ffcv",
                batch_size=batch_size,
                num_workers=num_workers,
                order=ffcv.loader.OrderOption.RANDOM,
                pipelines={
                    'image': image_pipeline,
                    'label': label_pipeline
                },
                drop_last=False,
            )



Now let's instantiate our model, optimizer, and trainer again but with FFCV dataloaders. No need to instantiate our scheduler again because it's stateless!


In [13]:
model = ComposerResNetCIFAR(model_name="resnet_20", num_classes=10)

optimizer = composer.optim.DecoupledSGDW(
    model.parameters(),
    lr=0.05,
    momentum=0.9,
    weight_decay=2.0e-3
)

trainer = composer.trainer.Trainer(
    model=model,
    train_dataloader=ffcv_train_dataloader,
    eval_dataloader=ffcv_test_dataloader,
    max_duration=train_epochs,
    optimizers=optimizer,
    schedulers=lr_scheduler,
    device=device,
)



And let's get training!


In [14]:
start_time = time.perf_counter()
trainer.fit()
end_time = time.perf_counter()
accelerated_time = end_time - start_time
print(f"It took {accelerated_time:0.4f} seconds to train with FFCV dataloaders")

Epoch     0 train 100%|█████████████████████████| 48/48 [00:05<00:00,  9.18ba/s, loss/train=1.4949]         

Epoch     0 val     0%|                         | 0/10 [00:00<?, ?ba/s]         [A
Epoch     0 val    20%|█████                    | 2/10 [00:00<00:00, 19.85ba/s]         [A
Epoch     0 val    70%|█████████████████▌       | 7/10 [00:00<00:00, 37.01ba/s]         [A
Epoch     0 val   100%|█████████████████████████| 10/10 [00:00<00:00, 37.01ba/s]         [A
Epoch     0 val   100%|█████████████████████████| 10/10 [00:00<00:00, 37.01ba/s]         [A
Epoch     0 val   100%|█████████████████████████| 10/10 [00:00<00:00, 37.33ba/s, metrics/eval/Accuracy=0.3116]         [A
Epoch     1 train 100%|█████████████████████████| 48/48 [00:04<00:00, 10.94ba/s, loss/train=1.1486]         

Epoch     1 val     0%|                         | 0/10 [00:00<?, ?ba/s]         [A
Epoch     1 val    30%|███████▌                 | 3/10 [00:00<00:00, 25.32ba/s]         [A
Epoch     1 val    80%|████

It took 24.8695 seconds to train with FFCV dataloaders





# CIFAR10 + Pytorch Lightning

In [15]:
import numpy as np
import torch
cuda_ver = torch.version.cuda.replace(".", "")

import time

import composer
from composer.models import ComposerResNetCIFAR
from torchvision import datasets, transforms

torch.manual_seed(42) # For replicability


from composer.datasets.ffcv_utils import ffcv_monkey_patches
from composer.datasets.ffcv_utils import write_ffcv_dataset

ffcv_monkey_patches()

device = "gpu"
batch_size = 32
num_workers = 1


In [16]:
import os

import pandas as pd
# import seaborn as sn
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from IPython.core.display import display
from pl_bolts.datamodules import CIFAR10DataModule
from pl_bolts.transforms.dataset_normalizations import cifar10_normalization
from pytorch_lightning import LightningModule, Trainer, seed_everything
from pytorch_lightning.callbacks import LearningRateMonitor
from pytorch_lightning.callbacks.progress import TQDMProgressBar
from pytorch_lightning.loggers import CSVLogger
from torch.optim.lr_scheduler import OneCycleLR
from torch.optim.swa_utils import AveragedModel, update_bn
from torchmetrics.functional import accuracy

seed_everything(7)

PATH_DATASETS = os.environ.get("PATH_DATASETS", ".")
BATCH_SIZE = 256 if torch.cuda.is_available() else 64
NUM_WORKERS = int(os.cpu_count() / 2)

  from IPython.core.display import display
  stdout_func(
  stdout_func(
Global seed set to 7


In [17]:
train_transforms = torchvision.transforms.Compose(
    [
        torchvision.transforms.RandomCrop(32, padding=4),
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.ToTensor(),
        cifar10_normalization(),
    ]
)

test_transforms = torchvision.transforms.Compose(
    [
        torchvision.transforms.ToTensor(),
        cifar10_normalization(),
    ]
)

cifar10_dm = CIFAR10DataModule(
    data_dir=PATH_DATASETS,
    batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS,
    train_transforms=train_transforms,
    test_transforms=test_transforms,
    val_transforms=test_transforms,
)

  rank_zero_deprecation(
  rank_zero_deprecation(
  rank_zero_deprecation(


In [18]:
def create_model():
    model = torchvision.models.resnet18(pretrained=False, num_classes=10)
    model.conv1 = nn.Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    model.maxpool = nn.Identity()
    return model

In [19]:
class LitResnet(LightningModule):
    def __init__(self, lr=0.05):
        super().__init__()

        self.save_hyperparameters()
        self.model = create_model()

    def forward(self, x):
        out = self.model(x)
        return F.log_softmax(out, dim=1)

    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        self.log("train_loss", loss)
        return loss

    def evaluate(self, batch, stage=None):
        x, y = batch
        logits = self(x)
        loss = F.nll_loss(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = accuracy(preds, y)

        if stage:
            self.log(f"{stage}_loss", loss, prog_bar=True)
            self.log(f"{stage}_acc", acc, prog_bar=True)

    def validation_step(self, batch, batch_idx):
        self.evaluate(batch, "val")

    def test_step(self, batch, batch_idx):
        self.evaluate(batch, "test")

    def configure_optimizers(self):
        optimizer = torch.optim.SGD(
            self.parameters(),
            lr=self.hparams.lr,
            momentum=0.9,
            weight_decay=5e-4,
        )
        steps_per_epoch = 45000 // BATCH_SIZE
        scheduler_dict = {
            "scheduler": OneCycleLR(
                optimizer,
                0.1,
                epochs=self.trainer.max_epochs,
                steps_per_epoch=steps_per_epoch,
            ),
            "interval": "step",
        }
        return {"optimizer": optimizer, "lr_scheduler": scheduler_dict}

In [20]:
model = LitResnet(lr=0.05)

trainer = Trainer(
    max_epochs=3,
    accelerator="auto",
    devices=1 if torch.cuda.is_available() else None,  # limiting got iPython runs
    logger=CSVLogger(save_dir="logs/"),
    callbacks=[LearningRateMonitor(logging_interval="step"), TQDMProgressBar(refresh_rate=10)],
)

trainer.fit(model, cifar10_dm)
trainer.test(model, datamodule=cifar10_dm)

Multiprocessing is handled by SLURM.
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


Files already downloaded and verified
Files already downloaded and verified


  rank_zero_deprecation(
  rank_zero_deprecation(
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type   | Params
---------------------------------
0 | model | ResNet | 11.2 M
---------------------------------
11.2 M    Trainable params
0         Non-trainable params
11.2 M    Total params
44.696    Total estimated model params size (MB)
SLURM auto-requeueing enabled. Setting signal handlers.


Sanity Checking: 0it [00:00, ?it/s]



Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading i

  rank_zero_deprecation(
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This

[{'test_loss': 0.7317133545875549, 'test_acc': 0.7427999973297119}]

In [21]:
metrics = pd.read_csv(f"{trainer.logger.log_dir}/metrics.csv")
del metrics["step"]
metrics.set_index("epoch", inplace=True)
display(metrics.dropna(axis=1, how="all").head())

Unnamed: 0_level_0,lr-SGD,train_loss,val_loss,val_acc,test_loss,test_acc
epoch,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
,0.025408,,,,,
0.0,,1.80931,,,,
,0.071421,,,,,
0.0,,1.716269,,,,
,0.099457,,,,,


# CIFAR10 + PL + FFCV

In [22]:
from composer.datasets.ffcv_utils import write_ffcv_dataset
from torchvision.datasets import CIFAR10
from composer.datasets.ffcv_utils import ffcv_monkey_patches
from composer.datasets.ffcv_utils import write_ffcv_dataset
ffcv_monkey_patches()

data_directory = "/tmp"

# Train dataset
ds = CIFAR10(root=data_directory, train=True, download=True)
write_ffcv_dataset(dataset=ds, write_path=data_directory + "/cifar_train.ffcv")

# validation dataset
ds = CIFAR10(root=data_directory, train=False, download=True)
write_ffcv_dataset(dataset=ds, write_path=data_directory + "/cifar_val.ffcv")

Files already downloaded and verified
Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before f

 60%|██████    | 30200/50000 [00:00<00:00, 301528.69it/s]

Unable to join threads to shut down before fork(). This can break multithreading in child processes.



100%|██████████| 50000/50000 [00:00<00:00, 99600.01it/s] 

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.






Files already downloaded and verified
Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before f

100%|██████████| 10000/10000 [00:00<00:00, 99269.47it/s]

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.

Unable to join threads to shut down before fork(). This can break multithreading in child processes.






In [23]:
import ffcv
from ffcv.fields.decoders import IntDecoder, SimpleRGBImageDecoder

# Please note that this mean/std is different from the mean/std used for regular PyTorch dataloader as
# ToTensor does the normalization for PyTorch dataloaders.
cifar10_mean_ffcv = np.array([125.307, 122.961, 113.8575])
cifar10_std_ffcv = np.array([51.5865, 50.847, 51.255])
label_pipeline = [IntDecoder(), ffcv.transforms.ToTensor(), ffcv.transforms.Squeeze()]
image_pipeline = [SimpleRGBImageDecoder(), ffcv.transforms.ToTensor(),
                ffcv.transforms.ToTorchImage(channels_last=False, convert_back_int16=False),
                ffcv.transforms.Convert(torch.float32),
                transforms.Normalize(cifar10_mean_ffcv, cifar10_std_ffcv),
            ]

ffcv_train_dataloader = ffcv.Loader(
                data_directory + "/cifar_train.ffcv",
                batch_size=batch_size,
                num_workers=num_workers,
                order=ffcv.loader.OrderOption.RANDOM,
                pipelines={
                    'image': image_pipeline,
                    'label': label_pipeline
                },
                drop_last=True,
            )
ffcv_val_dataloader = ffcv.Loader(
                data_directory + "/cifar_val.ffcv",
                batch_size=batch_size,
                num_workers=num_workers,
                order=ffcv.loader.OrderOption.SEQUENTIAL,
                pipelines={
                    'image': image_pipeline,
                    'label': label_pipeline
                },
                drop_last=False,
            )

In [24]:
model = LitResnet(lr=0.5)

trainer = Trainer(
    max_epochs=3,
    accelerator="auto",
    devices=1 if torch.cuda.is_available() else None,  # limiting got iPython runs
    logger=CSVLogger(save_dir="logs/"),
    callbacks=[ TQDMProgressBar(refresh_rate=10)],
    num_sanity_val_steps=0
)

trainer.fit(model, train_dataloaders=ffcv_train_dataloader, val_dataloaders=ffcv_val_dataloader)

Multiprocessing is handled by SLURM.
GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type   | Params
---------------------------------
0 | model | ResNet | 11.2 M
---------------------------------
11.2 M    Trainable params
0         Non-trainable params
11.2 M    Total params
44.696    Total estimated model params size (MB)
SLURM auto-requeueing enabled. Setting signal handlers.


Epoch 0:  28%|██▊       | 520/1875 [00:10<00:26, 51.80it/s, loss=1.51, v_num=21]

ValueError: Tried to step 527 times. The specified number of total steps is 525

Epoch 0:  28%|██▊       | 520/1875 [00:23<01:00, 22.31it/s, loss=1.51, v_num=21]