# <span style="color:red"><center>Cuaderno de DASH<center></span>

<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# Mejores prácticas  con `dask.delayed`

## <span style="color:blue">Llamada retrasada a la función, no al resultado</span>

 
 ```python
    # This executes immediately
    dask.delayed(f(x, y))
    ```
 Correcto
 
 ```python
    # This executes delayed
    dask.delayed(f)(x, y)
    ```


## <span style="color:blue">Programe muchos cálculos a la vez</span>

 
 ```python
    # Avoid calling compute repeatedly
    results = []
    for x in L:
      y = dask.delayed(f)(x)
      results.append(y.compute())
    results
    ```
 Correcto
 
 ```# Collect many calls for one compute
    results = []
    for x in L:
        y = dask.delayed(f)(x)
        results.append(y)
    results = dask.compute(*results)
    ```


## <span style="color:blue">No modifique las entradas</span>

 
 ```python
    # Mutate inputs in functions
    @dask.delayed
    def f(x):
        x += 1
        return x
    ```
 Correcto
 
 ```# Return new values or copies
    @dask.delayed
    def f(x):
        x = x + 1
        return x
   ```
Correcto

 ```# 
    @dask.delayed
    def f(x):
        x = copy(x)
        x += 1
        return x
  ```

## <span style="color:blue">Programe muchos cálculos a la vez</span>

In [5]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add(x, y):
    sleep(1)
    return x + y

In [11]:
%%time
# This takes three seconds to run because we call each
# function sequentially, one after the other

x = inc(1)
y = inc(2)
z = add(x, y)

CPU times: user 174 ms, sys: 22.7 ms, total: 197 ms
Wall time: 3 s


Ahora vamos a usar delayer para paralelizar los procesos.

In [7]:
from dask import delayed

In [13]:
%%time
# This runs immediately, all it does is build a graph

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

CPU times: user 1.21 ms, sys: 556 µs, total: 1.76 ms
Wall time: 1.58 ms


Sin embargo falta cargar la función computar para que haga realmente el proceso

In [15]:
%%time
# This actually runs our computation using a local thread pool

z.compute()

CPU times: user 5 µs, sys: 0 ns, total: 5 µs
Wall time: 9.78 µs


## <span style="color:blue">Paralelizar un loop usando for</span>

In [17]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [18]:
%%time
# Sequential code

results = []
for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)

CPU times: user 452 ms, sys: 56.2 ms, total: 508 ms
Wall time: 8.01 s


In [None]:
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)
print("Before computing:", total)  # Let's see what type of thing total is
result = total.compute()
print("After computing :", result)  # After it's computed

## <span style="color:blue">Evite efectos colaterales</span>

In [None]:
# Ensure delayed tasks are computed

x = dask.delayed(f)(1, 2, 3)
#...
dask.compute(x, ...)

## <span style="color:blue">Divida los cálculos en muchas partes</span>

In [None]:
# Erróneo

def load(filename):
    ...


def process(filename):
    ...


def save(filename):
    ...

@dask.delayed
def f(filenames):
    results = []
    for filename in filenames:
        data = load(filename)
        data = process(data)
        result = save(data)

    return results

dask.compute(f(filenames))

In [None]:
# Correcto
# Break up into many tasks

@dask.delayed
def load(filename):
    ...

@dask.delayed
def process(filename):
    ...

@dask.delayed
def save(filename):
    ...


def f(filenames):
    results = []
    for filename in filenames:
        data = load(filename)
        data = process(data)
        result = save(data)

    return results

dask.compute(f(filenames))

## <span style="color:blue">Evite demasiadas tareas retrasadas</span>

Cada tarea retrasada tiene una sobrecarga de unos cientos de microsegundos. Por lo general, esto está bien, pero puede convertirse en un problema si aplica dask.delayed demasiado finamente. En este caso, a menudo es mejor dividir sus muchas tareas en lotes o usar una de las colecciones de Dask para ayudarlo.

In [None]:
# Too many tasks

results = []
for x in range(10000000):
    y = dask.delayed(f)(x)
    results.append(y)

    
# Use collections

import dask.bag as db
b = db.from_sequence(range(10000000), npartitions=1000)
b = b.map(f)
...

In [None]:
# Alternativamente, sin usar bag collection

def batch(seq):
    sub_results = []
    for x in seq:
        sub_results.append(f(x))
    return sub_results

 batches = []
 for i in range(0, 10000000, 10000):
     result_batch = dask.delayed(batch)(range(i, i + 10000))
     batches.append(result_batch)

## <span style="color:blue">Evite llamar delayed dentro de funciones retrasadas</span>

A menudo, si es nuevo en el uso de Dask retrasado, realiza llamadas `dask.delayed` en todas partes y espera lo mejor. Si bien esto puede funcionar, generalmente es lento y da como resultado soluciones difíciles de entender.

Por lo general, nunca llame a `dask.delayed` dentro de las funciones `dask.delayed`.

In [None]:
# Evitar: Delayed function calls delayed

@dask.delayed
def process_all(L):
    result = []
    for x in L:
        y = dask.delayed(f)(x)
        result.append(y)
    return result



# Normal function calls delayed

def process_all(L):
    result = []
    for x in L:
        y = dask.delayed(f)(x)
        result.append(y)
    return result

## <span style="color:blue">No llame a dask.delayed en otras colecciones de Dask</span>

Cuando coloca una matriz Dask o Dask DataFrame en una llamada retrasada, esa función recibirá el equivalente de NumPy o Pandas. 

Tenga en cuenta que si su matriz es grande, esto podría bloquear a sus trabajadores.

En cambio, es más común usar métodos como `da.map_blocks`.

In [None]:
# Incorrecto: Call delayed functions on Dask collections

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')

dask.delayed(train)(df)

#Correcto:  Use mapping methods if applicable

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')

df.map_partitions(train)

# O alternativamente,if the procedure doesn’t fit into a mapping, 
# you can always turn your arrays or dataframes into many delayed objects, for example

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')
                 
partitions = df.to_delayed()
delayed_values = [dask.delayed(train)(part)
                  for part in partitions]

## <span style="color:blue">Evite poner repetidamente grandes entradas en llamadas retrasadas</span>

Cada vez que pase un resultado concreto (cualquier cosa que no se retrase), Dask lo codificará de forma predeterminada para darle un nombre. Esto es bastante rápido (alrededor de 500 MB / s) pero puede ser lento si lo hace una y otra vez. En cambio, también es mejor retrasar sus datos.

Esto es especialmente importante cuando se usa un clúster distribuido para evitar enviar sus datos por separado para cada llamada de función.

In [None]:
# No haga esto

x = np.array(...)  # some large array

results = [dask.delayed(train)(x, i)
           for i in range(1000)]

# En su lugar haga esto

x = np.array(...)    # some large array
x = dask.delayed(x)  # delay the data once

results = [dask.delayed(train)(x, i)
           for i in range(1000)]