In [1]:
import collections
import dask
import gcsfs
import h5py
import io
import numba
import numpy as np
import os
import os.path
import pickle
import requests

from dask.distributed import Client
from dask_kubernetes import KubeCluster
from numba.typed import Dict

In [2]:
cluster = KubeCluster.from_yaml('worker-spec.yml')
cluster

distributed.scheduler - INFO - Clear task state
Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.
distributed.scheduler - INFO -   Scheduler at:    tcp://10.36.0.28:33787
distributed.scheduler - INFO -   dashboard at:                    :41697


VBox(children=(HTML(value='<h2>KubeCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n    .…

In [7]:
client = Client(cluster)

distributed.scheduler - INFO - Receive client connection: Client-048eb164-3871-11ea-8035-d6ea29a0b863
distributed.core - INFO - Starting established connection


Check that the client is working well.

In [32]:
@dask.delayed
def the_sum(a, b):
    return a + b
the_sum(the_sum(1, 2), 3).compute()

6

In [None]:
if not os.path.exists('../../.gcs_tokens'):
    # Get a token
    gcsfs.GCSFileSystem(project='neuron-jungle', token='browser')

In [5]:
with open('../.gcs_tokens', 'rb') as f:
    credentials = pickle.load(f)
credentials = credentials[list(credentials.keys())[0]]
fs = gcsfs.GCSFileSystem(project='neuron-jungle', token=credentials)
fs.ls('l4dense/segmentation-volume')

218

# Create a map from segment id to neuron id

In [14]:
def locally_cache(filename, credentials):
    fs = gcsfs.GCSFileSystem(project='neuron-jungle', token=credentials)
    with fs.open(f'l4dense/{filename}', 'rb') as f:
        data = f.read()
    with open(f'../cache/{filename}', 'wb') as f:
        f.write(data)
    return len(data)

locally_cache('dendrites.hdf5', credentials)

263193540

In [16]:
dendrites['dendrites']['agglomerate']

<HDF5 group "/dendrites/agglomerate" (11400 members)>

In [19]:
dendrites = h5py.File('../cache/dendrites.hdf5', 'r')

d = collections.defaultdict(lambda: [])
neuronId = np.array(dendrites['dendrites']['neuronId'])
for i, id in enumerate(neuronId):
    if id > 0:
        # Append the dendrite ids to the right slot.
        d[id] += np.array(dendrites['dendrites']['agglomerate'][str(i + 1)]).tolist()
    
neuron_map = {}
for neuron_id, segment_ids in d.items():
    for segment_id in segment_ids:
        neuron_map[segment_id] = neuron_id

In [55]:
@numba.jit(nopython=True)
def remap(data, the_map):
    b = np.zeros_like(data)
    for i in range(len(data)):
        if data[i] in the_map:
            b[i] = the_map[data[i]]
    return b

In [68]:
# To repaint: map dendrite ids to neuron id (default to 0)
@dask.delayed
def repaint(filename, credentials):    
    # Create a typed map for segment_to_neuron
    fs = gcsfs.GCSFileSystem(project='neuron-jungle', token=credentials)
    with fs.open('l4dense/neuron-map.pkl', 'rb') as f:
        segment_to_neuron = pickle.loads(f.read())

    the_map_typed = Dict.empty(key_type=numba.int32, value_type=numba.uint8)
    for k, v in segment_to_neuron.items():
        the_map_typed[k] = v
    
    with fs.open(f'l4dense/segmentation-volume/{filename}', 'rb') as f:    
        cube = h5py.File(f, 'r')
        
        a = np.zeros((1024, 1024, 1024), dtype=np.uint8)
        
        slice_size = 32
        nslices = int(1024 / slice_size)
        
        for j in range(nslices):
            subd = np.array(cube['data'][(slice_size*j):(slice_size*(j+1)), :, :])
            for i in range(nslices):
                a[i + j*slice_size, :, :] = remap(subd[i, :, :].ravel(), 
                                   the_map_typed).astype(np.uint8).reshape((1024, 1024))
    
    bio = io.BytesIO()
    cube = h5py.File(bio, 'w')
    cube.create_dataset('data', a.shape, compression="gzip", data=a)
    cube.close()

    data = bio.getvalue()
    with fs.open(f'l4dense/neuron-volume/{filename}', 'wb') as f:
        f.write(data)
    return len(data)

In [54]:
fs = gcsfs.GCSFileSystem(project='neuron-jungle', token=credentials)
with fs.open('l4dense/neuron-map.pkl', 'wb') as f:
    f.write(pickle.dumps(neuron_map))

In [None]:
# x5y8z3 are the largest ids
bytes_total = 0
for i in range(6):
    for j in range(9):
        for k in range(4):
            print(i, j, k)
            bytes_total += repaint(f"x{i}y{j}z{k}.hdf5", credentials)
bytes_total.compute()

In [9]:
print(len(sorted(fs.glob('l4dense/segmentation-volume/*.hdf5'))))
print(len(sorted(fs.glob('l4dense/neuron-volume/*.hdf5'))))

216
216


distributed.scheduler - INFO - Register tcp://10.36.2.2:45201
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.36.2.2:45201
distributed.core - INFO - Starting established connection


We're done!