Skip to content

Object Type Development

Zdeněk Materna edited this page Jan 7, 2022 · 18 revisions

Object Type (OT) acts as a plugin into the system, that can be dynamically loaded and provides new functionality or e.g. integration with a new type of robot. In order to achieve smooth integration, please follow these practices:

  • Do not use any third-party imports.
    • Only standard Python libraries and stuff from arcor2 is allowed.
    • If anything special is required, it should be 'offloaded' into a separate service with which the OT will communicate e.g. using a REST API.
  • Each OT has to be derived from Generic, or something else that is derived from it.
  • OT class name must be in PascalCase.
  • OTs are treated as abstract if they have any abstract method or are marked as abstract with the _ABSTRACT class property (True by default).
  • Use upload_def function to upload class definition to the Project service. It does some basic checks.
  • OT should not print anything
    • stdin/stdout is used for communication between Execution Service and the main script which runs as its subprocess.
    • You may be interested in Logging.

Built-in base classes

There is a set of built-in abstract base classes defined in arcor2.object_types.abstract - one of those has to be used as an ancestor (directly or indirectly) for any OT.

Generic

The most generic base class (surprise!), which defines basic API (properties as id and name and methods as e.g. cleanup) that is common for all derived classes. Typically, it is used as a base for objects that have no counterpart in the workplace, like databases, cloud APIs, etc. An example could be LogicActions - OT providing logical actions (e.g. for comparing two values). Serves also as a base class for following, more specialized base classes.

Properties and methods

The DYNAMIC_PARAMS class property is a dictionary, providing a mapping between parameter name and method name (plus so-called 'parent' parameters) which could be called in order to get possible values for the parameter. This is useful when parameter values can't be determined in advance and defined e.g. by enumeration. The code might look like this:

class MyObject(Generic):

    def param1_values() -> Set[str]:
        ...  # there could be call to some API to determine the values

    def param2_values(param1: str) -> Set[str]:  # param2 values are dependent on actual param1 value
        ...

    def my_action(param1: str, param2: str, *, an: Optional[str] = None) -> None:
        ...


MyObject.DYNAMIC_PARAMS["param1"] = DynamicParamTuple(MyObject.param1_values.__name__, set())
MyObject.DYNAMIC_PARAMS["param2"] = DynamicParamTuple(MyObject.param2_values.__name__, set(["param1"]))

Please note that dynamic parameters could be only strings.

Another important and possibly useful property is CANCEL_MAPPING, providing a mapping between long-running action and the corresponding method that interrupts that action. This could be used e.g. to stop some procedure if the user is not interested in its outcome anymore. Example:

class MyObject(Generic):

    def stop_counting() -> None:  # method that doesn't require any parameter
        ...

    def count_to_infinity(*, an: Optional[str] = None) -> None:  # arbitrary action
        ...


MyObject.CancelDict[MyObject.count_to_infinity.__name__] = MyObject.stop_counting.__name__

Please note: it does not matter if the interrupted method ends by returning something or by raising an exception (something based on Arcor2Exception).

Object Types can also define their settings - a set of parameters that are used to initialize the object when creating its instance.

@dataclass
class UrlSettings(Settings):

    url: str = "http://"  # only basic types (str, int, float, bool) are supported at the moment

class MyObject(Generic):

    _ABSTRACT = False

    def __init__(self, obj_id: str, name: str, settings: UrlSettings) -> None:
        super(Generic, self).__init__(obj_id, name, settings)

    @property
    def settings(self) -> UrlSettings:  # type: ignore
        return cast(UrlSettings, super(Generic, self).settings)

Finally, there is a cleanup method that is called before the system goes from the online state to offline. When overridden (default one does nothing), it may handle procedures that are required in order to take to object down (e.g. to shut down a robot).

GenericWithPose

This class extends Generic with pose property (position and orientation): there are setter and getter for it, which makes developers able to override them and therefore add custom handling of the change of object's pose (used in CollisionObject, see below). There is also the action update_pose, which allows updating the object's pose during runtime, in the main script (e.g. based on detection).

CollisionObject

Adds the ability to specify a collision model, which might be a primitive or a mesh. The primitive is not specified in the code: one has to create a collision model on the Project Service and then link it with the Object Type. Fortunately, there is a helper function to simplify this:

# my_collision_object.py
class MyCollisionObject(CollisionObject)
    ...

# upload_my_objects.py
from arcor2.data.object_type import Box
from arcor2.object_types.upload import upload_def

upload_def(MyCollisionObject, Box("Box", 0.1, 0.1, 0.1))  # adds Object Type and primitive model and links them together on the Project Service

With a mesh, it is slightly more difficult:

# my_mesh_object.py
class MyMeshObject(CollisionObject)
    mesh_filename = "my_mesh_object.dae"  # should be snake_case form of the class name

# upload_my_objects.py
from arcor2.data.object_type import Mesh
from arcor2.object_types.upload import upload_def

upload_def(
        MyMeshObject,
        Mesh(MyMeshObject.__name__, MyMeshObject.mesh_filename),
        file_to_upload="complete path to the mesh",
    )

During class initialization, its associated collision model is sent to the Scene Service.

Properties and methods

The class overrides pose setter in order to be able to update pose of the collision model on the Scene Service. Moreover, it adds enabled property, which allows to temporarily disable (remove it from the Scene Service) the collision model by setting it to False. The property has also an action wrapper set_enabled so there is possibility to enable/disable collision model within the program.

VirtualCollisionObject

Extends CollisionObject, but is intended only for "dumb" collision objects, without any functionality. Should be used only with primitive collision models.

Robot

Based on GenericWithPose, adds robot-related API. There are some abstract methods, that have to be implemented for each robot. Then, there is a set of methods containing only raise Arcor2NotImplemented("Robot does not support xyz.") - during runtime, ARServer checks for Arcor2NotImplemented and enables or disables those features for user interfaces accordingly. There is also possibility to add a link to the URDF package (class property urdf_package) - works in a similar way as adding a mesh (the file to upload must be a zip file with URDF description and meshes).

Properties and methods

In order to prevent multiple attempts to move the robot at the same time (and also to be able to test whether there is a motion in progress), any method that controls the robot movement should acquire _move_lock mutex:

class MyRobot(Robot):

    def move_action(*, an: Optional[str] = None) -> None:
        with self._move_lock:
            rest.call(...) 

Then there is a property move_in_progress which returns True if there is any motion in progress (if the mutex is not free). There is also the possibility to check whether the movement is possible - check_if_ready_to_move raises an exception if the robot is already moving, or if it is in a hand teaching mode (if the mode is supported).

MultiArmRobot

Extends Robot in order to support multiple arms per robot (e.g. ABB YuMi). All relevant methods have additional and optional parameter arm_id (has to be optional in order to maintain compatibility with the base class, see Liskov substitution principle) - it is the responsibility of the concrete implementation to check whether the arm_id is set. Hand teaching mode can be set for each arm independently. The _move_lock is shared for all arms (however that does not prevent the possibility for an action that moves more than one arm at once).

Camera

The Camera class extends CollisionObject and adds RGB(D) camera-relevant API. There are (so far) two methods - one for getting a color image and one for getting a depth image. The principle is the same as with Robot - ARServer checks whether a method (feature) is implemented and enables or disables related functionality accordingly. At least one of the methods should be implemented.

Actions

If the object's method should be used as an action, it has to follow certain conventions and rules:

  • There should be a keyword-only parameter an (action name), which is then used to uniquely identify action when called from the Main Script.
  • The action should use only supported parameter types, which are at the moment: primitive ones (int, float, bool, str), Pose, enum (based on StrEnum or IntEnum), or ProjectRobotJoints (but please use rather Pose where possible).
    • In the future, the support for parameter types will be handled by dynamically loaded plugins.
    • There is also some support for Image, but not well tested.
  • Method must have __action__ property, which is instance of ActionMetadata.
  • Method must have docstring defined.
  • All parameters (including return values) must be type annotated.

Other important points:

  • Action parameters might or might not have a default value.
  • Ranges for int or float can be defined using assertions.

See an example:

class MyRobot(Robot):

    def take_over_the_world(violently: bool, message: str, delay: int = 0, *, an: Optional[str] = None) -> bool:
        """Initiates the enslavement of humanity.

        :param violently: How it should be done.
        :param message: What should be people told.
        :param delay: How long to wait before starting to ex...
        :return: Success.
        """

        assert 0 <= delay <= 100

    take_over_the_world.__action__ = ActionMetadata()  # type: ignore

Please note: actions can be nested (action can be called within another action), then just do not forget to mark the top-level action as composite one with ActionMetadata(composite=True).

Code Sharing

It is always good to avoid code duplication! If there is a set of OTs that needs to share some code, there are basically two ways...

Common (abstract) ancestor

Put the shared code into the common ancestor so it will be available in all classes derived from it. If the ancestor is abstract, it won't appear in the user interfaces. An example:

class MySharedCode(Generic):  # my_shared_code.py, won't appear in AREditor

    def shared_method() -> None:
        pass

    def shared_action(*, an: Optional[str]=None) -> None:

    shared_action.__action__ = ActionMetadata()  # type: ignore  # needed because of mypy

class ActualObjectType(MySharedCode):  # actual_object_type.py, will appear in AREditor

    _ABSTRACT = False

    def fancy_action(*, an: Optional[str]=None) -> None:
        self.shared_method()  # method from ancestor

    fancy_action.__action__ = ActionMetadata()  # type: ignore

...like this, the ARServer will recognize two actions for ActualObjectType: shared_action (inherited from MySharedCode) and fancy_action.

Mixin

Using mixins is a common Python pattern - those are simple, single-responsibility classes and are 'included' through multiple inheritance. If there is a need to share a code between OTs derived from different bases, using mixins is the proper way to go! The example which would not be possible using the previous approach:

class DbConnectionMixin:  # db_connection_mixin.py, based on object

    def report(what: str) -> None:
        pass

class MyRobot(DbConnectionMixin, Robot):  # my_robot.py, mixins first, something based on Generic last
   
    def move() -> None:
        self.report("moving")
        ...

class MyCamera(DbConnectionMixin, Camera):  # my_camera.py

    def capture() -> None:
        self.report("capturing")
        ...

Please note: Imports from parent OT or from mixin must be relative, otherwise it won't work within the ARServer or Execution.

Logging

When OT needs to log something (for instance debugging information), it should not be done by printing out something. For that purposes, there is LoggingMixin and respective service, see arcor2_logger, where README also provides a code example. The mixin adds methods as info and error that send messages through WebSockets to the Logger Service, which prints them out.