In [1]:
%load_ext autoreload
%autoreload 2

# Compile mmcoremj.jar and copy dll and jar to Micro-Manager installation
import os
from pathlib import Path

os.chdir(r"C:\Users\henry\GitRepos\micro-manager\mmCoreAndDevices\MMCoreJ_wrap")
result = os.system("ant jar")
if result != 0:
    print(f"Error running 'ant jar': exit code {result}")

import shutil
source_1 = r"C:\Users\henry\GitRepos\micro-manager\mmCoreAndDevices\build\Debug\x64\MMCoreJ_wrap.dll"
destination_1 = r"C:\Program Files\Micro-Manager\MMCoreJ_wrap.dll"
source_2 = r"C:\Users\henry\GitRepos\micro-manager\build\Java\MMCoreJ.jar"
destination_2 = r"C:\Program Files\Micro-Manager\plugins\Micro-Manager\MMCoreJ.jar"
shutil.copy2(source_1, destination_1)
shutil.copy2(source_2, destination_2)


'C:\\Program Files\\Micro-Manager\\plugins\\Micro-Manager\\MMCoreJ.jar'

In [2]:
%load_ext autoreload
%autoreload 2

# import the buffer_test_util.py file
import importlib.util
import sys

# Import util file form absolute path
file_path = 'C:/Users/henry/GitRepos/mmpycorex/test/buffer_test_util.py'
spec = importlib.util.spec_from_file_location("buffer_test_util", file_path)
buffer_test_util = importlib.util.module_from_spec(spec)
sys.modules["buffer_test_util"] = buffer_test_util
spec.loader.exec_module(buffer_test_util)


from pycromanager import start_headless, stop_headless, Core
import time
import numpy as np
from buffer_test_util import *

# start_headless('C:/Program Files/Micro-Manager', 'MMConfig_demo.cfg', max_memory_mb=9000)

# start_headless('C:/Program Files/Micro-Manager', 'MMConfig_demo.cfg', python_backend=True)
# import pymmcore; print(pymmcore.__version__)

core = Core()

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [3]:
from pycromanager import Acquisition, multi_d_acquisition_events, Core

mmc = Core()
mmc.set_property('Camera', 'OnCameraCCDXSize', '1024')
mmc.set_property('Camera', 'OnCameraCCDYSize', '1024')

with Acquisition(show_display=False) as acq:
    events = multi_d_acquisition_events(num_time_points=5)
    acq.acquire(events)

# reset image size
mmc.set_property('Camera', 'OnCameraCCDXSize', '512')
mmc.set_property('Camera', 'OnCameraCCDYSize', '512')

dataset = acq.get_dataset()
try:
    data_shape = dataset.as_array().shape
    assert(data_shape[-2:] == (1024, 1024))
finally:
    print(data_shape[-2:])
    dataset.close()


(1024, 1024)


In [2]:
# SnapImage Tests 

# Make sure we can get and release the pointer if v2 buffer is enabled
core.enable_v2_buffer(True)

core.snap_image()
buffer_data_pointer = core.get_image_pointer()
assert isinstance(buffer_data_pointer.get_data(), np.ndarray)
buffer_data_pointer.release()
assert(buffer_data_pointer.get_data() is None)


# Make sure we can't get the pointer if v2 buffer is disabled
core.enable_v2_buffer(False)
core.snap_image()

try:
    core.get_image_pointer()
    assert False
except Exception as e:
    pass


# Make sure regular snap image still works with either buffer
for v2 in [False, True]:
    core.enable_v2_buffer(v2)
    core.snap_image()
    pixels = core.get_image()
    assert isinstance(pixels, np.ndarray)


## Tagged image snap tests

# Make sure regular snap image still works with either buffer
for v2 in [False, True]:
    core.enable_v2_buffer(v2)
    core.snap_image()
    tagged_image = core.get_tagged_image()
    assert isinstance(tagged_image.pix, np.ndarray)
    assert isinstance(tagged_image.tags, dict)


# TaggedImagePointer 
for v2 in [False, True]:
    core.enable_v2_buffer(v2)
    core.snap_image()
    if not v2:
        # it should raise an exception if you try to get the pointer
        try:
            tagged_image = core.get_tagged_image_pointer()
            assert False
        except Exception as e:
            pass
    else:
        # it should return a pointer
        tagged_image = core.get_tagged_image_pointer()
        assert tagged_image.pix is None
        tagged_image.get_pixels()
        assert isinstance(tagged_image.pix, np.ndarray)




In [5]:
tagged_image.tags.get('Width')

'1024'

In [4]:
# Test functions for getting images and TaggedImages out of the buffer
from time import sleep, time
import numpy as np

# Test Peeking images/tagged images when running an overwrite mode sequence
for v2 in [True, False]:
    core.enable_v2_buffer(v2)
    core.set_circular_buffer_memory_footprint(10)
    core.start_continuous_sequence_acquisition(0)
    sleep(1)
    core.stop_sequence_acquisition()

    ti = core.get_last_tagged_image()
    assert isinstance(ti.pix, np.ndarray)
    i = core.get_last_image()
    assert isinstance(i, np.ndarray)
    # Test direct access to the BufferDataPointer object
    if v2:
        buffer_data_pointer = core.get_last_data_pointer()
        assert isinstance(buffer_data_pointer.get_data(), np.ndarray)
        buffer_data_pointer.release()
        assert(buffer_data_pointer.get_data() is None)
        # Test the TaggedImagePointer object
        tip = core.get_last_tagged_image_pointer()    
        assert tip.pix is None
        tip.get_pixels()
        assert isinstance(tip.pix, np.ndarray)
        tip.release()
    else:
        # assert low level throws exception
        try:
            core.get_last_data_pointer()
            assert False
        except Exception as e:
            pass
        # Assert high level gives none
        try:
            tip = core.get_last_tagged_image_pointer()
            assert False
        except Exception as e:
            pass
    

# Test popping tagged images when running a non-overwrite mode sequence
for v2 in [True, False]:
    core.enable_v2_buffer(v2)
    core.set_circular_buffer_memory_footprint(10)
    core.start_sequence_acquisition(4, 0, True)
    while core.is_sequence_running():
        sleep(0.01)

    ti = core.pop_next_tagged_image()
    assert isinstance(ti.pix, np.ndarray)
    i = core.pop_next_image()
    assert isinstance(i, np.ndarray)
    # Test direct access to the BufferDataPointer object
    if v2:
        buffer_data_pointer = core.pop_next_data_pointer()
        assert isinstance(buffer_data_pointer.get_data(), np.ndarray)
        buffer_data_pointer.release()
        assert(buffer_data_pointer.get_data() is None)
        # Test the TaggedImagePointer object
        tip = core.pop_next_tagged_image_pointer()    
        assert tip.pix is None
        tip.get_pixels()
        assert isinstance(tip.pix, np.ndarray)
        tip.release()
    else:
        # assert low level throws exception
        try:
            core.pop_next_data_pointer()
            assert False
        except Exception as e:
            pass
        # Assert high level gives none
        try:
            tip = core.pop_next_tagged_image_pointer()
            assert False
        except Exception as e:
            pass


In [5]:
# Test pulling out images from different cameras
from time import sleep


core.stop_sequence_acquisition('Camera')
core.stop_sequence_acquisition('Camera2')
image_size = 1024
image_size2 = 2048
buffer_size = 100

core.enable_v2_buffer(True)
core.set_buffer_memory_footprint(buffer_size)

# change to differen image sizes and pixel types
core.set_property('Camera', 'OnCameraCCDYSize', str(image_size))
core.set_property('Camera', 'OnCameraCCDXSize', str(image_size))
core.set_property('Camera', 'PixelType', '8bit')

core.set_property('Camera2', 'OnCameraCCDYSize', str(image_size2))
core.set_property('Camera2', 'OnCameraCCDXSize', str(image_size2))
core.set_property('Camera2', 'PixelType', '16bit')

core.start_continuous_sequence_acquisition('Camera', 0)
core.start_continuous_sequence_acquisition('Camera2', 0)

sleep(0.5)

tip1 = core.get_last_tagged_image_pointer_from_device('Camera')
tip2 = core.get_last_tagged_image_pointer_from_device('Camera2')

assert isinstance(tip1.get_pixels(), np.ndarray)
assert isinstance(tip2.get_pixels(), np.ndarray)

assert tip1.tags.get('Camera') == 'Camera'
assert tip2.tags.get('Camera') == 'Camera2'

# check the data types are uint8 and uint16
assert tip1.pix.dtype == np.uint8
assert tip2.pix.dtype == np.uint16

core.stop_sequence_acquisition('Camera')
core.stop_sequence_acquisition('Camera2')

In [6]:
# Test whether you can wrap around more data greater than the buffer size
from time import sleep

num_images = 50
core.enable_v2_buffer(True)
core.set_exposure(25)

core.set_circular_buffer_memory_footprint(10)
core.start_sequence_acquisition(num_images, 0, True)

i = 0
while True:
    try:
        im = core.pop_next_image()
        assert im is not None
        i += 1
        print(f'{i} popped: Free {core.get_buffer_free_capacity()}', f'Total {core.get_buffer_total_capacity()}')
    except:
        pass
    if core.is_buffer_overflowed():
        print('overflowed, ', f'Free {core.get_buffer_free_capacity()}')
        break
    if not core.is_sequence_running():
        print('sequence done')
    if i == num_images:
        break
    sleep(0.01)

1 popped: Free 10 Total 10
2 popped: Free 10 Total 10
3 popped: Free 10 Total 10
4 popped: Free 10 Total 10
5 popped: Free 10 Total 10
6 popped: Free 10 Total 10
7 popped: Free 10 Total 10
8 popped: Free 10 Total 10
9 popped: Free 10 Total 10
10 popped: Free 10 Total 10
11 popped: Free 10 Total 10
12 popped: Free 10 Total 10
13 popped: Free 10 Total 10
14 popped: Free 10 Total 10
15 popped: Free 10 Total 10
16 popped: Free 10 Total 10
17 popped: Free 10 Total 10
18 popped: Free 10 Total 10
19 popped: Free 10 Total 10
20 popped: Free 10 Total 10
21 popped: Free 10 Total 10
22 popped: Free 10 Total 10
23 popped: Free 10 Total 10
24 popped: Free 10 Total 10
25 popped: Free 10 Total 10
26 popped: Free 10 Total 10
27 popped: Free 10 Total 10
28 popped: Free 10 Total 10
29 popped: Free 10 Total 10
30 popped: Free 10 Total 10
31 popped: Free 10 Total 10
32 popped: Free 10 Total 10
33 popped: Free 10 Total 10
34 popped: Free 10 Total 10
35 popped: Free 10 Total 10
36 popped: Free 10 Total 10
3

In [7]:
# Test continuous acquisition (live mode) produces changing images
import numpy as np
from buffer_test_util import *

# Setup camera with desired parameters
image_size = 1024
exposure = 10
MB_per_image = setup_camera(core, image_size, exposure, fast_image=False)
buffer_size = 20
duration = 5.0

run_live_mode(core, image_size, use_v2=True, buffer_size_mb=buffer_size, duration=duration, check_for_changing_images=True)
run_live_mode(core, image_size, use_v2=False, buffer_size_mb=buffer_size, duration=duration, check_for_changing_images=True)


# Test live mode with RGB images
core.set_property('Camera', 'PixelType', '32bitRGB')

MB_per_image = setup_camera(core, image_size, exposure, fast_image=False)

run_live_mode(core, image_size, use_v2=True, buffer_size_mb=buffer_size, duration=duration, check_for_changing_images=True)
run_live_mode(core, image_size, use_v2=False, buffer_size_mb=buffer_size, duration=duration, check_for_changing_images=True)


core.set_property('Camera', 'PixelType', '16bit')


Continuous acquisition:   0%|          | 0/5.0 [00:00<?, ?it/s]

Buffer free:   0%|          | 0/20 [00:00<?, ?it/s]

Continuous acquisition:   0%|          | 0/5.0 [00:00<?, ?it/s]

Buffer free:   0%|          | 0/20 [00:00<?, ?it/s]

Continuous acquisition:   0%|          | 0/5.0 [00:00<?, ?it/s]

Buffer free:   0%|          | 0/5 [00:00<?, ?it/s]

Continuous acquisition:   0%|          | 0/5.0 [00:00<?, ?it/s]

Buffer free:   0%|          | 0/5 [00:00<?, ?it/s]

In [8]:
# Run two cameras with different sizes
from time import sleep

num_images = 5
image_size = 1024
image_size2 = 2048
buffer_size = 500

core.enable_v2_buffer(True)
core.set_buffer_memory_footprint(buffer_size)

# change to differen image sizes and pixel types
core.set_property('Camera', 'OnCameraCCDYSize', str(image_size))
core.set_property('Camera', 'OnCameraCCDXSize', str(image_size))
core.set_property('Camera', 'PixelType', '8bit')

core.set_property('Camera2', 'OnCameraCCDYSize', str(image_size2))
core.set_property('Camera2', 'OnCameraCCDXSize', str(image_size2))
core.set_property('Camera2', 'PixelType', '16bit')

core.start_sequence_acquisition('Camera', num_images, 0, True)
core.start_sequence_acquisition('Camera2', num_images, 0, True)

i = 0
images = []
while True:
    try:
        im = core.pop_next_tagged_image()
        assert im is not None
        images.append(im)
        i += 1
    except:
        pass
    if core.is_buffer_overflowed():
        print('overflowed, ', f'Free {core.get_buffer_free_capacity()}')
        break
    if i == num_images * 2:
        break
    sleep(0.01)

# find the size of camera 1 and camera 2 in the tags and shape
# ensure that call camera 1 and camera 2 are the same
camera1_images = [im for im in images if im.tags['Camera'] == 'Camera']
camera2_images = [im for im in images if im.tags['Camera'] == 'Camera2']

assert len(camera1_images) == len(camera2_images)
assert all(camera1_images[0].pix.shape == im.pix.shape for im in camera1_images)
assert all(camera2_images[0].pix.shape == im.pix.shape for im in camera2_images)    
# assert shapes are different
assert camera1_images[0].pix.shape != camera2_images[0].pix.shape
