# Xarray to Zarr on GCS with CamHD Data Example
Here we build an Xarray Dataset from Dask delayed functions, and save the images to a GCS zarr group using xarray's to_zarr() functionality. Processing takes place on the workers and memory pressure is very low because all the images do not need to be loaded at the same time.

#### Imports

In [None]:
%matplotlib inline
import pycamhd as camhd
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import xarray as xr

#### Create a list of frames to analyze using the dbcamhd.json database

In [None]:
dbcamhd = pd.read_json('dbcamhd.json', orient="records", lines=True)
dbcamhd.tail()

In [None]:
fileindex = 2064
filename = dbcamhd.filename[fileindex]
frame_count = dbcamhd.frame_count[fileindex]
n_images = 1000
frame_numbers = np.linspace(750,frame_count-6000, n_images, dtype=np.int64())
filename

In [None]:
frame_numbers[0:10]

#### Create timestamps for frames

In [None]:
from datetime import datetime

In [None]:
timestamps = []
for i in range(len(frame_numbers)):
    timestamps.append(datetime.fromtimestamp(dbcamhd.timestamp[fileindex] + frame_numbers[i]/29.95))

In [None]:
timestamps[0:5]

#### Create Xarray Dataset out of Dask delayed functions

In [None]:
from dask import delayed, compute
import dask.array as da

In [None]:
delayed_frames = []
moov_atom = camhd.get_moov_atom(filename)
for frame_number in frame_numbers:
    delayed_frames.append(da.from_delayed(
                            delayed(camhd.get_frame)(filename, frame_number, 'rgb24', moov_atom),
                            shape=(1080, 1920, 3), dtype=np.uint8)[None,:,:,:])

In [None]:
delayed_frames[0]

In [None]:
all_data = da.concatenate(delayed_frames, axis=0)
all_data

In [None]:
ds = xr.DataArray(all_data, dims=['time', 'y', 'x', 'channel'],
                  coords={'time': timestamps}
                 ).to_dataset(name='video')
ds

#### Start a KubeCluster

In [None]:
from dask_kubernetes import KubeCluster
cluster = KubeCluster(n_workers=32)
cluster

In [None]:
from dask import delayed, compute
from dask.distributed import Client
client = Client(cluster)
client

#### Use to_zarr() to save to a GCS bucket

In [None]:
import google.auth
token, project = google.auth.default()

In [None]:
import zarr

In [None]:
gcsstore = zarr.storage.GCSStore('rte-pangeo-data', 'test1.zarr', client_kwargs={'project': 'pangeo-198314', 'credentials': token})

In [None]:
%%time
ds.to_zarr(gcsstore)

#### Open and compute on data in GCS bucket

In [None]:
%%time
del ds
ds = xr.open_zarr(gcsstore)
ds

In [None]:
%%time
mean_image = ds.video.mean(dim='time').load()

In [None]:
mean_image.astype('i8').plot.imshow();

#### Use to_zarr() to save to Azure Blob Storage 

In [None]:
import zarr
import xarray as xr
from azure_credentials import account_name, account_key

In [None]:
absstore = zarr.storage.ABSStore('rte-pangeo-data', 'test1.zarr', account_name, account_key)

In [None]:
absstore.rmdir()

In [None]:
%%time
ds.to_zarr(absstore)

#### Open and compute on data Azure Blob container

In [None]:
%%time
del ds
ds = xr.open_zarr(absstore)
ds

In [None]:
%%time
mean_image = ds.video.mean(dim='time').load()

In [None]:
mean_image.astype('i8').plot.imshow();