**Llamada retrasada a la función, no al resultado** 

In [None]:
# This executes immediately
dask.delayed(f(x, y))


# This executes delayed
dask.delayed(f)(x, y)



**Programe muchos cálculos a la vez**

In [None]:
# Avoid calling compute repeatedly
    results = []
    for x in L:
      y = dask.delayed(f)(x)
      results.append(y.compute())
    results

#Correcto
# Collect many calls for one compute
    results = []
    for x in L:
        y = dask.delayed(f)(x)
        results.append(y)
    results = dask.compute(*results)

**No modifique las entradas**

In [None]:
# Mutate inputs in functions
    @dask.delayed
    def f(x):
        x += 1
        return x
#Correcto

#``` Return new values or copies @dask.delayed def f(x): x = x + 1 return x

#Correcto
    @dask.delayed
    def f(x):
        x = copy(x)
        x += 1
        return x

**Programe muchos cálculos a la vez**

In [None]:
 # Incorrecto
# Avoid calling compute repeatedly
results = []
for x in L:
    y = dask.delayed(f)(x)
    results.append(y.compute())
    results
    
# Correcto
 
# Collect many calls for one compute
results = []
for x in L:
    y = dask.delayed(f)(x)
    results.append(y)
results = dask.compute(*results)

**Evite el estado global**

In [None]:
L = []

# This references global variable L

@dask.delayed
def f(x):
    L.append(x)

**Evite efectos colaterales**

In [None]:
# Ensure delayed tasks are computed

x = dask.delayed(f)(1, 2, 3)
#...
dask.compute(x, ...)

**Divida los cálculos en muchas partes**

In [None]:
# Erróneo

def load(filename):
    ...


def process(filename):
    ...


def save(filename):
    ...

@dask.delayed
def f(filenames):
    results = []
    for filename in filenames:
        data = load(filename)
        data = process(data)
        result = save(data)

    return results

dask.compute(f(filenames))

In [None]:
# Correcto
# Break up into many tasks

@dask.delayed
def load(filename):
    ...

@dask.delayed
def process(filename):
    ...

@dask.delayed
def save(filename):
    ...


def f(filenames):
    results = []
    for filename in filenames:
        data = load(filename)
        data = process(data)
        result = save(data)

    return results

dask.compute(f(filenames))

**Evite demasiadas tareas retrasadas**

In [None]:
# Too many tasks

results = []
for x in range(10000000):
    y = dask.delayed(f)(x)
    results.append(y)

    
# Use collections

import dask.bag as db
b = db.from_sequence(range(10000000), npartitions=1000)
b = b.map(f)
...

In [None]:
# Alternativamente, sin usar bag collection

def batch(seq):
    sub_results = []
    for x in seq:
        sub_results.append(f(x))
    return sub_results

 batches = []
 for i in range(0, 10000000, 10000):
     result_batch = dask.delayed(batch)(range(i, i + 10000))
     batches.append(result_batch)

**Evite llamar delayed dentro de funciones retrasadas**

In [None]:
# Evitar: Delayed function calls delayed

@dask.delayed
def process_all(L):
    result = []
    for x in L:
        y = dask.delayed(f)(x)
        result.append(y)
    return result



# Normal function calls delayed

def process_all(L):
    result = []
    for x in L:
        y = dask.delayed(f)(x)
        result.append(y)
    return result

**No llame a dask.delayed en otras colecciones de Dask**

In [None]:
# Incorrecto: Call delayed functions on Dask collections

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')

dask.delayed(train)(df)

#Correcto:  Use mapping methods if applicable

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')

df.map_partitions(train)

# O alternativamente,if the procedure doesn’t fit into a mapping, 
# you can always turn your arrays or dataframes into many delayed objects, for example

import dask.dataframe as dd
df = dd.read_csv('/path/to/*.csv')
                 
partitions = df.to_delayed()
delayed_values = [dask.delayed(train)(part)
                  for part in partitions]

**Evite poner repetidamente grandes entradas en llamadas retrasadas**

In [None]:
# No haga esto

x = np.array(...)  # some large array

results = [dask.delayed(train)(x, i)
           for i in range(1000)]

# En su lugar haga esto

x = np.array(...)    # some large array
x = dask.delayed(x)  # delay the data once

results = [dask.delayed(train)(x, i)
           for i in range(1000)]