# 📖 [LMX Tutorial] Absolute and Relative motion
http://download.movensys.com:8111/wmx3/en/html/api/page_WMXDOC_TUTORIAL_SEC3_1_ABS_REL.html

## Start LMX engine

In [2]:
!/opt/lmx/bin/lmx-start-engine

[2025-02-12 02:31:37.489][WMX3Engine]Build: Oct 16 2024:11:35:23 (v3.5.0.0)
[2025-02-12 02:31:38.394][WMX3Engine]LicenseVerify: Valid WMX3 license code is not found.
[2025-02-12 02:31:38.444][WMX3Engine]Free license running mode is applied.
[2025-02-12 02:31:38.444][WMX3Engine]WMX3Engine will stop communicating after one hour of continuous communication.
lmx engine is started!


## Import WMX library and Initialize a WMX3 API device

In [5]:
# Import WMX3 API library
from WMX3ApiPython import *
from time import *

# Constants
INFINITE = int(0xFFFFFFFF)

# When all the devices are done, the WMX3 engine will also terminate.	
wmx3Lib = WMX3Api()
wmx3LibCore = CoreMotion(wmx3Lib)
wmxLog = Log(wmx3Lib)
wmxMemLogStatus = MemoryLogStatus()
# wmxEventCtrl = EventControl()

# Create a device.
wmx3Lib.CreateDevice('/opt/lmx', DeviceType.DeviceTypeNormal, INFINITE)

# Set wmx3Lib Name.
wmx3Lib.SetDeviceName('device')
wmx3Lib

<WMX3ApiPython.WMX3Api; proxy of <Swig Object of type 'wmx3ApiPython::WMX3Api *' at 0x755f49c55470> >

# Import Python libraries and declare utility functions


In [5]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from matplotlib import rc

%matplotlib inline
%config InlineBackend.figure_format='retina'

sns.set(style='whitegrid', palette='muted', font_scale=1.2)
HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"]
sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE))

title_font = {
    'fontsize': 16,
    'fontweight': 'bold'
}

# Plot utility functions
def plotPosMotion(df):
    plt.plot(df['Timestamp'].head(1000), df['DcDiffAvg'].head(1000))
    plt.xlabel('Time [cycle]')
    plt.ylabel('Position [p]')
    plt.title('Position Plot')

# Plot the velocity  plot
def plotVelMotion(df):
    plt.plot(df['Timestamp'].head(1000), df['DcDiffAvg'].head(1000))
    plt.xlabel('Time [cycle]')
    plt.ylabel('Velocity [p/s]')
    plt.title('velocity  Plot')

def OP_STATE(state):
    if state == OperationState.Idle:
        return "Idle"
    elif state == OperationState.Pos:
        return "Pos"
    elif state == OperationState.Jog:
        return "Jog"
    elif state == OperationState.Home:
        return "Home"
    elif state == OperationState.Sync:
        return "Sync"
    elif state == OperationState.GantryHome:
        return "GantryHome"
    elif state == OperationState.Stop:
        return "Stop"
    elif state == OperationState.Intpl:
        return "Intpl"
    elif state == OperationState.List:
        return "List"
    elif state == OperationState.ConstLinearVelocity:
        return "ConstLinearVelocity"
    elif state == OperationState.Trq:
        return "Trq"
    elif state == OperationState.DirectControl:
        return "DirectControl"
    elif state == OperationState.PVT:
        return "PVT"
    elif state == OperationState.ECAM:
        return "ECAM"
    elif state == OperationState.SyncCatchUp:
        return "SyncCatchUp"
    elif state == OperationState.DancerControl:
        return "DancerControl"
    else:
        return ""

def dumpStatus(cmAxis):
    print(" 1.PosCmd    :%-11.3f" % cmAxis.posCmd)             # 1. PosCmd
    print(" 2.ActPos    :%-11.3f" % cmAxis.actualPos)          # 2. ActualPos
    print(" 3.ActVel    :%-11.3f" % cmAxis.actualVelocity)     # 3. ActualVelocity
    print(" 4.ActTrq    :%-11.3f" % cmAxis.actualTorque)       # 4. ActualTorque 
    print(" 5.AmpAlm    :%-3d"    % cmAxis.ampAlarm)           # 5. AmpAlarm
    print(" 6.AmpAlmCode:0x%05x"  % cmAxis.ampAlarmCode)       # 6. AmpAlarmCode 
    print(" 7.SrvOn     :%-3d"    % cmAxis.servoOn)            # 7. ServoOn
    print(" 8.HomeDone  :%-4d"    % cmAxis.homeDone)           # 8. HomeDone 
    print(" 9.InPos     :%-3d"    % cmAxis.inPos)              # 9. InPos
    print("10.NegLS     :%-3d"    % cmAxis.negativeLS)         # 10.negativeLS
    print("11.PosLS     :%-3d"    % cmAxis.positiveLS)         # 11.positiveLS
    print("12.HomeSw    :%-4d"    % cmAxis.homeSwitch)         # 12.homeSwitch
    print("13.OpState   :%s"      % OP_STATE(cmAxis.opState))  # 13.OperationState

In [6]:
import multiprocessing

def LogUpdateTask (channel, dumpFlag=True):
    history = [np.zeros((0,)), np.zeros((0,))]
   
    while True:
        updateLogdata, overflowFlag = UpdateLogData(channel, 0)

        if overflowFlag:
            print(f'Overflow: {overflowFlag}');
        if not updateLogdata or len(updateLogdata) == 0:
            break
        if dumpFlag:
            print(f'Read log buffer size: {len(updateLogdata)}')
            
        np_feedbackPos = [data.feedbackPos for data in updateLogdata]
        np_feedbackVelocity = [data.feedbackVelocity for data in updateLogdata]
        
        history[0] = np.concatenate((history[0], np_feedbackPos), axis=0)
        history[1] = np.concatenate((history[1], np_feedbackVelocity), axis=0)

    if dumpFlag:
        print(f'Updated data size: {history[0].size}, {history[1].size}')
        print(history)

    return_logupdater['history'] = history

manager = multiprocessing.Manager()
return_logupdater = manager.dict()

In [7]:
from WMX3ApiPython import constants

log_channel = 0
overflowFlag = 0

def CheckErrorCode(func, errCode):
    if errCode != ErrorCode.PyNone:
        if errCode >= 0x00000 and errCode <= 0x10000:
           lastErrString =  f"function: {func}, ErrorType: WMX3Api, ErrorCode: {errCode}"
        elif errCode >= 0x11000 and errCode <= 0x11FFF:
            lastErrString = f"function: {func}, ErrorType: Log, ErrorCode: {errCode}"
        else:
            lastErrString = f"function: {func}, ErrorType: Undefined, ErrorCode: {errCode}"
        
        print(lastErrString)
        return False
    return True

def IsAvailableLogChannel(channel):    
    ret, memLogStatus = wmxLog.GetMemoryLogStatus(channel)    
    if ret != ErrorCode.PyNone:
        CheckErrorCode("GetMemoryLogStatus", ret)
        return False
    
    if memLogStatus.logState != LogState.Idle or memLogStatus.bufferOpened == True:
        print(f"memLogStatus.LogState: {memLogStatus.logState}, memLogStatus.bufferOpened: {memLogStatus.bufferOpened}")
        return False
    else:
        return True

def _CheckMemoryLogChannel():
    channel = -1
    for curchannel in range(constants.maxLogChannel - 1, 1, -1):
        print(f"curchannel: {curchannel}")
        if IsAvailableLogChannel(curchannel):
            ret = wmxLog.OpenMemoryLogBuffer(curchannel);
            if ret == ErrorCode.PyNone:
                channel = curchannel   
                break
            else:
                CheckErrorCode("OpenMemoryLogBuffer", ret)                

    return channel

def _CloseMemoryLog(channel):
    retryCount = 100

    for i in range (retryCount):
        ret, memLogStatus = wmxLog.GetMemoryLogStatus(channel)
        if ret != ErrorCode.PyNone:
            CheckErrorCode("GetMemoryLogStatus", ret)
            return False

        if memLogStatus.logState == LogState.Idle:
           break

        ret = wmxLog.StopMemoryLog(channel)
        if ret != ErrorCode.PyNone:
            CheckErrorCode("StopMemoryLog", ret)
            return False

        sleep(5)
            
    if ret != ErrorCode.PyNone:
        CheckErrorCode("_CloseMemoryLog", ret)
        return False

    return True

def UpdateLogData(channel, axis=0):
    updatedLogdata = None
    overflowFlag = False

    ret, memLogData = wmxLog.GetMemoryLogData(channel)
    if ret != ErrorCode.PyNone:
        CheckErrorCode("GetMemoryLogData", ret)
        return None, False

    overflowFlag = memLogData.overflowFlag > 0

    updatedLogdata = [None]*memLogData.count
    for index in range(memLogData.count):
        logdata = memLogData.GetLogData(index)                                       
        updatedLogdata[index] = logdata.GetLogAxisData(axis)
    
    return updatedLogdata, overflowFlag

def StartLog():
    
    StopLog(log_channel)
    
    maxAxes = 2
    channel = _CheckMemoryLogChannel()
    updaterProc = None
    
    if channel != -1:
        axisSel = AxisSelection()
        
        axisSel.axisCount = maxAxes;
        for idx in range(maxAxes):
            axisSel.SetAxis(idx, idx)

        evi = CoreMotionEventInput()
        evi.inputFunction  = CoreMotionEventInputType.OpState;
        inputFuncArg = CoreMotionEventInputFunctionArguments_OpState()
        inputFuncArg.opState = OperationState.Pos
        inputFuncArg.axis = 0
        inputFuncArg.invert = 0
        evi.opState = inputFuncArg
        
        # Set the output function to None
        evo = EventApiEventOutput()
        evo.outputFunction = EventApiEventOutputType.PyNone;
       
        memOption = MemoryLogOptions()
        memOption.triggerEventCount = 0

        ret, eventId = wmxEventCtrl.SetEvent(evi, evo)
        if ret != ErrorCode.PyNone:
            CheckErrorCode("SetEvent", ret)            
        else:
            memOption.triggerEventCount = 1
            memOption.SetTriggerEventID(0, eventId)
            wmxEventCtrl.EnableEvent(eventId, 1)
    
        ret = wmxLog.SetMemoryLog(channel, axisSel, memOption)
        if ret != ErrorCode.PyNone:
            CheckErrorCode("SetMemoryLog", ret)
            return -1
        
        ret = wmxLog.StartMemoryLog(channel)
        if ret != ErrorCode.PyNone:
            CheckErrorCode("StartMemoryLog", ret)
            return -1

        updaterProc = Process(target=LogUpdateTask, args=(channel,))
        updaterProc.start()

    return channel, updaterProc

def StopLog(channel):
    if _CloseMemoryLog(channel):
        
        ret = wmxLog.CloseMemoryLogBuffer(channel);
        if ret != ErrorCode.PyNone:
            CheckErrorCode("CloseMemoryLogBuffer", ret)

def PauseLog(channel, updaterProc):
    ret = wmxLog.StopMemoryLog(channel)
    if ret != ErrorCode.PyNone:
        CheckErrorCode("StopMemoryLog in PauseLog()", ret)
    
    sleep(0.1)
    
    ret, memLogStatus = wmxLog.GetMemoryLogStatus(channel)
    if ret != ErrorCode.PyNone:
        CheckErrorCode("GetMemoryLogStatus in PauseLog()", ret)
        return False
                
    print(f"Memory Logging is paused. (samplesToCollect: {memLogStatus.samplesToCollect}, samplesCollected: {memLogStatus.samplesCollected})") 

    if updaterProc:
        updaterProc.join()
        print(return_logupdater.values())
    

In [8]:
def UpdateLogPlot(plot_title, dumpFlag=False):
    # history = [np.zeros((0,)), np.zeros((0,))]
   
    # while True:
    #     updateLogdata, overflowFlag = UpdateLogData(log_channel, 0)

    #     if overflowFlag:
    #         print(f'Overflow: {overflowFlag}');
    #     if not updateLogdata or len(updateLogdata) == 0:
    #         break
    #     if dumpFlag:
    #         print(f'Read log buffer size: {len(updateLogdata)}')
            
    #     np_feedbackPos = [data.feedbackPos for data in updateLogdata]
    #     np_feedbackVelocity = [data.feedbackVelocity for data in updateLogdata]
        
    #     history[0] = np.concatenate((history[0], np_feedbackPos), axis=0)
    #     history[1] = np.concatenate((history[1], np_feedbackVelocity), axis=0)

    # if dumpFlag:
    #     print(f'Updated data size: {history[0].size}, {history[1].size}')
    #     print(history)
        
    # Plot position and velocity feedbacks
    fig, axs = plt.subplots(
      nrows=1,
      ncols=2,
      sharey=False,
      sharex=False,
      figsize=(25, 15)
    )
    
    ax = axs[0]
    ax.plot(history[0], label='Position', color='limegreen')
    ax.set_xlabel('Cycle')
    ax.set_title(f'{plot_title}: Feedback Position', fontdict=title_font, pad=20)
    ax.ticklabel_format(useOffset=False)
    ax.legend(['Position'])
    
    ax = axs[1]
    ax.plot(history[1], label='Velocity', color='violet')
    ax.set_xlabel('Cycle')
    ax.set_title(f'{plot_title}: Feedback Velocity', fontdict=title_font, pad=20)
    ax.legend(['Velocity'])
    
    fig.tight_layout();

## Set the device name and start communication

In [12]:
# Set wmx3Lib Name.
ret = wmx3Lib.SetDeviceName('MotorControl')
if ret != ErrorCode.PyNone:
    CheckErrorCode("SetDeviceName", ret)
    
# Start Communication.
ret = wmx3Lib.StartCommunication(INFINITE)
if ret != ErrorCode.PyNone:
    CheckErrorCode("StartCommunication", ret)

In [13]:
# Get created device state.
ret, devInfo = wmx3Lib.GetAllDevices()
if ret != ErrorCode.PyNone:
    CheckErrorCode("GetAllDevices", ret)
    
# Display the acquired device.
print(f'Device Id: {devInfo.GetDevices(0).id} Name: {devInfo.GetDevices(0).name}')

Device Id: 0 Name: 


## Set servo ON ▶️

In [None]:
wmx3LibCore.axisControl.SetServoOn(0, 1)
while True:
    # wmx3LibCore.GetStatus(CmStatus)
    ret, CmStatus = wmx3LibCore.GetStatus()
    if CmStatus.GetAxesStatus(0).servoOn:
        break
    sleep(0.1)

## Start log 

In [None]:
# log_channel = StartLog()
# print(log_channel)

In [None]:
# np_logdata = GetFeedbackPos(log_channel)
# print(np_logdata)

In [None]:
# StopLog(log_channel)

## 1. Absolute position command (StartPos)

http://download.movensys.com:8111/wmx3/en/html/api/classwmx3_api_1_1_motion.html#_CPPv4N7wmx3Api6Motion8StartPosEjP17TriggerPosCommand

In [None]:
# Create a command value.
posCommand = Motion_PosCommand()

# Set position command parameters
posCommand.axis = 0
posCommand.profile.type = ProfileType.Trapezoidal
posCommand.profile.velocity = 10000
posCommand.profile.acc = 10000
posCommand.profile.dec = 10000

# Execute absolute position command to 10000
posCommand.target = 100000

log_channel, updater = StartLog()

# Rotate the motor at the specified speed.
err = wmx3LibCore.motion.StartPos(posCommand)

if err != ErrorCode.PyNone:
    errString = wmx3LibCore.ErrorToString(err)
    print("Failed to execute motion. Error=%d (%s)\n", err, errString)

err = wmx3LibCore.motion.Wait(0)
PauseLog(log_channel, updater)

### 1.1 Draw the position and velocity plots for absolute motion 📈

In [None]:
plot_title = 'Absolute position command (StartPos)'
UpdateLogPlot(plot_title, True)

## 2. Relative position command (StartMov)

http://download.movensys.com:8111/wmx3/en/html/api/classwmx3_api_1_1_motion.html#_CPPv4N7wmx3Api6Motion8StartMovEP10PosCommand

In [None]:
# Execute relative position command of 10000
posCommand.target = 1000
posCommand.profile.velocity = 100
posCommand.profile.acc = 10
posCommand.profile.dec = 10

log_channel, updater = StartLog()

err = wmx3LibCore.motion.StartMov(posCommand)

if err != ErrorCode.PyNone:
    errString = wmx3LibCore.ErrorToString(err)
    print("Failed to execute motion. Error=%d (%s)\n", err, errString)

# dumpStatus(cmAxis)
err = wmx3LibCore.motion.Wait(0)
PauseLog(log_channel, updater)

### 1.1 Draw the position and velocity plots for relative motion 📈

In [None]:
plot_title = 'Relative position command (StartMov)'
UpdateLogPlot(plot_title, True)

## Set servo OFF ⏹️

In [None]:
wmx3LibCore.axisControl.SetServoOn(0, 0)
while True:
    # wmx3LibCore.GetStatus(CmStatus)
    ret, CmStatus = wmx3LibCore.GetStatus()
    if CmStatus.GetAxesStatus(0).servoOn:
        break
    sleep(0.1)

## Stop communication and close the WMX3 API device

In [None]:
# ----------------------
# Stop Communication.
# ----------------------
wmx3Lib.StopCommunication(INFINITE)

# Close wmx3Lib.
wmx3Lib.CloseDevice()

## Stop LMX engine

In [1]:
!/opt/lmx/bin/lmx-stop-engine

lmx engine is stopped!
