# Streaming Zarr

This notebook demonstrates how to use Redis and Zarr to enable analysis of a stream of data coming from a simulation model. We use _Conway's Game of Life_ as our "simulation model".

In [None]:
# imports

from bokeh.io import show, output_notebook
from bokeh.application import Application
from bokeh.application.handlers.function import FunctionHandler
from bokeh.plotting import figure, ColumnDataSource

import dask.array
from dask.diagnostics import ProgressBar

import matplotlib.pyplot as plt
import xarray as xr
import zarr

In [None]:
# we'll output some results using bokeh, this next lines enables embedding bokeh apps in the notebook
output_notebook()

## Zarr's Redis Store

Zarr recently added a `RedisStore` to its growing list of MutableMapping interfaces. This store will allow us to read/write Zarr arrays to an active Redis server. If you are running this notebook as a Binder, we've already started a Redis server for you and have begun to stream data from our simulation model.

In [None]:
# connect to the redis server
store = zarr.RedisStore(port=7777)

## Basic Data Access

Our model is writing each timestep to a different key within the Redis database. These keys accumulate until they are evicted (usually because of memory pressure).

In the cell below, we use `store.keylist()` to list all the available keys and extract one array (timestep). We plot this array using matplotlib.

In [None]:
def get_all_keys(store):
    '''helper function to get all available keys in the RedisStore'''
    keys = list(set([s.split('/')[0] for s in store.keylist()]))
    keys.remove('.zgroup')
    keys.sort(key=int)
    return keys


def get_most_recent_key(store):
    '''helper function to get the most recent key from redis'''
    keys = get_all_keys(store)
    return keys[-1]


# access the most recent frame of data
key = get_most_recent_key(store)
grid = zarr.open_array(store=store, mode='r', path=key)

# make a quick matplotlib plot
plt.imshow(grid, cmap='Greys')
plt.title(key)

## Using Dask
We can repeat this proceedure, this time using Dask. Since our Zarr array's are chunked, Dask automatically returns a chunked array.

In [None]:
key = get_most_recent_key(store)
grid = zarr.open_array(store=store, mode='r', path=key)

# map the zarr array to a dask array
grid2 = dask.array.from_zarr(grid)
grid2

We can compute the values of this array using the `.compute()` method: 

In [None]:
# compute the array using dask
with ProgressBar():
    grid2.compute()
    
# another plot
plt.imshow(grid2, cmap='Greys')
plt.title(key)

## Visualizing the model in real time

Often times when running a simulation model, we have to wait until the simulation is complete before we can observe what it has done. Now that we are streaming data to our Redis server, we can visualize the model as it runs. In the next cell, we setup a fairly simple Bokeh application that will let us watch the Game of Life as it evolves in time.

In [None]:
def get(key='1'):
    data = grid = zarr.open_array(store=store, mode='r', path=key)
    return data[:]


def make_document(doc):

    key = get_most_recent_key(store)
    data = get(key)
    shape = data.shape

    img = data
    source = ColumnDataSource(data=dict(img=[img]))

    def update():
        s1, s2 = slice(None), slice(None)
        index = [0, s1, s2]
        key = get_most_recent_key(store)
        data = get(key)
        new_data = data.flatten()
        source.patch({'img' : [(index, new_data)]})
        p2d.title.text = f"Streaming Conway's Game of Life - Timestep: {key}"
        p2d.title.align = "left"

    doc.add_periodic_callback(update, 1)
    p2d = figure(plot_width=500, plot_height=500, x_range=(0,shape[0]), y_range=(0,shape[1]),
                 title=f"Streaming Conway's Game of Life - Timestep: {key}")
    p2d.image(image='img', x=0, y=0, dw=shape[0], dh=shape[1], source=source)
    doc.title = "Streaming Conway's Game of Life"

    doc.add_root(p2d)


app = Application(FunctionHandler(make_document))

show(app)

## Enabling Data Analysis

One of the most exciting things about streaming data through something like Redis is that it allows us to do analysis while the model is running. For models that are written in low level languages like C or Fortran, it is often much easier to write our analysis code in Python. We've traditionally had to do this after the model runs with the (sometimes limited) data written to disk. Streaming our analysis offers a different paradigm.

In the next section, we'll load a bunch of timesteps from the model (whatever is available at the moment) and we'll put them in a stacked dask array (wrapped in a `xarray.DataArray`).

In [None]:
def load_as_dask_array(keys):
    dask_arrays = []
    labels = []
    for key in keys:
        try:
            za = zarr.open_array(store=store, mode='r', path=key)
            dask_arrays.append(dask.array.from_zarr(za))
            labels.append(int(key))
        except ValueError:
            pass
    
    stacked = dask.array.stack(dask_arrays)
    return xr.DataArray(stacked, dims=['time', 'x', 'y'], coords={'time': labels}, name='gameoflife')


data = load_as_dask_array(get_all_keys(store))
with xr.set_options(display_style="html"):
    display(data.to_dataset())

From here our analysis proceeds just like it would if we were analyzing any other model output source. We can do data manipulation, visualization, and analysis using tools we're familiar with.

In [None]:
# replace 255's with 1s
with ProgressBar():
    da = xr.where(data == 255, 1, 0).persist()

How many times has each cell been populated?

In [None]:
with ProgressBar():
    da.sum('time').plot(robust=True)

How has the total population evolved in time?

In [None]:
with ProgressBar():
    da.sum(('x', 'y')).plot()
    
plt.ylabel('Total population')
plt.title("Conway's Game of Life")

How has the rate of population increase/decrease changed over time?

In [None]:
with ProgressBar():
    for window in [10, 25, 50, 100, 200]:
        temp = da.sum(('x', 'y')).compute()
        temp.rolling(time=window).mean().diff('time').plot(label=window)
plt.grid()
plt.legend()