# Building and running a preprocessing pipeline

In this example, an image processing pipeline is created and then executed in a manner that maximize throughput.

In [None]:
import os
import tempfile
from PIL import Image, ImageOps
import seqtools

workdir = tempfile.TemporaryDirectory()
os.chdir(workdir.name)

In [None]:
! curl -s "https://cdn.pixabay.com/photo/2017/04/07/01/05/owl-2209827_640.jpg" -o owl.jpg
! curl -s "https://cdn.pixabay.com/photo/2018/08/26/14/05/hahn-3632299_640.jpg" -o rooster.jpg
! curl -s "https://cdn.pixabay.com/photo/2018/09/02/10/03/violet-duck-3648415_640.jpg" -o duck.jpg
! curl -s "https://cdn.pixabay.com/photo/2018/08/21/05/15/tit-3620632_640.jpg" -o bird.jpg
! curl -s "https://cdn.pixabay.com/photo/2018/09/04/18/07/pug-3654360_640.jpg" -o dog.jpg
! curl -s "https://cdn.pixabay.com/photo/2018/09/04/18/52/hedgehog-3654434_640.jpg" -o hedgehog.jpg

## Initial data loading

SeqTools works with list-like indexable objects, so the first step is to create one that maps to our samples, then this object will be passed to functions that apply the desired transformations.
In this example, we represent our samples with their file names and store them in a list.

In [None]:
labels = ['owl', 'rooster', 'duck', 'bird', 'dog', 'hedgehog']
# We artificially increase the size of the dataset for the example
labels = [labels[i % len(labels)] for i in range(1000)]

image_files = [l + '.jpg' for l in labels]

Let's load the full resolution images, the result cannot normally fit into memory, but with SeqTools the evaluation is delayed until the images are actually accessed.

In [None]:
raw_images = seqtools.smap(Image.open, image_files)

We can verify the result for one sample, this will trigger its evaluation and return it:

In [None]:
raw_images[0]

##  Mapping transformations

As a first preprocessing stage, we can normalize the size:

In [None]:
def normalize_size(im):
    w, h = im.size
    left_crop = w // 2 - h // 2
    return im.resize((200, 200), Image.BILINEAR, box=(left_crop, 1, h, h))

small_images = seqtools.smap(normalize_size, raw_images)

small_images[1]

then apply common preprocessing steps:

In [None]:
contrasted = seqtools.smap(ImageOps.autocontrast, small_images)
equalized = seqtools.smap(ImageOps.equalize, contrasted)
grayscale = seqtools.smap(ImageOps.grayscale, equalized)

grayscale[0]

That preprocessing seems a bit over the top... let's check where it went wrong:

In [None]:
equalized[0]

In [None]:
contrasted[0]

For each sample, the minimal set of computations was run to produce the requested item.
We find here that equalization is inappropriate and autocontrast is too weak, let's fix this.

In [None]:
grayscale = seqtools.smap(ImageOps.grayscale, small_images)
contrasted = seqtools.smap(lambda im: ImageOps.autocontrast(im, cutoff=3), grayscale)

contrasted[0]

## Combining datasets

Then we want to augment the dataset by flipping:

In [None]:
# Generate flipped versions of the images
flipped = seqtools.smap(ImageOps.mirror, contrasted)

# Combine with the original dataset
augmented_dataset = seqtools.concatenate([contrasted, flipped])

augmented_dataset[-1]

## Evaluation

Once satisfied with our preprocessing pipeline, evaluating all values is simply done by iterating over the elements or forcing the conversion to a list:

In [None]:
%time computed_values = list(augmented_dataset);

This above evaluation is a bit slow, probably due to the IO operations when loading the images from the hard drive. Maybe using multiple threads could help keep the CPU busy?

In [None]:
fast_dataset = seqtools.prefetch(augmented_dataset, max_buffered=10, nworkers=2)
%time computed_values = list(fast_dataset)

The CPU time is the same because the computations are the same (plus some threading overhead), but the wall time was cut by half because image processing continues for some images while others are being loaded.

However, we could spare some IO by not reading the same image a second time when generating the augmented version, and also save some common transformations. For that matter, we can put a cache to save intermediate values within the pipeline.

In [None]:
raw_images = seqtools.smap(Image.open, image_files)
small_images = seqtools.smap(normalize_size, raw_images)
grayscale = seqtools.smap(ImageOps.grayscale, small_images)
contrasted = seqtools.smap(lambda im: ImageOps.autocontrast(im, cutoff=3), grayscale)

# Use some cache to avoid recomputation of recently accessed item.
contrasted = seqtools.add_cache(contrasted, cache_size=20)

flipped = seqtools.smap(ImageOps.mirror, contrasted)

We need organize the calls to items of `grayscale` so that the original image is found in the cache when generating the flipped one.
Besides, we need to to make sure that we don't have simultaneous calls between workers, otherwise they will have to wait for the first one to finish.

To do so, we simply compute related images by groups and then flatten the results into a long sequence of samples.

In [None]:
grouped = seqtools.collate([contrasted, flipped])
flattened = seqtools.unbatch(grouped, batch_size=2)

In [None]:
fast_dataset = seqtools.prefetch(flattened, max_buffered=10, nworkers=2)
%time computed_values = list(fast_dataset)

The wall time did not improve much but the CPU time was cut in half, leaving more room for other processes.