# define setup
- single field (!): geopotential
- all levels, i.e. 11 z-levels

In [1]:
import numpy as np
import torch
from src.pytorch.util import init_torch_device

device = init_torch_device()
datadir = '/gpfs/work/nonnenma/data/forecast_predictability/weatherbench/5_625deg/'
res_dir = '/gpfs/work/nonnenma/results/forecast_predictability/weatherbench/5_625deg/'

batch_size = 32
lead_time = 72
train_years = ('2015', '2015')

var_dict = {'geopotential': ('z', [   1,   10,  100,  200,  300,  400,  500,  600,  700,  850, 1000])}
target_var_dict = {'geopotential': 500} # atm this is only for the Dataset to initialize

past_times = [] # no extra time-shifted inputs for now
verbose = False


CUDA not available


In [2]:
import xarray as xr
import dask
from src.pytorch.Dataset import Dataset_dask, Dataset_xr

x = xr.merge(
[xr.open_mfdataset(f'{datadir}/{var}/*.nc', combine='by_coords')
 for var in var_dict.keys()],
fill_value=0  # For the 'tisr' NaNs
)
dg_train = Dataset_dask(x.sel(time=slice(train_years[0], train_years[1])), var_dict, lead_time, 
                   normalize=False, res_dir=res_dir, train_years=train_years,
                   target_var_dict=target_var_dict, past_times=past_times, verbose=verbose)

print('chunks', dg_train.data.chunks)

def collate_fn(batch):

    X_stack, Y_stack = dask.compute(dask.array.stack([X for X,_ in batch]), 
                                    dask.array.stack([y for _,y in batch]))
    X_stack = torch.as_tensor(X_stack, device='cpu')
    Y_stack = torch.as_tensor(Y_stack, device='cpu')

    """
    out = None
    if torch.utils.data.get_worker_info() is not None:
        # If we're in a background process, concatenate directly into a
        # shared memory tensor to avoid an extra copy
        storage = X_stack.storage()._new_shared(X_stack.numel())
        out = X_stack.new(storage)
        return torch.stack(batch, 0, out=out)
    """

    return (X_stack, Y_stack)

num_workers = int(train_years[1]) - int(train_years[0]) + 1

train_loader = torch.utils.data.DataLoader(
    dg_train,
    #pin_memory=True,
    batch_size=batch_size,
    num_workers=num_workers,
    collate_fn=collate_fn,
    drop_last=True)

chunks ((8760,), (11,), (32,), (64,))


In [3]:
max_steps = 100
print_every = 10

import time

def do_dummy_epoch(train_loader):
    # check I/O speed on single (empty) epoch
    num_steps = 1
    t = time.time()
    for batch in train_loader:
        if np.mod(num_steps, print_every) == 0 or num_steps == 1:
            print(f"- batch #{num_steps}, time: {'{0:.2f}'.format(time.time() - t)}")
        inputs, targets = batch[0].to(device), batch[1].to(device)
        out = (inputs.shape, targets.shape)
        #print(out)
        num_steps +=1
        if num_steps > max_steps:
            break

# xr-generated Dask arrays

In [4]:
do_dummy_epoch(train_loader)

- batch #1, time: 1.78
- batch #10, time: 10.47
- batch #20, time: 20.04
- batch #30, time: 29.62
- batch #40, time: 39.18
- batch #50, time: 48.61
- batch #60, time: 58.39
- batch #70, time: 67.94
- batch #80, time: 77.40
- batch #90, time: 86.97
- batch #100, time: 96.52


### create numpy array for direct comparison

In [5]:
var = list(var_dict.keys())[0]
for year in range(int(train_years[0]), int(train_years[1])+1):
    print('year', year)
    x = xr.open_mfdataset(f'{datadir}/{var}/*{year}*.nc', combine='by_coords')
    darray = x[var_dict[var][0]].values
    np.save(res_dir + var + f'/geopotential_{year}_5_625deg', darray) # all levels for single year

year 2015


# create dask array directly from numpy array 
- Remember: dask itself does lazy evaluation

### V1: load full numpy array into memory, create dask array from that

In [4]:
dg_train.data.data = dask.array.from_array(
    np.load(res_dir + 'geopotential/' + 'geopotential_2015_5_625deg.npy', allow_pickle=False),
    chunks=dg_train.data.chunks)

do_dummy_epoch(train_loader)

- batch #1, time: 0.54
- batch #10, time: 4.34
- batch #20, time: 8.49
- batch #30, time: 12.71
- batch #40, time: 16.87
- batch #50, time: 21.03
- batch #60, time: 25.21
- batch #70, time: 29.36
- batch #80, time: 33.56
- batch #90, time: 37.76
- batch #100, time: 42.01


### V2: store parts for dask array from disk, then create dask array only from pointer to disk (no pre-loading!)

In [None]:
darray = np.load(res_dir + 'geopotential/' + 'geopotential_2015_5_625deg.npy', allow_pickle=False)
darray = dask.array.from_array(darray, chunks=dg_train.data.chunks)
dask.array.to_npy_stack(res_dir + 'geopotential/', darray) # creates a new file '0.npy' along with an 'info'
del darray

In [4]:
dg_train.data.data = dask.array.from_npy_stack(res_dir + '/geopotential')

do_dummy_epoch(train_loader)

- batch #1, time: 2.77
- batch #10, time: 8.21
- batch #20, time: 14.02
- batch #30, time: 19.87
- batch #40, time: 25.65
- batch #50, time: 31.50
- batch #60, time: 37.42
- batch #70, time: 43.42
- batch #80, time: 49.27
- batch #90, time: 55.08
- batch #100, time: 60.99


# numpy approaches (ditching xarray, Dask)
### Remember: will mostly load all into memory

### directly read from numpy array (but dask.array in collate_fn..)

In [5]:
class Data(object): # dummy object to house fields expected by Dataset.__iter__()
    def __init__(self):
        self.data = None
        self.chunks = ((8760,), (11,), (32,), (64,))

In [6]:
dg_train.data = Data()
dg_train.data.data = np.load(res_dir + 'geopotential/' + 'geopotential_2015_5_625deg.npy', allow_pickle=False)

do_dummy_epoch(train_loader)

- batch #1, time: 0.76
- batch #10, time: 1.05
- batch #20, time: 1.45
- batch #30, time: 1.76
- batch #40, time: 2.07
- batch #50, time: 2.38
- batch #60, time: 2.76
- batch #70, time: 3.14
- batch #80, time: 3.54
- batch #90, time: 3.92
- batch #100, time: 4.30


### pure numpy (rewrite collate_fn & pass to train_loader). 

In [7]:
def collate_fn_np(batch):
    X_stack, Y_stack = (np.stack([X for X,_ in batch]), np.stack([y for _,y in batch]))
    X_stack = torch.as_tensor(X_stack, device='cpu')
    Y_stack = torch.as_tensor(Y_stack, device='cpu')
    return (X_stack, Y_stack)

train_loader_np = torch.utils.data.DataLoader(
    dg_train,
    batch_size=batch_size,
    num_workers=num_workers,
    collate_fn=collate_fn_np,
    drop_last=True)

In [8]:
dg_train.data = Data()
dg_train.data.data = np.load(res_dir + 'geopotential/' + 'geopotential_2015_5_625deg.npy', allow_pickle=False)

do_dummy_epoch(train_loader_np)

- batch #1, time: 0.68
- batch #10, time: 0.73
- batch #20, time: 0.78
- batch #30, time: 0.83
- batch #40, time: 0.88
- batch #50, time: 0.93
- batch #60, time: 0.98
- batch #70, time: 1.03
- batch #80, time: 1.08
- batch #90, time: 1.13
- batch #100, time: 1.18


# numpy memmap

In [9]:
dg_train.data = Data()

darray = np.memmap(res_dir + 'geopotential/' + 'geopotential_2015_5_625deg.npy', 
                   dtype=np.float32, mode='r', shape=tuple(np.concatenate(dg_train.data.chunks)))
dg_train.data.data = darray
print(darray.shape)

do_dummy_epoch(train_loader_np)

(8760, 11, 32, 64)
- batch #1, time: 2.51
- batch #10, time: 2.56
- batch #20, time: 2.61
- batch #30, time: 2.66
- batch #40, time: 2.71
- batch #50, time: 2.76
- batch #60, time: 2.81
- batch #70, time: 2.86
- batch #80, time: 2.91
- batch #90, time: 2.96
- batch #100, time: 3.01
