In [1]:
import automation1 as a1
import time
import numpy as np
import matplotlib.pyplot as plt

import os
import serial
from pathlib import Path
from pprint import pprint

import sys
sys.path.append('C:\\Users\\UNIVERSITY\\git\\')
sys.path.append('C:\\Users\\UNIVERSITY\\git\\metalens\\')
import metalens
import core_utils as cu 

# Test mml-automation package core-utils

In [2]:
def enable_metrologyprobe(controller, state, output_num=0, axis="X", execution_task_index=1):
    """
    Enable or disable the metrology probe, then confirm state.

    Parameters
    ----------
    controller : object
        Automation1 controller instance.
    state : str
        Either "on" or "off".
    output_num : int
        Digital IO output port where the probe is connected. Default 0.
    axis : str
        Axis name (e.g., "X"). Default "X".
    execution_task_index : int
        The Task window in the Automation1 software suite to execute the command.
    """

    state = state.lower()
    if state == "on":
        value = 1
    elif state == "off":
        value = 0
    else:
        raise ValueError("state must be 'on' or 'off'")

    # Set the output
    controller.runtime.commands.io.digitaloutputset(
        axis=axis,
        output_num=output_num,
        value=value,
        execution_task_index=execution_task_index,
    )
    time.sleep(1)
    # Read back the output state
    current = controller.runtime.commands.io.digitaloutputget(
        axis=axis,
        output_num=output_num,
        execution_task_index=execution_task_index,
    )
    #print(current)
    # Build human-readable status
    if int(current) == 1:
        status = "Metrology probe is ON"
    elif int(current) == 0:
        status = "Metrology probe is OFF"
    else:
        status = "Metrology probe in awkward state. Stop and check hardware."

    return status


In [3]:
def enable_axes(controller, cq, z_axes):
    """
    Enable X, Y, and one or more Z axes.

    Args
    ----
    controller : a1.Controller
    cq         : Command queue
    z_axes     : str or list of str  ("ZC" or ["ZA", "ZB"])

    Note: controller and command queue must already be instantiated. 
    """
    print("Queue was initiated on task:", cq.task_index, "capacity:", cq.command_capacity)

    if isinstance(z_axes, str):
        z_axes = [z_axes]

    cq.pause()
    for axis in ["X", "Y"] + z_axes:
        cq.commands.motion.enable(axis)
    print(f"Enabled axes: X, Y, {', '.join(z_axes)}")

    cq.resume()
    cq.wait_for_empty()
    controller.runtime.commands.end_command_queue(cq)

In [4]:
def _get_program_pos(controller, axes=("X","Y","ZA")):
    cfg = a1.StatusItemConfiguration()
    for ax in axes:
        cfg.axis.add(a1.AxisStatusItem.ProgramPosition, ax)
    res = controller.runtime.status.get_status_items(cfg)
    return {ax: res.axis.get(a1.AxisStatusItem.ProgramPosition, ax).value for ax in axes}

def testtouch_metrology(
    command_queue, controller,
    numX, lengthX,
    Xstart, Ystart, Zstart, Zdrop,
    outname, comport="COM4",
    dwell_ms_at_depth=0  # set >0 if you want a hardware dwell at depth
):
    """
    Y is fixed. For each X:
      - Move to (X, Ystart, Zstart), wait
      - Move ZA to depth, wait
      - wait_for_empty()  <-- ensure we're *actually* at depth
      - pause()           <-- freeze queue; nothing else will move
      - read sensor + positions; print
      - resume(); retract to Zstart; wait; wait_for_empty()
    """
    if os.path.exists(outname):
        print("Metrology File Present, Stopping Motion")
        return
    
    # Step size along X
    incX = 0.0 if numX <= 1 else (lengthX / (numX - 1))
    depth = Zstart - Zdrop

    # Enable axes we use
    for ax in ["X", "Y", "ZA"]:
        command_queue.commands.motion.enable(ax)

    # Move to start (Y fixed)
    command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[0.0], speeds=[8.0])
    command_queue.commands.motion.waitformotiondone(["ZA"])
    command_queue.commands.motion.waitforinposition(["ZA"])
    command_queue.commands.motion.moveabsolute(axes=["X"], positions=[Xstart], speeds=[15.0])
    command_queue.commands.motion.waitformotiondone(["X"])
    command_queue.commands.motion.waitforinposition(["X"])
    command_queue.commands.motion.moveabsolute(axes=["Y"], positions=[Ystart], speeds=[15.0])
    command_queue.commands.motion.waitformotiondone(["Y"])
    command_queue.commands.motion.waitforinposition(["Y"])

    command_queue.wait_for_empty()  # arrive at start pose

    # Open gauge once
    ser = serial.Serial(comport, 9600, timeout=3)
    time.sleep(0.02)
    f = open(outname, "w")
    for ix in range(numX):
        x = Xstart + incX * ix

        # 1) Row point: (x, Ystart, Zstart) and wait
        command_queue.commands.motion.moveabsolute(
            axes=["X", "Y", "ZA"], positions=[x, Ystart, Zstart], speeds=[10.0, 10.0, 3.0]
        )
        command_queue.commands.motion.waitformotiondone(["X", "Y", "ZA"])
        command_queue.commands.motion.waitforinposition(["X", "Y", "ZA"])
        command_queue.commands.motion.movedelay("ZA", 2_000)
        

        # 2) Drop to depth and dwell so we can pause and query data points
        
        command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[depth], speeds=[3.0])
        command_queue.commands.motion.waitforinposition(["ZA"])
        command_queue.commands.motion.waitformotiondone(["ZA"])
        command_queue.commands.motion.movedelay("ZA", int(dwell_ms_at_depth))

        command_queue.commands.motion.movedelay(["X", "Y", "ZA"], 4_000)
        
        ser.write(b"RMD0\r\n")  # replace with your gauge's measurement command if needed
        time.sleep(0.05)
        
        sensor = ser.read(2048).decode("utf-8", errors="ignore").strip()

        while True:
            pos = _get_program_pos(controller, axes=("X","Y","ZA"))
            if pos['X'] == x and pos['Y'] == Ystart and pos['ZA'] == depth:
                line = f"{pos['X']}, {pos['Y']}, {pos['ZA']}, {sensor}\n"
                f.write(line)
                f.flush()
                print(f"{pos['X']}, {pos['Y']}, {pos['ZA']}, {sensor}")
                break   # exit while, go to next X
            else:
                time.sleep(0.1)  # tiny delay before re-check
                

        #command_queue.commands.motion.movedelay("ZA", 2_000)
        command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[Zstart], speeds=[7.0])
        command_queue.commands.motion.waitformotiondone(["ZA"])
        command_queue.commands.motion.waitforinposition(["ZA"])
        command_queue.commands.motion.movedelay("X", 4_000)
        command_queue.wait_for_empty()  # ensure retract finished before next X
        
    # Park and end
    command_queue.commands.motion.movedelay("ZA", 3_000)
    command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[0.0], speeds=[8.0])
    command_queue.commands.motion.waitformotiondone(["ZA"])
    command_queue.commands.motion.waitforinposition(["ZA"])

    command_queue.wait_for_empty()
    controller.runtime.commands.end_command_queue(command_queue)
    f.close()
    ser.close()
 

In [13]:
def dressing_metrology(
    command_queue, controller,
    numX, lengthX, numY, lengthY,
    Xstart, Ystart, Zstart, Zdrop,
    outname, comport="COM4",
    dwell_ms_at_depth=0  # set >0 if you want a hardware dwell at depth
):
    """
    For each X and Y:
      - Move to (X, Y, Zstart), wait
      - Move ZA to depth, wait
      - wait_for_empty()  <-- ensure we're *actually* at depth
      - movedelay()           <-- freeze; nothing else will move
      - read sensor + positions; write to file
      - resume(); retract to Zstart; wait; wait_for_empty()
    """
    if os.path.exists(outname):
        print("Metrology File Present, Stopping Motion")
        return
    
    # Step size along X
    incX = 0.0 if numX <= 1 else (lengthX / (numX - 1))
    incY = 0.0 if numY <= 1 else (lengthY / (numY - 1))
    depth = Zstart - Zdrop

    # Enable axes we use
    for ax in ["X", "Y", "ZA"]:
        command_queue.commands.motion.enable(ax)

    # Move to start (Y fixed)
    command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[0.0], speeds=[8.0])
    command_queue.commands.motion.waitformotiondone(["ZA"])
    command_queue.commands.motion.waitforinposition(["ZA"])
    command_queue.commands.motion.moveabsolute(axes=["X"], positions=[Xstart], speeds=[15.0])
    command_queue.commands.motion.waitformotiondone(["X"])
    command_queue.commands.motion.waitforinposition(["X"])
    command_queue.commands.motion.moveabsolute(axes=["Y"], positions=[Ystart], speeds=[15.0])
    command_queue.commands.motion.waitformotiondone(["Y"])
    command_queue.commands.motion.waitforinposition(["Y"])

    command_queue.wait_for_empty()  # arrive at start pose

    # Open gauge once
    ser = serial.Serial(comport, 9600, timeout=3)
    time.sleep(0.02)
   # f = open(outname, "w")
    for ix in range(numX):
        x = Xstart + incX * ix

        command_queue.commands.motion.moveabsolute(
            axes=["X", "ZA"], positions=[x, Zstart], speeds=[10.0, 6.0]
        )
        command_queue.commands.motion.waitformotiondone(["X", "ZA"])
        command_queue.commands.motion.waitforinposition(["X", "ZA"])
        command_queue.commands.motion.movedelay("ZA", 2_000)
        
        for iy in range(numY):
            y = Ystart + incY * iy
            print(y)
            # 2) Drop to depth and dwell so we can pause and query data points
            command_queue.commands.motion.moveabsolute(axes=["Y"], positions=[y], speeds=[3.0])
            command_queue.commands.motion.waitforinposition(["Y"])
            command_queue.commands.motion.waitformotiondone(["Y"])
            command_queue.commands.motion.movedelay("Y", 2_000) # can change to 1s

            
            command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[depth], speeds=[3.0])
            command_queue.commands.motion.waitforinposition(["ZA"])
            command_queue.commands.motion.waitformotiondone(["ZA"])
            #command_queue.commands.motion.movedelay("ZA", int(dwell_ms_at_depth))
    
            command_queue.commands.motion.movedelay(["X", "Y", "ZA"], 4_000)
            
            ser.write(b"RMD0\r\n")  # replace with your gauge's measurement command if needed
            time.sleep(0.05)
            
            sensor = ser.read(2048).decode("utf-8", errors="ignore").strip()
    
            while True:
                pos = _get_program_pos(controller, axes=("X","Y","ZA"))
                if pos['X'] == x and pos['Y'] == y and pos['ZA'] == depth:
                    line = f"{pos['X']}, {pos['Y']}, {pos['ZA']}, {sensor}\n"
                    #f.write(line)
                    #f.flush()
                    print(f"{pos['X']}, {pos['Y']}, {pos['ZA']}, {sensor}")
                    break   # exit while, go to next X
                else:
                    time.sleep(0.1)  # tiny delay before re-check
                    
    
            #command_queue.commands.motion.movedelay("ZA", 2_000)
            command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[Zstart], speeds=[7.0])
            command_queue.commands.motion.waitformotiondone(["ZA"])
            command_queue.commands.motion.waitforinposition(["ZA"])
            command_queue.commands.motion.movedelay(["X", "Y"], 3_000)
            command_queue.wait_for_empty()  # ensure retract finished before next X
        
    # Park and end
    command_queue.commands.motion.movedelay("ZA", 3_000)
    command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[0.0], speeds=[8.0])
    command_queue.commands.motion.waitformotiondone(["ZA"])
    command_queue.commands.motion.waitforinposition(["ZA"])

    command_queue.wait_for_empty()
    controller.runtime.commands.end_command_queue(command_queue)
   # f.close()
    ser.close()

# 1. Controller connection

In [7]:
controller = a1.Controller.connect()
controller.start()
print(controller.is_running)

True


# 2. Turn Metrology Probe On

- the reason we can't do this in the function that does the rastering is because we use the command queue module to make the motions for moving the axes
- in order for us to enable the probe, we cannot command digital output bits like the probe, the flood cooling, the spindle, inside a command_queue session, so it has to be done separately. To make life easier, I made a simple function that takes the automation1 command and turns the probe on and checks the status of the probe. 
- we will have to run this step before the metrology functions that gather the data

In [None]:
enable_metrologyprobe(controller=controller, state='on', output_num=0, axis="X", execution_task_index=1)

# 3. Parameter Settings
- below is a test touch metrology function in use

In [8]:
# Constants
numX = 4
numY = 4
lengthX = 15  # for normal
lengthY = 7

lifterSettleTime = 0.5
outname = r"C:\Users\UNIVERSITY\Desktop\RunData\Automation1_TEST\Metrology_dressing.dat"

Xstart = -231.74
Ystart = 27.25
Zstart = -40.0 #Actually do -20 when the time comes to test in-lab
Zdrop = 12.0
#pi = math.pi

# 4. Begin a Command Queue Session

In [14]:
command_queue = controller.runtime.commands.begin_command_queue("Task 1", 60, True)

In [15]:
cq = command_queue

In [16]:
s = command_queue.status  # property, not callable

# Show non-callable public attributes + their values
attrs = {
    name: getattr(s, name)
    for name in dir(s)
    if not name.startswith("_") and not callable(getattr(s, name, None))
}
pprint(attrs)


{'is_empty': True,
 'is_paused': False,
 'number_of_executed_commands': 0,
 'number_of_times_emptied': 0,
 'number_of_unexecuted_commands': 0}


# 5. Enable ZAxes
- note that the `enable_axes` function by default enables X and Y, it is only our job to tell it which additional Z axes we care to enable for our use case

In [None]:
z_axes = ["ZA"]

In [None]:
enable_axes(controller, cq, z_axes)

# 6. And now, we do metrology
- pick your poison: test touch metrology? plane metrology? lens metrology? dressing metrology? 

### currently testing dressing metrology

In [17]:
print( numX, lengthX, numY, lengthY, Xstart, Ystart, Zstart, Zdrop, outname)

4 15 4 7 -231.74 27.25 -40.0 12.0 C:\Users\UNIVERSITY\Desktop\RunData\Automation1_TEST\Metrology_dressing.dat


Note: it moves to the second Y and then the while loop keeps chugging and it might be because it's expecting the next X position but i haven't given it that properly? 
- maybe for a square, we loosen the while loop conditions to not include X in the inner logging?
- not sure not sure

In [None]:
dressing_metrology(
    command_queue, controller,
    numX, lengthX, numY, lengthY,
    Xstart, Ystart, Zstart, Zdrop,
    outname, comport="COM4",
    dwell_ms_at_depth=5_000  # set >0 if you want a hardware dwell at depth
)

27.25
-231.74, 27.25, -52.0, -27.5062
29.583333333333332


KeyboardInterrupt: 

In [None]:
import gc

In [None]:
import psutil, os
p = psutil.Process(os.getpid())
print(p.open_files())

#### Annoying things that are happening

- X axis moves while ZA is still moving up but it should really wait to reach up to the zstart before X moves at all
- Also, sometimes ZA stops at Zstart and sometimes it blows right through it?? mind blowing

In [None]:
def _get_program_pos(controller, axes=("X","Y","ZA")):
    cfg = a1.StatusItemConfiguration()
    for ax in axes:
        cfg.axis.add(a1.AxisStatusItem.ProgramPosition, ax)
    res = controller.runtime.status.get_status_items(cfg)
    return {ax: res.axis.get(a1.AxisStatusItem.ProgramPosition, ax).value for ax in axes}

def testtouch_metrology(
    command_queue, controller,
    numX, lengthX,
    Xstart, Ystart, Zstart, Zdrop,
    outname, comport="COM4",
    dwell_ms_at_depth=0  # set >0 if you want a hardware dwell at depth
):
    """
    Y is fixed. For each X:
      - Move to (X, Ystart, Zstart), wait
      - Move ZA to depth, wait
      - wait_for_empty()  <-- ensure we're *actually* at depth
      - pause()           <-- freeze queue; nothing else will move
      - read sensor + positions; print
      - resume(); retract to Zstart; wait; wait_for_empty()
    """
    if os.path.exists(outname):
        print("Metrology File Present, Stopping Motion")
        return
    
    # Step size along X
    incX = 0.0 if numX <= 1 else (lengthX / (numX - 1))
    depth = Zstart - Zdrop

    # Enable axes we use
    for ax in ["X", "Y", "ZA"]:
        command_queue.commands.motion.enable(ax)

    # Move to start (Y fixed)
    command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[0.0], speeds=[8.0])
    command_queue.commands.motion.waitformotiondone(["ZA"])
    command_queue.commands.motion.waitforinposition(["ZA"])
    command_queue.commands.motion.moveabsolute(axes=["X"], positions=[Xstart], speeds=[15.0])
    command_queue.commands.motion.waitformotiondone(["X"])
    command_queue.commands.motion.waitforinposition(["X"])
    command_queue.commands.motion.moveabsolute(axes=["Y"], positions=[Ystart], speeds=[15.0])
    command_queue.commands.motion.waitformotiondone(["Y"])
    command_queue.commands.motion.waitforinposition(["Y"])

    command_queue.wait_for_empty()  # arrive at start pose

    # Open gauge once
    ser = serial.Serial(comport, 9600, timeout=3)
    time.sleep(0.02)
    f = open(outname, "w")
    for ix in range(numX):
        x = Xstart + incX * ix

        # 1) Row point: (x, Ystart, Zstart) and wait
        command_queue.commands.motion.moveabsolute(
            axes=["X", "Y", "ZA"], positions=[x, Ystart, Zstart], speeds=[10.0, 10.0, 3.0]
        )
        command_queue.commands.motion.waitformotiondone(["X", "Y", "ZA"])
        command_queue.commands.motion.waitforinposition(["X", "Y", "ZA"])
        command_queue.commands.motion.movedelay("ZA", 2_000)
        

        # 2) Drop to depth and dwell so we can pause and query data points
        
        command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[depth], speeds=[3.0])
        command_queue.commands.motion.waitforinposition(["ZA"])
        command_queue.commands.motion.waitformotiondone(["ZA"])
        command_queue.commands.motion.movedelay("ZA", int(dwell_ms_at_depth))

        command_queue.commands.motion.movedelay(["X", "Y", "ZA"], 4_000)
        
        ser.write(b"RMD0\r\n")  # replace with your gauge's measurement command if needed
        time.sleep(0.05)
        
        sensor = ser.read(2048).decode("utf-8", errors="ignore").strip()

        while True:
            pos = _get_program_pos(controller, axes=("X","Y","ZA"))
            if pos['X'] == x and pos['Y'] == Ystart and pos['ZA'] == depth:
                line = f"{pos['X']}, {pos['Y']}, {pos['ZA']}, {sensor}\n"
                f.write(line)
                f.flush()
                print(f"{pos['X']}, {pos['Y']}, {pos['ZA']}, {sensor}")
                break   # exit while, go to next X
            else:
                time.sleep(0.1)  # tiny delay before re-check
                

        #command_queue.commands.motion.movedelay("ZA", 2_000)
        command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[Zstart], speeds=[7.0])
        command_queue.commands.motion.waitformotiondone(["ZA"])
        command_queue.commands.motion.waitforinposition(["ZA"])
        command_queue.commands.motion.movedelay("X", 4_000)
        command_queue.wait_for_empty()  # ensure retract finished before next X
        
    # Park and end
    command_queue.commands.motion.movedelay("ZA", 3_000)
    command_queue.commands.motion.moveabsolute(axes=["ZA"], positions=[0.0], speeds=[8.0])
    command_queue.commands.motion.waitformotiondone(["ZA"])
    command_queue.commands.motion.waitforinposition(["ZA"])

    command_queue.wait_for_empty()
    controller.runtime.commands.end_command_queue(command_queue)
    f.close()
    ser.close()
 

In [None]:
command_queue = controller.runtime.commands.begin_command_queue("Task 1", 60, True)

In [None]:
ser = serial.Serial('COM4', 9600, timeout=1)

In [None]:
io_axis_flood = "X"  # change if your flood IO lives elsewhere
controller.commands.io.digitaloutputset(axis=io_axis_flood, output_num=6, value=0, execution_task_index=1)
print("flood coolant OFF (DO6=0).")


In [None]:
testtouch_metrology_print_only(
    command_queue, controller,
    numX, lengthX,
    Xstart, Ystart, Zstart, Zdrop,
    outname, comport="COM4",
    dwell_ms_at_depth=5_000  # set >0 if you want a hardware dwell at depth
)

In [None]:
ser.close()

In [None]:
s = command_queue.status  # property, not callable

# Show non-callable public attributes + their values
attrs = {
    name: getattr(s, name)
    for name in dir(s)
    if not name.startswith("_") and not callable(getattr(s, name, None))
}
pprint(attrs)

#assert attrs['is_empty'] is True

# Using data collection module???

In [None]:
def wait_and_read_at_za(
    controller,
    *,
    target_z: float = -62.0,
    tol: float = 0.005,                 # user units (e.g., mm). Adjust to your metrology tolerance.
    period_ms: float = 1.0,             # snapshot cadence; 0.5–2.0 ms is typical
    signal: AxisDataSignal = AxisDataSignal.ActualPosition,
    require_descending: bool = True,    # only trigger when moving downward through target
    serial_port: str = "COM4",
    baudrate: int = 9600,
    sensor_cmd: bytes = b"RMD0\r\n",
    sensor_settle_s: float = 0.02,      # small dwell for device to respond
    read_bytes: int = 2048,
    timeout_s: float = 5.0              # overall guard (increase if moves are slow)
):
    """
    Block until ZA ~= target_z (within tol), then perform a serial read and return (za_value, sensor_text).

    Returns:
        (za_at_trigger: float, sensor_text: str)
    Raises:
        TimeoutError if not reached within timeout_s.
    """
    start = time.perf_counter()

    # --- helper to get a single real-time ZA sample via data collection ---
    def _sample_za() -> float:
        cfg = DataCollectionConfiguration(number_of_points_to_collect=1, period=period_ms)
        cfg.axis.add(signal, AxisSpecifier("ZA"), 0)
        res = controller.runtime.data_collection.collect_snapshot(cfg)
        return float(res.axis.items[0].values[0])

    # Pre-arm state for edge detection
    last = _sample_za()

    # Open serial once to avoid port churn
    with serial.Serial(serial_port, baudrate=baudrate, timeout=1) as ser:
        while True:
            now = _sample_za()

            # Check descending edge (above target last sample, now at/under target) if required,
            # else accept any arrival within the tolerance window.
            crossed = (last > target_z >= now) if require_descending else True
            close_enough = abs(now - target_z) <= tol

            if crossed and close_enough:
                # Immediately perform the serial read at this position
                ser.write(sensor_cmd)
                time.sleep(sensor_settle_s)
                sensor_text = ser.read(read_bytes).decode("utf-8", errors="replace").strip()
                return now, sensor_text

            if time.perf_counter() - start > timeout_s:
                raise TimeoutError(
                    f"Timed out waiting for ZA≈{target_z}±{tol}. Last ZA={now:.6f}."
                )

            last = now
How to use it (typical metrology touch)
python
Copy code
za_at_touch, probe_reading = wait_and_read_at_za(
    controller,
    target_z=-62.0,
    tol=0.005,          # tighten/loosen based on your units & sensor response time
    period_ms=1.0,
    require_descending=True,
    serial_port="COM4",
    baudrate=9600,
    sensor_cmd=b"RMD0\r\n",
    sensor_settle_s=0.02,
    read_bytes=2048,
    timeout_s=8.0       # extend if motion is slow
)
print(f"ZA at trigger: {za_at_touch:.6f}, gauge: {probe_reading}")

# To dos:
1. add the command to turn the digital axis on for the probe
2. Consider if this would be faster if we didn't use the command queue and instead used waitformotion done and the command module by itself. Cus i dont think command queue really looks for wait for motion done etc.
3. if you wanna test a dressing type of metrology then make Y a length of 7 and change the zdrop to be more shallow since you'll be wfh on Friday August 29
4. generalize `get_program_position` args; don't like it as it stands currently