In [1]:
import automation1 as a1
import time
import numpy as np
import matplotlib.pyplot as plt

import os
import serial
import time

import math

# Connect to Controller

In [2]:
controller = a1.Controller.connect()
controller.start()
print(controller.is_running)

KeyboardInterrupt: 

# Command Queue Python Module

WARNINGS FROM THE DOCUMENTATION:
- must keep command queue populated at all times. If the command queue is not populated, your process will stall and motion problems might occur
- if a `MovePt()` or `MovePvt()` command is the most recently executed command from the queue, and a starvation of the command queue has occurred, the controlloer **will not** automatically decelerate the axes that you specified to a comand to zero velocity 

NOTES FROM THE DOCUMENTATION
- when the command queue begins on a task, the controller executes all the commands that are in the command queue as quickly as possible. If the controller automatically executes commands from the command queue more quickly than you can add them, the command queue will not have a sufficient quantity of commands to execute. This condition is known as a starvation of the command queue. 
- while the command queue is active, you can examine its status to find the number of times that a starvation of the command queue has occurred. 
- if starvation mode of the queue occurs, velocity blending mode is enabled on the task, AND most recently executed command from the command queue is a `MoveCcw()`, `MoveCw()`, or `MoveLinear()` command then the controller automatically decelerates them to zero velocity to prevent motion problems from occurring 
- can use `CommandQueueCount` task status item to make the first command wait for the command queue to fill with a specified number of commands before the controller executes them.

RELEVANT LINK: 
- http://help.aerotech.com/automation1/Content/APIs/Python/References/Command-Queue-Python.htm?Highlight=advanced_motion

# Status Command Queue Commands to Have in Arsenal

In [None]:
command_queue.status.number_of_times.emptied

In [None]:
command_queue.execute("wait(StatusGetTaskItem ... >= 20)")

# Example Code

In [3]:
# Begin a new command queue on task 1.
command_queue = controller.runtime.commands.begin_command_queue("Task 1", 10, True)
# "Task 1": which task to create a command queue on, 
# 10: max # number of unexecuted Aeroscript commands that we decide can be stored in queue
# True: bool for whether or not to block if you add a command when the queue is full. 


# First, pause the command queue so you can add all the commands
# before they are executed.
command_queue.pause()

# Add all the AeroScript commands that you want to execute.
command_queue.commands.advanced_motion.velocityblendingon()
command_queue.commands.motion.enable("X")
command_queue.commands.motion.movelinear("X", [10], 5)
command_queue.commands.motion.movelinear("X", [5], 5)
command_queue.commands.motion.disable("X")
command_queue.commands.advanced_motion.velocityblendingoff()

# Resume the command queue so that all the commands that you added start
# to execute.
command_queue.resume()

# Here you can do other things such as more process, get status, etc.
# You can do these things because the command queue is executing
# commands on the controller and is not blocking your code execution.

# Here you wait to make sure that the command queue executes all the commands.
# You must do this before you end the command queue.
# When you end the command queue, this process aborts all motion and commands.
command_queue.wait_for_empty()

# At this time, end the command queue.
# You can also call CommandQueue.end_command_queue to abort the command
# that is currently executing and discard the remaining commands.
controller.runtime.commands.end_command_queue(command_queue)

Object `a1.Controller.runtime.commands.begin_command_queue` not found.


# Metrology Command Queue Code

Steps in Writing for me:

1. Initializes scan parameters
2. Checks if the output file exists → exit if it does
3. Enables axes
4. Moves to start position
5. Opens serial port for the metrology gauge
6. Opens output data file
7. Raster scans in X (with optional Y)
8. At each point:
- Moves to X, Y
- Drops Z probe
- Triggers lifter (optional)
- Waits for settle
- Sends command to gauge
- Reads Z gauge value + position feedback
- Lifts Z probe
- Writes to file
- Closes all files
- Returns Z to safe height

#### 1. Constants & Initial Definitions

In [13]:
# Constants
numX = 50
numY = 1
lengthX = 25  # for normal
lengthY = 0

lifterSettleTime = 0.5
outname = r"C:\Users\UNIVERSITY\Desktop\RunData\CCAT_Alumina_350\Surface2\0deg\TestWafer\SpindleC\TestWaferMet_0deg_col_2.dat"

Xstart = 179.05
Ystart = 582.23
Zstart = -70.0 #Actually do -20 when the time comes to test in-lab
Zdrop = 22.0
pi = math.pi


#### 2. Check if file already exists


In [10]:
def file_exists():
    if os.path.exists(outname):
        print("Metrology File Present, Stopping Motion")
        return  # equivalent to `END PROGRAM`


#### 3. Calculate increments and start point

In [12]:
incdistX = lengthX / (numX - 1) if numX > 1 else 0
incdistY = lengthY  # since numY is 1, no division here (for Filter Test Touch)

startpoint = [Xstart, Ystart, Zstart]
lifterSettleTime = 0.5

#### 4. Now add automation1 commands from python command queue

In [None]:
def setup_motion(command_queue):
    if file_exists(outname):
        return  # Stop execution if file exists

    # Grid increment setup
    incdistX = lengthX / (numX - 1) if numX > 1 else 0
    incdistY = lengthY  # numY is 1

    startpoint = [Xstart, Ystart, Zstart]

    # Enable axes
    for axis in ["X", "Y", "ZA"]:
        command_queue.commands.motion.enable(axis)

    # Dwell 2 seconds (2000 ms)
    command_queue.commands.motion.movedelay(["X", "Y", "ZA"], 2000)

In [None]:
def get_axis_position(axis_name: str):
    """Fetches ProgramPosition (commanded position) for a given axis."""
    status_item_configuration = a1.StatusItemConfiguration()
    status_item_configuration.axis.add(a1.AxisStatusItem.ProgramPosition, axis_name)
    results = controller.runtime.status.get_status_items(status_item_configuration)
    return results.axis.get(a1.AxisStatusItem.ProgramPosition, axis_name).value

def get_xyz_positions():
    """Fetch ProgramPosition for X, Y, and ZA simultaneously."""
    config = a1.StatusItemConfiguration()
    for axis in ["X", "Y", "ZA"]:
        config.axis.add(a1.AxisStatusItem.ProgramPosition, axis)

    results = controller.runtime.status.get_status_items(config)
    return {
        axis: results.axis.get(a1.AxisStatusItem.ProgramPosition, axis).value
        for axis in ["X", "Y", "ZA"]
    }

In [None]:
### NOTE, if we don't specifiy execution_task_index in the `movelinear` function, 
### it will assume Task1 is the window of operation
def run_metrology_scan(command_queue):
    if os.path.exists(outname):
        print("Metrology File Present, Stopping Motion")
        return

    incdistX = lengthX / (numX - 1)
    incdistY = lengthY

    # Enable axes
    for axis in ["X", "Y", "ZA"]:
        command_queue.commands.motion.enable(axis)

    command_queue.commands.motion.movedelay(["X", "Y", "ZA"], 2000)

    # Move all Zs to 0
    command_queue.commands.motion.movelinear(
        axes=["ZA"], distances=[0.0], coordinated_speed=37.5
    )
    command_queue.commands.motion.movelinear(
        axes=["ZB"], distances=[0.0], coordinated_speed=20.0
    )
    command_queue.commands.motion.movelinear(
        axes=["ZC"], distances=[0.0], coordinated_speed=20.0
    )

    # Open serial port for gauge
    sensor_port = serial.Serial("COM4", baudrate=9600, timeout=1)

    # Open output file
    with open(outname, "w") as outfile:
        for xcount in range(numX):
            x = Xstart + incdistX * xcount
            y = Ystart

            command_queue.commands.motion.movelinear(
                axes=["X", "Y", "ZA"],
                distances=[x, y, Zstart],
                coordinated_speed=25.0
            )
            command_queue.commands.motion.movedelay(["ZA"], 500)

            for ycount in range(numY):
                y = Ystart + incdistY * ycount

                command_queue.commands.motion.movelinear(
                    axes=["Y"], distances=[y], coordinated_speed=25.0
                )

                command_queue.commands.motion.movelinear(
                    axes=["ZA"],
                    distances=[Zstart - Zdrop],
                    coordinated_speed=3.0
                )
                command_queue.commands.motion.movedelay(["ZA"], int(lifterSettleTime * 1000))

                sensor_port.write(b"RMD0\r\n")
                time.sleep(0.1)
                sensor_reading = sensor_port.readline().decode().strip()

                # Get XYZ positions
                posvals = get_xyz_positions()

                command_queue.commands.motion.movelinear(
                    axes=["ZA"],
                    distances=[Zstart],
                    coordinated_speed=10.0
                )
                
                outfile.write(f"{posvals['X']}, {posvals['Y']}, {posvals['ZA']}, {sensor_reading}\n")


    # Close sensor port
    sensor_port.close()

    # Return Z to safe height
    command_queue.commands.motion.movelinear(
        axes=["ZA"], distances=[0.0], coordinated_speed=37.5
    )

    print("Metrology Done.")


# Since we don't have automation1 here hooked up to hardware, let's make a "dry run" version in case it catchees any errors

In [7]:
import random
from datetime import datetime

In [4]:
def dry_run_metrology_scan():
    print("Running dry run metrology scan...")

    incdistX = lengthX / (numX - 1)
    incdistY = lengthY

    # Write to the same output file for verification
    with open(outname, "w") as outfile:
        for xcount in range(numX):
            x = Xstart + incdistX * xcount
            y = Ystart

            for ycount in range(numY):
                y = Ystart + incdistY * ycount
                za = Zstart - Zdrop

                # Simulate a mock gauge sensor reading
                sensor_reading = f"{random.uniform(0.01, 0.05):.5f}"

                # Simulate some jitter in position
                jitter = lambda val: val + random.uniform(-0.01, 0.01)
                posval_x = jitter(x)
                posval_y = jitter(y)
                posval_z = jitter(za)

                # Write simulated data to file
                line = f"{posval_x:.6f}, {posval_y:.6f}, {posval_z:.6f}, {sensor_reading}\n"
                outfile.write(line)
                print(f"[{xcount}, {ycount}] {line.strip()}")

    print("Dry run complete. Output written to:")
    print(outname)

In [8]:
# Constants used in both dry_run and real scan
numX = 50
numY = 1
lengthX = 25
lengthY = 0

Xstart = 179.05
Ystart = 582.23
Zstart = -70.0
Zdrop = 22.0

lifterSettleTime = 0.5

outname = rf"C:\Windows\System32\JupyterNotebooks\TestDryRun_{datetime.now():%Y%m%d_%H%M%S}.dat"

In [10]:

dry_run_metrology_scan()

Running dry run metrology scan...
[0, 0] 179.056870, 582.228433, -91.998388, 0.01205
[1, 0] 179.561198, 582.232356, -91.996985, 0.01194
[2, 0] 180.067579, 582.234060, -92.005291, 0.04725
[3, 0] 180.585617, 582.225907, -91.992914, 0.02152
[4, 0] 181.084962, 582.222491, -91.997092, 0.04917
[5, 0] 181.602481, 582.230340, -91.994440, 0.04124
[6, 0] 182.112834, 582.233450, -91.997194, 0.02448
[7, 0] 182.625303, 582.232861, -91.991357, 0.02811
[8, 0] 183.125072, 582.237781, -91.993950, 0.01498
[9, 0] 183.648947, 582.234536, -92.001981, 0.01068
[10, 0] 184.144665, 582.227813, -91.990896, 0.04358
[11, 0] 184.658835, 582.220665, -92.004444, 0.02722
[12, 0] 185.165225, 582.239026, -92.008196, 0.04853
[13, 0] 185.686502, 582.222003, -92.001419, 0.03356
[14, 0] 186.187920, 582.220277, -92.001398, 0.03688
[15, 0] 186.704745, 582.230459, -92.009285, 0.04746
[16, 0] 187.217999, 582.234313, -92.008012, 0.04050
[17, 0] 187.729393, 582.235977, -91.994043, 0.03567
[18, 0] 188.234861, 582.238893, -91.9913

To do Metrology Python Program
1. ~probe sensor read out~
2. data collection part with python?
3. or start with it taking those values and spitting them out to a textfile the way the .pgm was doing it?
4. there's some kind of differencing that the .pgm does that it outputs in the cell that'll be good to know exactly what it's doing

Questions:
1. what is velocity blending mode?
2. camming is something we do currently. not sure we need to do camming anymore, but what is involved in that?
3. is there really no way to interrupt motion in a command queue? that's really dangerous? there's gotta be some way---maybe they mean, we can't interrupt in a python kernel? Question to ask support staff