#### Link to blog: https://medium.com/@robertbracco1/how-to-do-fastai-even-faster-bebf9693c99a

In [2]:
%reload_ext autoreload
%autoreload 2
%matplotlib inline

In [3]:
import os
import shutil
from itertools import islice
from pathlib import Path
from IPython.display import Audio
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.fftpack import fft
from scipy.signal import get_window
import pylab

In [4]:
import threading
from queue import Queue
from multiprocessing import Pool

In [5]:
from fastai.vision import *
from fastai.metrics import error_rate

#### Our data is contained in a folder called parallel, which contains two subfolders, audio which holds 1000 .wav files and spectrogram where we output our melspectrograms

We are going to do some benchmarking so lets grab a set of 1000 files, and a subset of 100 files for speed testing

In [6]:
path = Config.data_path()/'parallel'
path_audio = path/"audio"
path_spectrogram = path/"spectrogram"
fnames_1000 = os.listdir(path_audio)[0:1000]
fnames_100 = fnames_1000[0:100]

#### This code takes an audio file and generates a spectrogram, it saves it instead of displaying it 

In [7]:
def gen_spec(fname: str, src_path: str, dst_path: str):
    y, sr = librosa.load(src_path/fname)
    
    S = librosa.feature.melspectrogram(y, sr=sr, n_fft=1024, 
                                                    hop_length=512, 
                                                    n_mels=128, power=1.0, 
                                                    fmin=20, fmax=8000)
                     
    plt.figure(figsize=(2.24, 2.24))
    pylab.axis('off')
    pylab.axes([0., 0., 1., 1.], frameon=False, xticks=[], yticks=[])
                     
    librosa.display.specshow(librosa.power_to_db(S, ref=np.max), y_axis='mel', x_axis='time')
                     
    save_path = f'{dst_path/fname}.png'
    pylab.savefig(save_path, bbox_inches=None, pad_inches=0, dpi=100)
    pylab.close()

In [9]:
%%time
for fname in fnames_100: gen_spec(fname, path_audio, path_spectrogram)

CPU times: user 2min 44s, sys: 300 ms, total: 2min 44s
Wall time: 30 s


I am doing the  <a href="https://www.kaggle.com/c/tensorflow-speech-recognition-challenge/">Tensorflow Speech Recognition Kaggle Competition</a> where the test set is 158,000 files, 158000*30/100/3600 = <strong>13.16 hours</strong> to generate a test set for only one set of spectrogram settings!

There are lots of options (none of which I understand) for multiprocessing and threading to make things faster, but fastai has a built in `parallel` function. Let's take a look

In [44]:
doc(parallel)

<img src="files/img/parallel_doc.png"></img>

####  Looks pretty straightforward, we pass in our function, and a collection of the arguments we want to pass to it, and fastai library handles the rest. 

The only problem is, our function `gen_spec` takes 3 arguments and parallel is expecting a function that takes one. There are a few ways around this and it depends on your particular function.
1. If the additional arguments are fixed/static, make a new function with default values, or use python’s `partial` to create a function that fits parallels model. I prefer using partial so that’s what I’ll demonstrate below.
2. If you have multiple arguments that are going to change with each function call, pass them as a tuple of arguments, and then unpack them.

In [12]:
gen_spec_partial = partial(gen_spec, src_path = path_audio, dst_path = path_spectrogram)

In [15]:
%%time
parallel(gen_spec_partial, fnames_100);

CPU times: user 40 ms, sys: 116 ms, total: 156 ms
Wall time: 173 ms


Wow, that's insanely fast. 196ms down from 30 seconds, or 150x faster. That doesn't seem possible so let's do a sanity check and make sure our spectrograms are there

In [20]:
len(os.listdir(path_spectrogram))

0

Uh-oh, looks like some type of silent failure is happening. Let's check the full documentation and maybe the source code and figure out what's going on

<img src="files/img/parallel_full_doc.png"></img>

`func` must accept both the value and index of each `arr` element”

The function you pass in needs to be of a special type, accepting only two arguments:

A value (this is what `arr` contains, and is the normal argument to your function)
The index* of value in `arr` (note: your function doesn’t actually need to do anything with the index, it just needs to have it in the function definition)
* Sylvain Gugger informed me on the fastai forums that the index is required to make verify_images work. Also parallel is meant to be a convenience function for internal use, so if you’re more advanced, you can implement your own version using ProcessPoolExecutor

### Altering gen_spec to accept an index 

In [22]:
def gen_spec_parallel(fname: str, index: int=0, src_path: str="", dst_path: str=""):
    y, sr = librosa.load(src_path/fname)
    
    S = librosa.feature.melspectrogram(y, sr=sr, n_fft=1024, 
                                                    hop_length=512, 
                                                    n_mels=128, power=1.0, 
                                                    fmin=20, fmax=8000)
                     
    plt.figure(figsize=(2.24, 2.24))
    pylab.axis('off')
    pylab.axes([0., 0., 1., 1.], frameon=False, xticks=[], yticks=[])
                     
    librosa.display.specshow(librosa.power_to_db(S, ref=np.max), y_axis='mel', x_axis='time')
                     
    save_path = f'{dst_path/fname}.png'
    pylab.savefig(save_path, bbox_inches=None, pad_inches=0, dpi=100)
    pylab.close()

In [23]:
# recreate our partial using our new parallel compatible function
gen_spec_partial = partial(gen_spec_parallel, src_path = path_audio, dst_path = path_spectrogram)

In [24]:
%%time
parallel(gen_spec_partial, fnames_100);

CPU times: user 208 ms, sys: 180 ms, total: 388 ms
Wall time: 10.7 s


Okay this looks like it worked but let's repeat our sanity check

In [25]:
len(os.listdir(path_spectrogram))

100

Confirmed, so 10.7s, we're already nearly 3x faster (13.16 hrs -> 4.5 hrs), and we get a progress bar for free! Now let's handle the other case where we don't have static arguments (our arguments will stay static but we'll pretend they aren't)

Here's an attempt to avoid rewriting our function yet again, using a lambda (anonymous one line function in python) as suggested to me by someone in the dev thread, but apparently lambdas don't pickle (python's library for saving python objects to disk/bytestream) well so this is probably more trouble than it's worth. 

Note line 3 works, so the lambda is fine, it's only in line 4 in conjunction w parallel that it blows up. 

In [30]:
arg_tuple_list = list(zip(fnames_100[0:100], [path_audio]*100, [path_spectrogram]*100))
func = lambda x: gen_spec_parallel(x[0], 0, x[1], x[2])
func(arg_tuple_list[0])
parallel(func, arg_tuple_list)

Traceback (most recent call last):
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7f300e427e18>: attribute lookup <lambda> on __main__ failed
Traceback (most recent call last):
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7f300e427e18>: attribute lookup <lambda> on __main__ failed
Traceback (most recent call last):
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/c

Traceback (most recent call last):
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7f300e427e18>: attribute lookup <lambda> on __main__ failed
Traceback (most recent call last):
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7f300e427e18>: attribute lookup <lambda> on __main__ failed
Traceback (most recent call last):
  File "/opt/conda/envs/fastai/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/c

KeyboardInterrupt: 

Now we need a version of our function that takes a tuple with our arguments, then we can unpack it and run it using our original function

In [None]:
def gen_spec_tuple(arg, index):
    fname, src_path, dst_path = arg
    gen_spec(fname, src_path, dst_path)

In [37]:
%time parallel(gen_spec_tuple, arg_tuple_list)

CPU times: user 164 ms, sys: 148 ms, total: 312 ms
Wall time: 9.82 s


It works! Now you know how to take functions with an arbitrary number of arguments and run them in parallel to speed up your preprocessing and spend more time training. Let's see a couple more benchmarks on 1000 files to make sure the gains scale. 

In [50]:
%time for fname in fnames_1000: gen_spec(fname, path_audio, path_spectrogram)

CPU times: user 27min 18s, sys: 3.67 s, total: 27min 22s
Wall time: 4min 54s


In [51]:
%time parallel(gen_spec_partial, fnames_1000);

CPU times: user 1.77 s, sys: 604 ms, total: 2.38 s
Wall time: 1min 39s


In [55]:
print(f"{round(294/99, 2)}x faster with parallel")

2.97x faster with parallel


### What if you need to store return values?

I had this great question from Yashu Seth on Medium. "Thanks for the tutorial. How to modify this if my function needs to return something? For example, given a list of integers, I want a list of squares of the integers. How to do this in parallel?"

I had no idea, so I checked out the source code, which looks intimidating but is pretty straightforward once you get past how foreign it looks (to me at least)

<img src="files/img/parallel_source.png"></img>

It runs something called `ProcessPoolExecutor` that will be referred to as `ex`, creating a list of "futures" by submitting our function and it's arguments to `ex`. Then it makes a progress bar to show results. There's only one line here we don't understand. Googling "ProcessPoolExecutor python" takes us to the <a href="https://docs.python.org/3/library/concurrent.futures.html">Python 3 docs for concurrent.futures</a> 

Right at the top is the `submit` function

<img src="files/img/submit.png"></img>

So it is pretty straightforward, when we call ex.submit for the jobs, it schedules our function to be run and returns a `future` object to allow us to interact with the job and it even shows in the first example something called `future.result()` which I'm guessing is a good place to look for return values, but we don't have access to it because it isn't our code, it's someone else's library (fastai)

### You can make your own fastai functions more easily than you think

In [39]:
def my_parallel(func, arr:Collection, max_workers:int=None):
    return_values = []
    max_workers = ifnone(max_workers, defaults.cpus)
    print(f"Max workers: {max_workers}")
    if max_workers<2: _ = [func(o,i) for i,o in enumerate(arr)]
    else:
        with ProcessPoolExecutor(max_workers=max_workers) as ex:
            futures = [ex.submit(func,o,i) for i,o in enumerate(arr)]
            for f in progress_bar(concurrent.futures.as_completed(futures), total=len(arr)): pass
            for f in futures: return_values.append(f.result())
    return return_values

In [42]:
def calc_squares(x, index): return x**2

In [44]:
squares = my_parallel(calc_squares, range(100))
print(squares)

Max workers: 8


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]


#### Let's ditch the index as well, it's only used internally for fastai in the verify_images widget

In [47]:
# calc_squares is now a normal function, no extra index needed
def calc_squares(x): return x**2
def my_parallel(func, arr:Collection, max_workers:int=None):
    return_values = []
    max_workers = ifnone(max_workers, defaults.cpus)
    print(f"Max workers: {max_workers}")
    if max_workers<2: _ = [func(o,i) for i,o in enumerate(arr)]
    else:
        with ProcessPoolExecutor(max_workers=max_workers) as ex:
            futures = [ex.submit(func,o) for o in arr]
            for f in progress_bar(concurrent.futures.as_completed(futures), total=len(arr)): pass
            for f in futures: return_values.append(f.result())
    return return_values

In [48]:
squares = my_parallel(calc_squares, range(100))
print(squares)

Max workers: 8


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400, 441, 484, 529, 576, 625, 676, 729, 784, 841, 900, 961, 1024, 1089, 1156, 1225, 1296, 1369, 1444, 1521, 1600, 1681, 1764, 1849, 1936, 2025, 2116, 2209, 2304, 2401, 2500, 2601, 2704, 2809, 2916, 3025, 3136, 3249, 3364, 3481, 3600, 3721, 3844, 3969, 4096, 4225, 4356, 4489, 4624, 4761, 4900, 5041, 5184, 5329, 5476, 5625, 5776, 5929, 6084, 6241, 6400, 6561, 6724, 6889, 7056, 7225, 7396, 7569, 7744, 7921, 8100, 8281, 8464, 8649, 8836, 9025, 9216, 9409, 9604, 9801]


## Future stuff, is there a way to go faster than parallel via another library

- from joblib import Parallel, delayed
- multiprocessing.Pool
- threading
- our own implementation of concurrent.futures

#### Some leads (via Jeremy Brown on <a href="https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python">Stack Overflow</a>

<img src="files/img/multiprocessing.png"></img>

<img src="files/img/multithreading.png"></img>

In [None]:
%%time
p = Pool(100)
p.map(gen_spec_partial_no_index, fnames_100)

In [None]:
fnames_1000 = os.listdir(path_audio)[0:1000]

In [None]:
%%time
for fname in fnames_1000:
    gen_spec(fname, path_audio, path_spectrogram)

In [17]:
%%time
parallel(gen_spec_partial, fnames_1000);

CPU times: user 2.32 s, sys: 700 ms, total: 3.02 s
Wall time: 1min 41s


In [16]:
%%time
p = Pool(8)
p.map(gen_spec_partial_no_index, fnames_1000)

CPU times: user 196 ms, sys: 180 ms, total: 376 ms
Wall time: 1min 44s


In [21]:
%%time
p = Pool(6)
p.imap(gen_spec_partial_no_index, fnames_1000)

CPU times: user 744 ms, sys: 464 ms, total: 1.21 s
Wall time: 2min 44s


In [None]:
%%time
p = Pool(6)
for x in p.imap(gen_spec_partial_no_index, fnames_1000): pass

In [24]:
%%time
p = Pool(6)
for idx, x in enumerate(p.imap(gen_spec_partial_no_index, fnames_100)): 
    for f in progress_bar(idx, total=len(fnames_100)): pass

TypeError: 'int' object is not iterable

In [26]:
%%time
p = Pool(6)
for f in progress_bar((p.imap(gen_spec_partial_no_index, fnames_1000)),total = len(fnames_1000)): pass


CPU times: user 2.24 s, sys: 852 ms, total: 3.09 s
Wall time: 1min 53s


#### How much does the progress bar cost us? Lets chuck a progress bar on multiprocessor.Pool and see if there's a change

In [None]:
def num_cpus()->int:
    "Get number of cpus"
    try:                   return len(os.sched_getaffinity(0))
    except AttributeError: return os.cpu_count()

In [None]:
max_workers = min(16, num_cpus())
print(max_workers)