# Streaming data from micro-manager to napari: PSF Viewer

This notebook shows how to acquire data using `micromanager`, then use `pycro-manager` to stream it to `napari`.  
Buttons to start and stop data acquisition are added to the `napari` window using the `magic-gui` package. In this example, the data displayed in `napari` is resliced to get a live PSF viewer. However, reslicing is only a small example for the data analysis possible using `napari`.

In [1]:
# only execute first time to install all required packages
#!pip install pycromanager queue napari pyqt5 magicgui yappi

In [2]:
import time
import numpy as np
import queue
#import yappi # needed for benchmarking multithreaded code

# tested with napari 0.3.8
import napari
from napari.qt import thread_worker

# tested with magicgui 0.1.6, will break with 0.2.0
from magicgui import magicgui

# tested with pycromanager 0.7.6
from pycromanager import Acquisition, multi_d_acquisition_events

# open napari in an extra window
%gui qt

## define constants
some constants for microscope parameters  
and display options  
global variables for multithreading

In [24]:
# data acquired on microscope or simulated?
simulate = False
# z-stage controlled through micromanager, or externally?
z_stack_external = False
# clip image to central part. Speeds up display as data size is reduced
# is used as size for simulating data
clip =[500, 500]
# um / px, for correct scaling in napari
size_um = [0.16, 0.16]
# start in um, end in um, number of slices, active slice
z_range = [0, 50, 200, 0]
# sleep time to keep software responsive
sleep_time = 0.05
# contrast limits for display
clim = [100, 300]
# color map for display
cmap = 'plasma'

# initialize global variables
# flag to break while loops
acq_running = False
# empty queue for image data and z positions
img_queue = queue.Queue()
# xyz data stack
data = np.random.rand(z_range[2], clip[0], clip[1]) * clim[1]

# if z-stage is controlled through micromanager:
# need bridge to move stage at beginning of stack
# USE WITH CAUTION: only tested with micromanager demo config
if not(simulate) and not(z_stack_external):
    from pycromanager import Bridge
    bridge = Bridge()
    #get object representing micro-manager core
    core = bridge.get_core()
    print(core)
    core.set_position(z_range[0])

<pycromanager.core.mmcorej_CMMCore object at 0x7f9cb50eec70>


## create dummy image and and put into stack
adds dummy image of constant brightness  
keeps track of z position  
adds image and z position to queue  
use for testing purposes without microscope  
stack of increasing brightness helps to identify glitches

In [4]:
def simulate_image(b, size = [128,128]):
    """ fnc to simulate an image of constant brightness.
        Keeps track of z-position in stacks, concatenates
        image data and z-position and adds to queue.
        Inputs: int b: brightness
                np.array size: # of px in image in xy.
        Global variables: img_queue to write image and z position to
                z_range to keep track of z position
        """
    global img_queue
    global z_range
    image = np.ones(size) * b
    img_queue.put([z_range[3], np.ravel(image)])
    z_range[3]= (z_range[3]+1) % z_range[2]
    #print("simulating ", z_range[3])
    
    
def simulate_data(ii, z_range):
    """ fnc to create images with constant, but increasing brightness.
        Inputs: int ii: counter to increase brightness
                int z_range: number of slices in stack"""
    for zz in range(z_range[2]):
        brightness = (ii+1) * (zz+1) / ((z_range[2]+1)) * clim[1]
        simulate_image(brightness, clip)
        time.sleep(sleep_time)
        # need sleep time especially when simulated datasize is small or this will kill CPU

## image process function and pycromanager acquisition
adds acquired image and z position to queue,  
keeps track of z position  
built pycromanager acquisition events  
acquire data and send to image_process_fn  

In [5]:
def grab_image(image, metadata):
    """ image_process_fnc to grab image from uManager.
        Keeps track of z-position in stacks, concatenates
        image data and z-position and adds to queue.
        Inputs: array image: image from micromanager
                metadata from micromanager
        Global variables: img_queue to write image and z position to
                z_range to keep track of z position
        """
    global img_queue
    global z_range
    
    size = np.shape(image)
    image_clipped = image[(size[0]-clip[0])//2:(size[0]+clip[0])//2,
                      (size[1]-clip[1])//2:(size[1]+clip[1])//2]
    img_queue.put([z_range[3], np.ravel(image_clipped)])
    z_range[3]= (z_range[3]+1) % z_range[2]
   
    return image, metadata


def acquire_data(z_range):
    """ micro-manager data acquisition. Creates acquisition events for z-stack.
        This example: use custom events, not multi_d_acquisition because the 
        z-stage is not run from micro-manager but controlled via external DAQ."""
    with Acquisition(directory=None, name=None, 
                     show_display=True, 
                     image_process_fn = grab_image) as acq:
        events = []
        for index, z_um in enumerate(np.linspace(z_range[0], z_range[1], z_range[2])):
            evt = {"axes": {"z_ext": index}, "z_ext": z_um}
            events.append(evt)
        acq.acquire(events)
        
        
def acquire_multid(z_range):
    """ micro-manager data acquisition. Creates acquisition events for z-stack.
        This example: use multi_d_acquisition because the z-stage is run 
        from micro-manager.
        Unless hardware triggering is set up in micro-manager, this will be fairly slow:
        micro-manager does not sweep the z-stage, but acquires plane by plane. """
    with Acquisition(directory=None, name=None, 
                     show_display=False, 
                     image_process_fn = grab_image) as acq:
        events = multi_d_acquisition_events(z_start=z_range[0], z_end=z_range[1], 
                                            z_step=(z_range[1]-z_range[0])/(z_range[2]-1))
        acq.acquire(events)

## napari update display
is called whenever the thread worker checking the queue yields an image  
adds images into xyz stack and updates data

In [6]:
def display_napari(pos_img):
    """ Unpacks z position and reshapes image from pos_img. Writes image into correct 
        slice of data, and updates napari display.
        Called by worker thread yielding elements from queue.
        Needs to be in code before worker thread connecting to it.
        Inputs: array pos_img: queue element containing z position and raveled image data.
        Global variables: np.array data: contains image stack
            img_queue: needed only to send task_done() signal.
    """
    global data
    global img_queue
    if pos_img is None:
        return
    # read image and z position
    image = np.reshape(pos_img[1:],(clip[0], clip[1]))
    z_pos = pos_img[0]

    # write image into correct slice of data and update display
    data[z_pos] = np.squeeze(image)
    layer = viewer.layers[0]
    layer.data = data
    #print("updating ", z_pos)

    img_queue.task_done()

## worker threads appending data to queue and reading from queue

In [7]:
@thread_worker
def append_img(img_queue):
    """ Worker thread that adds images to a list.
        Calls either micro-manager data acquisition or functions for simulating data.
        Inputs: img_queue """
    # start microscope data acquisition
    if not simulate:
        if z_stack_external:
            while acq_running:
                acquire_data(z_range)
                time.sleep(sleep_time)
        else:
            while acq_running:
                acquire_multid(z_range)
                time.sleep(sleep_time)

    # run with simulated data
    else:
        ii = 0
        while acq_running:
            simulate_data(ii, z_range)
            ii = ii + 1
            #print("appending to queue", ii)
            time.sleep(sleep_time)
            
            
@thread_worker(connect={'yielded': display_napari})
def yield_img(img_queue):
    """ Worker thread that checks whether there are elements in the 
        queue, reads them out.
        Connected to display_napari function to update display """
    global acq_running
    
    while acq_running:
        time.sleep(sleep_time)
        # get elements from queue while there is more than one element
        # playing it safe: I'm always leaving one element in the queue
        while img_queue.qsize() > 1:
            #print("reading from queue ", img_queue.qsize())
            yield img_queue.get(block = False)

    # read out last remaining elements after end of acquisition
    while img_queue.qsize() > 0:
        yield img_queue.get(block = False)
    print("acquisition done")

## define functions to start and stop acquisition
connect to gui buttons using magic_gui  
`start_acq` restarts workers, resets `acq_running` flag and resets `z_range[3]`, ie z_pos  
`stop_acq` sets `acq_running` flag to `False`, which will stop the worker threads

In [25]:
@magicgui(call_button="Start")
def start_acq():
    """ Called when Start button in pressed. Starts workers and resets global variables"""
    print("starting threads...")
    global acq_running
    global z_range
    if not(acq_running):
        z_range[3] = 0
        if not(simulate) and not(z_stack_external):
            print(core.get_position())
            core.set_position(z_range[0])
            time.sleep(1)
            print(core.get_position())
        acq_running = True
        # comment in when benchmarking
        #yappi.start()
        worker1 = append_img(img_queue)
        worker2 = yield_img(img_queue)
        worker1.start()
        #worker2.start() # doesn't need to be started bc yield is connected
    else:
        print("acquisition already running!")
    
    
@magicgui(call_button = "Stop")
def stop_acq():
    print("stopping threads")
    # set global acq_running to False to stop other workers
    global acq_running
    acq_running = False
    if not(simulate) and not(z_stack_external):
        print(core.get_position())
        core.stop()
        print(core.get_position())
    # comment in when benchmarking
    # yappi.stop()

## "Main" function: start napari and worker threads
(re-)opens napary viewer  
initializes view with random data  
sets scale, contrast etc and rolls view. 
add GUI buttons for start stop

In [26]:
# check if viewer is already open
# if yes: close and reopen

try:
    if viewer:
        viewer.close()
except:
    print("viewer already closed or never opened")
viewer = napari.Viewer(ndisplay=2)

# initialize napari viewer with stack view and random data, reslice view
scale = [(z_range[1]-z_range[0])/z_range[2], size_um[1], size_um[0]]
layer = viewer.add_image(data, 
                        name = 'uManager',
                        colormap = cmap,
                        interpolation = 'nearest',
                        blending = 'additive',
                        rendering = 'attenuated_mip',
                        scale = scale,
                        contrast_limits = clim )
viewer.dims._roll()

# set sliders to the middle of the stack for all three dimensions.
# doesn't work anymore after fixing scaling
for dd, dim in enumerate(layer.data.shape):
    viewer.dims.set_point(dd, dim*scale[2-dd]//2)

# define start stop buttons and add to napari gui
# these will break when upgrading to magicgui v0.2.0
gui_start = start_acq.Gui()
gui_stop = stop_acq.Gui()
viewer.window.add_dock_widget(gui_start)
viewer.window.add_dock_widget(gui_stop)


Exception ignored in: <function JavaObjectShadow.__del__ at 0x7f9cb4ca6790>
Traceback (most recent call last):
  File "/opt/anaconda3/envs/pyoformats/lib/python3.8/site-packages/pycromanager/core.py", line 457, in __del__
    self._close()
  File "/opt/anaconda3/envs/pyoformats/lib/python3.8/site-packages/pycromanager/core.py", line 447, in _close
    self._socket.send(message)
  File "/opt/anaconda3/envs/pyoformats/lib/python3.8/site-packages/pycromanager/core.py", line 102, in send
    self._socket.send_multipart(message_parts)
  File "/opt/anaconda3/envs/pyoformats/lib/python3.8/site-packages/zmq/sugar/socket.py", line 447, in send_multipart
    return self.send(msg_parts[-1], flags, copy=copy, track=track)
  File "/opt/anaconda3/envs/pyoformats/lib/python3.8/site-packages/zmq/sugar/socket.py", line 400, in send
    return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
  File "zmq/backend/cython/socket.pyx", line 728, in zmq.backend.cython.socket.Socket.send
  F

viewer already closed or never opened


<napari._qt.widgets.qt_viewer_dock_widget.QtViewerDockWidget at 0x7f9c9e082310>

starting threads...
0
0
stopping threads
acquisition done


## Get output from yappi
only needs to be run when benchmarking code

In [13]:
#only needs to be executed when yappi is used
threads = yappi.get_thread_stats()
for thread in threads:
    print(
        "Function stats for (%s) (%d)" % (thread.name, thread.id)
    )  # it is the Thread.__class__.__name__
    yappi.get_func_stats(ctx_id=thread.id).print_all()

Function stats for (_MainThread) (0)


SystemError: NULL object passed to Py_BuildValue


Clock type: CPU
Ordered by: totaltime, desc

name                                  ncall  tsub      ttot      tavg      
..t-8-af3e07d989d1>:1 display_napari  255    0.061164  9.212611  0.036128
..yers/image/image.py:299 Image.data  255    0.003209  9.013016  0.035345
..ase/base.py:380 Image._update_dims  255    0.009440  7.962832  0.031227
..ers/base/base.py:731 Image.refresh  255    0.006357  7.457126  0.029244
..event.py:467 EventEmitter.__call__  127..  0.044935  6.091301  0.004770
..:527 EventEmitter._invoke_callback  127..  0.006939  5.994742  0.004694
..60 VispyImageLayer._on_data_change  255    0.015204  4.954112  0.019428
..ispyImageLayer._transform_position  2024   0.052185  4.060790  0.002006
..ayer.coordinates_of_canvas_corners  757    0.100023  3.557644  0.004700
..ode.py:593 SubScene.node_transform  2271   0.017526  3.530115  0.001554
..hain.py:31 ChainTransform.__init__  2271   0.054381  3.266222  0.001438
..event.py:405 EventEmitter.__call__  480..  0.119697  2.884762 

SystemError: NULL object passed to Py_BuildValue

Function stats for (ParentPollerUnix) (3)
Function stats for (Thread) (2)

Clock type: CPU
Ordered by: totaltime, desc

name                                  ncall  tsub      ttot      tavg      
..ncio/events.py:79 TimerHandle._run  2      0.000015  0.001427  0.000713
..oop.py:735 ZMQIOLoop._run_callback  1      0.000009  0.000954  0.000954
..l/iostream.py:359 OutStream._flush  1      0.000040  0.000945  0.000945
..client/session.py:660 Session.send  1      0.000051  0.000829  0.000829
..4 _UnixSelectorEventLoop._run_once  1      0.000029  0.000553  0.000553
..t/session.py:601 Session.serialize  1      0.000043  0.000486  0.000486
..io.py:137 ZMQIOLoop._handle_events  1      0.000008  0.000441  0.000441
..am.py:446 ZMQStream._handle_events  1      0.000026  0.000433  0.000433
..yter_client/session.py:81 <lambda>  4      0.000021  0.000282  0.000070
..ages/zmq/utils/jsonapi.py:31 dumps  4      0.000040  0.000261  0.000065
..ream.py:471 ZMQStream._handle_recv  1      0.000013  0.000255 