![](https://docs.dask.org/en/stable/_images/dask_horizontal.svg)

Dask is a flexible library for parallel computing in Python.
1. **Dynamic task scheduling** optimized for computation.
1. **“Big Data” collections** like dataframes that extend common interfaces like Pandas to larger-than-memory or distributed environments.

![](https://www.nvidia.com/content/dam/en-zz/Solutions/glossary/data-science/dask/dask-pic3.png)

In [1]:
import os
import numpy as np
import cupy as cp
import pandas as pd
import cudf
import dask
import dask.array as da
import dask.dataframe as dd
import dask_cudf
import time

from dask.diagnostics import ProgressBar

cp.random.seed(220919)

print(pd.__version__)
print(dask.__version__)
print(cudf.__version__)
print(dask_cudf.__version__)

1.4.2
2022.05.2
22.06.00
22.06.00


In [2]:
# Dask 시각화 예제
x = da.ones((10000, 1000, 1000))

In [3]:
x

Unnamed: 0,Array,Chunk
Bytes,74.51 GiB,119.21 MiB
Shape,"(10000, 1000, 1000)","(250, 250, 250)"
Count,640 Tasks,640 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 74.51 GiB 119.21 MiB Shape (10000, 1000, 1000) (250, 250, 250) Count 640 Tasks 640 Chunks Type float64 numpy.ndarray",1000  1000  10000,

Unnamed: 0,Array,Chunk
Bytes,74.51 GiB,119.21 MiB
Shape,"(10000, 1000, 1000)","(250, 250, 250)"
Count,640 Tasks,640 Chunks
Type,float64,numpy.ndarray


In [4]:
%%time
with ProgressBar():
    print(x.sum().compute())

[########################################] | 100% Completed |  0.3s
10000000000.0
CPU times: user 2.42 s, sys: 61.6 ms, total: 2.48 s
Wall time: 331 ms


NumPy와 비교. Dask와 달리 실제로 생성하기 때문에 훨씬 더 오랜 시간이 소요되며, 또한 메모리 75 GB를 바로 점유한다.

In [5]:
%%time
np_x = np.ones((10000, 1000, 1000))

CPU times: user 2.86 s, sys: 6.91 s, total: 9.77 s
Wall time: 9.76 s


`sum()` 연산 또한 훨씬 더 오랜 시간이 소요된다.

In [6]:
%%time 
print(np_x.sum())

10000000000.0
CPU times: user 4.39 s, sys: 0 ns, total: 4.39 s
Wall time: 4.39 s


여기서부터는 단순 행렬 대신 실제 데이터로 변경해 실험 진행

In [7]:
%%time
# 실제로 2.9 GB 파일을 메모리에 읽어들이기 때문에 오랜 시간 소요
pdf = pd.read_csv("../loan-default-data/MidSizedData.csv")

CPU times: user 26.3 s, sys: 2.78 s, total: 29.1 s
Wall time: 29 s


In [8]:
%%time
# 계산 그래프만 생성하는 Lazy Operation이기 때문에 바로 실행 완료
ddf = dask.dataframe.read_csv("../loan-default-data/MidSizedData.csv", blocksize=25e6)  # 25MB chunks

CPU times: user 10.1 ms, sys: 0 ns, total: 10.1 ms
Wall time: 9.36 ms


# Sorting Values

In [9]:
%time pdf.sort_values('cust_id').head()
%time ddf.sort_values('cust_id').head()

CPU times: user 3.76 s, sys: 320 ms, total: 4.08 s
Wall time: 4.08 s
CPU times: user 2min 1s, sys: 32.9 s, total: 2min 34s
Wall time: 1min 3s


Unnamed: 0,cust_id,year,state,date_issued,date_final,emp_duration,own_type,income_type,app_type,loan_purpose,...,annual_pay,loan_amount,interest_rate,loan_duration,dti,total_pymnt,total_rec_prncp,recoveries,installment,is_default
7854,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0
90284,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0
90283,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0
90283,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0
7853,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0


30개 파티션으로 나뉘어져 있지만 오히려 훨씬 더 늦다. 왜냐면 `compute()` 실행시 Disk에서 읽어들이는 작업이 포함되어 있기 때문이다. 계산보다 Disk I/O가 실행 속도의 대부분을 차지하는 문제가 있다.

따라서 다음과 같이 `persist()`를 이용해 DataFrame을 메모리에 고정시키는 방법이 있다. 전체 메모리가 데이터 보다 더 크다면 이처럼 DataFrame을 메모리에 고정해 속도를 향상시킬 수 있다.

In [10]:
%%time
# DataFrame 메모리 고정
persisted_ddf = ddf.persist()

CPU times: user 37.5 s, sys: 3.41 s, total: 40.9 s
Wall time: 14.8 s


In [11]:
%%time
persisted_ddf.sort_values('cust_id').head()

CPU times: user 27.8 s, sys: 10.4 s, total: 38.1 s
Wall time: 27.6 s


Unnamed: 0,cust_id,year,state,date_issued,date_final,emp_duration,own_type,income_type,app_type,loan_purpose,...,annual_pay,loan_amount,interest_rate,loan_duration,dti,total_pymnt,total_rec_prncp,recoveries,installment,is_default
90283,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0
90283,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0
7854,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0
90284,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0
7854,54734,2009,Haryana,01/08/2009,1102011,0.5,RENT,Low,INDIVIDUAL,debt_consolidation,...,85000,25000,11.89,36 months,19.48,29324.32,25000.0,0.0,829.1,0


# Histogramming

In [12]:
%time pdf.state.value_counts()
%time ddf.state.value_counts().compute()
%time persisted_ddf.state.value_counts().compute()

CPU times: user 945 ms, sys: 1.12 ms, total: 946 ms
Wall time: 937 ms
CPU times: user 18.5 s, sys: 1.69 s, total: 20.2 s
Wall time: 4.5 s
CPU times: user 1.06 s, sys: 10.8 ms, total: 1.07 s
Wall time: 1.04 s


Chhattisgarh         708642
Sikkim               706860
Haryana              703978
Punjab               700766
Assam                699424
Goa                  699292
Madhya Pradesh       698544
Uttar Pradesh        698500
Maharashtra          698104
Himachal Pradesh     698060
Arunachal Pradesh    697466
Nagaland             697422
Andhra Pradesh       697136
Kerala               697114
Tripura              696718
Rajasthan            696718
Karnataka            695618
Manipur              695266
West Bengal          695112
Odisha               694914
Telangana            694892
Bihar                694694
Jharkhand            694100
Mizoram              693792
Gujarat              693792
Uttarakhand          693792
Tamil Nadu           693374
Meghalaya            688248
Name: state, dtype: int64

# Unique Counting

In [13]:
%time pdf.state.nunique()
%time ddf.state.nunique().compute()
%time persisted_ddf.state.nunique().compute()

CPU times: user 994 ms, sys: 900 µs, total: 995 ms
Wall time: 989 ms
CPU times: user 18.2 s, sys: 1.35 s, total: 19.5 s
Wall time: 3.97 s
CPU times: user 638 ms, sys: 0 ns, total: 638 ms
Wall time: 615 ms


28

# Grouping

In [14]:
%time pdf.groupby('state').total_pymnt.mean()
%time ddf.groupby('state').total_pymnt.mean().compute()
%time persisted_ddf.groupby('state').total_pymnt.mean().compute()

CPU times: user 1.24 s, sys: 0 ns, total: 1.24 s
Wall time: 1.23 s
CPU times: user 40.7 s, sys: 3.98 s, total: 44.7 s
Wall time: 16.3 s
CPU times: user 2.61 s, sys: 62 ms, total: 2.67 s
Wall time: 1.21 s


state
Andhra Pradesh       7539.887278
Arunachal Pradesh    7571.114449
Assam                7483.791068
Bihar                7542.281996
Chhattisgarh         7555.550607
Goa                  7567.639128
Gujarat              7530.040670
Haryana              7508.253666
Himachal Pradesh     7532.032232
Jharkhand            7528.867289
Karnataka            7499.055526
Kerala               7605.894841
Madhya Pradesh       7542.526326
Maharashtra          7595.657335
Manipur              7528.928218
Meghalaya            7626.587164
Mizoram              7563.220400
Nagaland             7685.616803
Odisha               7617.385990
Punjab               7551.748058
Rajasthan            7541.678963
Sikkim               7587.741911
Tamil Nadu           7547.376405
Telangana            7539.608749
Tripura              7560.031309
Uttar Pradesh        7545.473138
Uttarakhand          7595.781252
West Bengal          7554.332179
Name: total_pymnt, dtype: float64

# Boolean Indexing

In [15]:
%time pdf[pdf.cust_id > 83000].head()
%time ddf[ddf.cust_id > 83000].head()
%time persisted_ddf[persisted_ddf.cust_id > 83000].head()

CPU times: user 1.46 s, sys: 329 ms, total: 1.79 s
Wall time: 1.79 s
CPU times: user 242 ms, sys: 19.4 ms, total: 261 ms
Wall time: 261 ms
CPU times: user 17.2 ms, sys: 0 ns, total: 17.2 ms
Wall time: 17.1 ms


Unnamed: 0,cust_id,year,state,date_issued,date_final,emp_duration,own_type,income_type,app_type,loan_purpose,...,annual_pay,loan_amount,interest_rate,loan_duration,dti,total_pymnt,total_rec_prncp,recoveries,installment,is_default
0,180675,2007,Andhra Pradesh,01/12/2007,1032009,10.0,MORTGAGE,Low,INDIVIDUAL,debt_consolidation,...,73000,25000,10.91,36 months,22.13,13650.38,8767.32,2207.65,817.41,1
1,85781,2007,Rajasthan,01/06/2007,1072010,0.5,RENT,Low,INDIVIDUAL,other,...,40000,1400,10.91,36 months,8.61,1663.04,1400.0,0.0,45.78,0
2,85675,2007,Manipur,01/06/2007,1062010,10.0,RENT,Low,INDIVIDUAL,other,...,25000,1000,14.07,36 months,16.27,1231.38,1000.0,0.0,34.21,0
3,84918,2007,Andhra Pradesh,01/09/2007,1042008,10.0,MORTGAGE,Low,INDIVIDUAL,other,...,65000,5000,7.43,36 months,0.28,5200.44,5000.0,0.0,155.38,0
4,84670,2007,Arunachal Pradesh,01/06/2007,1082009,10.0,MORTGAGE,High,INDIVIDUAL,other,...,300000,5000,7.75,36 months,5.38,5565.65,5000.0,0.0,156.11,0


# Query API

In [16]:
%time pdf.query('cust_id > 83000').head()
%time ddf.query('cust_id > 83000').head()
%time persisted_ddf.query('cust_id > 83000').head()

CPU times: user 1.51 s, sys: 303 ms, total: 1.82 s
Wall time: 1.81 s
CPU times: user 266 ms, sys: 15 ms, total: 281 ms
Wall time: 280 ms
CPU times: user 22.5 ms, sys: 452 µs, total: 23 ms
Wall time: 22.8 ms


Unnamed: 0,cust_id,year,state,date_issued,date_final,emp_duration,own_type,income_type,app_type,loan_purpose,...,annual_pay,loan_amount,interest_rate,loan_duration,dti,total_pymnt,total_rec_prncp,recoveries,installment,is_default
0,180675,2007,Andhra Pradesh,01/12/2007,1032009,10.0,MORTGAGE,Low,INDIVIDUAL,debt_consolidation,...,73000,25000,10.91,36 months,22.13,13650.38,8767.32,2207.65,817.41,1
1,85781,2007,Rajasthan,01/06/2007,1072010,0.5,RENT,Low,INDIVIDUAL,other,...,40000,1400,10.91,36 months,8.61,1663.04,1400.0,0.0,45.78,0
2,85675,2007,Manipur,01/06/2007,1062010,10.0,RENT,Low,INDIVIDUAL,other,...,25000,1000,14.07,36 months,16.27,1231.38,1000.0,0.0,34.21,0
3,84918,2007,Andhra Pradesh,01/09/2007,1042008,10.0,MORTGAGE,Low,INDIVIDUAL,other,...,65000,5000,7.43,36 months,0.28,5200.44,5000.0,0.0,155.38,0
4,84670,2007,Arunachal Pradesh,01/06/2007,1082009,10.0,MORTGAGE,High,INDIVIDUAL,other,...,300000,5000,7.75,36 months,5.38,5565.65,5000.0,0.0,156.11,0


# String Manipulation

In [17]:
%time pdf['state'].unique()[0].lower()
%time ddf['state'].unique().compute()[0].lower()
%time persisted_ddf['state'].unique().compute()[0].lower()

CPU times: user 980 ms, sys: 12.7 ms, total: 993 ms
Wall time: 986 ms
CPU times: user 17.8 s, sys: 1.3 s, total: 19.2 s
Wall time: 3.58 s
CPU times: user 856 ms, sys: 0 ns, total: 856 ms
Wall time: 414 ms


'andhra pradesh'

Dask는 메모리 크기를 초과하는 파일을 처리할 수 있지만 그만큼 `compute()`에서 계산하는데 시간이 소요된다. `persist()`를 이용해 미리 메모리에 읽어들이면 분산 처리의 잇점을 활용해 Pandas 대비 속도를 3 ~ 20배 가량 향상시킬 수 있다.

그러나 고작 이 정도로 Dask를 써야할까? 단순히 Pandas를 사용하는 것에 비해 많이 불편한데 거의 모든 Operation에서 월등히 뛰어난 성능을 보여야 사용할 수 있을 것 같다. 그렇다면 로컬에 분산 클러스터를 구성하고 이를 이용해보자. 실행만 하면 default로 로컬에 16개의 Workers가 구성되며, [대시보드](https://docs.dask.org/en/stable/dashboard.html)에서 바로 현황 조회가 가능하다.

In [18]:
from dask.distributed import Client
client = Client(host='0.0.0.0')

2022-10-28 12:08:52,549 - distributed.diskutils - INFO - Found stale lock file and directory '/skpark-workspace/rapids-docker/experiments/dask-worker-space/worker-_efgvy4x', purging
2022-10-28 12:08:52,550 - distributed.diskutils - INFO - Found stale lock file and directory '/skpark-workspace/rapids-docker/experiments/dask-worker-space/worker-p3w88ig_', purging
2022-10-28 12:08:52,550 - distributed.diskutils - INFO - Found stale lock file and directory '/skpark-workspace/rapids-docker/experiments/dask-worker-space/worker-84apqy9u', purging
2022-10-28 12:08:52,550 - distributed.diskutils - INFO - Found stale lock file and directory '/skpark-workspace/rapids-docker/experiments/dask-worker-space/worker-s5lrjhu8', purging
2022-10-28 12:08:52,550 - distributed.diskutils - INFO - Found stale lock file and directory '/skpark-workspace/rapids-docker/experiments/dask-worker-space/worker-1qac1kx2', purging
2022-10-28 12:08:52,550 - distributed.diskutils - INFO - Found stale lock file and directo

클러스터 구동 후에는 클러스터를 활용할 수 있도록 `persist()`를 다시 실행해준다.

In [23]:
%%time
persisted_ddf = ddf.persist()

CPU times: user 412 ms, sys: 44.5 ms, total: 457 ms
Wall time: 450 ms


이제 모든 Operation에서 월등히 빠른 속도를 보인다.

In [24]:
%time pdf.state.value_counts()
%time ddf.state.value_counts().compute()
%time persisted_ddf.state.value_counts().compute()

CPU times: user 961 ms, sys: 840 µs, total: 962 ms
Wall time: 948 ms
CPU times: user 287 ms, sys: 94.4 ms, total: 382 ms
Wall time: 761 ms
CPU times: user 95.6 ms, sys: 8.52 ms, total: 104 ms
Wall time: 165 ms


Chhattisgarh         708642
Sikkim               706860
Haryana              703978
Punjab               700766
Assam                699424
Goa                  699292
Madhya Pradesh       698544
Uttar Pradesh        698500
Maharashtra          698104
Himachal Pradesh     698060
Arunachal Pradesh    697466
Nagaland             697422
Andhra Pradesh       697136
Kerala               697114
Tripura              696718
Rajasthan            696718
Karnataka            695618
Manipur              695266
West Bengal          695112
Odisha               694914
Telangana            694892
Bihar                694694
Jharkhand            694100
Mizoram              693792
Gujarat              693792
Uttarakhand          693792
Tamil Nadu           693374
Meghalaya            688248
Name: state, dtype: int64

In [26]:
%time pdf.state.nunique()
%time ddf.state.nunique().compute()
%time persisted_ddf.state.nunique().compute()

CPU times: user 1.06 s, sys: 32.3 ms, total: 1.09 s
Wall time: 990 ms
CPU times: user 397 ms, sys: 89.8 ms, total: 487 ms
Wall time: 839 ms
CPU times: user 91.4 ms, sys: 5.96 ms, total: 97.4 ms
Wall time: 138 ms


28

In [27]:
%time pdf.query('cust_id > 83000').head()
%time ddf.query('cust_id > 83000').head()
%time persisted_ddf.query('cust_id > 83000').head()

CPU times: user 1.56 s, sys: 326 ms, total: 1.89 s
Wall time: 1.86 s
CPU times: user 69.9 ms, sys: 27.5 ms, total: 97.4 ms
Wall time: 338 ms
CPU times: user 26.5 ms, sys: 0 ns, total: 26.5 ms
Wall time: 40.4 ms


Unnamed: 0,cust_id,year,state,date_issued,date_final,emp_duration,own_type,income_type,app_type,loan_purpose,...,annual_pay,loan_amount,interest_rate,loan_duration,dti,total_pymnt,total_rec_prncp,recoveries,installment,is_default
0,180675,2007,Andhra Pradesh,01/12/2007,1032009,10.0,MORTGAGE,Low,INDIVIDUAL,debt_consolidation,...,73000,25000,10.91,36 months,22.13,13650.38,8767.32,2207.65,817.41,1
1,85781,2007,Rajasthan,01/06/2007,1072010,0.5,RENT,Low,INDIVIDUAL,other,...,40000,1400,10.91,36 months,8.61,1663.04,1400.0,0.0,45.78,0
2,85675,2007,Manipur,01/06/2007,1062010,10.0,RENT,Low,INDIVIDUAL,other,...,25000,1000,14.07,36 months,16.27,1231.38,1000.0,0.0,34.21,0
3,84918,2007,Andhra Pradesh,01/09/2007,1042008,10.0,MORTGAGE,Low,INDIVIDUAL,other,...,65000,5000,7.43,36 months,0.28,5200.44,5000.0,0.0,155.38,0
4,84670,2007,Arunachal Pradesh,01/06/2007,1082009,10.0,MORTGAGE,High,INDIVIDUAL,other,...,300000,5000,7.75,36 months,5.38,5565.65,5000.0,0.0,156.11,0


In [28]:
%time pdf['state'].unique()[0].lower()
%time ddf['state'].unique().compute()[0].lower()
%time persisted_ddf['state'].unique().compute()[0].lower()

CPU times: user 1.05 s, sys: 31.1 ms, total: 1.08 s
Wall time: 987 ms
CPU times: user 250 ms, sys: 60.6 ms, total: 311 ms
Wall time: 669 ms
CPU times: user 93.5 ms, sys: 2.05 ms, total: 95.6 ms
Wall time: 114 ms


'andhra pradesh'

# Conclusion

이제 Dask로 기존 Pandas 대비 월등히 빠른 속도도 데이터를 처리할 수 있다. 심지어 클러스터를 구동하면 메모리에 고정하지 않아도 기존 Pandas 대비 더 빠른 속도를 보이므로 아무리 큰 파일도 Pandas 대비 훨씬 더 빠르게 조작할 수 있다.

- Dask를 사용하면 메모리 크기를 초과하는 파일을 처리할 수 있다.
- 파일이 메모리 크기 이내인 경우 `persist()`로 미리 메모리에 읽어들이면 처리 속도를 향상시킬 수 있다.
- 로컬 클러스터를 구동하면 훨씬 더 빠른 속도로 처리할 수 있다.