<h1 style="color:purple;"> <strong> <center> Taller Procesamiento paralelo - Inteligencia Artificial</center></strong></h1>

<h3 style="color:black;"><center><strong>Nataly Ramírez Urrego (naramirezur@unal.edu.co)</strong></center></h3>

Tomado de: <https://github.com/AprendizajeProfundo/BigData/blob/main/Dask/Cuadernos/01_dask_Mejores_Practicas.ipynb>

Profesores: 

1. Alvaro Mauricio Montenegro Díaz, ammontenegrod@unal.edu.co
2. Daniel Mauricio Montenegro Reyes, dextronomo@gmail.com

## Introducción

Es fácil comenzar con Dask retrasado, pero usarlo bien requiere algo de experiencia. Esta lección contiene sugerencias de mejores prácticas e incluye soluciones a problemas comunes.

## Llamada retrasada a la función, no al resultado

In [None]:
# This executes immediately
    dask.delayed(f(x, y))
Correcto

# This executes delayed
    dask.delayed(f)(x, y)
Programe muchos cálc

## Programe muchos cálculos a la vez

In [None]:
# Avoid calling compute repeatedly
    results = []
    for x in L:
      y = dask.delayed(f)(x)
      results.append(y.compute())
    results

`# Collect many calls for one compute
    results = []
    for x in L:
        y = dask.delayed(f)(x)
        results.append(y)
    results = dask.compute(*results)`

## No modifique las entradas

In [None]:
# Mutate inputs in functions
    @dask.delayed
    def f(x):
        x += 1
        return x
Correcto

```# Return new values or copies @dask.delayed def f(x): x = x + 1 return x

Correcto

 ```# 
    @dask.delayed
    def f(x):
        x = copy(x)
        x += 1
        return x

## Evite el estado global 

Idealmente, sus operaciones no deberían depender del estado global. El uso del estado global puede funcionar si solo usa subprocesos, pero cuando se pasa a la computación distribuida o multiprocesamiento, es probable que encuentre errores confusos.

In [None]:
L = []

# This references global variable L

@dask.delayed
def f(x):
    L.append(x)

## Evite efectos colaterales

In [None]:
# Ensure delayed tasks are computed

x = dask.delayed(f)(1, 2, 3)
#...
dask.compute(x, ...)

## Divida los cálculos en muchas partes

In [None]:
# Erróneo

def load(filename):
    ...


def process(filename):
    ...


def save(filename):
    ...

@dask.delayed
def f(filenames):
    results = []
    for filename in filenames:
        data = load(filename)
        data = process(data)
        result = save(data)

    return results

dask.compute(f(filenames))

In [None]:
 Correcto
# Break up into many tasks

@dask.delayed
def load(filename):
    ...

@dask.delayed
def process(filename):
    ...

@dask.delayed
def save(filename):
    ...


def f(filenames):
    results = []
    for filename in filenames:
        data = load(filename)
        data = process(data)
        result = save(data)

    return results

dask.compute(f(filenames))

## Evite demasiadas tareas retrasadas

Cada tarea retrasada tiene una sobrecarga de unos cientos de microsegundos. Por lo general, esto está bien, pero puede convertirse en un problema si aplica dask.delayed demasiado finamente. En este caso, a menudo es mejor dividir sus muchas tareas en lotes o usar una de las colecciones de Dask para ayudarlo.

In [None]:
# Too many tasks

results = []
for x in range(10000000):
    y = dask.delayed(f)(x)
    results.append(y)

    
# Use collections

import dask.bag as db
b = db.from_sequence(range(10000000), npartitions=1000)
b = b.map(f)
...

In [None]:
# Alternativamente, sin usar bag collection

def batch(seq):
    sub_results = []
    for x in seq:
        sub_results.append(f(x))
    return sub_results

 batches = []
 for i in range(0, 10000000, 10000):
     result_batch = dask.delayed(batch)(range(i, i + 10000))
     batches.append(result_batch)

## Evite llamar delayed dentro de funciones retrasadas

A menudo, si es nuevo en el uso de Dask retrasado, realiza llamadas `dask.delayed` en todas partes y espera lo mejor. Si bien esto puede funcionar, generalmente es lento y da como resultado soluciones difíciles de entender.

Por lo general, nunca llame a `dask.delayed` dentro de las funciones `dask.delayed`.

In [None]:
# Evitar: Delayed function calls delayed

@dask.delayed
def process_all(L):
    result = []
    for x in L:
        y = dask.delayed(f)(x)
        result.append(y)
    return result



# Normal function calls delayed

def process_all(L):
    result = []
    for x in L:
        y = dask.delayed(f)(x)
        result.append(y)
    return result

## No llame a dask.delayed en otras colecciones de Dask

Cuando coloca una matriz Dask o Dask DataFrame en una llamada retrasada, esa función recibirá el equivalente de NumPy o Pandas.

Tenga en cuenta que si su matriz es grande, esto podría bloquear a sus trabajadores.

En cambio, es más común usar métodos como `da.map_blocks`.

In [None]:
# Incorrecto: Call delayed functions on Dask collections

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')

dask.delayed(train)(df)

#Correcto:  Use mapping methods if applicable

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')

df.map_partitions(train)

# O alternativamente,if the procedure doesn’t fit into a mapping, 
# you can always turn your arrays or dataframes into many delayed objects, for example

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')
                 
partitions = df.to_delayed()
delayed_values = [dask.delayed(train)(part)
                  for part in partitions]

## Evite poner repetidamente grandes entradas en llamadas retrasadas

Cada vez que pase un resultado concreto (cualquier cosa que no se retrase), Dask lo codificará de forma predeterminada para darle un nombre. Esto es bastante rápido (alrededor de 500 MB / s) pero puede ser lento si lo hace una y otra vez. En cambio, también es mejor retrasar sus datos.

Esto es especialmente importante cuando se usa un clúster distribuido para evitar enviar sus datos por separado para cada llamada de función.

In [None]:
# No haga esto

x = np.array(...)  # some large array

results = [dask.delayed(train)(x, i)
           for i in range(1000)]

# En su lugar haga esto

x = np.array(...)    # some large array
x = dask.delayed(x)  # delay the data once

results = [dask.delayed(train)(x, i)
           for i in range(1000)]