In [1]:
import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
import dask.bag as db

# facts

In [None]:
index = pd.date_range("2021-09-01", periods=12400, freq="1H")
df = pd.DataFrame({"a": np.arange(12400), "b": list("abcaddbe" * 1550)}, index=index)
ddf = dd.from_pandas(df, npartitions=5)

# ex01 example of map_partitions

In [None]:
def my_custom_converter(df, multiplier=1): 
    return df['a'] * multiplier
meta = pd.Series(name="multi", dtype="float64")
distance_km = ddf.map_partitions(
    my_custom_converter, multiplier=2, meta=meta
)

In [None]:
test = distance_km.compute()

In [None]:
ddf.divisions

In [None]:
distance_km.visualize()

# Ex02 dask delay 

In [None]:
import dask

@dask.delayed
def inc(x):
   return x + 1

@dask.delayed
def add(x, y):
   return x + y
a = inc(1)       # no work has happened yet
b = inc(2)       # no work has happened yet
c = add(a, b)    # no work has happened yet
c = c.compute()  # This triggers all of the above computations
print(c)

# Ex03 Scheduling

In [None]:
#client.shutdown()
#client.dashboard_link

In [None]:
from dask.distributed import Client
client = Client(n_workers=10)
client

In [None]:
client.dashboard_link

In [None]:
ddf = ddf.persist()

In [None]:
res = (ddf.a.cumsum() - 100).compute()

In [None]:
res

In [None]:
client.close()

# Ex04 Paralleize a for loop

In [None]:
from time import sleep
def inc(x):
    sleep(0.01)
    return x + 1
def add(x):
    sleep(0.1)
    return x + 100

In [None]:
%%time
my_list = []
for i in range(1, 100):
    my_list.append(i)
results = []
for x in my_list:
    y = inc(x)
    z = add(y)
    results.append(z)
total = sum(results)

In [None]:
%%time
from dask import delayed
results = []
for x in my_list:
    y = delayed(inc)(x)
    z = delayed(add)(y)
    results.append(z)
total = delayed(sum)(results)
#total.compute()

In [None]:
total.visualize()

In [None]:
%%time
total.compute()

# Ex06  A Typical Workflow for dask delay
# to delay or not to delay ????

In [None]:
import glob
data_path = r'Z:\vclgp\xiongl\ProjectIS2CalVal\result\*\*\rh*.parquet'
file_list = glob.glob(data_path) # , recursive=True
file_list[1]

In [None]:
len(file_list)

In [None]:
import dask.dataframe as dd
ddf_rh = dd.read_parquet(file_list[:100])
ddf_rh.npartitions

In [None]:
import dask
import pandas as pd

@dask.delayed
def process_file(filename):
    data = pd.read_parquet(filename)
    return data
results = []
for filename in file_list:
    results.append(process_file(filename))
res = dask.compute(results)

In [None]:
res_df = pd.concat(res[0], ignore_index=True)
#res_df  = pd.DataFrame(res[])

# EX07 dask distributed

In [None]:
#Cores and Logical Processors (Threads)
# my laptop:
#Processor	12th Gen Intel(R) Core(TM) i9-12900H, 2500 Mhz, 14 Core(s), 20 Logical Processor(s)
from dask.distributed import LocalCluster, Client
cluster = LocalCluster()
cluster

In [None]:
client = Client(cluster)
client

In [None]:
client.shutdown()

# EX08 futures: real-time execution for custom situations


In [None]:
def process_file(filename):
    data = pd.read_parquet(filename)
    return data
futures = []
for filename in file_list[:500]:
    future = client.submit(process_file, filename)
    futures.append(future)
from dask.distributed import wait, progress
#future = client.submit(process_file, file_list[1])
progress(futures)

In [None]:
future.result()

You can block on the computation and gather the result with the .result() method.

In [None]:
test = client.gather(futures)

In [None]:
type(test)

# EX09 difference between delayed and future
future: call returns immediately;

whose status begins as “pending” and later becomes “finished”

as soon as the inputs are available and there is compute available, the computation starts



<!-- # wait(future)
# client.gather(futures) # we normally don’t want to gather() results that are too big in memory.
# future_z = client.submit(sum, [future_x, future_y])
# number of retries in the client.compute, client.submit or client.map method.
# can be passed to new tasks without having to pull data locally from the cluster -->

In [None]:
# number of retries in the client.compute, client.submit or client.map method.
# can be passed to new tasks without having to pull data locally from the cluster

## Repartition to Reduce Overhead

# using the .set_index(column_name) method is expensive !!!!!