# working with big data

In [1]:
import numpy as np
import xarray as xr

import dask.array as da
from dask.distributed import Client, LocalCluster

from matplotlib import pyplot as plt
%matplotlib inline

import intake

## intro to dask

In [2]:
# a simple numpy array

shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np

array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

In [3]:
ones = da.ones(shape)
ones

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,30.52 MiB
Shape,"(1000, 4000)","(1000, 4000)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 30.52 MiB 30.52 MiB Shape (1000, 4000) (1000, 4000) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",4000  1000,

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,30.52 MiB
Shape,"(1000, 4000)","(1000, 4000)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [4]:
# dividing according to chunks on dask

chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,7.63 MiB
Shape,"(1000, 4000)","(1000, 1000)"
Count,4 Tasks,4 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 30.52 MiB 7.63 MiB Shape (1000, 4000) (1000, 1000) Count 4 Tasks 4 Chunks Type float64 numpy.ndarray",4000  1000,

Unnamed: 0,Array,Chunk
Bytes,30.52 MiB,7.63 MiB
Shape,"(1000, 4000)","(1000, 1000)"
Count,4 Tasks,4 Chunks
Type,float64,numpy.ndarray


In [5]:
# use compute to trigger a calculation

ones.compute()

array([[1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       ...,
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.],
       [1., 1., 1., ..., 1., 1., 1.]])

In [6]:
bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones

Unnamed: 0,Array,Chunk
Bytes,5.96 GiB,7.63 MiB
Shape,"(200000, 4000)","(1000, 1000)"
Count,800 Tasks,800 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 5.96 GiB 7.63 MiB Shape (200000, 4000) (1000, 1000) Count 800 Tasks 800 Chunks Type float64 numpy.ndarray",4000  200000,

Unnamed: 0,Array,Chunk
Bytes,5.96 GiB,7.63 MiB
Shape,"(200000, 4000)","(1000, 1000)"
Count,800 Tasks,800 Chunks
Type,float64,numpy.ndarray


## clustering with dask

In [7]:
# intiate a local cluster

cluster = LocalCluster()
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 8.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:60076,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 8.00 GiB

0,1
Comm: tcp://10.1.45.236:60088,Total threads: 1
Dashboard: http://10.1.45.236:60092/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:60082,
Local directory: /Users/nabig/repos/intro_climate_data/dask-worker-space/worker-0npwb602,Local directory: /Users/nabig/repos/intro_climate_data/dask-worker-space/worker-0npwb602

0,1
Comm: tcp://10.1.45.236:60090,Total threads: 1
Dashboard: http://10.1.45.236:60094/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:60079,
Local directory: /Users/nabig/repos/intro_climate_data/dask-worker-space/worker-a3d14nug,Local directory: /Users/nabig/repos/intro_climate_data/dask-worker-space/worker-a3d14nug

0,1
Comm: tcp://10.1.45.236:60089,Total threads: 1
Dashboard: http://10.1.45.236:60093/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:60081,
Local directory: /Users/nabig/repos/intro_climate_data/dask-worker-space/worker-jq__alj0,Local directory: /Users/nabig/repos/intro_climate_data/dask-worker-space/worker-jq__alj0

0,1
Comm: tcp://10.1.45.236:60091,Total threads: 1
Dashboard: http://10.1.45.236:60095/status,Memory: 2.00 GiB
Nanny: tcp://127.0.0.1:60080,
Local directory: /Users/nabig/repos/intro_climate_data/dask-worker-space/worker-2ew3kdz5,Local directory: /Users/nabig/repos/intro_climate_data/dask-worker-space/worker-2ew3kdz5


In [8]:
big_calc = (big_ones * big_ones[::-1, ::-1]).mean()
big_calc.compute()

1.0

## using dask with xarray

In [9]:
cat_url = "x"
cat = intake.open_catalog(cat_url)

In [None]:
ds = cat['sea_surface_height']
ds.to_dask()

In [None]:
sla_timeseries = ds.sla.mean(dim=('latitude', 'longitude')).load()

In [None]:
sla_timeseries.plot(label='full data')
sla_timeseries.rolling(time=365, center=True).mean().plot(label='rolling annual mean')
plt.ylabel('Sea Level Anomaly [m]')
plt.title('Global Mean Sea Level')
plt.legend()
plt.grid()