In [1]:
import dask
import dask.array
import distributed
# numpy and plotting
import numpy as np

from bokeh.io import output_notebook, show, push_notebook
from bokeh.plotting import figure
output_notebook()

from datashader.bokeh_ext import InteractiveImage
from datashader.transfer_functions import Image
import datashader as ds

You can access NaTType as type(pandas.NaT)
  @convert.register((pd.Timestamp, pd.Timedelta), (pd.tslib.NaTType, type(None)))


In [2]:
    # start local client
    client = distributed.Client()

In [11]:
    def q_r(numerator, divisor):
        """quotient and remainder"""
        return numerator // divisor, numerator % divisor

    def bin_ndarray(ndarray, new_shape, operation='sum'):
        """
        Bins an ndarray in all axes based on the target shape, by summing or
            averaging.

        Number of output dimensions must match number of input dimensions and
            new axes must divide old ones.
        """
        old_dtype = ndarray.dtype
        operation = operation.lower()
        if operation not in {'sum', 'mean'}:
            raise ValueError("Operation not supported.")
        if ndarray.ndim != len(new_shape):
            raise ValueError("Shape mismatch: {} -> {}".format(ndarray.shape,
                                                               new_shape))
        compression_pairs = [(d, c // d) for d, c in zip(new_shape,
                                                         ndarray.shape)]
        flattened = [l for p in compression_pairs for l in p]
        ndarray = ndarray.reshape(flattened)
        for i in range(len(new_shape)):
            op = getattr(ndarray, operation)
            ndarray = op(-1 * (i + 1))
        return ndarray.astype(old_dtype)

    def extent(xrange):
        return max(xrange) - min(xrange)


    def fix_range(oldrange, extra):
        """Make sure that range is a multiple of compression factor"""
        oldmin, oldmax = oldrange
        left = extra // 2
        right = extra - left
        return oldmin + left, oldmax - right


    def check_range(oldrange, tot):
        """Make sure the range doesn't exceed data bounds"""
        oldmin, oldmax = oldrange
        return (max(oldmin, 0), min(oldmax, tot))


    def calc_extra(ext, x):
        compress, extra = q_r(ext, x)
        if compress:
            return compress, extra
        else:
            return 1, 0

    def normalize_crop(img, y_range, x_range, h, w):
        """Normalize crop box so that integer units of ycompress or xcompress fit"""    
        toty, totx = img.shape

        x_range = check_range(x_range, totx)
        y_range = check_range(y_range, toty)

        xext = extent(x_range)
        yext = extent(y_range)

        xcompress, x_extra = calc_extra(xext, w)
        ycompress, y_extra = calc_extra(yext, h)

        new_x_range = fix_range(x_range, x_extra)
        new_y_range = fix_range(y_range, y_extra)

        yy, xx = np.arange(*new_y_range, ycompress), np.arange(*new_x_range, xcompress)
        return yy, xx, (slice(*new_y_range), slice(*new_x_range))

    def crop_downsample_img(img, x_range, y_range, w, h):
        """downsample an image so that only the required pixels are returned"""
        # w, h are the width and height of the bokeh image in pixels
        # x_range and y_range are the beginning and ending points of the image

        # need to find a way to crop properly
        yy, xx, crop_box = normalize_crop(img, y_range, x_range, h, w)
        crop = img[crop_box]
        if not all(crop.shape):
            yy = xx = np.arange(0)
            downsampled_crop = dask.array.from_array(yy, chunks=tuple())
        else:
            hw = min(crop.shape[0], h), min(crop.shape[1], w)
            downsampled_crop = bin_ndarray(crop, hw, "mean")

        # need to pass new coordinates of data and new dims
        return dict(new=downsampled_crop.compute(), coords={"y_axis": yy, "x_axis": xx}, dims=("y_axis", "x_axis"))

In [12]:
class to_plot:
    """fake global"""
    pass

to_plot.data = dask.array.from_array(np.random.randint(256, size=(1024, 1024), dtype=np.uint8), chunks=(64,64))

In [13]:
class message:
    """another global to keep track of computations and things
    
    For debugging purposes."""
    def __init__(self):
        self._msg = "compute number {}"
        self.i = 0
    
    def update(self):
        self.i += 1
    
    def __repr__(self):
        return self._msg.format(self.i)

plot_width  = 450
plot_height = 250
x_range=[0, 900]
y_range=[0, 500]

def base_plot(tools='pan,wheel_zoom,reset',plot_width=plot_width, plot_height=plot_height, **plot_args):
    p = figure(tools=tools, plot_width=plot_width, plot_height=plot_height,
        x_range=x_range, y_range=y_range, outline_line_color=None,
        min_border=0, min_border_left=0, min_border_right=0,
        min_border_top=0, min_border_bottom=0, **plot_args)
    
    p.axis.visible = True
    p.xgrid.grid_line_color = None
    p.ygrid.grid_line_color = None
    return p

msg = message()
msg2 = message()

def create_image(x_range, y_range, w=450, h=250):
    msg.update()
    d = crop_downsample_img(to_plot.data, x_range, y_range, w, h)
    msg.d = d
    msg2.update()
    raw = d.pop("new")
    new = np.dstack([raw, raw, raw, np.ones_like(raw) * 255]).view(np.uint32).reshape(raw.shape)
    msg2.d = Image(new, **d)
    return Image(new, **d)

p = base_plot(background_fill_color="white")
InteractiveImage(p, create_image, timeout=1)