Skip to content

Commit

Permalink
Notebeook version of batch processing with dask.
Browse files Browse the repository at this point in the history
  • Loading branch information
hmaarrfk committed Sep 16, 2018
1 parent efd6ae8 commit 5881fbf
Show file tree
Hide file tree
Showing 2 changed files with 374 additions and 222 deletions.
374 changes: 374 additions & 0 deletions doc/examples/batch_processing/batch_processing_with_dask.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,374 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Batch processing\n",
"\n",
"By Mark Harfouche, Sept 2018\n",
"\n",
"This example shows how to use [Dask](http://dask.pydata.org/) \n",
"to parallelize a batch processing job. Parallelized code doesn't\n",
"have to look too different than serial code in python.\n",
"\n",
"Our job is organized as follows\n",
" 1. Load our images as (dask) arrays.\n",
" 2. Apply scikit-image filters to our images.\n",
" 3. Compute and return some important metric about the image.\n",
" 4. Save resulting images to disk.\n",
"\n",
"The main difference in using dask is that the computation is done in a\n",
"so called *lazy* fashion. Lazy means that no results are computed until \n",
"you a specific `compute` action is invoked. \n",
"The advantage of parallelization comes when you have access to a machine with\n",
"many cores. Most modern computers have at least two cores.\n",
"\n",
"In the example below, the same computation will be done twice, once serially,\n",
"and the other using dask. You should experiment with the size and number of images\n",
"to compare the performance improvements for different cases."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import tempfile\n",
"import skimage.filters as filters\n",
"import numpy as np\n",
"import os\n",
"import imageio\n",
"import warnings\n",
"from skimage import img_as_ubyte\n",
"\n",
"# These numbers are chosen to be particularly small so that the generation \n",
"# of documents doesn't take too long\n",
"# The number of images we wish to analyze\n",
"N_images = 5\n",
"# The shape of each generated image\n",
"shape = (32, 32)\n",
"# The directory where we wish to save the final results.\n",
"# The code will ask your operating system for a temporary directory\n",
"save_directory = tempfile.mkdtemp()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Serial processing\n",
"\n",
"### Loading function\n",
"Unfortunately, parallel computing gives the best results when using large datasets. \n",
"Currently, we don't provide any dataset that is large enough to show the true advantages of dask.\n",
"To overcome this limitation, we will use a random number generator to create the data. We encourage you to adapt this function to load your dataset from disk."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\n",
"def load_my_data(index, shape=(2048, 2048), max_index=255):\n",
" \"\"\"A simple loading function.\n",
"\n",
" Return an image that is made of guassian noise centered about\n",
" `index/max_index` with standard deviation equal to 20/max_index\n",
" \"\"\"\n",
" # Make sure to use independend random number generators otherwise\n",
" # parallel code might have conflicts\n",
" r = np.random.RandomState(index)\n",
" image = r.normal(loc=((index+1) % max_index)/max_index, scale=20/max_index, size=shape)\n",
" return np.clip(image, 0, 1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"images = []\n",
"for i in range(0, N_images):\n",
" image = load_my_data(i, shape=shape)\n",
" images.append(image)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"input_variances = []\n",
"output_variances = []\n",
"output_images = []\n",
"\n",
"for image in images:\n",
" input_variance = np.var(image)\n",
" # Now we compute the output image as a processed version off the input\n",
" # image\n",
" output_image = filters.gaussian(image, 10)\n",
" output_variance = np.var(output_image)\n",
" output_images.append(output_image)\n",
"\n",
" # Store all the results\n",
" input_variances.append(input_variance)\n",
" output_variances.append(output_variance)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"# Save\n",
"for i, image in enumerate(output_images):\n",
" with warnings.catch_warnings():\n",
" warnings.filterwarnings('ignore', 'Possible precision loss')\n",
" image_ubyte = img_as_ubyte(image)\n",
" filename = os.path.join(save_directory,\n",
" 'image_{i:2d}.bmp'.format(i=i))\n",
" imageio.imwrite(filename, image_ubyte)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### A note on memory usage\n",
"\n",
"The author of this tutorial finds this kind of organization very useful during prototyping stages for a few reasons:\n",
"1. All images can be easily accessed from the variables `images`\n",
"2. Inspection of their metadata (`dtype`, `shape`) is readily acheived.\n",
"3. There is no need to rewrite the code between the *prototyping* stage and the *useful* execution stage where you might increase `N` from `10` to `1000`s. This leads to fewer bugs.\n",
"\n",
"Unfortunately, loading images can become a daunting task since realistic images, stored as PNGs or JPGs can often acheive compression ratios of 10:1. 1GB of images on your disk, might become 10GB or more when loaded as full numpy arrays in python. As such, it might be useful to refractor your code in a single loop that only keeps one image loaded at the same time."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Parallel computation with dask\n",
"\n",
"For this tutorial, we will make use of the `delayed` module in Dask. By default, the delayed module will start multiple python processes, each computing part of the desired computation.\n",
"\n",
"Instead of calling our functions `load_my_data`, `var`, `gaussian`, and `imwrite` directly, we will be calling delayed versions of them that will eventually be executed when we issue a `compute` instruction.\n",
"\n",
"For example, instead of calling the function `load_my_data(5)`, we will call the function `delayed(load_my_data)(5)`\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"from dask import delayed\n",
"images = []\n",
"for i in range(0, N_images):\n",
" image = delayed(load_my_data)(i, shape=shape)\n",
" images.append(image)\n",
" \n",
"print(images[0])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This loop returned almost immediately! This isn't because the data is now in ram. Rather dask provided us a `Delayed` object that promises to execute `load_my_data` in the future. We proceed to wrapping our calls to `np.var` and to `scikit-image` in delayed calls."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"input_variances = []\n",
"output_variances = []\n",
"output_images = []\n",
"\n",
"for image in images:\n",
" input_variance = delayed(np.var)(image)\n",
" # Now we compute the output image as a processed version off the input\n",
" # image\n",
" output_image = delayed(filters.gaussian)(image, 10)\n",
" output_variance = delayed(np.var)(output_image)\n",
" output_images.append(output_image)\n",
"\n",
" # Store all the results into the arrays we care about\n",
" input_variances.append(input_variance)\n",
" output_variances.append(output_variance)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"saved_list = []\n",
"for i, image in enumerate(output_images):\n",
" image_ubyte = delayed(img_as_ubyte)(image)\n",
" filename = os.path.join(save_directory,\n",
" 'image_{i:2d}.bmp'.format(i=i))\n",
" saved_list.append(delayed(imageio.imwrite)(filename, image_ubyte))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can now visualize what the computation looks like. Visualizing the whole batch might be a little daunting so we will instead visualize the first 3 elements of our computation. You will need to install `python-graphviz` for these next few lines to execute."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"dask.visualize(delayed([input_variances[:3], output_variances[:3], saved_list[:3]]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Visualizing the computation graph is a good way to double check your code. In fact, it helped me find a bug in mine! Here, the important aspect of the graph is that the data paths for the analysis is completely independent from one image to the next. This is what will help us get the most from parallelization.\n",
"\n",
"# Computing the result\n",
"\n",
"Finally, we will issue a call to `dask.compute` for all the outputs of interest.\n",
"For our specific comptuation, we don't actually care about getting the `output_images`. Rather, we simply want to ensure that they are computed. For that, we will ask for the results of `intput_variances`, `output_variances`, and `saved_list`. `saved_list` will actually be a list of `None` elements, but it will ensure that that branch of the computation graph is executed."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"input_variances, output_variances, saved_list = dask.compute(\n",
" input_variances, output_variances, saved_list)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Discussion of results\n",
"\n",
"The table below summarizes the results of running the program above changing the values of `N` and `shape` for a few typical examples. All examples were run on a computer with the following specifications:\n",
"\n",
"* Processor: i7-7700HQ, 4 cores, hyperthreading, \n",
"* RAM: 16GB of RAM\n",
"* Storage: Samsung 960 PRO SSD.\n",
"\n",
"\n",
"| N | shape | Wall time serial | Wall time with Dask | Speedup | \n",
"|------|-------------|------------------|---------------------|---------|\n",
"| 10 | 2048 x 2048 | 6.4 s | 2.56 s | 2.5 x |\n",
"| 50 | 2048 x 2048 | 32.5 | 11.1 s | 3 x |\n",
"| 50 | 512 x 512 | 1.85 s | 1.71 s | 1.1 x |\n",
"| 500 | 512 x 512 | 18 s | 13.9 s | 1.3 x |\n",
"| 50000 | 32 x 32 | 43 s | 4 + 7 + 5 + 88 s | 0.4 x | \n",
"| 5000 | 32 x 32 | 4.35 s | .4 + .6 + .5 + 8.6 s | 0.42 x | \n",
"\n",
"If we were computationally bound, the best case theoretical speedup will be around 4x as the particular processor used has 4 cores, each with their own arithmetic logical unit able to perform independent computation.\n",
"\n",
"Was it surprising to you that not all computation was able to benefit from parallelization with Dask? It was to me! When trying to accelerate your program, it isn't always obvious how speedups affect different workloads. Here are a few things you can do to help make this more systematic.\n",
"\n",
"### Benchmark your code\n",
"\n",
"The results above do not show uniform improvement when using Dask.\n",
"For very small images, this kind of parallelization actually hurts performance!\n",
"Make sure you first get a good feeling for the amount of time it takes for\n",
"your code to run before starting to optimize for speed. Rigorous benchmarks might \n",
"seem ideal, but they often aren't practical when rapidly developing something.\n",
"The `%time` magic commands in ipython can really help.\n",
"\n",
"Note that you can't always assume that the rate limiting step will be the same for different types of images and different image sizes. As such, we recommend benchmarking your whole analysis pipeline.\n",
"\n",
"### i/o speed\n",
"\n",
"Are you using a slow storage medium such as a hard disk instead\n",
"of an SSD (solid state drive)? Upgrading to an SSD might be the easiest \n",
"and cheapest way to speed up your whole workflow. While a hard-drive might be fast at copying large files from one directory to an other, it can trouble accessing multiple files at once, and saving many small files quickly. Solid state drives overcome these problems and have become relatively inexpensive in recent years. \n",
"\n",
"### Numpy and scipy already try to parrallelize\n",
"\n",
"Is numpy's parallelization enough? In the example above, numpy and scipy\n",
"in fact do parallize the computation of the variance. We encourage you to\n",
"look at your CPU usage and observe how multiple cores are working together\n",
"during the computation loop of the serial code.\n",
"\n",
"### Image loading\n",
"\n",
"Do you need to load all your images at once? This is often the most helpful simplification you can do, especially when loading compressed images that take up much more space in RAM than on disk. If your RAM fills up before the computation, it is almost guaranteed that your code will run many times slower than it should simply because your computer is moving memory back to your disk without warning you. Before starting to paralleize your code, we encourage you try this strategy to see if it helps your analysis.\n",
"\n",
"The author of this example finds that having all the images available\n",
"at the python prompt can be is tremendeously helpful when prototyping.\n",
"This can be especially helpful when loading data from\n",
"different sources each having their custom load and preprocessing functions.\n",
"If you decide to combine all 3 loops into one (load/analyze/save),\n",
"you are loosing the ability to index arbitrary images in your set after\n",
"the computation is completed.\n",
"In the example above, any image is accessible with through\n",
"the list `images[index]`. To avoid having Dask recompute the images at the\n",
"prompt, the author suggests caching the input image (or output) of interest with\n",
"\n",
"```python\n",
"my_image_of_interest = images[index].compute()\n",
"my_output_image_of_interest = output_images[index].compute()\n",
"```\n",
"\n",
"Note that Dask is re-computing each image, but this is likely acceptable as in our specific case as computing individual images might be rather quick.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

0 comments on commit 5881fbf

Please sign in to comment.