In [58]:
from multiprocessing import cpu_count, Pool
import time
import dask
import dask.array as da
from dask import array
from dask import bag
import csv
import dask.dataframe as dd
import pandas as pd
import numpy as np
from dask.distributed import Client, wait, fire_and_forget

In [4]:
# Identify CPU cores
num_cores = cpu_count()
print(f"Number of cores: {num_cores}")

Number of cores: 8


Concurrent Processing (enables dynamic sharing and allocation of memory and compute runtime)

Introduction to DASK

In [11]:
# array with 2 chunks
arr = da.from_array(
    list(range(1,11)),
    chunks=2)

# function to square the array
squared = arr ** 2

# execute the function
results = squared.compute()
print(results)

[  1   4   9  16  25  36  49  64  81 100]


Features of DASK
1. Tasks and Graphs
2. Lazy Evaluation
3. Partitioning and Chunking
4. Serialization and Pickling

DASK Architecture
1. Core Library
2. Schedulers
3. Client
4. Workers
5. Task Graphs


Data Structures and Concepts

In [18]:
#  create random array
var_1 = array.random.random(
    (10, 10),
    chunks=(2,2)
)

print(var_1)

# Execute random array
print(var_1.compute())

# persists the data in memory
var_2 = var_1.persist()

# re-chunk the array
var_1 = var_1.rechunk((5,5))

print(var_1)

# compute the mean
dask_mean = var_1.mean().compute()

print(dask_mean)

dask.array<random_sample, shape=(10, 10), dtype=float64, chunksize=(2, 2), chunktype=numpy.ndarray>
[[5.32417683e-01 7.31212462e-01 2.26441773e-01 2.65086129e-01
  3.74713874e-01 3.91903311e-01 4.25552155e-01 3.52020938e-01
  3.93620866e-01 6.06909171e-01]
 [6.89908844e-01 2.70881272e-01 8.06420086e-01 9.95337950e-01
  6.39409310e-01 1.47573888e-01 3.88624437e-01 8.27375637e-01
  6.17442184e-01 3.85097721e-01]
 [1.60060423e-01 2.12389457e-01 7.76628912e-01 8.81995273e-01
  5.23914169e-01 1.97197071e-01 9.90626460e-01 7.28679436e-01
  6.32861279e-01 5.35478384e-01]
 [8.32615393e-01 6.18044215e-01 3.61348301e-01 4.16297613e-01
  1.09554175e-01 4.71715798e-01 4.27315546e-01 5.05979390e-01
  2.47315457e-01 7.40052324e-01]
 [7.56796682e-01 4.33377446e-02 2.22865618e-01 4.93686744e-02
  8.61571105e-02 6.77816515e-01 8.73012432e-01 5.21226588e-01
  9.32114335e-01 8.64248259e-01]
 [4.06827226e-01 2.10568202e-01 5.55521725e-01 5.03312541e-01
  6.60284059e-01 2.57151987e-01 6.47367866e-01 4.0611

DASK Bags (collection of items that can be repeated)

In [33]:
# create a bag
new_bag = bag.from_sequence(
    list(range(1, 11)),
    npartitions=2
)

print(new_bag)
print(new_bag.compute())

# reading a csv into a bag
data_path = "/Users/pepijnschouten/Desktop/" \
    "Python_Scripts/Python_Scripts_Books/" \
        "Data_Engineering/Data_Engineering_" \
            "for_ML_Pipelines/Own_Files/7_" \
                "Currency_Programming_Dask/" \
                    "data/MockarooHealth.csv"
bags = (bag
        .read_text(data_path)
        .map(
            lambda x: next(csv.reader([x]))
        )
)
print(bags.take(5))
print(bags.count().compute())

# read csv file with read_csv
df = dd.read_csv(
    data_path,
)

print(df)
print(df.compute())



dask.bag<from_sequence, npartitions=2>
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
(['drug_company', 'drug_name', 'drug_code', 'drug_price', 'drug_in_stock', 'drug_release_date'], ['AMI Cosmetic Co.,Ltd.', 'Glycerin', '356082843-0', '21.77', 'true', '1/11/2022'], ['Apotheca Company', 'Berberis vulgaris, Hydrangea arborescens, Petroselinum sativum, Solidago virgaurea, Uva-ursi, Rubia tinctorum, Glutathiona, Kidney (suis), Bryonia, Eupatorium purpureum, Mercurius corrosivus,', '641668608-2', '333.3', 'false', '4/27/2022'], ['Blenheim Pharmacal, Inc.', 'Topiramate', '225672839-0', '630.5', 'false', '4/16/2022'], ['Aurobindo Pharma Limited', 'Carvedilol', '295230280-4', '129.39', 'false', '5/3/2022'])
1001
Dask DataFrame Structure:
              drug_company drug_name drug_code drug_price drug_in_stock drug_release_date
npartitions=1                                                                            
                    string    string    string    float64          bool            string
    

DASK Dataframes

In [30]:
# optimize query planning
dask.config.set(
    {"dataframe.query-planning": True})

#  create a dask dataframe from pandas
df = dd.from_pandas(
    pd.DataFrame(
        np.random.randn(10,3),
        columns=["A", "B", "C"]
    ),
    npartitions=3
)

print(df)
print(df.compute())

Dask DataFrame Structure:
                     A        B        C
npartitions=3                           
0              float64  float64  float64
4                  ...      ...      ...
7                  ...      ...      ...
9                  ...      ...      ...
Dask Name: frompandas, 1 expression
Expr=df
          A         B         C
0 -0.699248  0.472184  0.022455
1  0.543422 -0.235156 -0.907532
2 -0.885800 -0.418766 -1.173000
3  0.417280  0.365805 -0.593429
4  0.164768  1.455225  2.127391
5 -1.198969  2.365862 -0.328624
6  0.487740 -0.446207 -0.235864
7  0.793408 -1.115113  0.501886
8 -0.217239 -0.323574 -1.055606
9  2.545306  0.667405  1.288889


DASK Delayed

In [54]:
def add(x):
    return x + 1

def square(x):
    return x * x

def square_add(a, b):
    return a + b

data = list(range(1, 101))

# sequential execution
output = []
for x in data:
    a = add(x)
    b = square(a)
    c = square_add(a, b)
    output.append(c)
print(output)

# parallel execution with delayed
output = []
for x in data:
    a = dask.delayed(add)(x)
    b = dask.delayed(square)(a)
    c = dask.delayed(square_add)(a, b)
    d = c.compute()
    output.append(d)
print(output)  

#  parallel execution with decorators
@dask.delayed
def add(x):
    return x + 1

@dask.delayed
def square(x):
    return x * x

@dask.delayed
def square_add(a, b):
    return a + b

output = []
for x in data:
    a = add(x)
    b = square(a)
    c = square_add(a, b)
    output.append(c)
print(dask.compute(*output))

[6, 12, 20, 30, 42, 56, 72, 90, 110, 132, 156, 182, 210, 240, 272, 306, 342, 380, 420, 462, 506, 552, 600, 650, 702, 756, 812, 870, 930, 992, 1056, 1122, 1190, 1260, 1332, 1406, 1482, 1560, 1640, 1722, 1806, 1892, 1980, 2070, 2162, 2256, 2352, 2450, 2550, 2652, 2756, 2862, 2970, 3080, 3192, 3306, 3422, 3540, 3660, 3782, 3906, 4032, 4160, 4290, 4422, 4556, 4692, 4830, 4970, 5112, 5256, 5402, 5550, 5700, 5852, 6006, 6162, 6320, 6480, 6642, 6806, 6972, 7140, 7310, 7482, 7656, 7832, 8010, 8190, 8372, 8556, 8742, 8930, 9120, 9312, 9506, 9702, 9900, 10100, 10302]
[6, 12, 20, 30, 42, 56, 72, 90, 110, 132, 156, 182, 210, 240, 272, 306, 342, 380, 420, 462, 506, 552, 600, 650, 702, 756, 812, 870, 930, 992, 1056, 1122, 1190, 1260, 1332, 1406, 1482, 1560, 1640, 1722, 1806, 1892, 1980, 2070, 2162, 2256, 2352, 2450, 2550, 2652, 2756, 2862, 2970, 3080, 3192, 3306, 3422, 3540, 3660, 3782, 3906, 4032, 4160, 4290, 4422, 4556, 4692, 4830, 4970, 5112, 5256, 5402, 5550, 5700, 5852, 6006, 6162, 6320, 6480, 

DASK Futures (placeholder for future executions)

In [59]:
# start a client
client = Client(processes=False)
print(client)

# submit tasks to DASK scheduler
def square(x):
    return x**2

a = client.submit(square, 10)
b = client.submit(square, 20)

print(a)
# ask for result
print(a.result())

# cancel a future
"""a.cancel()"""

# wait on a future
"""wait(a)"""

# fire and forget
fire_and_forget(a)

<Client: 'inproc://192.168.0.114/97082/82' processes=1 threads=8, memory=8.00 GiB>
<Future: pending, key: square-8bcc3301ef4c977960343a0eba929844>
100


Perhaps you already have a cluster running?
Hosting the HTTP server on port 55400 instead


Data Locality

In [61]:
client = Client(
    processes=False,
)

df = dd.read_csv(
    data_path,
)

result = df["drug_price"].mean()
final_result = result.compute()

print(final_result)

# specify which workers process which tasks
"""client.submit(result, *args, workers=IP/Hostname)"""


496.369503945885


Perhaps you already have a cluster running?
Hosting the HTTP server on port 57762 instead


Prioritizing Work

In [None]:
# higher priority
a = client.submit(square, 10,
                  priority=10)

# lower priority
b = client.submit(square, 20,
                  priority=1)