# FaceSynthetics with Streaming Dataloader

In this notebook, we'll demonstrate a streaming approach to loading our datasets, using Microsoft's FaceSynthetics dataset as an example.

Streaming is useful for multi-node setups where workers don't have persistent storage and each element of the dataset must be downloaded exactly once.

This tutorial will consist of a few steps:
1. obtaining the dataset
2. preparing the dataset for streaming
    a. (optionally) uploading the dataset to a server
3. streaming the dataset to the local machine
4. training a model using these datasets

First, let's make sure we've installed our dependencies, note that `mmcv-full` will take some time to unpack.

In [None]:
!pip install mosaicml -U
!pip install mmsegmentation -U
!pip install mmcv -U
!pip install mmcv-full -U

In [None]:
import os
import time
import torch
import struct
import shutil

from io import BytesIO
from PIL import Image
import torch.nn.functional as F
from torchvision import transforms as tf

We'll be using Composer's streaming dataset writer, as well as the composer `DeepLabV3` model, which should help improve our performance even on the small hundred image dataset.

In [None]:
from composer.datasets.streaming import StreamingDatasetWriter, StreamingDataset
from composer.models.deeplabv3 import ComposerDeepLabV3

In [None]:
from composer import Trainer
from composer.models import ComposerModel
from composer.optim import DecoupledAdamW

## Getting the dataset

In [None]:
# Uncomment the line for the dataset you wish to download (the 100 image dataset is selected by default)
!curl https://facesyntheticspubwedata.blob.core.windows.net/iccv-2021/dataset_100.zip > dataset.zip
#!curl https://facesyntheticspubwedata.blob.core.windows.net/iccv-2021/dataset_1000.zip > dataset.zip
#!curl https://facesyntheticspubwedata.blob.core.windows.net/iccv-2021/dataset_100000.zip > dataset.zip


!mkdir ./dataset
!unzip dataset.zip -d dataset

## Global settings

In [None]:
# the location of our dataset
in_root = "./dataset/"

# the location of the "remote" streaming dataset. 
# Upload `out_root` to your cloud storage provider of choice.
out_root = "./sdl"
out_root_train = "./sdl/train"
out_root_test = "./sdl/test"

# the location to download the streaming dataset during training
local = './local'
local_train = './local/train'
local_test = './local/test'

# dataset parameters
shuffle = False
num_classes = 255
shard_size_limit = 1 << 25
tqdm = 1

# training hardware parameters
device = "gpu" if torch.cuda.is_available() else "cpu"

Next, we'll make the directories

In [None]:
os.mkdir(out_root)
os.mkdir(out_root_train)
os.mkdir(out_root_test)

## Preparing the dataset

In [None]:
def each(d, m=0, n=100):
    for i in range(m, n):
        f = '%s/%06d.png' % (d, i)
        x = open(f, 'rb').read()

        f = '%s/%06d_seg.png' % (d, i)
        y = open(f, 'rb').read()

        yield {
            'i': struct.pack('>q', i),
            'x': x,
            'y': y,
        }

In [None]:
def write_datasets() -> None:
    """Create ImageNet1k streaming dataset.

    Args:
        args (Namespace): Commandline arguments.
    """
    fields = ['i', 'x', 'y']
    m, n = 0, 90
    with StreamingDatasetWriter(out_root_train, fields, shard_size_limit) as out:
        out.write_samples(each(in_root, m, n), use_tqdm=bool(tqdm), total=n-m)
    m, n = n, 100
    with StreamingDatasetWriter(out_root_test, fields, shard_size_limit) as out:
        out.write_samples(each(in_root, m, n), use_tqdm=bool(tqdm), total=n-m)
    shuffle = True
    

Now that we've written the datasets to `out_root`, we can upload them to a cloud storage provider and stream them from there. For the sake of simplicity, we'll skip this step, but the rest of this tutorial will work if we replace `remote` with the URL of a cloud storage directory for the files we've just generated.

In [None]:
remote_train = out_root_train # replace this with your URL for cloud streaming
remote_test = out_root_test

## Loading the Data

In [None]:
class FaceSynthetics(StreamingDataset):
    def __init__(self,
                 remote: str,
                 local: str,
                 shuffle: bool) -> None:
        decoders = {
            'i': lambda data: struct.unpack('>q', data),
            'x': lambda data: Image.open(BytesIO(data), formats=['PNG']),
            'y': lambda data: Image.open(BytesIO(data), formats=['PNG']),
        }
        super().__init__(local=local, remote=remote, shuffle=shuffle, decoders=decoders)

    def __getitem__(self, i):
        obj = super().__getitem__(i)
        x = obj['x']
        x = tf.ToTensor()(x).to(device)
        y = obj['y']
        y = tf.ToTensor()(y).to(device)
        y_shape = list(y.shape)[:]
        y_shape[0] = num_classes # expand y to be a one-hot vector
        y_onehot = F.one_hot(y.to(torch.int64), num_classes).view(y_shape).to(device)
        return x, y_onehot

In [None]:
def get_dataloaders():
    dataset_train = FaceSynthetics(remote_train, local_train, shuffle)
    dataset_test  = FaceSynthetics(remote_test, local_test, shuffle)
    
    batch_size = 2

    train_dataloader = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size, shuffle=False)
    test_dataloader = torch.utils.data.DataLoader(dataset_test, batch_size=batch_size, shuffle=False)

    return train_dataloader, test_dataloader

## Training the Model

In [None]:
def make_trainer():
    train_dataloader, test_dataloader = get_dataloaders()
    train_epochs = "3ep"
    model = ComposerDeepLabV3(
        num_classes=num_classes, 
        backbone_arch='resnet101', 
        is_backbone_pretrained=True,
        sync_bn=False)
    optimizer = DecoupledAdamW(model.parameters(), lr=1e-3)
    
    return Trainer(
        model=model,
        train_dataloader=train_dataloader,
        eval_dataloader=test_dataloader,
        max_duration=train_epochs,
        optimizers=optimizer,
        device=device
    )

## Putting it all Together

In [None]:
write_datasets()

In [None]:
trainer = make_trainer()

In [None]:
start_time = time.perf_counter()
trainer.fit()
end_time = time.perf_counter()
print(f"It took {end_time - start_time:0.4f} seconds to train")

## Cleanup

In [None]:
shutil.rmtree(out_root)
!rm dataset.zip
!rm -rf dataset

## Next Steps

Congrats! We've trained our FaceSynthetics model on a streaming dataset!

Now that we're done, we can explore some additional speedups, like
* running against a full dataset
* using composer algorithms
* building a multi-gpu trainer

Happy training!