In [1]:
import numpy as np
from pydrake.common.containers import namedview
from pydrake.common.value import Value
from pydrake.math import RigidTransform, RotationMatrix
from pydrake.systems.analysis import Simulator
from pydrake.systems.framework import BasicVector, LeafSystem
from pydrake.trajectories import PiecewisePolynomial

In [2]:
class MyAdder(LeafSystem):
    def __init__(self):
        super().__init__()
        self._a_port = self.DeclareVectorInputPort(name="a", size=2)
        self._b_port = self.DeclareVectorInputPort(name="b", size=2)
        self.DeclareVectorOutputPort(name="sum", size=2, calc=self.CalcSum)
        self.DeclareVectorOutputPort(name="difference",
                                     size=2,
                                     calc=self.CalcDifference)
    
    def CalcSum(self, context, output):
        # Evaluate the input ports to obtain the 2x1 vectors.
        a = self._a_port.Eval(context)
        b = self._b_port.Eval(context)

        # Write the sum into the output vector.
        output.SetFromVector(a + b)

    def CalcDifference(self, context, output):
        # Evaluate the input ports to obtain the 2x1 vectors.
        a = self._a_port.Eval(context)
        b = self._b_port.Eval(context)

        # Write the difference into output vector.
        output.SetFromVector(a - b)

# Construct an instance of this system and a context.
system = MyAdder()
context = system.CreateDefaultContext()


# Fix the input ports to some constant values.
system.GetInputPort("a").FixValue(context, [3, 4])
system.GetInputPort("b").FixValue(context, [1, 2])

# Evaluate the output ports.
print(f"sum: {system.GetOutputPort('sum').Eval(context)}")
print(f"difference: {system.GetOutputPort('difference').Eval(context)}")

sum: [4. 6.]
difference: [2. 2.]


## Leaf sysem that used RigidTransform in its input and output ports

In [4]:
class RotateAboutZ(LeafSystem):
    def __init__(self):
        super().__init__()  # Don't forget to initialize the base class.
        self.DeclareAbstractInputPort(name="in",
                                      model_value=Value(RigidTransform()))
        self.DeclareAbstractOutputPort(
            name="out",
            alloc=lambda: Value(RigidTransform()),
            calc=self.CalcOutput)

    def CalcOutput(self, context, output):
        # Evaluate the input port to obtain the RigidTransform.
        X_1 = system.get_input_port().Eval(context)

        X_2 = RigidTransform(RotationMatrix.MakeZRotation(np.pi / 2)) @ X_1

        # Set the output RigidTransform.
        output.set_value(X_2)


# Construct an instance of this system and a context
system=RotateAboutZ()
context=system.CreateDefaultContext()

# Fix the input port to a constant value.
system.get_input_port().FixValue(context, RigidTransform())



# Evaluate the output port.
print(f"output: {system.get_output_port(0).Eval(context)}")

RigidTransform(
  R=RotationMatrix([
    [1.0, 0.0, 0.0],
    [0.0, 1.0, 0.0],
    [0.0, 0.0, 1.0],
  ]),
  p=[0.0, 0.0, 0.0],
)
output: RigidTransform(
  R=RotationMatrix([
    [6.123233995736766e-17, -1.0, 0.0],
    [1.0, 6.123233995736766e-17, 0.0],
    [0.0, 0.0, 1.0],
  ]),
  p=[0.0, 0.0, 0.0],
)


In [9]:
class AbstractStateSystem(LeafSystem):
    def __init__(self):
        super().__init__()

        self._traj_index = self.DeclareAbstractState(
            Value(PiecewisePolynomial()))
        self.DeclarePeriodicUnrestrictedUpdateEvent(period_sec=1.0,
                                                    offset_sec=0.0,
                                                    update=self.Update)

    def Update(self, context, state):
        t = context.get_time()
        traj = PiecewisePolynomial.FirstOrderHold(
            [t, t + 1],
            np.array([[-np.pi / 2.0 + 1., -np.pi / 2.0 - 1.], [-2., 2.]]))
        # Update the state
        state.get_mutable_abstract_state(int(self._traj_index)).set_value(traj)



system = AbstractStateSystem()
simulator = Simulator(system)
context = simulator.get_mutable_context()

# Set an initial condition for the abstract state.
context.SetAbstractState(0, PiecewisePolynomial())

# Run the simulation.
simulator.AdvanceTo(4.0)
traj = context.get_abstract_state(0).get_value()
print(f"breaks: {traj.get_segment_times()}")
print(f"traj({context.get_time()}) = {traj.value(context.get_time())}")

breaks: [3.0, 4.0]
traj(4.0) = [[-2.57079633]
 [ 2.        ]]


Parameters are like state, except that they are constant through the lifetime of a simulation. Parameters in the systems framework are declared and accessed almost identically to state, but they are never updated. Once again, we have vector-valued (declared with DeclareNumericParameter) and abstract-valued (via DeclareAbstractParameter) parameters.

In [10]:
class SystemWithParameters(LeafSystem):
    def __init__(self):
        super().__init__()  # Don't forget to initialize the base class.

        self.DeclareNumericParameter(BasicVector([1.2, 3.4]))
        self.DeclareAbstractParameter(
            Value(RigidTransform(RotationMatrix.MakeXRotation(np.pi / 6))))

        # Declare output ports to demonstrate how to access the parameters in
        # system methods.
        self.DeclareVectorOutputPort(name="numeric",
                                     size=2,
                                     calc=self.OutputNumeric)
        self.DeclareAbstractOutputPort(
            name="abstract",
            alloc=lambda: Value(RigidTransform()),
            calc=self.OutputAbstract)

    def OutputNumeric(self, context, output):
        output.SetFromVector(context.get_numeric_parameter(0).get_value())

    def OutputAbstract(self, context, output):
        output.set_value(context.get_abstract_parameter(0).get_value())

# Construct an instance of this system and a context.
system = SystemWithParameters()
context = system.CreateDefaultContext()

# Evaluate the output ports.
print(f"numeric: {system.get_output_port(0).Eval(context)}")
print(f"abstract: {system.get_output_port(1).Eval(context)}")

numeric: [1.2 3.4]
abstract: RigidTransform(
  R=RotationMatrix([
    [1.0, 0.0, 0.0],
    [0.0, 0.8660254037844387, -0.49999999999999994],
    [0.0, 0.49999999999999994, 0.8660254037844387],
  ]),
  p=[0.0, 0.0, 0.0],
)


In [14]:
class MyPublishingSystem(LeafSystem):
    def __init__(self):
        super().__init__()

        # Calling `ForcePublish()` will trigger the callback.
        self.DeclareForcedPublishEvent(self.Publish)

        # Publish once every second.
        self.DeclarePeriodicPublishEvent(period_sec=1,
                                         offset_sec=0,
                                         publish=self.Publish)
        
    def Publish(self, context):
        print(f"Publish() called at time={context.get_time()}")

system = MyPublishingSystem()
simulator = Simulator(system)
simulator.AdvanceTo(5.3)

# We can also "force" a publish at a arbitrary time.
print("\ncalling ForcedPublish:")
system.ForcedPublish(simulator.get_context())


Publish() called at time=0.0
Publish() called at time=1.0
Publish() called at time=2.0
Publish() called at time=3.0
Publish() called at time=4.0
Publish() called at time=5.0

calling ForcedPublish:
Publish() called at time=5.3
