# Packed Data Formats

In this tutorial we will review two ways of saving our data that is then loaded in PyTorch


## Scenario
Imagine we want to train a network which can classify images accurately. Here we will the ImageNet dataset to train our network. ImageNet is a huge image database of 14 million images. Below are a few example images from this dataset:

<img src="src/imagenet.jpg" alt= "ImageNet" width="500" height="500">

So you download the dataset to your computer and unzip to see the images inside. However, extracting 14 million images is something your computer does not like for a couple of reasons:
- extracting individual files takes long
- all files are stored individually with their own metadata
- on Snellius, we mostly have a shared disk so whatever data-intense heavy tasks are running may impact other users

In [None]:
import os
import glob
import time
import zipfile
import json

import numpy as np
import PIL.Image
import torch
from torchvision import transforms

#DATA_PATH = os.getenv("TEACHER_DIR", os.getcwd()) + "/JHS_data"
DATA_PATH = "data/"

## Create a torch Dataset class for jpeg images

After we extracted our imagenet zip files, we are left with .jpeg images. To load these images during training, we write our own Dataset class. 

NB: for this tutorial, we will work with a subset of ImageNet of 10k images and corresponding labels.


In [None]:
class JpegDataset(torch.utils.data.Dataset):
    """ImageNet10k Dataset for Jpeg images to pass to a PyTorch dataloader

    Args:
        path (str): location where the images are stored on disk
        transform (obj): torchvision.transforms object or None
    Returns:
        torch Dataset: to pass to a dataloader
    """
    def __init__(self, path, transform=None):
        self.path = path
        self.transform = transform

        self._get_img_names()
    
        label_fname = "validation_labels_10k.txt"
        self._read_label_file(label_fname)

    def _get_img_names(self):
        self.img_names = sorted(glob.glob(f"{self.path}/*.JPEG"))
        
    def _read_label_file(self, label_fname):
        with open(os.path.join(self.path, label_fname), "r") as file:
            self.labels = [int(l) for l in file.readlines()]

    @staticmethod
    def read_image(img_name):
        image = PIL.Image.open(img_name).convert("RGB")
        return image


    def __getitem__(self, index):
        img_name= self.img_names[index]
        image = self.read_image(img_name)
        label = self.labels[index]

        if self.transform:
            image = self.transform(image)

        return image, label
    

    def __len__(self):
        return len(self.labels)

## Benchmark the Jpeg Dataset
In a real-life deep learning scenario, we have a dataloader which on every iteration returns images and labels of size `batch_size`. By running this for 1 epoch, we ensure that every images is seen at least once by the network. in this benchmarking example, only the timings of getting the images and pushing it to device is relevant. For now, there is no neural network step.

In [None]:
def benchmark(dataset, epochs, batch_size, num_workers, persistent_workers, pin_memory, device, shuffle=True, warm_start=False):

    dataloader = torch.utils.data.DataLoader(
            dataset,
            num_workers=num_workers,
            batch_size=batch_size,
            shuffle=shuffle,
            persistent_workers=num_workers > 0 and persistent_workers,
            pin_memory=pin_memory,
        )
    
    if not warm_start:
            start = time.time()

    for epoch in range(epochs):
        if epoch == 1 and warm_start:
            start = time.time()
        timer_per_epoch = time.time()
        for i, (images, labels) in enumerate(dataloader):
            images, labels = map(
                    lambda tensor: tensor.to(device, non_blocking=pin_memory),
                    (images, labels),
                )

        print(f"Epoch {epoch} finished in {time.time() - timer_per_epoch}")
    total_time = time.time() - start
    return total_time 

## Initialize some dataloading parameters
Read careful and feel free to experiment with this

We include a transformation to imitate real-life applications

In [None]:
def transform(new_size, to_tensor=False):
    transform_list = []
    if to_tensor:
        transform_list.append(transforms.ToTensor())
    transform_list.append(transforms.Resize((new_size, new_size), antialias=True))
    transform_list.append(transforms.RandomHorizontalFlip())
    transform = transforms.Compose(transform_list)
    return transform


dataloader_kwargs = {
    "epochs": 2,
    "num_workers": 1,
    "batch_size": 16,
    "device": "cpu",
    "persistent_workers": True,
    "warm_start": False,
    "pin_memory": True,
    "shuffle": True,
}

## Run the benchmark on the Jpeg data
Below, we run the benchmark for 2 epochs and return the timings for each epoch to see the speed.

In [None]:
img_path = f"{DATA_PATH}/disk_transformed"

transform_jpeg = transform(new_size=256, to_tensor=True)
jpeg_dataset = JpegDataset(img_path, transform=transform_jpeg)

jpeg_time = benchmark(jpeg_dataset, **dataloader_kwargs)

## Cool, what now
As we have already discussed, storing and loading a lot of individual files can lead to poor performance for both the filesystem as well as your network training. On top, copying the data from one space to another is also faster if done with packed data formats. So let's convert them!

Wait, when you download a dataset and it already comes into a zip or tar folder, you already have your packed formats. While other alternatives exist (hdf5, lmdb, petastorm), using the original zip or tar folder might already be enough. Or if you need to split the data, you can create a new zip folder without ever needing to completely extract all individual files onto your machine.

Instead of a Jpeg dataset class, let's make a dataclass for zip files.

In [None]:
class ZIPDataset(torch.utils.data.Dataset):
    """ImageNet10k Dataset for packed ZIP to pass to a PyTorch dataloader

    Args:
        path (str): location where the images are stored on disk
        transform (obj): torchvision.transforms object or None
        load_encoded (bool): whether the images within the .zip file are encoded or saved as bytes directly
    Returns:
        torch Dataset: to pass to a dataloader
    """
    def __init__(self, path, transform=None, load_encoded=False):
        self.path = path
        self.transform = transform
        self.load_encoded = load_encoded

        self.zip_file = zipfile.ZipFile(path)
        self.members = sorted(self.zip_file.namelist())

        self._get_img_names()

        label_fname = "dataset.json"
        self._get_labels(label_fname)

    def _get_img_names(self):
        PIL.Image.init()
        self.img_names = [m for m in self.members if m.lower().endswith(tuple(PIL.Image.EXTENSION.keys()))]

    def _get_labels(self, label_fname):
        label_file = self.zip_file.open(label_fname, "r")
        self.labels = json.load(label_file)["labels"]


    def _get_image(self, img_fname, shape=(3, 256, 256)):
        img_f = self.zip_file.open(img_fname, "r")
        if self.load_encoded:
            image = PIL.Image.open(img_f).convert("RGB")
        else:
            buffer = np.frombuffer(img_f.read(), dtype=np.uint8).reshape(shape)
            image = torch.tensor(buffer) # make writeable copy
        return image


    def __getitem__(self, index):
        img_fname = self.img_names[index]
        image = self._get_image(img_fname)
        label = self.labels[img_fname]
        
        if self.transform:
            image = self.transform(image)

        return image, label

    def __len__(self):
        return len(self.img_names)



## Benchmark the zip file

In [None]:
zip_path = f"{DATA_PATH}/data/zip/part0.zip"
transform_zip = transform(new_size=256, to_tensor=False)
zip_dataset = ZIPDataset(zip_path, transform=transform_zip, load_encoded=False)
zip_time = benchmark(zip_dataset, **dataloader_kwargs)

In [None]:
speed_up = jpeg_time / zip_time
print(f"Speed up of zip dataset compared to jpeg dataset: {round(speed_up, 3)}x faster")

We already see a notable speed-up in favor for the ZIP dataset. But we only run the dataloading with a single process. In practice, while training neural networks we want the model to fully saturate the accelerator (GPU). We do this by providing the data as fast as possible such that the GPU is not waiting for the data and is bottlenecked by IO.

Let's change the dataloader settings to increase the number of processes to say 8.

In [None]:
# Change the number of workers to 8
dataloader_kwargs["num_workers"] = 8
# Re-run the JPEG dataset benchmark and 
jpeg_time_multi = benchmark(jpeg_dataset, **dataloader_kwargs)
# Re-run the ZIP dataset benchmark and call the benchmark time
zip_time_multi = benchmark(zip_dataset, **dataloader_kwargs)

If everything went well, it didn't go well. We got a weird BadZipFile error from ZIP with two file names having different headers or a bad magic number for file header... What could this be?


What is happening is that the images are all zipped in a single ZIP file. While loading the data, we set the number of processes that read the data to 8 (where did we set it?). All these processes are trying to open and read from the same ZIP file. Since we have opened a single connection to the ZIP file in the initialization of the dataset, Python only allows us to access this file through this file handle but since 8 workers are trying at the same time we have interference between the processes. 


# Multiple worker support

Now we will increase the amount of workers to access the same zip-file.


Upon opening a file within the zip, we will get a file handle from which we will read our images/labels.
You can see a zip-file handle as a separate file, as it is a place within the zip file where this image/label is located.
This also means that we cannot store a single file-handle when accessing multiple files from within this zip.
Instead, we need a separate file-handle for each file we are accessing.
As we will want multiple workers to do different work in parallel, we want them to each access their own files, and as such speed up this process.
Every worker will end up with their own file handle (which are not the same) to each process a separate file.

The implementation for this requires the tracking of multiple zip-file handles, or handles, such that the workers do not interfere with each others files, and also do not do the same work multiple times.
We will do this by changing the torch dataset by passing every worker their own handle.
The communication to the zip file will go through this handle, which means that this must be a separate one for each worker.

This is done in the following four steps:
 - Create the workers
 - Get the information about the workers (the worker-id)
 - Give each worker a separate handle
 - Read the images and labels from this handle
 
 Tips:
 - Each workers independently initializes their own MultiWorkerZIPDataset instance.  
 - Fill in the new `_get_file_handle` function
 - Revise the code from the previous dataset for loading the labels and images
 
  


In [None]:
class MultiWorkerZIPDataset(torch.utils.data.Dataset):
    """ImageNet10k Dataset for packed ZIP to pass to a PyTorch dataloader

    Args:
        path (str): location where the images are stored on disk
        transform (obj): torchvision.transforms object or None
        load_encoded (bool): whether the images within the .zip file are encoded or saved as bytes directly
    Returns:
        torch Dataset: to pass to a dataloader
    """
    def __init__(self, path, transform=None, load_encoded=False):
        self.path = path
        self.transform = transform
        self.load_encoded = load_encoded

        # Each worker needs access to get the keys/filenames of the ZIP file
        worker = torch.utils.data.get_worker_info()
        worker = worker.id if worker else None
        self.zip_handle = {worker: zipfile.ZipFile(path)}
        self.members = sorted(self.zip_handle[worker].namelist())

        self._get_img_names()

        label_fname = "dataset.json"
        self._get_labels(worker, label_fname)

    def _get_img_names(self):
        PIL.Image.init()
        self.img_names = [m for m in self.members if m.lower().endswith(tuple(PIL.Image.EXTENSION.keys()))]

    def _get_labels(self, worker, label_fname):
        label_file = self.zip_handle[worker].open(label_fname, "r")
        self.labels = json.load(label_file)["labels"]

    def _get_file_handle(self, fname):
        # TODO: 
        # This function must return the file handle for the current worker, 
        # or create one if it is not already created
        worker = torch.utils.data.get_worker_info()
        worker = worker.id if worker else None

        if worker not in self.zip_handle:
            self.zip_handle[worker] = zipfile.ZipFile(self.path)

        return self.zip_handle[worker].open(fname, "r")

    def _get_image(self, img_fname, shape=(3, 256, 256)):
        img_f = self._get_file_handle(img_fname)
        if self.load_encoded:
            image = PIL.Image.open(img_f).convert("RGB")
        else:
            buffer = np.frombuffer(img_f.read(), dtype=np.uint8).reshape(shape)
            image = torch.tensor(buffer) # make writeable copy
        return image


    def __getitem__(self, index):
        img_fname = self.img_names[index]
        image = self._get_image(img_fname)
        label = self.labels[img_fname]
        
        if self.transform:
            image = self.transform(image)

        return image, label

    def __len__(self):
        return len(self.img_names)

    def __del__(self):
        """Clean all file handles of the workers on exit"""
        if hasattr(self, "zip_handle"):
            for o in self.zip_handle.values():
                o.close()

    def __getstate__(self):
        """Serialize without the ZipFile references, for multiprocessing compatibility"""
        state = dict(self.__dict__)
        state["zip_handle"] = {}
        return state


In [None]:
zip_dataset_multi = MultiWorkerZIPDataset(zip_path, transform=transform_zip, load_encoded=False)
zip_time_multi = benchmark(zip_dataset_multi, **dataloader_kwargs)

In [None]:
speed_up = jpeg_time_multi / zip_time_multi
print(f"Speed up of zip dataset compared to jpeg dataset: {round(speed_up, 3)}x faster")

# Evaluate... 
We can see that zipping our datasets works in favor in terms of dataloading. This is a crucial step in training your machine learning pipeline and can be a bottleneck. With the help of profiling and using packed data format we can minimize the bottleneck for IO communication between files and the neural network. On top, packing your data encourages a healthy filesystem for you and others :)


### Last things
Try to change the number of workers to any number between 1 and 32, see what happens with the speed-up!

Other tips:
- Any transformation on your dataset which is deterministic (for example resize) can be done once in advance as preprocessing and will probably save you time during the actual training.
- Look into LMDB/HDF5/Petastorm to see if there is a packed data format which may be more suited for your data
- More info here: https://servicedesk.surf.nl/wiki/display/WIKI/Best+Practice+for+Data+Formats+in+Deep+Learning and here: \newline https://github.com/sara-nl/Packed-Data-Formats


Finally, it's good to keep in mind that reading files is a task which we can partially do in a overlapping way with GPU computations. In this hands-out, we did not consider a neural network so the GPU was idle at all times. In real-life scenarios, we can push the data to the GPU, let the GPU do its calculations and at the same time load some more data for the next batch.  