# Dask delay

Not everything reduces to what is inside numpy (dask array API).

Dask delayed allows you to:
  1. Do custom computations with regular python code
  2. Scale them up to heterogeneous clusters

Let's setup the same infrastructure as before

In [None]:
# Local client
from dask import delayed
from dask.distributed import Client
import dask.bag
n_workers = 40


def scale_to_sge(n_workers):
    queue="q_1day"
    queue_resource_spec="q_1day=TRUE,io_big=TRUE"
    memory="8GB"
    sge_log= "./logs"
    from dask_jobqueue import SGECluster
    cluster = SGECluster(queue=queue, memory=memory, cores=1, processes=1,
              log_directory=sge_log,
              local_directory=sge_log,
              resource_spec=queue_resource_spec
              )
    cluster.scale_up(n_workers)
    return Client(cluster)  # start local workers as threads

# Local client
#client = Client(n_workers=n_workers)

# SGE client
client = scale_to_sge(n_workers)


# Training an GMM-UBM distributed using Dask

In the example below dask delayed is used to build a simple pipeline to train GMM-UBM with speaker data.
The pipeline is pretty simple and can be split in the parts.

## Feature extraction.

For each audio file the following steps are piped using dask delayed:

  1. File opening (using scipy)
  2. Training and detection segments that contains audio (bob.bio.spear.preprocessor.Energy_2Gauss)
  3. Extraction of MFCC features (bob.bio.spear.extractor.Cepstral)
 
## GMM training

Once we have the MFCCs for all audio file, we can run the EM algorithm to train the GMM.
In this example a GMM is initialized from some previous GMM (just for the sake of the example).
Furthermore, only the GMM means are updated during the mstep.

For GMM training, for each EM iteration, the following steps are piped using dask delays:

  1. E-step. For each block of MFCSS compute the posterior probabilities. Those posteriors are accumulated using two statistics. The first one, called zeroth order, is the simple summation of the posteriors. The second one, called first order, is the dot product between zeroth order and the input MFCCs data.
  2. Accumulation. The statistics from the previous step is accumulated using the (everything is summed)
  3. M-step. The recomputation of the means.
  
  

In this example we are building the pipeline using [dask bags](https://docs.dask.org/en/latest/bag.html).
In short, dask bag is map/reduce abstraction implemented on this.
Using it, our pipeline is cleaner, in terms of code, than the one using delayed.

In [None]:
%%time

# fetching the data
import glob
import os


##### ADD YOUR PATH HERE. 
##### I CAN'T SHARE ONLINE A PATH FROM OUR SHARED FILE SYSTEM ##################
PATH_TO_VOX_FORGE = ""
paths = glob.glob(os.path.join(PATH_TO_VOX_FORGE, "16kHz_16bit/*/wav/*.wav"))
####################


import bob.bio.spear
import scipy
import numpy
import bob.learn.em
import bob.io.base

em_iterations = 5
sample_rate = 16000
n_files = 2000

def read_audio(path):
    _, audio = scipy.io.wavfile.read(path)
    audio = numpy.cast['float'](audio)        
    return audio

def preprocess(data, preprocessor, sample_rate=16000):
    _,_,preprocessed = preprocessor((sample_rate, data))
    return preprocessed

def extract(data, vad_data, extractor, sample_rate=16000):
    extracted = extractor((sample_rate, data, vad_data))
    return extracted


def run(path):
    
    sample_rate=16000
    preprocessor = bob.bio.spear.preprocessor.Energy_2Gauss()
    extractor = bob.bio.spear.extractor.Cepstral()

    mfccs = []    
    for p in path:
        
        data = read_audio(p)
        vad_data = preprocess(data, preprocessor, sample_rate=sample_rate)
        extracted = extract(data, vad_data, extractor,sample_rate=sample_rate)
        mfccs.append(extracted)
        
    return mfccs


# GMM INITIALIZATION
from gmm_steps import e_step, m_step, acc_stats

gmm_initialization =  bob.learn.em.GMMMachine(bob.io.base.HDF5File("Projector.hdf5"))
weights = gmm_initialization.weights
variances = gmm_initialization.variances
means = gmm_initialization.means


db = dask.bag.from_sequence(paths[0:n_files], npartitions=n_workers)
db = db.map_partitions(run)
mfccs = db.to_delayed()

for i in range(em_iterations):
    gmm_stats = []
    for mfcc in mfccs:

        # This step is not necessary, i could iterate over it
        stacked = delayed(numpy.vstack)(mfcc)

        gmm_stats.append(delayed(e_step)(stacked, weights, means, variances))

    acc_gmm_stats = delayed(acc_stats)(gmm_stats)
    means = delayed(m_step)(acc_gmm_stats)
    



# Now that we have the graph. We execute it

In [None]:
%%time
print("Means at step [0]")
print(means)

print("########")

print("Means at step [n]")
print(means.compute(scheduler=client))

# Always shutdown your client

Also, try to check what is in http://localhost:8787

In [None]:
client.shutdown()