# Dask
Dask is a parallel computing library in Python that is designed for scaling computations across multiple cores, processors, or even distributed systems (such as clusters of machines). It is especially useful when working with large datasets that cannot fit into memory all at once or when computational tasks are too large for a single machine.

**Key Features of Dask:**
1. Parallel Computing: Dask allows you to parallelize tasks on a single machine or across a distributed cluster. It breaks larger tasks into smaller chunks and schedules them for execution in parallel, optimizing resource usage.

2. Big Data Handling: It can work with datasets larger than memory by performing computations in a memory-efficient manner. This is especially useful for working with data that exceeds the RAM available on your machine.

3. Familiar API: Dask integrates with libraries like Pandas, NumPy, and Scikit-learn. Its APIs are designed to mimic those of these libraries, meaning you can apply many of the same operations (e.g., on DataFrames) that you would use in Pandas or NumPy, but with Dask, the computations are distributed.

4. Lazy Execution: Similar to Spark, Dask uses lazy execution. This means that operations on Dask objects (like dask.dataframe, dask.array, etc.) are not immediately computed. Instead, they build a task graph, and computations are only executed when explicitly triggered (e.g., by calling .compute()).

5. Task Scheduling: Dask has a flexible task scheduler that coordinates the execution of tasks across multiple workers. It offers:
    - Single Machine Scheduler: For parallel execution on your local machine.
    - Distributed Scheduler: For distributed execution across clusters of machines.

6. Scalability: Dask scales from small problems on your local machine to large problems on distributed clusters. It works well with cloud computing platforms (e.g., AWS, Google Cloud) and cluster managers (e.g., Kubernetes, YARN, SLURM).

In [1]:
pip install "dask[complete]"

Collecting pyarrow>=14.0.1 (from dask[complete])
  Downloading pyarrow-17.0.0-cp311-cp311-win_amd64.whl.metadata (3.4 kB)
Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.3.3-cp311-cp311-win_amd64.whl.metadata (3.8 kB)
Collecting dask-expr<1.2,>=1.1 (from dask[complete])
  Downloading dask_expr-1.1.15-py3-none-any.whl.metadata (2.5 kB)
Collecting distributed==2024.9.1 (from dask[complete])
  Downloading distributed-2024.9.1-py3-none-any.whl.metadata (3.3 kB)
Collecting msgpack>=1.0.2 (from distributed==2024.9.1->dask[complete])
  Downloading msgpack-1.1.0-cp311-cp311-win_amd64.whl.metadata (8.6 kB)
Collecting sortedcontainers>=2.0.5 (from distributed==2024.9.1->dask[complete])
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting tblib>=1.6.0 (from distributed==2024.9.1->dask[complete])
  Downloading tblib-3.0.0-py3-none-any.whl.metadata (25 kB)
Collecting zict>=3.0.0 (from distributed==2024.9.1->dask[complete])
  Downloading zict-3.0.0-py

In [1]:
import pandas as pd
import dask.dataframe as dd
import numpy as np

# Creating a Dask Object

In [2]:
index = pd.date_range("2021-09-01", periods=2400, freq="1h")
df = pd.DataFrame({"a":np.arange(2400), "b": list("abcaddbe" * 300)}, index = index)
# creating a dask dataframe from pandas data frame
ddf = dd.from_pandas(df, npartitions= 10) #The DataFrame is divided into 10 partitions each will handle part of the data.
ddf

Unnamed: 0_level_0,a,b
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-09-01 00:00:00,int64,string
2021-09-11 00:00:00,...,...
...,...,...
2021-11-30 00:00:00,...,...
2021-12-09 23:00:00,...,...


In [3]:
ddf.compute()

Unnamed: 0,a,b
2021-09-01 00:00:00,0,a
2021-09-01 01:00:00,1,b
2021-09-01 02:00:00,2,c
2021-09-01 03:00:00,3,a
2021-09-01 04:00:00,4,d
...,...,...
2021-12-09 19:00:00,2395,a
2021-12-09 20:00:00,2396,d
2021-12-09 21:00:00,2397,d
2021-12-09 22:00:00,2398,b


In [4]:
# To check the starting index for each partition
ddf.divisions

(Timestamp('2021-09-01 00:00:00'),
 Timestamp('2021-09-11 00:00:00'),
 Timestamp('2021-09-21 00:00:00'),
 Timestamp('2021-10-01 00:00:00'),
 Timestamp('2021-10-11 00:00:00'),
 Timestamp('2021-10-21 00:00:00'),
 Timestamp('2021-10-31 00:00:00'),
 Timestamp('2021-11-10 00:00:00'),
 Timestamp('2021-11-20 00:00:00'),
 Timestamp('2021-11-30 00:00:00'),
 Timestamp('2021-12-09 23:00:00'))

In [5]:
# access a particular partition
ddf.partitions[-1]

Unnamed: 0_level_0,a,b
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-11-30 00:00:00,int64,string
2021-12-09 23:00:00,...,...


# Indexing

In [6]:
ddf.a

Dask Series Structure:
npartitions=10
2021-09-01 00:00:00    int64
2021-09-11 00:00:00      ...
                       ...  
2021-11-30 00:00:00      ...
2021-12-09 23:00:00      ...
Dask Name: getitem, 2 expressions
Expr=df['a']

In [7]:
ddf["2021-09-01 00:00:00":"2021-11-01 00:03:00"]

Unnamed: 0_level_0,a,b
npartitions=7,Unnamed: 1_level_1,Unnamed: 2_level_1
2021-09-01 00:00:00.000000000,int64,string
2021-09-11 00:00:00.000000000,...,...
...,...,...
2021-10-31 00:00:00.000000000,...,...
2021-11-01 00:03:00.999999999,...,...


In [8]:
ddf.visualize()

ExecutableNotFound: failed to execute WindowsPath('dot'), make sure the Graphviz executables are on your systems' PATH

In [32]:
ddf["2021-09-01 00:00:00":"2021-11-01 00:03:00"].compute()

Unnamed: 0,a,b
2021-09-01 00:00:00,0,a
2021-09-01 01:00:00,1,b
2021-09-01 02:00:00,2,c
2021-09-01 03:00:00,3,a
2021-09-01 04:00:00,4,d
...,...,...
2021-10-31 20:00:00,1460,d
2021-10-31 21:00:00,1461,d
2021-10-31 22:00:00,1462,b
2021-10-31 23:00:00,1463,e


In [41]:
pip install graphviz


Note: you may need to restart the kernel to use updated packages.
