# Python PLI data acquisition

env: py39

# Test

In [1]:
import time
import serial
import serial.tools.list_ports

In [2]:
ports = serial.tools.list_ports.comports()
arduino_port = None
ser = None

In [3]:
for port, desc, hwid in sorted(ports):
    print("{}: {} [{}]".format(port, desc, hwid))

/dev/cu.Bluetooth-Incoming-Port: n/a [n/a]
/dev/cu.NT-P58C: n/a [n/a]
/dev/cu.usbmodem101: IOUSBHostDevice [USB VID:PID=2341:0043 SER=85935333337351E07262 LOCATION=0-1]


In [4]:
for port, desc, hwid in sorted(ports):
    print("{}: {} [{}]".format(port, desc, hwid))
    if "usbmodem" in port or "usbserial" in port:
        arduino_port = port
        print(f"arduino_port: {arduino_port}")

/dev/cu.Bluetooth-Incoming-Port: n/a [n/a]
/dev/cu.NT-P58C: n/a [n/a]
/dev/cu.usbmodem101: IOUSBHostDevice [USB VID:PID=2341:0043 SER=85935333337351E07262 LOCATION=0-1]
arduino_port: /dev/cu.usbmodem101


In [5]:
# this produces a double flash when it's ok
ser = serial.Serial(arduino_port, 9600)

In [296]:
# rotate polariser
step = 50000
str = f"3+{step}"
ser.write(bytes(str, "ascii"))

7

In [292]:
# +Y: Into PLI (up in the slice)
step = 50
str = f"""1+{step}
2-{step}"""
ser.write(bytes(str, "ascii"))

9

In [301]:
# -Y: Away from PLI (down in the slice)
step = 13000
str = f"""1-{step}
2+{step}"""
ser.write(bytes(str, "ascii"))

15

In [291]:
# +X: toward the polariser's stepper (right in the slice)
step = 50 
str = f"""1+{step}
2+{step}"""
ser.write(bytes(str, "ascii"))

9

In [282]:
# -X: away from the polariser's stepper (left in the slice)
step = 150
str = f"""1-{step}
2-{step}"""
ser.write(bytes(str, "ascii"))

11

In [171]:
# illumination on
str = "dir 4+"
ser.write(bytes(str, "ascii"))

6

In [137]:
# illumination off
str = "dir 4-"
ser.write(bytes(str, "ascii"))

6

In [16]:
dx, dy = -123, 456

# +X: toward the polariser's stepper (right in the slice image)
sign = "+" if dx >= 0 else ""
str = f"""1{sign}{dx}\n2{sign}{dx}"""
print(bytes(str, "ascii"))

# +Y: Into PLI (up in the slice image)
signx, signy = ("+", "-") if dy >= 0 else ("-", "+")
str = f"""1{signx}{abs(dy)}\n2{signy}{abs(dy)}"""
print(bytes(str, "ascii"))

b'1-123\n2-123'
b'1+456\n2-456'


In [6]:
ser.close()

# Acquisition

In [15]:
import time
import serial
import serial.tools.list_ports

ports = serial.tools.list_ports.comports()
arduino_port = None
ser = None

for port, desc, hwid in sorted(ports):
    print("{}: {} [{}]".format(port, desc, hwid))

for port, desc, hwid in sorted(ports):
    print("{}: {} [{}]".format(port, desc, hwid))
    if "usbmodem" in port or "usbserial" in port:
        arduino_port = port
        print(f"arduino_port: {arduino_port}")

# this produces a double flash when it's ok
ser = serial.Serial(arduino_port, 9600)

/dev/cu.Bluetooth-Incoming-Port: n/a [n/a]
/dev/cu.NT-P58C: n/a [n/a]
/dev/cu.usbmodem1101: IOUSBHostDevice [USB VID:PID=2341:0043 SER=85935333337351E07262 LOCATION=1-1]
/dev/cu.Bluetooth-Incoming-Port: n/a [n/a]
/dev/cu.NT-P58C: n/a [n/a]
/dev/cu.usbmodem1101: IOUSBHostDevice [USB VID:PID=2341:0043 SER=85935333337351E07262 LOCATION=1-1]
arduino_port: /dev/cu.usbmodem1101


In [1]:
# First, place the sample at the origin

In [16]:
fovstep = 500
ncols = 2
nrows = 2
n_angles = 5

In [28]:
def sleep():
    time.sleep(0.5)
def illumination_on():
    str = "dir 4+"
    ser.write(bytes(str, "ascii"))
    sleep()
def illumination_off():
    str = "dir 4-"
    ser.write(bytes(str, "ascii"))
    sleep()
def rotate_pos(angle):
    str = f"3+{angle}"
    ser.write(bytes(str, "ascii"))
    sleep()
def rotate_neg(angle):
    str = f"3-{angle}"
    ser.write(bytes(str, "ascii"))
    sleep()
def move_pos_x(x):
    str = f"""1-{x}
2-{x}"""
    ser.write(bytes(str, "ascii"))
    sleep()
def move_neg_x(x):
    str = f"""1+{x}
2+{x}"""
    ser.write(bytes(str, "ascii"))
    sleep()
def move_pos_y(y):
    str = f"""1+{y}
2-{y}"""
    ser.write(bytes(str, "ascii"))
    sleep()
def move_neg_y(y):
    str = f"""1-{y}
2+{y}"""
    ser.write(bytes(str, "ascii"))
    sleep()

In [29]:
# Acquire a grid of FOVs

fov_x_steps, fov_y_steps = 500, 300
angle_steps = 500
nx = 2
ny = 2
n_angles = 5

for i in range(ny):
    for j in range(nx):

        # acquire
        for k in range(n_angles):
            illumination_on()

            # acquire
            print(f"Acquiring y:{i}, x:{j}, angle:{k}")
            time.sleep(1)

            # turn off illumination
            illumination_off()

            # rotate polariser
            rotate_pos(angle_steps)
        # return polariser to origin
        rotate_neg(n_angles * angle_steps)
        
        # move to next position
        if j < nx - 1:
            move_pos_x(fov_x_steps)
        else:
            move_neg_x(fov_x_steps * (nx - 1))
            move_neg_y(fov_y_steps)
move_neg_x(fov_x_steps * (nx - 1))
move_pos_y(fov_y_steps * (ny - 1))

Acquiring 0, 0, 0
Acquiring 0, 0, 1
Acquiring 0, 0, 2
Acquiring 0, 0, 3
Acquiring 0, 0, 4
Acquiring 0, 1, 0
Acquiring 0, 1, 1
Acquiring 0, 1, 2
Acquiring 0, 1, 3
Acquiring 0, 1, 4
Acquiring 1, 0, 0
Acquiring 1, 0, 1
Acquiring 1, 0, 2
Acquiring 1, 0, 3
Acquiring 1, 0, 4
Acquiring 1, 1, 0
Acquiring 1, 1, 1
Acquiring 1, 1, 2
Acquiring 1, 1, 3
Acquiring 1, 1, 4


In [30]:
ser.close()

# Test `asyncio`
Waiting fixed durations makes everything slow. It'd be better to listen to the
messages sent by the machine through the serial interface to determine when
it's done.
Here I test the use of `asyncio` within a jupyter notebook.

In [31]:
import asyncio
async def fn():
  print('hello')
  await asyncio.sleep(1)
  print('world')

hello
world


In [32]:
await fn()

hello
world


The following code simulates a lengthy task (the pli machine): `do_things()`. The `status` variable tracks the status of the task. The `wait_for_done()` function can be awaited and will return when `do_things` is done.

In [51]:
status = 'idle'
async def do_things():
    global status
    print("doing many things")
    status = 'busy'
    for i in range(10):
        status = f"i: {i}"
        await asyncio.sleep(0.5)
    status = 'done'

async def wait_for_done():
    global status
    while status != 'done':
        await asyncio.sleep(1)
    print("done waiting")

In [52]:
await do_things()

doing many things


In [55]:
print('begin')

# this triggers the lengthy task
task = asyncio.create_task(do_things())
await task

# this monitors the status and returns when the task is done
await wait_for_done()

print('end')

begin
doing many things
done waiting
end
