# Dask?

# Basic

- toy function을 가지고 시간 재보기

In [1]:
from time import sleep

def inc(x):
    sleep(1)
    return x + 1

def add (x,y):
    sleep(1)
    return x + y

In [2]:
%%time

x = inc(1)
y = inc(2)
z = add(x,y)

CPU times: user 1.29 ms, sys: 968 µs, total: 2.26 ms
Wall time: 3.01 s


각 함수가 실행 될 때 1초의 sleep 모드를 주었기 때문에 x,y,z까지 계산하는데 3초가 걸림

# Dask

- basic과 똑같이 delay를 줄 것임
- 그러나 decorate를 활용할 것임

In [3]:
from dask import delayed

In [4]:
@delayed
def inc(x):
    sleep(1)
    return x + 1

@delayed
def add (x,y):
    sleep(1)
    return x + y

In [5]:
%%time

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x,y)

CPU times: user 641 µs, sys: 314 µs, total: 955 µs
Wall time: 739 µs


In [6]:
z

Delayed('add-6a95af52-ad43-4484-9ad5-0900738aef4a')

In [7]:
# flow 직접 확인 가능

z.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

In [8]:
%time
z.compute()

CPU times: user 1 µs, sys: 1e+03 ns, total: 2 µs
Wall time: 4.29 µs


5

- inc(increment)함수 동시 계산 후 add함수 계산

# for 문 계산

In [9]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [10]:
%%time

results = []

for x in data:
    y = inc(x)
    results.append(y)
    
total = sum(results)

CPU times: user 2.09 ms, sys: 925 µs, total: 3.01 ms
Wall time: 2.04 ms


In [11]:
%%time

results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)
    
total = delayed(sum)(results)

total.compute()

CPU times: user 6.7 ms, sys: 3.74 ms, total: 10.4 ms
Wall time: 1.01 s


44

In [12]:
total.visualize()

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'BT', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…

# Dask DataFrame

In [13]:
# NYC flight data 불러오기

import urllib

print("- Downloading NYC Flights dataset... ", end='', flush=True)
url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"
filename, headers = urllib.request.urlretrieve(url, 'nycflights.tar.gz')
print("Done!", flush=True)

- Downloading NYC Flights dataset... Done!


In [14]:
import tarfile

# extract the .csv files from the tar file
with tarfile.open(filename, mode='r:gz') as flights:
            flights.extractall('data/')

In [15]:
import os
import dask.dataframe as dd

# das는 데이터 파일을 chunk로 가져오기 때문에 여러 cvs 파일을 가져와도 상관이 없음
df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]})

df

Unnamed: 0_level_0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
npartitions=10,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
,datetime64[ns],int64,float64,int64,float64,int64,object,int64,float64,float64,int64,float64,float64,float64,object,object,float64,float64,float64,int64,int64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


- 값을 다 가져왔기 때문에 value를 볼 수 없음 
- partition 10개
- data type도 한 눈에 확인 가능

In [16]:
df.head()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,0,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,0,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,0,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,0,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,0,0


In [17]:
df.tail()

ValueError: Mismatched dtypes found in `pd.read_csv`/`pd.read_table`.

+----------------+---------+----------+
| Column         | Found   | Expected |
+----------------+---------+----------+
| CRSElapsedTime | float64 | int64    |
| TailNum        | object  | float64  |
+----------------+---------+----------+

The following columns also raised exceptions on conversion:

- TailNum
  ValueError("could not convert string to float: 'N54711'")

Usually this is due to dask's dtype inference failing, and
*may* be fixed by specifying dtypes manually by adding:

dtype={'CRSElapsedTime': 'float64',
       'TailNum': 'object'}

to the call to `read_csv`/`read_table`.

- head에서 명시한 데이터 타입과 tail에 나오는 데이터 타입이 달라서 문제가 발생함

In [18]:
# 처음부터 데이터 타입 명시해주기

df = dd.read_csv(os.path.join('data', 'nycflights', '*.csv'),
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': str,
                        'CRSElapsedTime': float,
                        'Cancelled': bool})

In [19]:
df.tail()

Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
269176,1999-12-27,1,1645.0,1645,1830.0,1901,UA,1753,N516UA,225.0,...,205.0,-31.0,0.0,LGA,DEN,1619.0,7.0,13.0,False,0
269177,1999-12-28,2,1726.0,1645,1928.0,1901,UA,1753,N504UA,242.0,...,214.0,27.0,41.0,LGA,DEN,1619.0,5.0,23.0,False,0
269178,1999-12-29,3,1646.0,1645,1846.0,1901,UA,1753,N592UA,240.0,...,220.0,-15.0,1.0,LGA,DEN,1619.0,5.0,15.0,False,0
269179,1999-12-30,4,1651.0,1645,1908.0,1901,UA,1753,N575UA,257.0,...,233.0,7.0,6.0,LGA,DEN,1619.0,5.0,19.0,False,0
269180,1999-12-31,5,1642.0,1645,1851.0,1901,UA,1753,N539UA,249.0,...,232.0,-10.0,-3.0,LGA,DEN,1619.0,6.0,11.0,False,0


# dask.dataframe computation

- depdelay 활용하여 max값 구하기
- pandas 사용 시, 각 컬럼의 최대값을 구한 후, 그 값들을 비교 후 최종 최대값을 구해야 함.

In [20]:
# pandas 활용해서 구하기

%time
df.DepDelay.max().compute()

CPU times: user 4 µs, sys: 2 µs, total: 6 µs
Wall time: 13.1 µs


1435.0

In [21]:
df.DepDelay.max().visualize(rankdir="LR", size="12 12!")

CytoscapeWidget(cytoscape_layout={'name': 'dagre', 'rankDir': 'LR', 'nodeSep': 10, 'edgeSep': 10, 'spacingFact…