From 99ca14da53eb4a9b2277ed0f5ecff9b1c2f86e51 Mon Sep 17 00:00:00 2001 From: Reto Trappitsch Date: Mon, 14 Sep 2020 18:34:16 -0700 Subject: [PATCH 1/9] Support for Newport Picomotor Controller 8742 Full functionality for this controller in single-controller and multi-controller mode (main / secondaries via RS-485) is implemented. Only single-controller mode is tested with hardware at the moment. Full test suite however includes tests for multi-controller modes. Routines were documented in docstring and also in Sphinx documentation. Note: Control via USB is currently not functional, since InstrumentKit has some issues with the real USB communications class. Needs separate PR. --- doc/source/apiref/newport.rst | 6 + instruments/newport/__init__.py | 2 + instruments/newport/newport_pmc8742.py | 1163 +++++++++++++++++ .../test_newport/test_newport_pmc8742.py | 858 ++++++++++++ 4 files changed, 2029 insertions(+) create mode 100644 instruments/newport/newport_pmc8742.py create mode 100644 instruments/tests/test_newport/test_newport_pmc8742.py diff --git a/doc/source/apiref/newport.rst b/doc/source/apiref/newport.rst index ec5fc1631..55bc8a364 100644 --- a/doc/source/apiref/newport.rst +++ b/doc/source/apiref/newport.rst @@ -40,3 +40,9 @@ Newport :members: :undoc-members: +:class:`PicoMotorController8742` +================================ + +.. autoclass:: PicoMotorController8742 + :members: + :undoc-members: diff --git a/instruments/newport/__init__.py b/instruments/newport/__init__.py index 14a02ca1e..45e751b55 100644 --- a/instruments/newport/__init__.py +++ b/instruments/newport/__init__.py @@ -10,3 +10,5 @@ from .newportesp301 import ( NewportESP301, NewportESP301Axis, NewportESP301HomeSearchMode ) + +from .newport_pmc8742 import PicoMotorController8742 diff --git a/instruments/newport/newport_pmc8742.py b/instruments/newport/newport_pmc8742.py new file mode 100644 index 000000000..7645650b7 --- /dev/null +++ b/instruments/newport/newport_pmc8742.py @@ -0,0 +1,1163 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Provides support for the Newport Pico Motor Controller 8742 + +Note that the class is currently only tested with one controller connected, +however, a main controller / secondary controller setup has also been +implemented already. Commands are as described in the Picomotor manual. + +If a connection via TCP/IP is opened, the standard port that these devices +listen to is 23. + +If you have only one controller connected, everything should work out of +the box. Please only use axiss 0 through 3. + +If you have multiple controllers connected (up to 31), you need to set the +addresses of each controller. This can be done with this this class. See, +e.g., routines for `controller_address`, `scan_controller`, and `scan`. +Also make sure that you set `multiple_controllers` to `True`. This is +used for internal handling of the class only and does not communicate with +the instruments. +If you run with multiple controllers, the axiss are as following: +Ch 0 - 3 -> Motors 1 - 4 on controller with address 1 +Ch 4 - 7 -> Motors 1 - 4 on controller with address 2 +Ch i - i+4 -> Motors 1 - 4 on controller with address i / 4 + 1 (with i%4 = 0) + +All network commands only work with the main controller (this should make +sense). + +If in multiple controller mode, you can always send controller specific +commands by sending them to one individual axis of that controller. +Any axis works! +""" + +# IMPORTS # + +from enum import IntEnum + +from instruments.abstract_instruments import Instrument +from instruments.units import ureg as u +from instruments.util_fns import assume_units, ProxyList + +# pylint: disable=too-many-lines + + +class PicoMotorController8742(Instrument): + """Newport Picomotor Controller 8742 Communications Class + + Use this class to communicate with the picomotor controller 8742. + Single-controller and multi-controller setup can be used. + + Device can be talked to via TCP/IP or over USB. + FixMe: InstrumentKit currently does not communicate correctly via USB! + + Example for TCP/IP controller in single controller mode: + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> motor1 = inst.axis[0] + >>> motor1.move_relative = 100 + + Example for communications via USB: + >>> import instruments as ik + >>> pid = 0x4000 + >>> vid = 0x104d + >>> ik.newport.PicoMotorController8742.open_usb(pid=pid, vid=vid) + >>> motor3 = inst.axis[2] + >>> motor3.move_absolute = -200 + + Example for multicontrollers with controller addresses 1 and 2: + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.multiple_controllers = True + >>> contr1mot1 = inst.axis[0] + >>> contr2mot1 = inst.axis[4] + >>> contr1mot1.move_absolute = 200 + >>> contr2mot1.move_relative = -212 + """ + + def __init__(self, filelike): + """Initialize the PicoMotorController class.""" + super(PicoMotorController8742, self).__init__(filelike) + + # terminator + self.terminator = "\r\n" + + # setup + self._multiple_controllers = False + + # INNER CLASSES # + + class Axis: + """PicoMotorController8742 Axis class for individual motors.""" + def __init__(self, parent, idx): + """Initialize the axis with the parent and the number. + + :raises IndexError: Axis accessed looks like a main / secondary + setup, but the flag for `multiple_controllers` is not set + appropriately. See introduction. + """ + if not isinstance(parent, PicoMotorController8742): + raise TypeError("Don't do that.") + + if idx > 3 and not parent.multiple_controllers: + raise IndexError("You requested an axis that is only " + "available in multi controller mode, " + "however, have not enabled it. See " + "`multi_controllers` routine.") + + # set controller + self._parent = parent + self._idx = idx % 4 + 1 + + # set _address: + if self._parent.multiple_controllers: + self._address = f"{idx // 4 + 1}>" + else: + self._address = "" + + # ENUMS # + + class MotorType(IntEnum): + """IntEnum Class containing valid MotorTypes + + Use this enum to set the motor type. You can select that no or an + unkown motor are connected. See also `motor_check` command to set + these values per controller automatically. + """ + none = 0 + unknown = 1 + tiny = 2 + standard = 3 + + # PROPERTIES # + + @property + def acceleration(self): + """Get / set acceleration of axis in steps / sec^2. + + Valid values are between 1 and 200,000 (steps) 1 / sec^2 with the + default as 100,000 (steps) 1 / sec^2. If quantity is not unitful, + it is assumed that 1 / sec^2 is chosen. + + :return: Acceleration in 1 / sec^2 + :rtype: u.Quantity(int) + + :raises ValueError: Limit is out of bound. + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.acceleration = u.Quantity(500, 1/u.s**-2) + """ + return assume_units(int(self.query("AC?")), u.s**-2) + + @acceleration.setter + def acceleration(self, value): + value = int(assume_units(value, u.s**-2).to(u.s**-2).magnitude) + if not 1 <= value <= 200000: + raise ValueError(f"Acceleration must be between 1 and " + f"200,000 s^-2 but is {value}.") + self.sendcmd(f"AC{value}") + + @property + def home_position(self): + """Get / set home position + + The home position of the device is used, e.g., when moving + to a specific position instead of a relative move. Valid values + are between -2147483648 and 2147483647. + + :return: Home position. + :rtype: int + + :raises ValueError: Set value is out of range. + + Example: + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.home_position = 444 + """ + return int(self.query("DH?")) + + @home_position.setter + def home_position(self, value): + if not -2147483648 <= value <= 2147483647: + raise ValueError(f"Home position must be between -2147483648 " + f"and 2147483647, but is {value}.") + self.sendcmd(f"DH{int(value)}") + + @property + def is_stopped(self): + """Get if an axis is stopped (not moving). + + :return: Is the axis stopped? + :rtype: bool + + Example: + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.is_stopped + True + """ + return bool(int(self.query("MD?"))) + + @property + def motor_type(self): + """Set / get the type of motor connected to the axis. + + Use a `MotorType` IntEnum to set this motor type. + + :return: Motor type set. + :rtype: MotorType + + :raises TypeError: Set motor type is not of type `MotorType`. + + Example: + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.motor_type = ax.MotorType.tiny + """ + retval = int(self.query("QM?")) + return self.MotorType(retval) + + @motor_type.setter + def motor_type(self, value): + if not isinstance(value, self.MotorType): + raise TypeError(f"Set motor type must be of type `MotorType` " + f"but is of type {type(value)}.") + self.sendcmd(f"QM{value.value}") + + @property + def move_absolute(self): + """Get / set the absolute target position of a motor. + + Set with absolute position in steps. Valid values between + -2147483648 and +2147483647. + See also: `home_position`. + + :return: Absolute motion target position. + :rtype: int + + :raises ValueError: Requested position out of range. + + Example: + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.move_absolute = 100 + """ + return int(self.query("PA?")) + + @move_absolute.setter + def move_absolute(self, value): + if not -2147483648 <= value <= 2147483647: + raise ValueError(f"Set position must be between -2147483648 " + f"and 2147483647, but is {value}.") + self.sendcmd(f"PA{int(value)}") + + @property + def move_relative(self): + """Get / set the relative target position of a motor. + + Set with relative motion in steps. Valid values between + -2147483648 and +2147483647. + See also: `home_position`. + + :return: Relative motion target position. + :rtype: int + + :raises ValueError: Requested motion out of range. + + Example: + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.move_relative = 100 + """ + return int(self.query("PR?")) + + @move_relative.setter + def move_relative(self, value): + if not -2147483648 <= value <= 2147483647: + raise ValueError(f"Set motion must be between -2147483648 " + f"and 2147483647, but is {value}.") + self.sendcmd(f"PR{int(value)}") + + @property + def position(self): + """Queries current, actual position of motor. + + Positions are with respect to the home position. + + :return: Current position in steps. + :rtype: int + + Example: + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.position + 123 + """ + return int(self.query("TP?")) + + @property + def velocity(self): + """Get / set velocty of the connected motor (unitful). + + Velocity is given in (steps) per second (1/s). + If a `MotorType.tiny` motor is connected, the maximum velocity + allowed is 1750 /s, otherwise 2000 /s. + If no units are given, 1/s are assumed. + + :return: Velocity in 1/s + :rtype: u.Quantity(int) + + :raises ValueError: Set value is out of the allowed range. + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.velocity = u.Quantity(500, 1/u.s) + """ + retval = int(self.query("VA?")) + return u.Quantity(retval, 1/u.s) + + @velocity.setter + def velocity(self, value): + if self.motor_type == self.MotorType.tiny: + max_velocity = 1750 + else: + max_velocity = 2000 + + value = int(assume_units(value, 1 / u.s).to(1 / u.s).magnitude) + if not 0 < value <= max_velocity: + raise ValueError(f"The maximum allowed velocity for the set " + f"motor is {max_velocity}. The requested " + f"velocity of {value} is out of range.") + self.sendcmd(f"VA{value}") + + # METHODS # + + def move_indefinite(self, direction): + """Move the motor indefinitely in the specific direction. + + To stop motion, issue `stop_motion` or `abort_motion` command. + Direction is defined as a string of either "+" or "-". + + :param direction: Direction in which to move the motor, "+" or "-" + :type direction: str + + Example: + >>> from time import sleep + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.move_indefinite("+") + >>> sleep(1) # wait a second + >>> ax.stop() + """ + if direction in ["+", "-"]: + self.sendcmd(f"MV{direction}") + + def stop(self): + """Stops the specific axis if in motion. + + Example: + >>> from time import sleep + >>> import instruments as ik + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + >>> ax.move_indefinite("+") + >>> sleep(1) # wait a second + >>> ax.stop() + """ + self.sendcmd("ST") + + # CONTROLLER SPECIFIC PROPERTIES # + + @property + def controller_address(self): + """Get / set the controller address. + + Valid address values are between 1 and 31. For setting up multiple + instruments, see `multiple_controllers`. + + :return: Address of this device if secondary, otherwise `None` + :rtype: int + """ + retval = int(self.query("SA?", axs=False)) + return retval + + @controller_address.setter + def controller_address(self, newval): + self.sendcmd(f"SA{int(newval)}", axs=False) + + @property + def controller_configuration(self): + """Get / set configuration of some of the controller’s features. + + Configuration is given as a bit mask. If changed, please save + the settings afterwards if you would like to do so. See + `save_settings`. + + The bitmask to be set can be either given as a number, or as a + string of the mask itself. The following values are equivalent: + 3, 0b11, "11" + + Bit 0: + Value 0: Perform auto motor detection. Check and set motor + type automatically when commanded to move. + Value 1: Do not perform auto motor detection on move. + Bit 1: + Value 0: Do not scan for motors connected to controllers upon + reboot (Performs ‘MC’ command upon power-up, reset or + reboot). + Value 1: Scan for motors connected to controller upon power-up + or reset. + + :return: Bitmask of the controller configuration. + :rtype: str, binary configuration + """ + return self.query('ZZ?', axs=False) + + @controller_configuration.setter + def controller_configuration(self, value): + if isinstance(value, str): + self.sendcmd(f"ZZ{value}", axs=False) + else: + self.sendcmd(f"ZZ{str(bin(value))[2:]}", axs=False) + + @property + def error_code(self): + """Get error code only. + + Error code0 means no error detected. + + :return: Error code. + :rtype: int + """ + return int(self.query("TE?", axs=False)) + + @property + def error_code_and_message(self): + """Get error code and message. + + :return: Error code, error message + :rtype: int, str + """ + retval = self.query("TB?", axs=False) + err_code, err_msg = retval.split(",") + err_code = int(err_code) + err_msg = err_msg.strip() + return err_code, err_msg + + @property + def firmware_version(self): + """Get the controller firmware version.""" + return self.query("VE?", axs=False) + + @property + def name(self): + """Get the name of the controller.""" + return self.query("*IDN?", axs=False) + + # CONTROLLER SPECIFIC METHODS # + + def abort_motion(self): + """Instantaneously stops any motion in progress.""" + self.sendcmd("AB", axs=False) + + def motor_check(self): + """Check what motors are connected and set parameters. + + Use the save command `save_settings` if you want to save the + configuration to the non-volatile memory. + """ + self.sendcmd("MC", axs=False) + + def purge(self): + """Purge the non-volatile memory of the controller. + + Perform a hard reset and reset all the saved variables. The + following variables are reset to factory settings: + 1. Hostname + 2. IP Mode + 3. IP Address + 4. Subnet mask address + 5. Gateway address + 6. Configuration register + 7. Motor type + 8. Desired Velocity + 9. Desired Acceleration + """ + self.sendcmd("XX", axs=False) + + def recall_parameters(self, value=0): + """Recall parameter set. + + This command restores the controller working parameters from values + saved in its non-volatile memory. It is useful when, for example, + the user has been exploring and changing parameters (e.g., velocity) + but then chooses to reload from previously stored, qualified + settings. Note that “*RCL 0” command just restores the working + parameters to factory default settings. It does not change the + settings saved in EEPROM. + + :param value: 0 -> Recall factory default, + 1 -> Recall last saved settings + :type int: + """ + self.sendcmd(f"*RCL{1 if value else 0}", axs=False) + + def reset(self): + """Reset the controller. + + Perform a soft reset. Saved variables are not deleted! For a + hard reset, see the `purge` command. + + ..note:: It might take up to 30 seconds to re-establish + communications via TCP/IP + """ + self.sendcmd("*RST", axs=False) + + def save_settings(self): + """Save user settings. + + This command saves the controller settings in its non-volatile memory. + The controller restores or reloads these settings to working registers + automatically after system reset or it reboots. The purge + command is used to clear non-volatile memory and restore to factory + settings. Note that the SM saves parameters for all motors. + + Saves the following variables: + 1. Controller address + 2. Hostname + 3. IP Mode + 4. IP Address + 5. Subnet mask address + 6. Gateway address + 7. Configuration register + 8. Motor type + 9. Desired Velocity + 10. Desired Acceleration + """ + self.sendcmd("SM", axs=False) + + # SEND AND QUERY # + + def sendcmd(self, cmd, axs=True): + """Send a command to an axis object. + + :param cmd: Command + :type cmd: str + :param axs: Send axis address along? Not used for controller + commands. Defaults to `True` + :type axs: bool + """ + if axs: + command = f"{self._address}{self._idx}{cmd}" + else: + command = f"{self._address}{cmd}" + self._parent.sendcmd(command) + + def query(self, cmd, size=-1, axs=True): + """Query for an axis object. + + :param cmd: Command + :type cmd: str + :param size: bytes to read, defaults to "until terminator" (-1) + :type size: int + :param axs: Send axis address along? Not used for controller + commands. Defaults to `True` + :type axs: bool + + :raises IOError: The wrong axis answered. + """ + if axs: + command = f"{self._address}{self._idx}{cmd}" + else: + command = f"{self._address}{cmd}" + + retval = self._parent.query(command, size=size) + + if retval[:len(self._address)] != self._address: + raise IOError(f"Expected to hear back from secondary " + f"controller {self._address}, instead " + f"controller {retval[:len(self._address)]} " + f"answered.") + + return retval[len(self._address):] + + @property + def axis(self): + """Return an axis object. + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> ax = inst.axis[0] + """ + return ProxyList(self, self.Axis, range(31 * 4)) + + @property + def controller_address(self): + """Get / set the controller address. + + Valid address values are between 1 and 31. For setting up multiple + instruments, see `multiple_controllers`. + + :return: Address of this device if secondary, otherwise `None` + :rtype: int + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.controller_address = 13 + """ + return self.axis[0].controller_address + + @controller_address.setter + def controller_address(self, newval): + self.axis[0].controller_address = newval + + @property + def controller_configuration(self): + """Get / set configuration of some of the controller’s features. + + Configuration is given as a bit mask. If changed, please save + the settings afterwards if you would like to do so. See + `save_settings`. + + Bit 0: + Value 0: Perform auto motor detection. Check and set motor + type automatically when commanded to move. + Value 1: Do not perform auto motor detection on move. + Bit 1: + Value 0: Do not scan for motors connected to controllers upon + reboot (Performs ‘MC’ command upon power-up, reset or + reboot). + Value 1: Scan for motors connected to controller upon power-up + or reset. + + :return: Bitmask of the controller configuration. + :rtype: str + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.controller_configuration = "11" + """ + return self.axis[0].controller_configuration + + @controller_configuration.setter + def controller_configuration(self, value): + self.axis[0].controller_configuration = value + + @property + def dhcp_mode(self): + """Get / set if device is in DHCP mode. + + If not in DHCP mode, a static IP address, gateway, and netmask + must be set. + + :return: Status if DHCP mode is enabled + :rtype: `bool` + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.dhcp_mode = True + """ + return bool(self.query("IPMODE?")) + + @dhcp_mode.setter + def dhcp_mode(self, newval): + nn = 1 if newval else 0 + self.sendcmd(f"IPMODE{nn}") + + @property + def error_code(self): + """Get error code only. + + Error code0 means no error detected. + + :return: Error code. + :rtype: int + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.error_code + 0 + """ + return self.axis[0].error_code + + @property + def error_code_and_message(self): + """Get error code and message. + + :return: Error code, error message + :rtype: int, str + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.error_code + (0, 'NO ERROR DETECTED') + """ + return self.axis[0].error_code_and_message + + @property + def firmware_version(self): + """Get the controller firmware version. + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.firmware_version + '8742 Version 2.2 08/01/13' + """ + return self.axis[0].firmware_version + + @property + def gateway(self): + """Get / set the gateway of the instrument. + + :return: Gateway address + :rtype: str + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.gateway = "192.168.1.1" + """ + return self.query("GATEWAY?") + + @gateway.setter + def gateway(self, value): + self.sendcmd(f"GATEWAY {value}") + + @property + def hostname(self): + """Get / set the hostname of the instrument. + + :return: Hostname + :rtype: `str` + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.hostname = "asdf" + """ + return self.query("HOSTNAME?") + + @hostname.setter + def hostname(self, value): + self.sendcmd(f"HOSTNAME {value}") + + @property + def ip_address(self): + """Get / set the IP address of the instrument. + + :return: IP address + :rtype: `str` + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.ip_address = "192.168.1.2" + """ + return self.query("IPADDR?") + + @ip_address.setter + def ip_address(self, value): + self.sendcmd(f"IPADDR {value}") + + @property + def mac_address(self): + """Get the MAC address of the instrument. + + :return: MAC address + :rtype: `str` + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.mac_address + '5827809, 8087' + """ + return self.query("MACADDR?") + + @property + def multiple_controllers(self): + """Get / set if multiple controllers are used. + + By default, this is initialized as `False`. Set to `True` if you + have a main controller / secondary controller via RS-485 network + set up. + + Instrument commands will always be sent to main controller. + Axis specific commands will be set to the axis chosen, see + `axis` description. + + :return: Status if multiple controllers are activated + :rtype: bool + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.multiple_controllers = True + """ + return self._multiple_controllers + + @multiple_controllers.setter + def multiple_controllers(self, newval): + self._multiple_controllers = True if newval else False + + @property + def name(self): + """Get the name of the controller. + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.name + 'New_Focus 8742 v2.2 08/01/13 13991' + """ + return self.axis[0].name + + @property + def netmask(self): + """Get / set the Netmask of the instrument. + + :return: Netmask + :rtype: `str` + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.netmask = "255.255.255.0" + """ + return self.query("NETMASK?") + + @netmask.setter + def netmask(self, value): + self.sendcmd(f"NETMASK {value}") + + @property + def scan_controllers(self): + """RS-485 controller address map query of all controllers. + + 32 bit string that represents the following: + Bit: Value: (True: 1, False: 0) + 0 Address conflict? + 1: Controller with address 1 exists? + ... + 31: Controller with address 31 exists + + Bits 1—31 are one-to-one mapped to controller addresses 1—31. The + bit value is set to 1 only when there are no conflicts with that + address. For example, if the master controller determines that there + are unique controllers at addresses 1,2, and 7 and more than one + controller at address 23, this query will return 135. The binary + representation of 135 is 10000111. Bit #0 = 1 implies that the scan + found at lease one address conflict during last scan. Bit #1,2, 7 = 1 + implies that the scan found controllers with addresses 1,2, and 7 + that do not conflict with any other controller. + + :return: Binary representation of controller configuration bitmask. + :rtype: str + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.scan_controllers + "10000111" + """ + return self.query("SC?") + + @property + def scan_done(self): + """Queries if a controller scan is done or not. + + :return: Controller scan done? + :rtype: bool + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.scan_done + True + """ + return bool(int(self.query("SD?"))) + + # METHODS # + + def abort_motion(self): + """Instantaneously stop any motion in progress. + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.abort_motion() + """ + self.axis[0].abort_motion() + + def motor_check(self): + """Check what motors are connected and set parameters. + + Use the save command `save_settings` if you want to save the + configuration to the non-volatile memory. + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.motor_check() + """ + self.axis[0].motor_check() + + def scan(self, value=2): + """Initialize and set controller addresses automatically. + + Scans the RS-485 network for connected controllers and set the + addresses automatically. Three possible scan modes can be + selected: + Mode 0: + Primary controller scans the network but does not resolve + any address conflicts. + Mode 1: + Primary controller scans the network and resolves address + conflicts, if any. This option preserves the non-conflicting + addresses and reassigns the conflicting addresses starting + with the lowest available address. + Mode 2 (default): + Primary controller reassigns the addresses of all + controllers on the network in a sequential order starting + with master controller set to address 1. + + See also: `scan_controllers` property. + + :param value: Scan mode. + :type: int + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.scan(2) + """ + self.sendcmd(f"SC{value}") + + def purge(self): + """Purge the non-volatile memory of the controller. + + Perform a hard reset and reset all the saved variables. The + following variables are reset to factory settings: + 1. Hostname + 2. IP Mode + 3. IP Address + 4. Subnet mask address + 5. Gateway address + 6. Configuration register + 7. Motor type + 8. Desired Velocity + 9. Desired Acceleration + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.purge() + """ + self.axis[0].purge() + + def recall_parameters(self, value=0): + """Recall parameter set. + + This command restores the controller working parameters from values + saved in its non-volatile memory. It is useful when, for example, + the user has been exploring and changing parameters (e.g., velocity) + but then chooses to reload from previously stored, qualified + settings. Note that “*RCL 0” command just restores the working + parameters to factory default settings. It does not change the + settings saved in EEPROM. + + :param value: 0 -> Recall factory default, + 1 -> Recall last saved settings + :type value: int + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.recall_parameters(1) + """ + self.axis[0].recall_parameters(value) + + def reset(self): + """Reset the controller. + + Perform a soft reset. Saved variables are not deleted! For a + hard reset, see the `purge` command. + + ..note:: It might take up to 30 seconds to re-establish + communications via TCP/IP + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.reset() + """ + self.axis[0].reset() + + def save_settings(self): + """Save user settings. + + This command saves the controller settings in its non-volatile memory. + The controller restores or reloads these settings to working registers + automatically after system reset or it reboots. The purge + command is used to clear non-volatile memory and restore to factory + settings. Note that the SM saves parameters for all motors. + + Saves the following variables: + 1. Controller address + 2. Hostname + 3. IP Mode + 4. IP Address + 5. Subnet mask address + 6. Gateway address + 7. Configuration register + 8. Motor type + 9. Desired Velocity + 10. Desired Acceleration + + Example: + >>> import instruments as ik + >>> import instruments.units as u + >>> ip = "192.168.1.2" + >>> port = 23 # this is the default port + >>> inst = ik.newport.PicoMotorController8742.open_tcpip(ip, port) + >>> inst.save_settings() + """ + self.axis[0].save_settings() + + # QUERY # + + def query(self, cmd, size=-1): + """Query's the device and returns ASCII string. + + Must be queried as a raw string with terminator line ending. This is + currently not implemented in instrument and therefore must be called + directly from file. + + Sometimes, the instrument sends an undecodable 6 byte header along + (usually for the first query). We'll catch it with a try statement. + The 6 byte header was also remarked in this matlab script: + https://github.com/cnanders/matlab-newfocus-model-8742 + """ + self.sendcmd(cmd) + # ToDo: Implement into `instrument.py` - after tests are done + retval = self._file.read_raw(size=size) + try: + retval = retval.decode("utf-8") + except UnicodeDecodeError: + print(retval) + retval = retval[6:].decode("utf-8") + + return retval diff --git a/instruments/tests/test_newport/test_newport_pmc8742.py b/instruments/tests/test_newport/test_newport_pmc8742.py new file mode 100644 index 000000000..767f3dc1c --- /dev/null +++ b/instruments/tests/test_newport/test_newport_pmc8742.py @@ -0,0 +1,858 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Tests for the Newport Picomotor Controller 8742. +""" + +# IMPORTS ##################################################################### + +from hypothesis import given, strategies as st +import pytest + +import instruments as ik +from instruments.units import ureg as u +from instruments.tests import expected_protocol + +# pylint: disable=protected-access + + +# INSTRUMENT # + + +def test_init(): + """Initialize a new Picomotor PMC8742 instrument.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + assert inst.terminator == "\r\n" + assert not inst.multiple_controllers + + +def test_controller_address(): + """Set and get controller address.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "SA2", + "SA?" + ], + [ + "2" + ], + sep="\r\n" + ) as inst: + inst.controller_address = 2 + assert inst.controller_address == 2 + + +def test_controller_configuration(): + """Set and get controller configuration.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "ZZ11", + "ZZ11", + "ZZ11", + "ZZ?" + ], + [ + "11" + ], + sep="\r\n" + ) as inst: + inst.controller_configuration = 3 + inst.controller_configuration = 0b11 + inst.controller_configuration = "11" + assert inst.controller_configuration == "11" + + +def test_dhcp_mode(): + """Set and get DHCP mode.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "IPMODE0", + "IPMODE1", + "IPMODE?" + ], + [ + "1" + ], + sep="\r\n" + ) as inst: + inst.dhcp_mode = False + inst.dhcp_mode = True + assert inst.dhcp_mode + + +def test_error_code(): + """Get error code.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "TE?" + ], + [ + "0" + ], + sep="\r\n" + ) as inst: + assert inst.error_code == 0 + + +def test_error_code_and_message(): + """Get error code and message as tuple.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "TB?" + ], + [ + "0, NO ERROR DETECTED" + ], + sep="\r\n" + ) as inst: + err_expected = (0, "NO ERROR DETECTED") + err_received = inst.error_code_and_message + assert err_received == err_expected + assert isinstance(err_received, tuple) + + +def test_firmware_version(): + """Get firmware version.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "VE?" + ], + [ + "0123456789" + ], + sep="\r\n" + ) as inst: + assert inst.firmware_version == "0123456789" + + +def test_gateway(): + """Set / get gateway.""" + ip_addr = "192.168.1.1" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"GATEWAY {ip_addr}", + "GATEWAY?" + ], + [ + f"{ip_addr}" + ], + sep="\r\n" + ) as inst: + inst.gateway = ip_addr + assert inst.gateway == ip_addr + + +def test_hostname(): + """Set / get hostname.""" + host = "192.168.1.1" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"HOSTNAME {host}", + "HOSTNAME?" + ], + [ + f"{host}" + ], + sep="\r\n" + ) as inst: + inst.hostname = host + assert inst.hostname == host + + +def test_ip_address(): + """Set / get ip address.""" + ip_addr = "192.168.1.1" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"IPADDR {ip_addr}", + "IPADDR?" + ], + [ + f"{ip_addr}" + ], + sep="\r\n" + ) as inst: + inst.ip_address = ip_addr + assert inst.ip_address == ip_addr + + +def test_mac_address(): + """Set / get mac address.""" + mac_addr = "5827809, 8087" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "MACADDR?" + ], + [ + f"{mac_addr}" + ], + sep="\r\n" + ) as inst: + assert inst.mac_address == mac_addr + + +def test_name(): + """Get name of the current instrument.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "*IDN?" + ], + [ + "NAME" + ], + sep="\r\n" + ) as inst: + assert inst.name == "NAME" + + +def test_netmask(): + """Set / get netmask.""" + ip_addr = "192.168.1.1" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"NETMASK {ip_addr}", + "NETMASK?" + ], + [ + f"{ip_addr}" + ], + sep="\r\n" + ) as inst: + inst.netmask = ip_addr + assert inst.netmask == ip_addr + + +def test_scan_controller(): + """Scan connected controllers.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "SC?" + ], + [ + "11" + ], + sep="\r\n" + ) as inst: + assert inst.scan_controllers == "11" + + +def test_scan_done(): + """Query if a controller scan is completed.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "SD?", + "SD?" + ], + [ + "1", + "0" + ], + sep="\r\n" + ) as inst: + assert inst.scan_done + assert not inst.scan_done + + +def test_abort_motion(): + """Abort all motion.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "AB" + ], + [ + ], + sep="\r\n" + ) as inst: + inst.abort_motion() + + +def test_motor_check(): + """Check the connected motors.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "MC" + ], + [ + ], + sep="\r\n" + ) as inst: + inst.motor_check() + + +@pytest.mark.parametrize("mode", [0, 1, 2]) +def test_scan(mode): + """Scan address configuration of motors for default and other modes.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "SC2", + f"SC{mode}" + ], + [ + ], + sep="\r\n" + ) as inst: + inst.scan() + inst.scan(mode) + + +def test_purge(): + """Purge the memory.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "XX" + ], + [ + ], + sep="\r\n" + ) as inst: + inst.purge() + + +@pytest.mark.parametrize("mode", [0, 1]) +def test_recall_parameters(mode): + """Recall parameters, by default the factory set values.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "*RCL0", + f"*RCL{mode}" + ], + [ + ], + sep="\r\n" + ) as inst: + inst.recall_parameters() + inst.recall_parameters(mode) + + +def test_reset(): + """Soft reset of the controller.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "*RST" + ], + [ + ], + sep="\r\n" + ) as inst: + inst.reset() + + +def test_save_settings(): + """Save settings of the controller.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "SM" + ], + [ + ], + sep="\r\n" + ) as inst: + inst.save_settings() + + +def test_query_bad_header(): + """Ensure stripping of bad header if present, see comment in query.""" + retval = b"\xff\xfd\x03\xff\xfb\x01192.168.2.161" + val_expected = "192.168.2.161" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "IPADDR?" + ], + [ + retval + ], + sep="\r\n" + ) as inst: + assert inst.ip_address == val_expected + + +# AXIS SPECIFIC COMMANDS - CONTROLLER COMMANDS PER AXIS TESTED ABOVE # + + +@given(ax=st.integers(min_value=0, max_value=3)) +def test_axis_returns(ax): + """Return axis with given axis number testing all valid axes.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + axis = inst.axis[ax] + assert isinstance(axis, ik.newport.PicoMotorController8742.Axis) + assert axis._parent == inst + assert axis._idx == ax + 1 + assert axis._address == "" + + +def test_axis_returns_type_error(): + """Raise TypeError if parent class is not PicoMotorController8742.""" + with pytest.raises(TypeError): + _ = ik.newport.PicoMotorController8742.Axis(0, 0) + + +@given(ax=st.integers(min_value=4)) +def test_axis_return_index_error(ax): + """Raise IndexError if axis out of bounds and in one controller mode.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + with pytest.raises(IndexError): + _ = inst.axis[ax] + + +@given(val=st.integers(min_value=1, max_value=200000)) +def test_axis_acceleration(val): + """Set / get axis acceleration unitful and without units.""" + val_unit = u.Quantity(val, u.s**-2) + val_unit_other = val_unit.to(u.min**-2) + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"1AC{val}", + f"1AC{val}", + "1AC?" + ], + [ + f"{val}" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + axis.acceleration = val + axis.acceleration = val_unit_other + assert axis.acceleration == val_unit + + +@given(val=st.integers().filter(lambda x: not 1 <= x <= 200000)) +def test_axis_acceleration_value_error(val): + """Raise ValueError if acceleration out of range.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + with pytest.raises(ValueError): + axis.acceleration = val + + +@given(val=st.integers(min_value=-2147483648, max_value=2147483647)) +def test_axis_home_position(val): + """Set / get axis home position.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"1DH{val}", + "1DH?" + ], + [ + f"{val}" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + axis.home_position = val + assert axis.home_position == val + + +@given(val=st.integers().filter(lambda x: not -2147483648 <= x <= 2147483647)) +def test_axis_home_position_value_error(val): + """Raise ValueError if home position out of range.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + with pytest.raises(ValueError): + axis.home_position = val + + +@pytest.mark.parametrize("val", ["0", "1"]) +def test_axis_is_stopped(val): + """Query if axis is stopped.""" + exp_result = True if val == "1" else False + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "1MD?" + ], + [ + f"{val}" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + assert axis.is_stopped == exp_result + + +@pytest.mark.parametrize( + "val", ik.newport.PicoMotorController8742.Axis.MotorType +) +def test_axis_motor_type(val): + """Set / get motor type.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"1QM{val.value}", + "1QM?" + ], + [ + f"{val.value}" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + axis.motor_type = val + assert axis.motor_type == val + + +def test_axis_motor_type_wrong_type(): + """Raise TypeError if not appropriate motor type.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + with pytest.raises(TypeError): + axis.motor_type = 2 + + +@given(val=st.integers(min_value=-2147483648, max_value=2147483647)) +def test_axis_move_absolute(val): + """Set / get axis move absolute.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"1PA{val}", + "1PA?" + ], + [ + f"{val}" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + axis.move_absolute = val + assert axis.move_absolute == val + + +@given(val=st.integers().filter(lambda x: not -2147483648 <= x <= 2147483647)) +def test_axis_move_absolute_value_error(val): + """Raise ValueError if move absolute out of range.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + with pytest.raises(ValueError): + axis.move_absolute = val + + +@given(val=st.integers(min_value=-2147483648, max_value=2147483647)) +def test_axis_move_relative(val): + """Set / get axis move relative.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"1PR{val}", + "1PR?" + ], + [ + f"{val}" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + axis.move_relative = val + assert axis.move_relative == val + + +@given(val=st.integers().filter(lambda x: not -2147483648 <= x <= 2147483647)) +def test_axis_move_relative_value_error(val): + """Raise ValueError if move relative out of range.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + with pytest.raises(ValueError): + axis.move_relative = val + + +def test_axis_position(): + """Query position of an axis.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "1TP?" + ], + [ + "42" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + assert axis.position == 42 + + +@given(val=st.integers(min_value=1, max_value=2000)) +def test_axis_velocity(val): + """Set / get axis velocity, unitful and unitless.""" + val_unit = u.Quantity(val, 1 / u.s) + val_unit_other = val_unit.to(1 / u.hour) + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"1QM?", + f"1VA{val}", + f"1QM?", + f"1VA{val}", + "1VA?" + ], + [ + "3", + "3", + f"{val}" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + axis.velocity = val + axis.velocity = val_unit_other + assert axis.velocity == val_unit + + +@given(val=st.integers().filter(lambda x: not 1 <= x <= 2000)) +@pytest.mark.parametrize("motor", [0, 1, 3]) +def test_axis_velocity_value_error_regular(val, motor): + """Raise ValueError if velocity is out of range for non-tiny motor.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "1QM?" + ], + [ + f"{motor}" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + with pytest.raises(ValueError): + axis.velocity = val + + +@given(val=st.integers().filter(lambda x: not 1 <= x <= 1750)) +def test_axis_velocity_value_error_tiny(val): + """Raise ValueError if velocity is out of range for tiny motor.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + "1QM?" + ], + [ + "2" + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + with pytest.raises(ValueError): + axis.velocity = val + + +@pytest.mark.parametrize("direction", ["+", "-"]) +def test_axis_move_indefinite(direction): + """Move axis indefinitely.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"1MV{direction}" + ], + [ + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + axis.move_indefinite(direction) + + +def test_axis_stop(): + """Stop axis.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"1ST" + ], + [ + ], + sep="\r\n" + ) as inst: + axis = inst.axis[0] + axis.stop() + + +# SOME ADDITIONAL TESTS FOR MAIN / SECONDARY CONTROLLER SETUP # + + +def test_multi_controllers(): + """Enable and disable multiple controllers.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + inst.multiple_controllers = True + assert inst.multiple_controllers + inst.multiple_controllers = False + assert not inst.multiple_controllers + + +@given(ax=st.integers(min_value=0, max_value=31*4-1)) +def test_axis_return_multi(ax): + """Return axis properly for multi-controller setup.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + inst.multiple_controllers = True + axis = inst.axis[ax] + assert isinstance(axis, ik.newport.PicoMotorController8742.Axis) + assert axis._parent == inst + assert axis._idx == ax % 4 + 1 + assert axis._address == f"{ax // 4 + 1}>" + + +@given(ax=st.integers(min_value=124)) +def test_axis_return_multi_index_error(ax): + """Raise IndexError if axis out of bounds and in multi controller mode.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + ], + [ + ], + sep="\r\n" + ) as inst: + inst.multiple_controllers = True + with pytest.raises(IndexError): + _ = inst.axis[ax] + + +@given(ax=st.integers(min_value=0, max_value=31*4-1)) +def test_axis_sendcmd_multi(ax): + """Send correct command in multiple axis mode.""" + address = ax // 4 + 1 + axis = ax % 4 + 1 + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"{address}>{axis}CMD" + ], + [ + ], + sep="\r\n" + ) as inst: + inst.multiple_controllers = True + axis = inst.axis[ax] + axis.sendcmd("CMD") + + +@given(ax=st.integers(min_value=0, max_value=31*4-1)) +def test_axis_query_multi(ax): + """Query command in multiple axis mode and strip address routing.""" + address = ax // 4 + 1 + axis = ax % 4 + 1 + answer_expected = f"{axis}ANSWER" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"{address}>{axis}CMD" + ], + [ + f"{address}>{answer_expected}" + ], + sep="\r\n" + ) as inst: + inst.multiple_controllers = True + axis = inst.axis[ax] + assert axis.query("CMD") == answer_expected + + +def test_axis_query_multi_io_error(): + """Raise IOError if query response from wrong controller.""" + with expected_protocol( + ik.newport.PicoMotorController8742, + [ + f"1>1CMD" + ], + [ + f"4>1ANSWER" + ], + sep="\r\n" + ) as inst: + inst.multiple_controllers = True + axis = inst.axis[0] + with pytest.raises(IOError): + axis.query("CMD") From 2d45a625c2702581852405077fdfa9ae39dbb049 Mon Sep 17 00:00:00 2001 From: Reto Trappitsch Date: Wed, 13 Oct 2021 16:30:05 -0400 Subject: [PATCH 2/9] Added a `read_raw` routine to `instrument.py` Reading raw data from the instrument is required for the picomotor control class, however, had so far to be done by directly accessing a private argument of `instrument.py` (not pretty). --- instruments/abstract_instruments/instrument.py | 12 ++++++++++++ instruments/newport/newport_pmc8742.py | 3 +-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/instruments/abstract_instruments/instrument.py b/instruments/abstract_instruments/instrument.py index a8b0ef2a1..57c258143 100644 --- a/instruments/abstract_instruments/instrument.py +++ b/instruments/abstract_instruments/instrument.py @@ -144,6 +144,18 @@ def read(self, size=-1, encoding="utf-8"): """ return self._file.read(size, encoding) + def read_raw(self, size=-1): + """ + Read the raw last line. + + :param int size: Number of bytes to be read. Default is read until + termination character is found. + :return: The result of the read as returned by the + connected instrument. + :rtype: `str` + """ + return self._file.read_raw(size) + # PROPERTIES # @property diff --git a/instruments/newport/newport_pmc8742.py b/instruments/newport/newport_pmc8742.py index 7645650b7..e56d6220a 100644 --- a/instruments/newport/newport_pmc8742.py +++ b/instruments/newport/newport_pmc8742.py @@ -1152,8 +1152,7 @@ def query(self, cmd, size=-1): https://github.com/cnanders/matlab-newfocus-model-8742 """ self.sendcmd(cmd) - # ToDo: Implement into `instrument.py` - after tests are done - retval = self._file.read_raw(size=size) + retval = self.read_raw(size=size) try: retval = retval.decode("utf-8") except UnicodeDecodeError: From d251c6469e186d9807507e6eed2e271848ac8af8 Mon Sep 17 00:00:00 2001 From: Reto Trappitsch Date: Wed, 13 Oct 2021 18:51:57 -0400 Subject: [PATCH 3/9] Fix writing to USB device, change PMC to 1 termination character Writing works, but has issues with the terminator. need to add it at least once manually... The IK way of doing things is currently not the same as the tutorial? Reading always and only returns 0... might need to switch the control? --- instruments/abstract_instruments/comm/usb_communicator.py | 2 +- instruments/newport/newport_pmc8742.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/instruments/abstract_instruments/comm/usb_communicator.py b/instruments/abstract_instruments/comm/usb_communicator.py index 27770ef27..d2e1595ce 100644 --- a/instruments/abstract_instruments/comm/usb_communicator.py +++ b/instruments/abstract_instruments/comm/usb_communicator.py @@ -124,7 +124,7 @@ def _sendcmd(self, msg): :param str msg: The command message to send to the instrument """ msg += self._terminator - self._conn.sendall(msg) + self._conn.write(bytes(msg, "utf-8")) def _query(self, msg, size=-1): """ diff --git a/instruments/newport/newport_pmc8742.py b/instruments/newport/newport_pmc8742.py index e56d6220a..de1306162 100644 --- a/instruments/newport/newport_pmc8742.py +++ b/instruments/newport/newport_pmc8742.py @@ -85,7 +85,7 @@ def __init__(self, filelike): super(PicoMotorController8742, self).__init__(filelike) # terminator - self.terminator = "\r\n" + self.terminator = "\r" # setup self._multiple_controllers = False From 63a4b9bcc120309695f9cc24f05bb2d71fe0a14b Mon Sep 17 00:00:00 2001 From: Reto Trappitsch Date: Thu, 14 Oct 2021 21:25:33 -0400 Subject: [PATCH 4/9] Revert changes to newport_pmc8742 - termination character change --- instruments/newport/newport_pmc8742.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instruments/newport/newport_pmc8742.py b/instruments/newport/newport_pmc8742.py index de1306162..e56d6220a 100644 --- a/instruments/newport/newport_pmc8742.py +++ b/instruments/newport/newport_pmc8742.py @@ -85,7 +85,7 @@ def __init__(self, filelike): super(PicoMotorController8742, self).__init__(filelike) # terminator - self.terminator = "\r" + self.terminator = "\r\n" # setup self._multiple_controllers = False From f45a3aae6cbbfeb7505bb2291a6ed8c7fd688523 Mon Sep 17 00:00:00 2001 From: Reto Trappitsch Date: Thu, 14 Oct 2021 21:29:45 -0400 Subject: [PATCH 5/9] Rewrote instrument.py and usb communicator to work with Newport PMC8742 Tests are currently failing, but that's okay, these are tests that simply test for the old / not-working usb communicator initalization. SRS830 test is failing because USBCommunicator is used to test some behavior that USBCommunicator is the wrong one. Anyway, probably better to just mock something here in this failing test. --- .../comm/usb_communicator.py | 108 ++++++++++++++---- .../abstract_instruments/instrument.py | 26 +---- 2 files changed, 84 insertions(+), 50 deletions(-) diff --git a/instruments/abstract_instruments/comm/usb_communicator.py b/instruments/abstract_instruments/comm/usb_communicator.py index d2e1595ce..caf451289 100644 --- a/instruments/abstract_instruments/comm/usb_communicator.py +++ b/instruments/abstract_instruments/comm/usb_communicator.py @@ -10,6 +10,9 @@ import io +import usb.core +import usb.util + from instruments.abstract_instruments.comm import AbstractCommunicator # CLASSES ##################################################################### @@ -28,12 +31,46 @@ class USBCommunicator(io.IOBase, AbstractCommunicator): and it is suggested that it is not relied on. """ - def __init__(self, conn): + def __init__(self, dev): super(USBCommunicator, self).__init__(self) # TODO: Check to make sure this is a USB connection - self._conn = conn - self._terminator = "\n" + # follow (mostly) pyusb tutorial + + # set the active configuration. With no arguments, the first + # configuration will be the active one + dev.set_configuration() + + # get an endpoint instance + cfg = dev.get_active_configuration() + intf = cfg[(0, 0)] + + # initialize in and out endpoints + ep_out = usb.util.find_descriptor( + intf, + # match the first OUT endpoint + custom_match= \ + lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_OUT + ) + + ep_in = usb.util.find_descriptor( + intf, + # match the first OUT endpoint + custom_match= \ + lambda e: \ + usb.util.endpoint_direction(e.bEndpointAddress) == \ + usb.util.ENDPOINT_IN + ) + + if (ep_in or ep_out) is None: + raise IOError("USB endpoint not found.") + + self._dev = dev + self._ep_in = ep_in + self._ep_out = ep_out + self._terminator = "\n" # PROPERTIES # @property @@ -57,10 +94,7 @@ def terminator(self): def terminator(self, newval): if not isinstance(newval, str): raise TypeError("Terminator for USBCommunicator must be specified " - "as a single character string.") - if len(newval) > 1: - raise ValueError("Terminator for USBCommunicator must only be 1 " - "character long.") + "as a character string.") self._terminator = newval @property @@ -77,25 +111,47 @@ def close(self): """ Shutdown and close the USB connection """ - try: - self._conn.shutdown() - finally: - self._conn.close() + self._dev.reset() + usb.util.dispose_resources(self._dev) - def read_raw(self, size=-1): - raise NotImplementedError + def read_raw(self, size=1000): + """Read raw string back from device and return. - def read(self, size=-1, encoding="utf-8"): - raise NotImplementedError + String returned is most likely shorter than the size requested. Will + terminate by itself. + Read size of -1 will be transformed into 1000. - def write_raw(self, msg): + :param size: Size to read in bytes + :type size: int """ - Write bytes to the raw usb connection object. + if size == -1: + size = 1000 + read_val = bytes(self._ep_in.read(size)) + return read_val.rstrip(bytes(self._terminator, encoding="utf-8")) + + def read(self, size=1000, encoding="utf-8"): + return self.read_raw(size).decode(encoding=encoding) + + def write_raw(self, msg): + """Write bytes to the raw usb connection object. :param bytes msg: Bytes to be sent to the instrument over the usb connection. """ - self._conn.write(msg) + msg = msg + bytes(self._terminator, encoding="utf-8") + self._ep_out.write(msg) + + def write(self, msg, encoding="utf-8"): + """Write string to usb connection. + + First, message is encoded and then termination character is added. + + :param msg: Message to send to instrument + :type msg: str + :param encoding: Encoding for message + :type encoding: str + """ + self.write_raw(bytes(f"{msg}{self._terminator}", encoding=encoding)) def seek(self, offset): # pylint: disable=unused-argument,no-self-use return NotImplemented @@ -106,11 +162,14 @@ def tell(self): # pylint: disable=no-self-use def flush_input(self): """ Instruct the communicator to flush the input buffer, discarding the - entirety of its contents. - - Not implemented for usb communicator + entirety of its contents. Read 1000 bytes at a time and be done + once a timeout error comes back (which means the buffer is empty). """ - raise NotImplementedError + while True: + try: + self._ep_in.read(1000, 10) + except usb.core.USBTimeoutError: + break # METHODS # @@ -123,10 +182,9 @@ def _sendcmd(self, msg): :param str msg: The command message to send to the instrument """ - msg += self._terminator - self._conn.write(bytes(msg, "utf-8")) + self.write(msg) - def _query(self, msg, size=-1): + def _query(self, msg, size=1000): """ This is the implementation of ``query`` for communicating with raw usb connections. This function is in turn wrapped by the concrete diff --git a/instruments/abstract_instruments/instrument.py b/instruments/abstract_instruments/instrument.py index 57c258143..3222c0cf4 100644 --- a/instruments/abstract_instruments/instrument.py +++ b/instruments/abstract_instruments/instrument.py @@ -16,9 +16,7 @@ from serial import SerialException from serial.tools.list_ports import comports import pyvisa -import usb import usb.core -import usb.util from instruments.abstract_instruments.comm import ( SocketCommunicator, USBCommunicator, VisaCommunicator, FileCommunicator, @@ -684,29 +682,7 @@ def open_usb(cls, vid, pid): if dev is None: raise IOError("No such device found.") - # Use the default configuration offered by the device. - dev.set_configuration() - - # Copied from the tutorial at: - # http://pyusb.sourceforge.net/docs/1.0/tutorial.html - cfg = dev.get_active_configuration() - interface_number = cfg[(0, 0)].bInterfaceNumber - alternate_setting = usb.control.get_interface(dev, interface_number) - intf = usb.util.find_descriptor( - cfg, bInterfaceNumber=interface_number, - bAlternateSetting=alternate_setting - ) - - ep = usb.util.find_descriptor( - intf, - custom_match=lambda e: - usb.util.endpoint_direction(e.bEndpointAddress) == - usb.util.ENDPOINT_OUT - ) - if ep is None: - raise IOError("USB descriptor not found.") - - return cls(USBCommunicator(ep)) + return cls(USBCommunicator(dev)) @classmethod def open_file(cls, filename): From cd8d0969ccc2553a6c437ea7e798e759a400964b Mon Sep 17 00:00:00 2001 From: Reto Trappitsch Date: Fri, 15 Oct 2021 13:49:39 -0400 Subject: [PATCH 6/9] Adjust USBCommunicator such that compatible with other instruments --- .../comm/usb_communicator.py | 41 +++++++++---------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/instruments/abstract_instruments/comm/usb_communicator.py b/instruments/abstract_instruments/comm/usb_communicator.py index caf451289..0fa878904 100644 --- a/instruments/abstract_instruments/comm/usb_communicator.py +++ b/instruments/abstract_instruments/comm/usb_communicator.py @@ -14,6 +14,8 @@ import usb.util from instruments.abstract_instruments.comm import AbstractCommunicator +from instruments.units import ureg as u +from instruments.util_fns import assume_units # CLASSES ##################################################################### @@ -33,7 +35,8 @@ class USBCommunicator(io.IOBase, AbstractCommunicator): def __init__(self, dev): super(USBCommunicator, self).__init__(self) - # TODO: Check to make sure this is a USB connection + if not isinstance(dev, usb.core.Device): + raise TypeError("USBCommunicator must wrap a usb.core.Device object.") # follow (mostly) pyusb tutorial @@ -99,11 +102,18 @@ def terminator(self, newval): @property def timeout(self): - raise NotImplementedError + """ + Gets/sets the communication timeout of the USB communicator. + + :type: `~pint.Quantity` + :units: As specified or assumed to be of units ``seconds`` + """ + return assume_units(self._dev.default_timeout, u.ms).to(u.second) @timeout.setter def timeout(self, newval): - raise NotImplementedError + newval = assume_units(newval, u.second).to(u.ms).magnitude + self._dev.default_timeout = newval # FILE-LIKE METHODS # @@ -126,11 +136,12 @@ def read_raw(self, size=1000): """ if size == -1: size = 1000 + term = self._terminator.encode("utf-8") read_val = bytes(self._ep_in.read(size)) - return read_val.rstrip(bytes(self._terminator, encoding="utf-8")) - - def read(self, size=1000, encoding="utf-8"): - return self.read_raw(size).decode(encoding=encoding) + if term not in read_val: + raise IOError(f"Did not find the terminator in the returned string. " + f"Total size of {size} might not be enough.") + return read_val.rstrip(term) def write_raw(self, msg): """Write bytes to the raw usb connection object. @@ -138,21 +149,8 @@ def write_raw(self, msg): :param bytes msg: Bytes to be sent to the instrument over the usb connection. """ - msg = msg + bytes(self._terminator, encoding="utf-8") self._ep_out.write(msg) - def write(self, msg, encoding="utf-8"): - """Write string to usb connection. - - First, message is encoded and then termination character is added. - - :param msg: Message to send to instrument - :type msg: str - :param encoding: Encoding for message - :type encoding: str - """ - self.write_raw(bytes(f"{msg}{self._terminator}", encoding=encoding)) - def seek(self, offset): # pylint: disable=unused-argument,no-self-use return NotImplemented @@ -167,7 +165,7 @@ def flush_input(self): """ while True: try: - self._ep_in.read(1000, 10) + self._ep_in.read(1000, 10) # try to read until timeout error occurs except usb.core.USBTimeoutError: break @@ -182,6 +180,7 @@ def _sendcmd(self, msg): :param str msg: The command message to send to the instrument """ + msg += self._terminator self.write(msg) def _query(self, msg, size=1000): From c4e46e856e9ec272f67bc2733d0eb5ec3841f4a3 Mon Sep 17 00:00:00 2001 From: Reto Trappitsch Date: Fri, 15 Oct 2021 14:46:14 -0400 Subject: [PATCH 7/9] Fixed failing tests in existing test suite --- .../abstract_instruments/instrument.py | 3 +- instruments/tests/test_base_instrument.py | 54 ++++--------------- instruments/tests/test_srs/test_srs830.py | 6 +-- 3 files changed, 14 insertions(+), 49 deletions(-) diff --git a/instruments/abstract_instruments/instrument.py b/instruments/abstract_instruments/instrument.py index 3222c0cf4..6727e7493 100644 --- a/instruments/abstract_instruments/instrument.py +++ b/instruments/abstract_instruments/instrument.py @@ -672,12 +672,11 @@ def open_usb(cls, vid, pid): method. :param str vid: Vendor ID of the USB device to open. - :param int pid: Product ID of the USB device to open. + :param str pid: Product ID of the USB device to open. :rtype: `Instrument` :return: Object representing the connected instrument. """ - # pylint: disable=no-member dev = usb.core.find(idVendor=vid, idProduct=pid) if dev is None: raise IOError("No such device found.") diff --git a/instruments/tests/test_base_instrument.py b/instruments/tests/test_base_instrument.py index 3862b5142..6102d8877 100644 --- a/instruments/tests/test_base_instrument.py +++ b/instruments/tests/test_base_instrument.py @@ -346,46 +346,23 @@ def test_instrument_open_vxi11(mock_vxi11_comm): mock_vxi11_comm.assert_called_with("string", 1, key1="value") +@mock.patch("instruments.abstract_instruments.instrument.USBCommunicator") @mock.patch("instruments.abstract_instruments.instrument.usb") -def test_instrument_open_usb(mock_usb): +def test_instrument_open_usb(mock_usb, mock_usb_comm): """Open USB device.""" - # mock some behavior - mock_usb.core.find.return_value.__class__ = usb.core.Device # dev - mock_usb.core.find().get_active_configuration.return_value.__class__ = ( - usb.core.Configuration - ) + mock_usb.core.find.return_value.__class__ = usb.core.Device + mock_usb_comm.return_value.__class__ = USBCommunicator - # shortcuts for asserting calls - dev = mock_usb.core.find() - cfg = dev.get_active_configuration() - interface_number = cfg[(0, 0)].bInterfaceNumber - alternate_setting = mock_usb.control.get_interface( - dev, cfg[(0, 0)].bInterfaceNumber - ) + # fake instrument + vid = "0x1000" + pid = "0x1000" + dev = mock_usb.core.find(idVendor=vid, idProduct=pid) # call instrument - inst = ik.Instrument.open_usb("0x1000", 0x1000) - - # assert calls according to manual - dev.set_configuration.assert_called() # check default configuration - dev.get_active_configuration.assert_called() # get active configuration - mock_usb.control.get_interface.assert_called_with(dev, interface_number) - mock_usb.util.find_descriptor.assert_any_call( - cfg, - bInterfaceNumber=interface_number, - bAlternateSetting=alternate_setting - ) - # check the first argument of the `ep =` call - assert mock_usb.util.find_descriptor.call_args_list[1][0][0] == ( - mock_usb.util.find_descriptor( - cfg, - bInterfaceNumber=interface_number, - bAlternateSetting=alternate_setting - ) - ) + inst = ik.Instrument.open_usb(vid, pid) - # assert instrument of correct class assert isinstance(inst._file, USBCommunicator) + mock_usb_comm.assert_called_with(dev) @mock.patch("instruments.abstract_instruments.instrument.usb") @@ -398,17 +375,6 @@ def test_instrument_open_usb_no_device(mock_usb): assert err_msg == "No such device found." -@mock.patch("instruments.abstract_instruments.instrument.usb") -def test_instrument_open_usb_ep_none(mock_usb): - """Raise IOError if endpoint matching returns None.""" - mock_usb.util.find_descriptor.return_value = None - - with pytest.raises(IOError) as err: - _ = ik.Instrument.open_usb(0x1000, 0x1000) - err_msg = err.value.args[0] - assert err_msg == "USB descriptor not found." - - @mock.patch("instruments.abstract_instruments.instrument.USBTMCCommunicator") def test_instrument_open_usbtmc(mock_usbtmc_comm): mock_usbtmc_comm.return_value.__class__ = USBTMCCommunicator diff --git a/instruments/tests/test_srs/test_srs830.py b/instruments/tests/test_srs/test_srs830.py index 6ac052788..c317df577 100644 --- a/instruments/tests/test_srs/test_srs830.py +++ b/instruments/tests/test_srs/test_srs830.py @@ -13,10 +13,10 @@ import instruments as ik from instruments.abstract_instruments.comm import ( + FileCommunicator, GPIBCommunicator, LoopbackCommunicator, SerialCommunicator, - USBCommunicator ) from instruments.optional_dep_finder import numpy from instruments.tests import ( @@ -61,8 +61,8 @@ def test_init_mode_serial_comm(mocker): def test_init_mode_invalid(): - """Test initialization with invalild communicator.""" - comm = USBCommunicator(None) + """Test initialization with invalid communicator.""" + comm = FileCommunicator(None) with pytest.warns(UserWarning) as wrn_info: ik.srs.SRS830(comm) wrn_msg = wrn_info[0].message.args[0] From 082298a0546615116a83903a41f4c918cea93ac1 Mon Sep 17 00:00:00 2001 From: Reto Trappitsch Date: Sun, 17 Oct 2021 19:20:20 -0700 Subject: [PATCH 8/9] Full testing for USBCommunicator, some BFs --- .../tests/test_comm/test_usb_communicator.py | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 instruments/tests/test_comm/test_usb_communicator.py diff --git a/instruments/tests/test_comm/test_usb_communicator.py b/instruments/tests/test_comm/test_usb_communicator.py new file mode 100644 index 000000000..2b3c9ab5d --- /dev/null +++ b/instruments/tests/test_comm/test_usb_communicator.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" +Unit tests for the USBTMC communication layer +""" + +# IMPORTS #################################################################### + + +import pytest + +from instruments.abstract_instruments.comm import USBTMCCommunicator +from instruments.tests import unit_eq +from instruments.units import ureg as u +from .. import mock + +# TEST CASES ################################################################# + +# pylint: disable=protected-access,unused-argument,no-member + +patch_path = "instruments.abstract_instruments.comm.usbtmc_communicator.usbtmc" + + +@mock.patch(patch_path) +def test_usbtmccomm_init(mock_usbtmc): + _ = USBTMCCommunicator("foobar", var1=123) + mock_usbtmc.Instrument.assert_called_with("foobar", var1=123) + + +@mock.patch(patch_path, new=None) +def test_usbtmccomm_init_missing_module(): + with pytest.raises(ImportError): + _ = USBTMCCommunicator() + + +@mock.patch(patch_path) +def test_usbtmccomm_terminator_getter(mock_usbtmc): + comm = USBTMCCommunicator() + + term_char = mock.PropertyMock(return_value=10) + type(comm._filelike).term_char = term_char + + assert comm.terminator == "\n" + term_char.assert_called_with() + + +@mock.patch(patch_path) +def test_usbtmccomm_terminator_setter(mock_usbtmc): + comm = USBTMCCommunicator() + + term_char = mock.PropertyMock(return_value="\n") + type(comm._filelike).term_char = term_char + + comm.terminator = "*" + assert comm._terminator == "*" + term_char.assert_called_with(42) + + comm.terminator = b"*" + assert comm._terminator == "*" + term_char.assert_called_with(42) + + +@mock.patch(patch_path) +def test_usbtmccomm_timeout(mock_usbtmc): + comm = USBTMCCommunicator() + + timeout = mock.PropertyMock(return_value=1) + type(comm._filelike).timeout = timeout + + unit_eq(comm.timeout, 1 * u.second) + timeout.assert_called_with() + + comm.timeout = 10 + timeout.assert_called_with(10.0) + + comm.timeout = 1000 * u.millisecond + timeout.assert_called_with(1.0) + + +@mock.patch(patch_path) +def test_usbtmccomm_close(mock_usbtmc): + comm = USBTMCCommunicator() + + comm.close() + comm._filelike.close.assert_called_with() + + +@mock.patch(patch_path) +def test_usbtmccomm_read_raw(mock_usbtmc): + comm = USBTMCCommunicator() + comm._filelike.read_raw = mock.MagicMock(return_value=b"abc") + + assert comm.read_raw() == b"abc" + comm._filelike.read_raw.assert_called_with(num=-1) + assert comm._filelike.read_raw.call_count == 1 + + comm._filelike.read_raw = mock.MagicMock() + comm.read_raw(10) + comm._filelike.read_raw.assert_called_with(num=10) + + +@mock.patch(patch_path) +def test_usbtmccomm_write_raw(mock_usbtmc): + comm = USBTMCCommunicator() + + comm.write_raw(b"mock") + comm._filelike.write_raw.assert_called_with(b"mock") + + +@mock.patch(patch_path) +def test_usbtmccomm_sendcmd(mock_usbtmc): + comm = USBTMCCommunicator() + comm.write = mock.MagicMock() + + comm._sendcmd("mock") + comm.write.assert_called_with("mock") + + +@mock.patch(patch_path) +def test_usbtmccomm_query(mock_usbtmc): + comm = USBTMCCommunicator() + comm._filelike.ask = mock.MagicMock(return_value="answer") + + assert comm._query("mock") == "answer" + comm._filelike.ask.assert_called_with("mock", num=-1, encoding="utf-8") + + comm._query("mock", size=10) + comm._filelike.ask.assert_called_with("mock", num=10, encoding="utf-8") + + +@mock.patch(patch_path) +def test_usbtmccomm_seek(mock_usbtmc): + with pytest.raises(NotImplementedError): + comm = USBTMCCommunicator() + comm.seek(1) + + +@mock.patch(patch_path) +def test_usbtmccomm_tell(mock_usbtmc): + with pytest.raises(NotImplementedError): + comm = USBTMCCommunicator() + comm.tell() + + +@mock.patch(patch_path) +def test_usbtmccomm_flush_input(mock_usbtmc): + comm = USBTMCCommunicator() + comm.flush_input() From b7030ec0cbb37ce9de1167035da4af153cfab1aa Mon Sep 17 00:00:00 2001 From: Reto Trappitsch Date: Mon, 18 Oct 2021 10:38:50 -0700 Subject: [PATCH 9/9] USB Communicator bug fix and test suite... now for real... --- .../comm/usb_communicator.py | 10 +- .../tests/test_comm/test_usb_communicator.py | 260 ++++++++++++------ 2 files changed, 176 insertions(+), 94 deletions(-) diff --git a/instruments/abstract_instruments/comm/usb_communicator.py b/instruments/abstract_instruments/comm/usb_communicator.py index 0fa878904..998712914 100644 --- a/instruments/abstract_instruments/comm/usb_communicator.py +++ b/instruments/abstract_instruments/comm/usb_communicator.py @@ -129,7 +129,7 @@ def read_raw(self, size=1000): String returned is most likely shorter than the size requested. Will terminate by itself. - Read size of -1 will be transformed into 1000. + Read size of -1 will be transformed into 1000 bytes. :param size: Size to read in bytes :type size: int @@ -152,10 +152,10 @@ def write_raw(self, msg): self._ep_out.write(msg) def seek(self, offset): # pylint: disable=unused-argument,no-self-use - return NotImplemented + raise NotImplementedError def tell(self): # pylint: disable=no-self-use - return NotImplemented + raise NotImplementedError def flush_input(self): """ @@ -165,8 +165,8 @@ def flush_input(self): """ while True: try: - self._ep_in.read(1000, 10) # try to read until timeout error occurs - except usb.core.USBTimeoutError: + self._ep_in.read(1000, 10) # read until any exception + except: # pylint: disable=bare-except break # METHODS # diff --git a/instruments/tests/test_comm/test_usb_communicator.py b/instruments/tests/test_comm/test_usb_communicator.py index 2b3c9ab5d..88408280c 100644 --- a/instruments/tests/test_comm/test_usb_communicator.py +++ b/instruments/tests/test_comm/test_usb_communicator.py @@ -1,148 +1,230 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- """ -Unit tests for the USBTMC communication layer +Unit tests for the USB communicator. """ # IMPORTS #################################################################### - +from hypothesis import given, strategies as st import pytest -from instruments.abstract_instruments.comm import USBTMCCommunicator -from instruments.tests import unit_eq +import usb.core +import usb.util + +from instruments.abstract_instruments.comm import USBCommunicator from instruments.units import ureg as u from .. import mock # TEST CASES ################################################################# -# pylint: disable=protected-access,unused-argument,no-member +# pylint: disable=protected-access,unused-argument, redefined-outer-name + +patch_util = "instruments.abstract_instruments.comm.usb_communicator.usb.util" + + +@pytest.fixture() +def dev(): + """Return a usb core device for initialization.""" + dev = mock.MagicMock() + dev.__class__ = usb.core.Device + return dev -patch_path = "instruments.abstract_instruments.comm.usbtmc_communicator.usbtmc" +@pytest.fixture() +@mock.patch(patch_util) +def inst(patch_util, dev): + """Return a USB Communicator instrument.""" + return USBCommunicator(dev) -@mock.patch(patch_path) -def test_usbtmccomm_init(mock_usbtmc): - _ = USBTMCCommunicator("foobar", var1=123) - mock_usbtmc.Instrument.assert_called_with("foobar", var1=123) +@mock.patch(patch_util) +def test_init(usb_util, dev): + """Initialize usb communicator.""" + # mock some behavior of the device required for initializing + dev.find.return_value.__class__ = usb.core.Device # dev + # shortcuts for asserting calls + cfg = dev.get_active_configuration() + interface_number = cfg[(0, 0)].bInterfaceNumber + _ = dev.control.get_interface( + dev, cfg[(0, 0)].bInterfaceNumber + ) -@mock.patch(patch_path, new=None) -def test_usbtmccomm_init_missing_module(): - with pytest.raises(ImportError): - _ = USBTMCCommunicator() + inst = USBCommunicator(dev) + # # assert calls according to manual + dev.set_configuration.assert_called() # check default configuration + dev.get_active_configuration.assert_called() # get active configuration + dev.control.get_interface.assert_called_with(dev, interface_number) + usb_util.find_descriptor.assert_has_calls(cfg) -@mock.patch(patch_path) -def test_usbtmccomm_terminator_getter(mock_usbtmc): - comm = USBTMCCommunicator() + assert isinstance(inst, USBCommunicator) - term_char = mock.PropertyMock(return_value=10) - type(comm._filelike).term_char = term_char + assert inst._dev == dev - assert comm.terminator == "\n" - term_char.assert_called_with() +def test_init_wrong_type(): + """Raise TypeError if initialized with wrong device.""" + with pytest.raises(TypeError) as err: + _ = USBCommunicator(42) + err_msg = err.value.args[0] + assert err_msg == "USBCommunicator must wrap a usb.core.Device object." -@mock.patch(patch_path) -def test_usbtmccomm_terminator_setter(mock_usbtmc): - comm = USBTMCCommunicator() - term_char = mock.PropertyMock(return_value="\n") - type(comm._filelike).term_char = term_char +def test_init_no_endpoints(dev): + """Initialize usb communicator without endpoints.""" + # mock some behavior of the device required for initializing + dev.find.return_value.__class__ = usb.core.Device # dev + + with pytest.raises(IOError) as err: + _ = USBCommunicator(dev) + err_msg = err.value.args[0] + assert err_msg == "USB endpoint not found." + + +def test_address(inst): + """Address of device can not be read, nor written.""" + with pytest.raises(NotImplementedError): + _ = inst.address + + with pytest.raises(ValueError) as err: + inst.address = 42 - comm.terminator = "*" - assert comm._terminator == "*" - term_char.assert_called_with(42) + msg = err.value.args[0] + assert msg == "Unable to change USB target address." - comm.terminator = b"*" - assert comm._terminator == "*" - term_char.assert_called_with(42) +def test_terminator(inst): + """Get / set terminator of instrument.""" + assert inst.terminator == "\n" + inst.terminator = "\r\n" + assert inst.terminator == "\r\n" -@mock.patch(patch_path) -def test_usbtmccomm_timeout(mock_usbtmc): - comm = USBTMCCommunicator() - timeout = mock.PropertyMock(return_value=1) - type(comm._filelike).timeout = timeout +def test_terminator_wrong_type(inst): + """Raise TypeError when setting bad terminator.""" + with pytest.raises(TypeError) as err: + inst.terminator = 42 + msg = err.value.args[0] + assert msg == "Terminator for USBCommunicator must be specified as a " \ + "character string." - unit_eq(comm.timeout, 1 * u.second) - timeout.assert_called_with() - comm.timeout = 10 - timeout.assert_called_with(10.0) +@given(val=st.integers(min_value=1)) +def test_timeout_get(val, inst): + """Get a timeout from device (ms) and turn into s.""" + # mock timeout value of device + inst._dev.default_timeout = val - comm.timeout = 1000 * u.millisecond - timeout.assert_called_with(1.0) + ret_val = inst.timeout + assert ret_val == u.Quantity(val, u.ms).to(u.s) -@mock.patch(patch_path) -def test_usbtmccomm_close(mock_usbtmc): - comm = USBTMCCommunicator() +def test_timeout_set_unitless(inst): + """Set a timeout value from device unitless (s).""" + val = 1000 + inst.timeout = val + set_val = inst._dev.default_timeout + exp_val = 1000 * val + assert set_val == exp_val - comm.close() - comm._filelike.close.assert_called_with() +def test_timeout_set_minutes(inst): + """Set a timeout value from device in minutes.""" + val = 10 + val_to_set = u.Quantity(val, u.min) + inst.timeout = val_to_set + set_val = inst._dev.default_timeout + exp_val = 1000 * 60 * val + assert set_val == exp_val -@mock.patch(patch_path) -def test_usbtmccomm_read_raw(mock_usbtmc): - comm = USBTMCCommunicator() - comm._filelike.read_raw = mock.MagicMock(return_value=b"abc") - assert comm.read_raw() == b"abc" - comm._filelike.read_raw.assert_called_with(num=-1) - assert comm._filelike.read_raw.call_count == 1 +@mock.patch(patch_util) +def test_close(usb_util, inst): + """Close the connection, release instrument.""" + inst.close() + inst._dev.reset.assert_called() + usb_util.dispose_resources.assert_called_with(inst._dev) - comm._filelike.read_raw = mock.MagicMock() - comm.read_raw(10) - comm._filelike.read_raw.assert_called_with(num=10) +def test_read_raw(inst): + """Read raw information from instrument.""" + msg = b"message\n" + msg_exp = b"message" -@mock.patch(patch_path) -def test_usbtmccomm_write_raw(mock_usbtmc): - comm = USBTMCCommunicator() + inst._ep_in.read.return_value = msg - comm.write_raw(b"mock") - comm._filelike.write_raw.assert_called_with(b"mock") + assert inst.read_raw() == msg_exp -@mock.patch(patch_path) -def test_usbtmccomm_sendcmd(mock_usbtmc): - comm = USBTMCCommunicator() - comm.write = mock.MagicMock() +def test_read_raw_size(inst): + """If size is -1, read 1000 bytes.""" + msg = b"message\n" + inst._ep_in.read.return_value = msg - comm._sendcmd("mock") - comm.write.assert_called_with("mock") + _ = inst.read_raw(size=-1) + inst._ep_in.read.assert_called_with(1000) -@mock.patch(patch_path) -def test_usbtmccomm_query(mock_usbtmc): - comm = USBTMCCommunicator() - comm._filelike.ask = mock.MagicMock(return_value="answer") +def test_read_raw_termination_char_not_found(inst): + """Raise IOError if termination character not found.""" + msg = b"message" + inst._ep_in.read.return_value = msg + default_read_size = 1000 - assert comm._query("mock") == "answer" - comm._filelike.ask.assert_called_with("mock", num=-1, encoding="utf-8") + with pytest.raises(IOError) as err: + _ = inst.read_raw() + err_msg = err.value.args[0] + assert err_msg == f"Did not find the terminator in the returned " \ + f"string. Total size of {default_read_size} might " \ + f"not be enough." - comm._query("mock", size=10) - comm._filelike.ask.assert_called_with("mock", num=10, encoding="utf-8") +def test_write_raw(inst): + """Write a message to the instrument.""" + msg = b"message\n" + inst.write_raw(msg) + inst._ep_out.write.assert_called_with(msg) -@mock.patch(patch_path) -def test_usbtmccomm_seek(mock_usbtmc): + +def test_seek(inst): + """Raise NotImplementedError if `seek` is called.""" with pytest.raises(NotImplementedError): - comm = USBTMCCommunicator() - comm.seek(1) + inst.seek(42) -@mock.patch(patch_path) -def test_usbtmccomm_tell(mock_usbtmc): +def test_tell(inst): + """Raise NotImplementedError if `tell` is called.""" with pytest.raises(NotImplementedError): - comm = USBTMCCommunicator() - comm.tell() + inst.tell() + + +def test_flush_input(inst): + """Flush the input out by trying to read until no more available.""" + inst._ep_in.read.side_effect = [b"message\n", usb.core.USBTimeoutError] + inst.flush_input() + inst._ep_in.read.assert_called() + + +def test_sendcmd(inst): + """Send a command.""" + msg = "msg" + msg_to_send = f"msg{inst._terminator}" + + inst.write = mock.MagicMock() + + inst._sendcmd(msg) + inst.write.assert_called_with(msg_to_send) + + +def test_query(inst): + """Query the instrument.""" + msg = "msg" + size = 1000 + inst.sendcmd = mock.MagicMock() + inst.read = mock.MagicMock() -@mock.patch(patch_path) -def test_usbtmccomm_flush_input(mock_usbtmc): - comm = USBTMCCommunicator() - comm.flush_input() + inst._query(msg) + inst.sendcmd.assert_called_with(msg) + inst.read.assert_called_with(size)