# Write ApRES xarrays to zarrs 
This notebook 

- loads the individual zarrs created from each .dat file (using to_individual_zarr.ipynb), 
- computes the stacked profiles and adds them to the dataset
- rechunks to a reasonable chunk size in the time dimension, and
- write the whole thing to a zarr store in the ldeo-glaciology bucket.

Import packages

In [1]:
import numpy as np
from dask.distributed import performance_report
import xarray as xr
import fsspec
import json

with open('../../../secrets/ldeo-glaciology-bc97b12df06b.json') as token_file:
    token = json.load(token_file)

In [2]:
from dask.distributed import Client

client = Client("tcp://127.0.0.1:32905")
client

0,1
Connection method: Direct,
Dashboard: /user/jkingslake/xapres/proxy/8787/status,

0,1
Comm: tcp://127.0.0.1:32905,Workers: 4
Dashboard: /user/jkingslake/xapres/proxy/8787/status,Total threads: 16
Started: Just now,Total memory: 58.87 GiB

0,1
Comm: tcp://127.0.0.1:35985,Total threads: 4
Dashboard: /user/jkingslake/xapres/proxy/42115/status,Memory: 14.72 GiB
Nanny: tcp://127.0.0.1:42645,
Local directory: /tmp/dask-worker-space/worker-n70n4grn,Local directory: /tmp/dask-worker-space/worker-n70n4grn
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 156.57 MiB,Spilled bytes: 0 B
Read bytes: 10.56 kiB,Write bytes: 8.24 kiB

0,1
Comm: tcp://127.0.0.1:46803,Total threads: 4
Dashboard: /user/jkingslake/xapres/proxy/45481/status,Memory: 14.72 GiB
Nanny: tcp://127.0.0.1:39741,
Local directory: /tmp/dask-worker-space/worker-4feu3bpk,Local directory: /tmp/dask-worker-space/worker-4feu3bpk
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 156.73 MiB,Spilled bytes: 0 B
Read bytes: 10.52 kiB,Write bytes: 8.21 kiB

0,1
Comm: tcp://127.0.0.1:36425,Total threads: 4
Dashboard: /user/jkingslake/xapres/proxy/40273/status,Memory: 14.72 GiB
Nanny: tcp://127.0.0.1:35991,
Local directory: /tmp/dask-worker-space/worker-s0f4cus3,Local directory: /tmp/dask-worker-space/worker-s0f4cus3
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 156.39 MiB,Spilled bytes: 0 B
Read bytes: 10.52 kiB,Write bytes: 8.21 kiB

0,1
Comm: tcp://127.0.0.1:41181,Total threads: 4
Dashboard: /user/jkingslake/xapres/proxy/46135/status,Memory: 14.72 GiB
Nanny: tcp://127.0.0.1:36205,
Local directory: /tmp/dask-worker-space/worker-y_mf46g2,Local directory: /tmp/dask-worker-space/worker-y_mf46g2
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 156.79 MiB,Spilled bytes: 0 B
Read bytes: 10.55 kiB,Write bytes: 8.24 kiB


In [6]:
def zarrs_to_onezarr(site):
    ds = xr.open_mfdataset(f'gs://ldeo-glaciology/apres/greenland/2022/{site}/individual_zarrs_prechunked_3/dat_*',
                               chunks = {}, 
                               engine = 'zarr', 
                               consolidated = False, 
                               parallel = True)
    ds['attenuator'] = ds.attenuator[500]
    ds['AFGain'] = ds.AFGain[500]


    
    for var in ds:
        del ds[var].encoding['chunks']

    profile_stacked = ds.profile.mean(dim='chirp_num')
    ds_stacked = ds.assign({'profile_stacked':profile_stacked})
    ds_stacked_rechunked = ds_stacked.chunk({'time':20})
    
    encoding = {i: {"dtype": "float64"} for i in ds_stacked_rechunked.data_vars}

    filename = f'gs://ldeo-glaciology/apres/greenland/2022/single_zarrs/{site}' 
    mapper = fsspec.get_mapper(filename, mode='w', token=token) 
    with performance_report(f'ds_stacked_rechunked_{site}.html'):
        ds_stacked_rechunked.to_zarr(mapper, consolidated=True, safe_chunks=False, encoding=encoding)

In [7]:
zarrs_to_onezarr("A101")

In [None]:
zarrs_to_onezarr("A103")

  return to_zarr(  # type: ignore


In [None]:
zarrs_to_onezarr("A104")

In [None]:
def reload(site):
    filename = f'gs://ldeo-glaciology/apres/greenland/2022/single_zarrs/{site}'
    ds = xr.open_dataset(filename,
        engine='zarr', 
        consolidated=True, 
        chunks={}) 
    return ds

In [None]:
A101 = reload("A101")
A102 = reload("A102")
A103 = reload("A103")

In [None]:
print('it all finished!')