# 12. 大型資料處理

一般認為所謂大型資料或「大數據 (Big Data)」通常應用在機器學習領域上，但在大氣科學上，我們所認定的大數據可能更接近以下維基百科的定義：

> Big data is data sets that are so voluminous and complex that traditional data processing application software are inadequate to deal with them.

歐洲氣象中心第五代再分析資料 ([ERA5](https://www.ecmwf.int/en/forecasts/datasets/reanalysis-datasets/era5))，擁有37層垂直層、0.25度水平解析度網格，在正常的狀況下容易超出電腦記憶體負荷。以讀取1979-2018共40年全部的U, V風場並計算風速為例：

In [1]:
import xarray as xr

uds = xr.open_mfdataset('/data4/USERS/waynetsai/data/ERA5/ERA5_U_daily_????.nc',
                               combine = "by_coords",               
                               parallel=True
                             )
vds = xr.open_mfdataset('/data4/USERS/waynetsai/data/ERA5/ERA5_V_daily_????.nc',
                               combine = "by_coords",               
                               parallel=True
                             )
u = uds.u
v = vds.v
ws = u**2 + v**2
ws.loc['2017-12-01':'2017-12-31',850,:,:].mean(axis=0).plot()

    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ...     array[indexer]

To avoid creating the large chunks, set the option
    >>> with dask.config.set(**{'array.slicing.split_large_chunks': True}):
    ...     array[indexer]
  return self.array[key]


MemoryError: Unable to allocate 52.2 GiB for an array with shape (365, 37, 721, 1440) and data type float32

會有這樣的情形是因為要分析的資料量已經超過電腦記憶體 (RAM) 的負荷，更別提氣象上的計算實際上複雜許多。那要如何避免這個情形發生呢？

## Dask 

Dask是一套Python的套件，可以用電腦多核心(core)來進行平行運算，因此可以提升效率。在計算時，程式不會完全讀入所有的資料，而是以符號的方式先進行運算，這個過程稱為 "lazy computation"，也因此運算的過程不會耗費大量的記憶體 RAM。

為了理解dask如何在xarray上運作，我們先以一組1000 × 4000大小的矩陣來示範。

**1. Numpy矩陣**

In [2]:
import numpy as np

shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

**2. Dask矩陣**

In [3]:
import dask.array as da

ones = da.ones(shape)
ones

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,30.52 MiB
Shape,"(1000, 4000)","(1000, 4000)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 30.52 MiB 30.52 MiB Shape (1000, 4000) (1000, 4000) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",4000  1000,

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,30.52 MiB
Shape,"(1000, 4000)","(1000, 4000)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


![](https://docs.dask.org/en/latest/_images/dask-array.svg)

Dask會把矩陣分成許多子矩陣，這些子矩陣稱為"chunk"。在dask中，我們可以指定chunks的大小。

In [4]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,7.63 MiB
Shape,"(1000, 4000)","(1000, 1000)"
Count,4 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 30.52 MiB 7.63 MiB Shape (1000, 4000) (1000, 1000) Count 4 Tasks 4 Chunks Type float64 numpy.ndarray",4000  1000,

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,7.63 MiB
Shape,"(1000, 4000)","(1000, 1000)"
Count,4 Tasks,4 Chunks
Type,float64,numpy.ndarray


如果我們做點計算，例如先進行相乘然後再平均

In [5]:
ones_mean = (ones * ones[::-1, ::-1]).mean()
ones_mean

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,19 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8.0 B Shape () () Count 19 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,19 Tasks,1 Chunks
Type,float64,numpy.ndarray


計算過程如下：

![](https://earth-env-data-science.github.io/_images/dask_arrays_16_0.png)

也就是chunk本身會在各自的核心中先進行計算，然後最後再合併一起成為最終的結果。

由以上的計算過程，我們可以大致理解dask的計算原理，更進階的用法可以參閱dask的官方網站說明。

從以上例子我們可以知道，dask矩陣囊括了我們熟知的numpy套件中的函數。其實dask也囊括了xarray的函數，這對於我們要使用dask輔助處理大氣科學中大型資料是非常有利的。那麼dask怎麼幫助我們加速資料處理呢？

## 大型氣象資料處理

在第二單元中，我們已經介紹在開啟多個檔案`xarray.open_mfdataset`時，可以加上`parallel=True`來加快計算速度，這就是把xarray以dask矩陣的方式讀取，因此電腦多核心會同時讀取檔案，以增加速度。以下我們開啟40年的ERA5水平風場的檔案，指定chunks的大小`chunks={'time':183, 'level': 1, 'longitude':93*2,'latitude':91*2}`，並且計算氣候場、距平值，然後畫出結果。

In [6]:
import dask

with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    uds = xr.open_mfdataset('/data4/USERS/waynetsai/data/ERA5/ERA5_U_daily_????.nc',
                               combine = "by_coords",               
                               parallel=True,
                               chunks={'time':183,  'longitude':93*2,'latitude':91*2}
                             )
    vds = xr.open_mfdataset('/data4/USERS/waynetsai/data/ERA5/ERA5_V_daily_????.nc',
                               combine = "by_coords",               
                               parallel=True,
                               chunks={'time':183,  'longitude':93*2,'latitude':91*2}
                             )
%time

CPU times: user 4 µs, sys: 3 µs, total: 7 µs
Wall time: 14.1 µs


In [7]:
u = uds.sel(level=850).u
v = vds.sel(level=850).v
%time
u

CPU times: user 0 ns, sys: 10 µs, total: 10 µs
Wall time: 19.8 µs


Unnamed: 0,Array,Chunk
Bytes,56.51 GiB,23.63 MiB
Shape,"(14610, 721, 1440)","(183, 182, 186)"
Count,7720 Tasks,2560 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 56.51 GiB 23.63 MiB Shape (14610, 721, 1440) (183, 182, 186) Count 7720 Tasks 2560 Chunks Type float32 numpy.ndarray",1440  721  14610,

Unnamed: 0,Array,Chunk
Bytes,56.51 GiB,23.63 MiB
Shape,"(14610, 721, 1440)","(183, 182, 186)"
Count,7720 Tasks,2560 Chunks
Type,float32,numpy.ndarray


很快就讀好資料了！接下來計算氣候平均：

In [8]:
uDayClim = u.groupby('time.dayofyear').mean('time')
vDayClim = v.groupby('time.dayofyear').mean('time')

uDayClim

Unnamed: 0,Array,Chunk
Bytes,1.42 GiB,132.23 kiB
Shape,"(366, 721, 1440)","(1, 182, 186)"
Count,1118120 Tasks,11712 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 1.42 GiB 132.23 kiB Shape (366, 721, 1440) (1, 182, 186) Count 1118120 Tasks 11712 Chunks Type float32 numpy.ndarray",1440  721  366,

Unnamed: 0,Array,Chunk
Bytes,1.42 GiB,132.23 kiB
Shape,"(366, 721, 1440)","(1, 182, 186)"
Count,1118120 Tasks,11712 Chunks
Type,float32,numpy.ndarray


In [10]:
ua = (u.sel(time=slice('2017-12-01','2017-12-31')).groupby('time.dayofyear')) - uDayClim
va = (v.sel(time=slice('2017-12-01','2017-12-31')).groupby('time.dayofyear')) - vDayClim
ws = np.sqrt(ua**2 + va**2).mean(axis=0)
ws

Unnamed: 0,Array,Chunk
Bytes,3.96 MiB,132.23 kiB
Shape,"(721, 1440)","(182, 186)"
Count,2279930 Tasks,32 Chunks
Type,float32,numpy.ndarray
"Array Chunk Bytes 3.96 MiB 132.23 kiB Shape (721, 1440) (182, 186) Count 2279930 Tasks 32 Chunks Type float32 numpy.ndarray",1440  721,

Unnamed: 0,Array,Chunk
Bytes,3.96 MiB,132.23 kiB
Shape,"(721, 1440)","(182, 186)"
Count,2279930 Tasks,32 Chunks
Type,float32,numpy.ndarray


### Dask Array輸出

在前面我們提過，利用dask運算時程式會以 "lazy computation" 的方式計算，也就是在算的過程中程式還沒有真正將資料的數值給代入。因此如果在上面這個階段就執行 `ws.to_netcdf()` ，會發現檔案裡面的內容全部是空白的。因此計算結果到最後的時候，我們必須加上`load()`函數，把最終的計算結果讀入記憶體。

> `xarray.Dataset.load`: Manually trigger loading and/or computation of this dataset’s data from disk or a remote source into memory and return this dataset. 

### 使用dask的一些好習慣

1. 目前dask在`resample()` or `groupby()`兩個函數並沒有做很好的效率最佳化，因此建議在這之前就先進行`load()`的動作，以避免非常大量的計算。從上面的範例就可以看到從頭計算到ws這個動作，就要花費2279930 tasks，必然要花費很多時間！
2. 把一些初步的結果先儲存成netCDF檔案，然後重新讀進來，會比較節省時間。
3. 空間上切越小的chunks越好 (e.g., chunks={'latitude': 10, 'longitude': 10})。
4. xarray官方網站建議開啟多個檔案時，設定`engine='h5netcdf'`，會比 `engine='netcdf4'`快。
