Skip to content
Permalink
Browse files

Rename PrefetchData -> MultiProcessRunner

  • Loading branch information...
ppwwyyxx committed May 25, 2019
1 parent 0cecfbb commit dd2d9ffaa04e6bfd7cf3c1dbd605ffe50b61f7bf
@@ -372,7 +372,6 @@ def process_signature(app, what, name, obj, options, signature,
# deprecated stuff:
'QueueInputTrainer',
'dump_dataflow_to_process_queue',
'PrefetchOnGPUs',
'DistributedTrainerReplicated',
'DistributedTrainerParameterServer',
'InputDesc',
@@ -382,11 +381,14 @@ def process_signature(app, what, name, obj, options, signature,
'DumpTensor',
'DumpParamAsImage',
'get_nr_gpu',
'start_test', # TestDataSpeed
'ThreadedMapData',
'TrainingMonitor',
'PeakMemoryTracker',

'PrefetchData',
'MultiProcessPrefetchData',
'PrefetchDataZMQ',
'MultiThreadPrefetchData',

# deprecated or renamed symbolic code
'Deconv2D', 'psnr',

@@ -3,11 +3,11 @@

### What is DataFlow

DataFlow is a library to build Python iterators for efficient data loading.
DataFlow is a pure-Python library to create iterators for efficient data loading.

**Definition**: A DataFlow is a idiomatic Python container object that has a `__iter__()` generator method,
which yields `datapoints` and optionally a `__len__()` method returning the size of the flow.
A datapoint is a **list** of Python objects which are called the `components` of a datapoint.
**Definition**: A DataFlow is a idiomatic Python iterator object that has a `__iter__()` method
which yields `datapoints`, and optionally a `__len__()` method returning the size of the DataFlow.
A datapoint is a **list or dict** of Python objects, each of which are called the `components` of a datapoint.

**Example**: to train on MNIST dataset, you may need a DataFlow with a `__iter__()` method
that yields datapoints (lists) of two components:
@@ -21,12 +21,10 @@ You can simply use DataFlow as a data processing pipeline and plug it into any o


### Composition of DataFlow
One good thing about having a standard interface is to be able to provide
the greatest code reusability.
There are a lot of existing DataFlow utilities in tensorpack, which you can use to compose
DataFlow with complex data pipeline. A common pipeline usually
would __read from disk (or other sources), apply transformations, group into batches,
prefetch data__, etc. A simple example is as the following:
one DataFlow with complex data pipeline. A common pipeline usually
would __read from disk (or other sources), apply transformations (possibly in parallel), group into batches,
prefetch data__, etc, and all __run in parallel__. A simple example is as the following:

````python
# a DataFlow you implement to produce [tensor1, tensor2, ..] lists from whatever sources:
@@ -36,17 +34,17 @@ df = AugmentImageComponent(df, [imgaug.Resize((225, 225))])
# group data into batches of size 128
df = BatchData(df, 128)
# start 3 processes to run the dataflow in parallel
df = PrefetchDataZMQ(df, 3)
df = MultiProcessRunnerZMQ(df, 3)
````
You can find more complicated DataFlow in the [ImageNet training script](../examples/ImageNetModels/imagenet_utils.py)
with all the data preprocessing.

### Work with Your Data
Unless you are working with standard data types (image folders, LMDB, etc),
you would usually want to write the source DataFlow (`MyDataFlow` in the above example) for your data format.
We do not make any assumptions about your data format.
You would usually want to write the source DataFlow (`MyDataFlow` in the above example) for your own data format.
See [another tutorial](extend/dataflow.html) for simple instructions on writing a DataFlow.
Once you have the source reader, all the [existing
DataFlows](../modules/dataflow.html) are ready for you to build up the rest of the data pipeline.
Once you have the source reader, all the [built-in
DataFlows](../modules/dataflow.html) are ready for you to assemble the rest of the data pipeline.

### Why DataFlow

@@ -62,16 +60,16 @@ Nevertheless, tensorpack supports data loading with native TF operators / TF dat

### Use DataFlow in Your Own Code

Normally, tensorpack `InputSource` interface runs the DataFlow during training.
However, DataFlow can also be used without other tensorpack components.
If you need to run the DataFlow by yourself, call `reset_state()` first to initialize it,
When training with tensorpack, typically it is the `InputSource` interface that runs the DataFlow.
However, DataFlow can be used without other tensorpack components.
To run a DataFlow by yourself, call `reset_state()` first to initialize it,
and then use the generator however you like:
```python
df = SomeDataFlow()
df.reset_state()
for dp in df:
# dp is now a list. do whatever
# dp is now a list. do whatever
```

Read the [API documentation](../../modules/dataflow.html#tensorpack.dataflow.DataFlw)
@@ -16,7 +16,7 @@ then apply complicated preprocessing to it.
We aim to reach a speed of, roughly **1k~3k images per second**, to keep GPUs busy.

Some things to know before reading:
1. For smaller datasets (e.g. several GBs of images with lightweight preprocessing), a simple reader plus some multiprocess prefetch should usually work well enough.
1. For smaller datasets (e.g. several GBs of images with lightweight preprocessing), a simple reader plus some multiprocess runner should usually work well enough.
Therefore you don't have to understand this tutorial in depth unless you really find your data being the bottleneck.
This tutorial could be a bit complicated for people new to system architectures, but you do need these to be able to run fast enough on ImageNet-scale dataset.
2. Having a fast Python generator **alone** may or may not improve your overall training speed.
@@ -64,7 +64,7 @@ On a good filesystem you probably can already observe good speed here (e.g. 5 it
because we are doing heavy random read on the filesystem (regardless of whether `shuffle` is True).
Image decoding in `cv2.imread` could also be a bottleneck at this early stage.

### Parallel Prefetch
### Parallel Runner

We will now add the cheapest pre-processing now to get an ndarray in the end instead of a list
(because training will need ndarray eventually):
@@ -84,15 +84,15 @@ Now it's time to add threads or processes:
ds0 = dataset.ILSVRC12('/path/to/ILSVRC12', 'train', shuffle=True)
ds1 = AugmentImageComponent(ds0, lots_of_augmentors)
ds = PrefetchDataZMQ(ds1, nr_proc=25)
ds = MultiProcessRunnerZMQ(ds1, num_proc=25)
ds = BatchData(ds, 256)
```
Here we fork 25 processes to run `ds1`, and collect their output through ZMQ IPC protocol,
which is faster than `multiprocessing.Queue`. You can also apply prefetch after batch, of course.
which is faster than `multiprocessing.Queue`. You can also apply parallel runner after batching, of course.

### Parallel Map
The above DataFlow might be fast, but since it forks the ImageNet reader (`ds0`),
it's **not a good idea to use it for validation** (for reasons mentioned at top. More details at the [documentation](../modules/dataflow.html#tensorpack.dataflow.PrefetchDataZMQ)).
it's **not a good idea to use it for validation** (for reasons mentioned at top. More details at the [documentation](../modules/dataflow.html#tensorpack.dataflow.MultiProcessRunnerZMQ)).
Alternatively, you can use multi-threaded preprocessing like this:

```eval_rst
@@ -102,9 +102,9 @@ Alternatively, you can use multi-threaded preprocessing like this:
ds0 = dataset.ILSVRC12('/path/to/ILSVRC12', 'train', shuffle=True)
augmentor = AugmentorList(lots_of_augmentors)
ds1 = MultiThreadMapData(
ds0, nr_thread=25,
ds0, num_thread=25,
map_func=lambda dp: [augmentor.augment(dp[0]), dp[1]], buffer_size=1000)
# ds1 = PrefetchDataZMQ(ds1, nr_proc=1)
# ds1 = MultiProcessRunnerZMQ(ds1, num_proc=1)
ds = BatchData(ds1, 256)
```
`MultiThreadMapData` launches a thread pool to fetch data and apply the mapping function on **a single
@@ -127,11 +127,11 @@ If you identify this as a bottleneck, you can also use:
ds0 = dataset.ILSVRC12Files('/path/to/ILSVRC12', 'train', shuffle=True)
augmentor = AugmentorList(lots_of_augmentors)
ds1 = MultiThreadMapData(
ds0, nr_thread=25,
ds0, num_thread=25,
map_func=lambda dp:
[augmentor.augment(cv2.imread(dp[0], cv2.IMREAD_COLOR)), dp[1]],
buffer_size=1000)
ds1 = PrefetchDataZMQ(ds1, nr_proc=1)
ds1 = MultiProcessRunnerZMQ(ds1, num_proc=1)
ds = BatchData(ds1, 256)
```

@@ -159,15 +159,15 @@ class BinaryILSVRC12(dataset.ILSVRC12Files):
jpeg = np.asarray(bytearray(jpeg), dtype='uint8')
yield [jpeg, label]
ds0 = BinaryILSVRC12('/path/to/ILSVRC/', 'train')
ds1 = PrefetchDataZMQ(ds0, nr_proc=1)
ds1 = MultiProcessRunnerZMQ(ds0, num_proc=1)
LMDBSerializer.save(ds1, '/path/to/ILSVRC-train.lmdb')
```
The above script builds a DataFlow which produces jpeg-encoded ImageNet data.
We store the jpeg string as a numpy array because the function `cv2.imdecode` later expect this format.
Please note we can only use 1 prefetch process to speed up. If `nr_proc>1`, `ds1` will take data
Please note we can only use 1 runner process to speed up. If `num_proc>1`, `ds1` will take data
from several forks of `ds0`, then neither the content nor the order of `ds1` will be the same as `ds0`.
See [documentation](../modules/dataflow.html#tensorpack.dataflow.PrefetchDataZMQ)
about caveats of `PrefetchDataZMQ`.
See [documentation](../modules/dataflow.html#tensorpack.dataflow.MultiProcessRunnerZMQ)
about caveats of `MultiProcessRunnerZMQ`.

It will generate a database file of 140G. We load the DataFlow back by reading this LMDB file sequentially:
```
@@ -193,7 +193,7 @@ the added line above maintains a buffer of datapoints and shuffle them once a wh
It will not affect the model as long as the buffer is large enough,
but it can also consume much memory if too large.

### Augmentations & Parallel Prefetch
### Augmentations & Parallel Runner

Then we add necessary transformations:
```eval_rst
@@ -218,24 +218,24 @@ Both imdecode and the augmentors can be quite slow. We can parallelize them like
ds = LMDBSerializer.load(db, shuffle=False)
ds = LocallyShuffleData(ds, 50000)
ds = PrefetchData(ds, 5000, 1)
ds = MultiProcessRunner(ds, 5000, 1)
ds = MapDataComponent(ds, lambda x: cv2.imdecode(x, cv2.IMREAD_COLOR), 0)
ds = AugmentImageComponent(ds, lots_of_augmentors)
ds = PrefetchDataZMQ(ds, 25)
ds = MultiProcessRunnerZMQ(ds, 25)
ds = BatchData(ds, 256)
```

Since we are reading the database sequentially, having multiple forked instances of the
base LMDB reader will result in biased data distribution. Therefore we use `PrefetchData` to
base LMDB reader will result in biased data distribution. Therefore we use `MultiProcessRunner` to
launch the base DataFlow in only **one process**, and only parallelize the transformations
with another `PrefetchDataZMQ`
(Nesting two `PrefetchDataZMQ`, however, will result in a different behavior.
with another `MultiProcessRunnerZMQ`
(Nesting two `MultiProcessRunnerZMQ`, however, will result in a different behavior.
These differences are explained in the API documentation in more details.).
Similar to what we did earlier, you can use `MultiThreadMapData` to parallelize as well.

Let me summarize what this DataFlow does:

1. One process reads LMDB file, shuffle them in a buffer and put them into a `multiprocessing.Queue` (used by `PrefetchData`).
1. One process reads LMDB file, shuffle them in a buffer and put them into a `multiprocessing.Queue` (used by `MultiProcessRunner`).
2. 25 processes take items from the queue, decode and process them into [image, label] pairs, and
send them through ZMQ IPC pipe.
3. The main process takes data from the pipe, makes batches.
@@ -82,7 +82,7 @@ def get_data(path, isTrain, stat_file):
ds = MapDataComponent(ds, lambda x: (x - mean) / std)
ds = TIMITBatch(ds, BATCH)
if isTrain:
ds = PrefetchDataZMQ(ds, 1)
ds = MultiProcessRunnerZMQ(ds, 1)
return ds


@@ -32,7 +32,7 @@ def get_data():
]
data_train = AugmentImageComponent(data_train, augmentors)
data_train = BatchData(data_train, 128)
data_train = PrefetchData(data_train, 5, 5)
data_train = MultiProcessRunner(data_train, 5, 5)

augmentors = [imgaug.Resize((40, 40))]
data_test = AugmentImageComponent(data_test, augmentors)
@@ -148,7 +148,7 @@ def get_config():
]
data_train = AugmentImageComponent(data_train, augmentors)
data_train = BatchData(data_train, 128)
data_train = PrefetchDataZMQ(data_train, 5)
data_train = MultiProcessRunnerZMQ(data_train, 5)

augmentors = [imgaug.Resize((40, 40))]
data_test = AugmentImageComponent(data_test, augmentors)
@@ -225,7 +225,7 @@ def get_data():
ds = ThetaImages(ds)
ds = RepeatedData(ds, 50) # just pretend this dataset is bigger
# this pre-computation is pretty heavy
ds = PrefetchDataZMQ(ds, min(20, multiprocessing.cpu_count()))
ds = MultiProcessRunnerZMQ(ds, min(20, multiprocessing.cpu_count()))
ds = BatchData(ds, BATCH)
return ds

@@ -9,7 +9,7 @@
from termcolor import colored

from tensorpack.dataflow import (
DataFromList, MapData, MapDataComponent, MultiProcessMapDataZMQ, MultiThreadMapData,
DataFromList, MapData, MapDataComponent, MultiProcessMapData, MultiThreadMapData,
TestDataSpeed, imgaug)
from tensorpack.utils import logger
from tensorpack.utils.argtools import log_once, memoized
@@ -392,7 +392,7 @@ def get_train_dataflow():
# MPI does not like fork()
else:
buffer_size = cfg.DATA.NUM_WORKERS * 20
ds = MultiProcessMapDataZMQ(ds, cfg.DATA.NUM_WORKERS, preprocess, buffer_size=buffer_size)
ds = MultiProcessMapData(ds, cfg.DATA.NUM_WORKERS, preprocess, buffer_size=buffer_size)

This comment has been minimized.

Copy link
@dcyoung

dcyoung May 25, 2019

What was the motivation behind the change to this example? Are there performance implications for using a non ZMQ based MultiProcessMapData here on a system of such as p3.16xlarge or smaller.

This comment has been minimized.

Copy link
@ppwwyyxx

ppwwyyxx May 25, 2019

Author Collaborator

MultiProcessMapData is currently identical to MultiProcessMapDataZMQ (because I have not made a non-zmq implementation)

This comment has been minimized.

Copy link
@dcyoung

dcyoung May 25, 2019

gotcha. Thanks!

else:
ds = MapData(ds, preprocess)
return ds
@@ -177,7 +177,7 @@ def get_df(dir):
names = ['trainA', 'trainB'] if isTrain else ['testA', 'testB']
df = get_image_pairs(*[os.path.join(datadir, n) for n in names])
df = BatchData(df, BATCH if isTrain else TEST_BATCH)
df = PrefetchDataZMQ(df, 2 if isTrain else 1)
df = MultiProcessRunnerZMQ(df, 2 if isTrain else 1)
return df


@@ -115,7 +115,7 @@ def get_data():
ds = ImageFromFile(imgs, channel=3, shuffle=True)
ds = AugmentImageComponent(ds, get_augmentors())
ds = BatchData(ds, args.batch)
ds = PrefetchDataZMQ(ds, 5)
ds = MultiProcessRunnerZMQ(ds, 5)
return ds


@@ -186,7 +186,7 @@ def get_filelist(idxlist):
imgaug.Resize(64)]
df = AugmentImageComponents(df, augs, (0, 1))
df = BatchData(df, BATCH)
df = PrefetchDataZMQ(df, 3)
df = MultiProcessRunnerZMQ(df, 3)
return df


@@ -173,7 +173,7 @@ def get_data():
augs = [imgaug.Resize(286), imgaug.RandomCrop(256)]
ds = AugmentImageComponents(ds, augs, (0, 1))
ds = BatchData(ds, BATCH)
ds = PrefetchData(ds, 100, 1)
ds = MultiProcessRunner(ds, 100, 1)
return ds


@@ -233,7 +233,7 @@ def f(m): # thresholding
]
ds = AugmentImageComponent(ds, augmentors, copy=False)
ds = BatchDataByShape(ds, 8, idx=0)
ds = PrefetchDataZMQ(ds, 1)
ds = MultiProcessRunnerZMQ(ds, 1)
else:
ds = BatchData(ds, 1)
return ds
@@ -11,7 +11,9 @@
import tqdm

from tensorpack import ModelDesc
from tensorpack.dataflow import AugmentImageComponent, BatchData, MultiThreadMapData, PrefetchDataZMQ, dataset, imgaug
from tensorpack.dataflow import (
AugmentImageComponent, BatchData, MultiThreadMapData,
MultiProcessRunnerZMQ, dataset, imgaug)
from tensorpack.input_source import QueueInput, StagingInput
from tensorpack.models import regularize_cost, l2_regularizer
from tensorpack.predict import FeedfreePredictor, PredictConfig
@@ -88,7 +90,7 @@ def get_imagenet_dataflow(
ds = AugmentImageComponent(ds, augmentors, copy=False)
if parallel < 16:
logger.warn("DataFlow may become the bottleneck when too few processes are used.")
ds = PrefetchDataZMQ(ds, parallel)
ds = MultiProcessRunnerZMQ(ds, parallel)
ds = BatchData(ds, batch_size, remainder=False)
else:
ds = dataset.ILSVRC12Files(datadir, name, shuffle=False)
@@ -101,7 +103,7 @@ def mapf(dp):
return im, cls
ds = MultiThreadMapData(ds, parallel, mapf, buffer_size=2000, strict=True)
ds = BatchData(ds, batch_size, remainder=True)
ds = PrefetchDataZMQ(ds, 1)
ds = MultiProcessRunnerZMQ(ds, 1)
return ds


@@ -133,7 +133,7 @@ def get_data(train_or_test):
ds = AugmentImageComponent(ds, augmentors)
ds = BatchData(ds, BATCH_SIZE, remainder=not isTrain)
if isTrain:
ds = PrefetchData(ds, 3, 2)
ds = MultiProcessRunner(ds, 3, 2)
return ds


@@ -68,7 +68,7 @@ def get_data(train_or_test):

ds = AugmentImageComponent(ds, augmentors, copy=False)
if isTrain:
ds = PrefetchDataZMQ(ds, min(25, multiprocessing.cpu_count()))
ds = MultiProcessRunnerZMQ(ds, min(25, multiprocessing.cpu_count()))
ds = BatchData(ds, BATCH_SIZE, remainder=not isTrain)
return ds

@@ -254,7 +254,7 @@ def get_data(file_name):
imgaug.Flip(horiz=True)]
ds = AugmentImageComponent(ds, augmentors, index=0, copy=True)
ds = MapData(ds, lambda x: [cv2.resize(x[0], (32, 32), interpolation=cv2.INTER_CUBIC), x[0]])
ds = PrefetchDataZMQ(ds, 3)
ds = MultiProcessRunnerZMQ(ds, 3)
ds = BatchData(ds, BATCH_SIZE)
return ds

@@ -103,7 +103,7 @@ def get_data(train_or_test, cifar_classnum):
ds = AugmentImageComponent(ds, augmentors)
ds = BatchData(ds, 128, remainder=not isTrain)
if isTrain:
ds = PrefetchDataZMQ(ds, 5)
ds = MultiProcessRunnerZMQ(ds, 5)
return ds


@@ -78,7 +78,7 @@ def get_data():
]
data_train = AugmentImageComponent(data_train, augmentors)
data_train = BatchData(data_train, 128)
data_train = PrefetchData(data_train, 5, 5)
data_train = MultiProcessRunner(data_train, 5, 5)

augmentors = [imgaug.Resize((40, 40))]
data_test = AugmentImageComponent(data_test, augmentors)

0 comments on commit dd2d9ff

Please sign in to comment.
You can’t perform that action at this time.