In [1]:
import numpy as np
from PIL import Image

from ffcv.fields import RGBImageField
from ffcv_pl.generate_dataset import create_beton_wrapper
from torch.utils.data.dataset import Dataset

In [2]:
class ToyImageLabelDataset(Dataset):

    def __init__(self, n_samples: int):
        self.samples = [Image.fromarray((np.random.rand(32, 32, 3) * 255).astype('uint8')).convert('RGB')
                        for _ in range(n_samples)]

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        return (self.samples[idx], int(idx))

In [3]:
# 1. Instantiate the torch dataset that you want to create
# Important: the __get_item__ dataset must return tuples! (This depends on FFCV library)
image_label_dataset = ToyImageLabelDataset(n_samples=256)

# 2. Optional: create Field objects.
# here overwrites only RGBImageField, leave default IntField.
fields = (RGBImageField(write_mode='jpg', max_resolution=32), None)

# 3. call the method, and it will automatically create the .beton dataset for you.
create_beton_wrapper(image_label_dataset, "./data/image_label.beton", fields)

[INFO] creating ffcv dataset into file: ./data/image_label.beton
[INFO] number of items: 256
[INFO] ffcv fields of items: [<ffcv.fields.rgb_image.RGBImageField object at 0x7f982064ebc0>, <ffcv.fields.basics.IntField object at 0x7f96d4dafe20>]


100%|██████████| 256/256 [00:00<00:00, 2549.61it/s]

[INFO] Done.
##############################





In [4]:
import torch
from torch import nn
from torch.optim import Adam
from torchvision.transforms import RandomHorizontalFlip
import pytorch_lightning as pl
from pytorch_lightning.strategies.ddp import DDPStrategy

from ffcv.fields.basics import IntDecoder
from ffcv.fields.rgb_image import RandomResizedCropRGBImageDecoder, CenterCropRGBImageDecoder
from ffcv.loader import OrderOption
from ffcv.transforms import ToTensor, ToTorchImage
from ffcv_pl.data_loading import FFCVDataModule
from ffcv_pl.ffcv_utils.augmentations import DivideImage255
from ffcv_pl.ffcv_utils.utils import FFCVPipelineManager

In [5]:
# define the LightningModule
class LitAutoEncoder(pl.LightningModule):

    def __init__(self):
        super().__init__()
        self.encoder = nn.Sequential(nn.Linear(32 * 32 * 3, 64), nn.ReLU(), nn.Linear(64, 3))
        self.decoder = nn.Sequential(nn.Linear(3, 64), nn.ReLU(), nn.Linear(64, 32 * 32 * 3))

    def training_step(self, batch, batch_idx):

        x = batch[0]

        b, c, h, w = x.shape
        x = x.reshape(b, -1)
        z = self.encoder(x)
        x_hat = self.decoder(z)
        loss = nn.functional.mse_loss(x_hat, x)

        # Logging to TensorBoard by default
        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        pass

    def configure_optimizers(self):
        optimizer = Adam(self.parameters(), lr=1e-3)
        return optimizer

In [6]:
seed = 1234

pl.seed_everything(seed, workers=True)

batch_size = 16
gpus = 2
nodes = 1
workers = 8

# image label dataset
train_manager = FFCVPipelineManager(
    "./data/image_label.beton",  # previously defined using dataset_creation.py
    pipeline_transforms=[
        # image pipeline
        [RandomResizedCropRGBImageDecoder((32, 32)),
        ToTensor(),
        ToTorchImage(),
        DivideImage255(dtype=torch.float32),
        RandomHorizontalFlip(p=0.5)],
        # label (int) pipeline
        [IntDecoder(),
        ToTensor() ]
    ],
    ordering=OrderOption.RANDOM  # random ordering for training
)
val_manager = FFCVPipelineManager(
    "./data/image_label.beton",
    pipeline_transforms=[
        # image pipeline (different from train)
        [CenterCropRGBImageDecoder((32, 32), ratio=1.),
        ToTensor(),
        ToTorchImage(),
        DivideImage255(dtype=torch.float32)],
        # label (int) pipeline
        None  # if None, uses default
    ],
    ordering=OrderOption.SEQUENTIAL  # sequential ordering for validation
)
# datamodule creation
# ignore test and predict steps, since managers are not defined.
data_module = FFCVDataModule(
    batch_size, workers, 
    train_manager=train_manager, val_manager=val_manager,
    is_dist=True, seed=seed)

Seed set to 1234


In [8]:
# define model
model = LitAutoEncoder()

# trainer
trainer = pl.Trainer(
    # strategy=DDPStrategy(find_unused_parameters=False),
    strategy='ddp_notebook',
    deterministic=True,
    accelerator='gpu',
    devices=gpus,
    num_nodes=nodes,
    max_epochs=5,
    logger=False
)

GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


In [9]:
# start training!
trainer.fit(model, data_module)

[rank: 0] Seed set to 1234
Initializing distributed: GLOBAL_RANK: 0, MEMBER: 1/2
[rank: 1] Seed set to 1234
Initializing distributed: GLOBAL_RANK: 1, MEMBER: 2/2
----------------------------------------------------------------------------------------------------
distributed_backend=nccl
All distributed processes registered. Starting with 2 processes
----------------------------------------------------------------------------------------------------



ProcessRaisedException: 

-- Process 1 terminated with the following error:
Traceback (most recent call last):
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 74, in _wrap
    fn(i, *args)
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/pytorch_lightning/strategies/launchers/multiprocessing.py", line 170, in _wrapping_function
    results = function(*args, **kwargs)
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/pytorch_lightning/trainer/trainer.py", line 580, in _fit_impl
    self._run(model, ckpt_path=ckpt_path)
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/pytorch_lightning/trainer/trainer.py", line 947, in _run
    self.strategy.setup_environment()
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/pytorch_lightning/strategies/ddp.py", line 149, in setup_environment
    super().setup_environment()
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/pytorch_lightning/strategies/strategy.py", line 128, in setup_environment
    self.accelerator.setup_device(self.root_device)
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/pytorch_lightning/accelerators/cuda.py", line 44, in setup_device
    _check_cuda_matmul_precision(device)
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/lightning_fabric/accelerators/cuda.py", line 348, in _check_cuda_matmul_precision
    major, _ = torch.cuda.get_device_capability(device)
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/torch/cuda/__init__.py", line 435, in get_device_capability
    prop = get_device_properties(device)
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/torch/cuda/__init__.py", line 449, in get_device_properties
    _lazy_init()  # will define _get_device_properties
  File "/root/miniconda3/envs/vqvae/lib/python3.10/site-packages/torch/cuda/__init__.py", line 284, in _lazy_init
    raise RuntimeError(
RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method
