Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plugin compliant with new features. #4

Open
MisterOwlPT opened this issue Apr 6, 2023 · 12 comments
Open

Plugin compliant with new features. #4

MisterOwlPT opened this issue Apr 6, 2023 · 12 comments

Comments

@MisterOwlPT
Copy link
Collaborator

Hi @Kazadhum

I implemented core new features to the Rigel framework that will, among other things, support your work (Rigel now supports concurrent execution of multiple plugins and decouples introspection from infrastructure launching).

However, to take advantage of these new features you'll need to change some things in your plugin and the Rigefileyou are using. I leave here a detailed description of everything that changed for your reference (@rarrais included).

Please keep in mind the following:

  • The new features are working but were not yet tested intensively. For this reason, I created a new branch flow. This way you'll always have the stable develop intact if something goes wrong and you need to revert. I suggest you do something similar with your work. Consider creating a branch for your updated plugin and associated Rigelfiles.

  • Next week I'll be on holiday. I'll try to pay attention to GitHub issues and help you if required.

UPDATES TO RIGEL

NOTE: remember to use branch flow !

Rigel plugins

Changes to interface

The run function was renamed to start.

A new function process(self) -> None was introduced.

All other required functions remain the same.

During execution, the following plugin functions are now called in the following order:

  • setup()
  • start()
  • process()
  • stop()

Notice that process is always called after start.

Rigel plugins must be compliant with this new interface.

NOTE: Arguments to the plugin constructor also remain unchanged.

New function process()

Whenever applicable, this function acts as a complement to start and should contain logic that is either:

  • user-dependent (built-in function input, keyboard events, ...)
  • blocking or very long (while loops, time.sleep, ...)
  • analysis of results

All other business logic should remain inside start.

This division allows for concurrent execution of plugins.
Depending on the chosen flow of execution the function process may or not be called.
Check the section on executors below for more information.

Data sharing mechanism

Plugins that are executed in sequence can now share data between themselves!

This is done via a class attribute shared_data of type Dict[str, Any]. This attribute is automatically passed between all plugins for them to store/read data according as required.

All plugins automatically have access to this attribute without the need for changes (you can use access it self.shared_data).

New plugin rigel.plugins.core.ComposePlugin

The existing plugin rigel.plugins.core.TestPlugin was broken into two plugins. One kept the original name and the other name was called _rigel.plugins.core.ComposePlugin_.

The new ComposePlugin is responsible only for launching a containerized ROS application (think of it as Docker Compose for ROS). After launching the containers it waits forever for user input (CTRL-C / CTRL-Z). Check the following Rigelfile excerpt:

# ...
plugin: "rigel.plugins.core.ComposePlugin"
with:
    components:
    -
        name: "simulation"
        image: "{{ AWS_ECR }}/simulation:latest"
        command: ["roslaunch", "tm_sim_gazebo", "run.launch", "gui:=true"]
        envs:
            DISPLAY: "{{ DISPLAY }}"
        volumes:
            - !!python/tuple ["/tmp/.X11-unix", "/tmp/.X11-unix"]
    -
        name: "robot"
        image: "{{ AWS_ECR }}/robot:latest"
        command: ["roslaunch", "robot_app", "run.launch", "use_rosbridge:=true"]
        envs:
            DISPLAY: "{{ DISPLAY }}"
        volumes:
            - !!python/tuple ["/tmp/.X11-unix", "/tmp/.X11-unix"]
        introspection: True  # <--- This component has a ROS bridge installed
# ...

If a component is declared with introspection: True the plugin will store the name of the container for that component in the shared plugin data (key simulation_hostname). This can be used by other plugins, if applicable. If required, check the plugin model.

The TestPlugin is now responsible only for the introspection of a containerized ROS system. It still requires ROS bridge. Check the following Rigelfile excerpt:

# ...
plugin: "rigel.plugins.core.TestPlugin"
with:
    requirements:
    - 'globally : some /OSPS/TM/HeartBeep [osps_msgs/TMHeartBeep] {publisherId = "friday"}'
    - "globally : some /OSPS/TM/TaskStatus [osps_msgs/TMTaskStatus] {statusCode = 5}"
# ...

If no field hostname or port were declared the plugin will automatically look for connection data inside the shared plugin data. If required check the plugin model.

Updated plugins

All plugins except rigel.plugins.aws.RoboMakerPlugin were updated, are compliant with the new protocol and are ready to use.

Executors, job sequences and execution flows

Until now Rigel was only capable of executing sequential job sequences (via the rigel run sequence command).

Now it supports the following execution flows:

  • sequential
  • concurrent
  • parallel

Sequential job sequences

NOTE: this corresponds to the execution already supported by Rigel.

Consider the following Rigelfile excerpt where an example sequential job sequence example_sequence is declared:

# ...
sequences:
    example_sequence:
        stages:
            jobs: ["a", "b"]  # <-- One or more job ids
# ...

Assume that job a uses plugin PluginA and job b uses plugin PluginB.

Executing the command rigel run sequence example_sequence will trigger the execution of the following functions:

  • PluginA.setup()
  • PluginA.start()
  • PluginA.process()
  • PluginA.stop()

  • PluginB.setup()
  • PluginB.start()
  • PluginB.process()
  • PluginB.stop()

Summary: PluginA executes completely and then PluginB executes completely.

Concurrent job sequences

Consider the following Rigelfile excerpt where an example concurrent job sequence example_sequence is declared:

# ...
sequences:
    example_sequence:
        stages:
            jobs: ["a", "b"]
            dependencies: ["c", "d"]
# ...

Assume that job a uses plugin PluginA, job b uses plugin PluginB ... and vice-versa.

Executing the command rigel run sequence example_sequence will trigger the execution of the following functions:

  • PluginC.setup()
  • PluginC.start()
  • PluginD.setup()
  • PluginD.start()

  • PluginA.setup()
  • PluginA.start()
  • PluginA.process()
  • PluginA.stop()
  • PluginB.setup()
  • PluginB.start()
  • PluginB.process()
  • PluginB.stop()

  • PluginC.stop()
  • PluginD.stop()

Summary: PluginC and PluginD are partially executed (function process is never called). Before the stop function is called, PluginA and PluginB are completely executed and in sequence.

NOTE: Consider using this execution flow in your work. Use the new rigel.plugins.core.ComposePlugin to launch the containers you require and use your plugin to inspect their execution as required.

Parallel job sequences

Consider the following Rigelfile excerpt where an example parallel job sequence example_sequence is declared:

# ...
sequences:
    example_sequence:
        stages:
            -  # <-- Will be executed in a separate Thread_0
                jobs: ["a", "b"]

            -  # <-- Will be executed in a separate Thread_1
                jobs: ["c"]
                dependencies: ["d"]
# ...

Here, exceptionally, each stage declared within stages must consist of either a sequential or concurrent sequence of jobs.

Assume that job a uses plugin PluginA, job b uses plugin PluginB ... and vice-versa.

Executing the command rigel run sequence example_sequence will trigger the execution of the following functions:

Thread_0 Thread_1
PluginA.setup() PluginD.setup()
PluginA.start() PluginD.start()
PluginA.process() PluginC.setup()
PluginA.stop() PluginC.start()
PluginB.setup() PluginC.process()
PluginB.start() PluginC.stop()
PluginB.process() PluginD.stop()
PluginB.stop()

Summary: plugins are called according to the type of subsequence. Each subsequence is executed on a different thread.

I hope you find this message useful 😄

Let me know if you have any questions or found any problems.

Happy Easter! 🐰

@Kazadhum
Copy link
Owner

Kazadhum commented Apr 6, 2023

Hi @MisterOwlPT, thank you for the heads up!

I've done as you suggested and created a new branch for the plugin repo, also called flow and changed both the plugin and Rigelfile to work with it.

Thank you and enjoy your holiday!

@Kazadhum
Copy link
Owner

Hi @MisterOwlPT!

I don't know why, but I can't get to make it so that my plugin runs after the compose plugin. Here's my Rigelfile. Thanks in advance :)

@MisterOwlPT
Copy link
Collaborator Author

Hi @Kazadhum,

What do you mean by "runs after"?

Looking at the Rigelfile you provided, if you do rigel run sequence test the job diogo_introspection should execute after and only after the job compose finished executing.

Is this not the behavior you were expecting?
If so, can you show me the Rigel CLI outputs?

@MisterOwlPT
Copy link
Collaborator Author

MisterOwlPT commented Apr 19, 2023

Hi @Kazadhum,

I've committed some more changes to Rigel (branch flow).
Here is a description of the changes:

UPDATES TO RIGEL

NOTE: remember to use branch flow !

Deprecated applications field:

NOTE: this is the only mandatory change to your existing Rigelfile.

Jobs were declared within an application, and multiple applications were supported.
Look at the following Rigelfile excerpt:

# ...
applications:
  my_ros_application:
    distro: "{{ distro }}"
    jobs:
    # ...

We reached the conclusion that it doesn't make much sense to have multiple ROS applications declared within a single Rigelfile. Each application must have its own Rigelfile.

A new Rigelfile now looks as this:

# ...
application:
  distro: "{{ distro }}"

jobs:
  # ...

Notice that:

  • The field applications was renamed to application. The field application.distro is still required.
  • The field jobs is now expected at root level just like fields vars and sequences (and the new field application).

NOTE: despite the mentioned changes, no changes are required at the plugin level (the constructor is still passed the exact same arguments).

Parallel execution flows fixed behavior:

The way I had implemented parallel execution flows was wrong!

Consider the following example sequence:

# ...
sequences:
    example_sequence:
        stages:
            - 
              parallel:
                -  
                  jobs: ["a", "b"]
                -
                  jobs: ["c", "d"]
# ...

I told you in my last message that if you run rigel run sequence example_sequence jobs a and b would execute in one thread and that jobs c and d would execute on another thread. This was true but is not as it should be.

With the new version, the following will happen instead:

Thread 0 Thread 1
PluginA.setup() PluginA.setup()
PluginA.start() PluginA.start()
PluginA.process() PluginA.process()
PluginA.stop() PluginA.stop()
PluginB.setup() PluginB.setup()
PluginB.start() PluginB.start()
PluginB.process() PluginB.process()
PluginB.stop() PluginB.stop()
PluginC.setup() PluginC.setup()
PluginC.start() PluginC.start()
PluginC.process() PluginC.process()
PluginC.stop() PluginC.stop()
PluginD.setup() PluginD.setup()
PluginD.start() PluginD.start()
PluginD.process() PluginD.process()
PluginD.stop() PluginD.stop()

Notice now that the same exact same plugins are called in all the different threads.
Check the next section on matrix to see how this is useful.

Parallel execution flows support matrix field:

Consider the following example sequence:

# ...
sequences:
  drive:
    stages:
      -
        matrix:
          driver: ["Diogo", "Pedro"]
          car: ["Volvo", "Ferrari", "Mitsubishi", "Opel"]  # <-- DISCLAIMER: I don't know much about cars 
        parallel:
        - 
          jobs: ["test_drive"]

Notice the new field matrix of type Dict[str, List[Any] (i.e., a dictionary of lists). You can have as many fields as you want and each list may have as many elements as you want (notice in the example that the length of driver and car is not the same).

If you run rigel run sequence drive you will see that 8 instances of the job test_drive will execute in parallel each one in a separate thread).
Instance 0 will be passed with data {"driver": "Diogo", "car": "Volvo"}
Instance 1 will be passed with data {"driver": "Diogo", "car": "Ferrari"}
Instance 2 will be passed with data {"driver": "Diogo", "car": "Mitsubishi"}
Instance 3 will be passed with data {"driver": "Diogo", "car": "Opel"}
Instance 4 will be passed with data {"driver": "Pedro", "car": "Volvo"}
Instance 5 will be passed with data {"driver": "Pedro", "car": "Ferrari"}
Instance 6 will be passed with data {"driver": "Pedro", "car": "Mitsubishi"}
Instance 7 will be passed with data {"driver": "Pedro", "car": "Opel"}

Rigel ensures all combinations are tested.

Inside the plugin, you can access the data using the class attribute shared_data (e.g., self.shared_data["driver"]).

NOTE: if you don't provide a matrix only one thread will run.
From now on you need matrix to be able to do parallelism with Rigel.

Parallel execution flows support dynamic decoding of variables:

Before, all template variables ( the ones declared with {{ ... }}) were decoded right away before the execution.
This is useful. However, you may also need variables to be decoded at runtime - this is especially useful for parallelism.

Consider the previous example of the cars and drivers.
Assume the following:

  • You have a job filter_drivers associated with an example plugin that receives a list of people and selects only those that are allowed to drive. The names of these people are placed in the shared data field under the key allowed_drivers.
  • Pedro and Diogo are allowed to drive but Ruca does not.

An alternative version of the previous example could then be written as this:

# ...
jobs:
  filter_drivers:
    plugin: "example.Plugin"
    with:
      drivers: ["Diogo", "Ruca", "Pedro"]
# ...
sequences:
  drive:
    stages:
      -
        jobs: ["filter_drivers]
      -
        matrix:
          driver: "{{ data.allowed_drivers }}"
          car: ["Volvo", "Ferrari", "Mitsubishi", "Opel"]  # <-- DISCLAIMER: I don't know much about cars 
        parallel:
        - 
          jobs: ["test_drive"]

This is what will happen if you run rigel run sequence drive:

  • Job filter_drivers will run completely. As declared in the Rigelfile it receives a list of people and places in the shared data the names of the people allowed to drive.
  • Rigel then sees that you intend to run a sequence of jobs in parallel (in this case consisting only of test_drive). It will prepare the combinations based on the entries of the matrix field.
  • Rigel notices that one field is not a list but instead a string with format {{ data.<KEY> }}.
  • Rigel uses the shared data coming from previous stages and uses the value of to retrieve the value (in this case list of people who can drive -> ["Diogo", "Pedro"].
  • Rigel generates combinations.
  • Rigel runs test_drive in an independent thread where each thread is passed one of the generated combinations.

Rigel is now programmed to ignore template variables starting with {{ data.___ }}.
All other variables are decoded as usual.

NOTE: Use this mechanism only within the matrix field. Doing so otherwise will lead to undecided values.

NOTE: If you try to use any other header besides data. will lead to an error being thrown.

And... that's it!
I really hope the examples are not confusing and this message proves useful to you.

Try using these new features to parameterize the parallel execution of jobs based on the output values of preceding jobs. As usual, let me know if you have any questions or suggestions 😃

@MisterOwlPT
Copy link
Collaborator Author

Hi @Kazadhum,

The Compose plugin now supports a field timeout of type float.
Its default value is 0.0 (seconds).

If the default value is used then the containers will run indefinitely (until CTRL-C/CTRL-Z is pressed).
If another value is used then the containers will be stopped after that amount of time.

Changes committed to branch flow.

Let me know if you have any other questions or doubts about this issue 😃

@Kazadhum
Copy link
Owner

Hey @MisterOwlPT, thank you! :D

@Kazadhum
Copy link
Owner

Hello @MisterOwlPT!

Just one more thing! In this version of Rigel, I've noticed that the containers get a different name each time the Compose plugin is run, like:
calibration_evaluation-8561119a-e179-11ed-9e53-47c8f9dfe4e6
calibration_evaluation-4cb00a04-e179-11ed-9e53-47c8f9dfe4e6
calibration_evaluation-1a91df84-e179-11ed-9e53-47c8f9dfe4e6

This is fine, except for when saving artifacts during multiple tests. To illustrate my point, here's the directory structure of the Rigel archives after running some tests with the new version (the folders without the random names are from the previous Rigel version):

.
└── test
    ├── 17-04-2023-12-29-41
    │   └── calibration_evaluation
    ├── 17-04-2023-12-31-56
    │   └── calibration_evaluation
    │       └── rgb_to_rgb_results.csv
    ├── 23-04-2023-02-31-47
    │   └── calibration_evaluation-86aaa3de-e176-11ed-9e53-47c8f9dfe4e6
    │       └── rgb_to_rgb_results.csv
    ├── 23-04-2023-02-34-34
    │   └── calibration_evaluation-ae9ca96e-e176-11ed-9e53-47c8f9dfe4e6
    │       └── rgb_to_rgb_results.csv
    ├── 23-04-2023-02-50-00
    │   └── calibration_evaluation-1a91df84-e179-11ed-9e53-47c8f9dfe4e6
    │       └── rgb_to_rgb_results.csv
    ├── 23-04-2023-02-51-24
    │   └── calibration_evaluation-4cb00a04-e179-11ed-9e53-47c8f9dfe4e6
    │       └── rgb_to_rgb_results.csv
    ├── 23-04-2023-02-52-59
    │   └── calibration_evaluation-8561119a-e179-11ed-9e53-47c8f9dfe4e6
    │       └── rgb_to_rgb_results.csv
    └── latest -> /home/diogo/.rigel/archives/test/23-04-2023-02-52-59

So the latest tag doesn't work in this version. I need to be able to access the latest .csv file for my introspection plugin. If you could take a look at it I'd appreciate it!

Thank you! 😄

@MisterOwlPT
Copy link
Collaborator Author

Hello @Kazadhum,

Whoops... I forgot about this "detail".
It's fixed now and committed. Thank you for the feedback!

Please let me know if you still have any problems with this 😄

@MisterOwlPT
Copy link
Collaborator Author

MisterOwlPT commented May 15, 2023

Hi @Kazadhum,

(@rarrais for reference)

I've committed some more changes to Rigel (branch flow).
These changes should help you with the issue you reported.

Here is a description of the changes:

UPDATES TO RIGEL

New mechanism for data decoding

Previously all template variables (the ones declared using {{ ... }}) were decoded right away.
Although this is useful for passing credentials and initializing providers it limits interaction between plugins.
Therefore a new mechanism for decoding template variables was implemented.

Not there are considered two types of template variables:

  • static - declared using {{ vars.XXXX }}
  • dynamic - declared using {{ data.XXX }}

The difference between them is that static template variables are decoded when parsing the Rigelfile (i.e., the existing mechanism).

NOTE: static template variables must still be declared using the vars field via Rigelfile or via environment variables.

Dynamic template variables are decoded at run-time before loading each Plugin. In this case, Rigel uses an internal structure called shared_data (type Dict[str, Any]) to decode the values. This internal structure is shared among all plugins, allowing for data passing.

NOTE: when declaring template variables, if you use any header besides vars or data you'll get an error. Make sure to update existing Rigelfiles according to your needs.

Plugin constructor changes

All plugins are now passed the shared_data field described early.
Therefore, the constructor must reflect this change. Consider the following example:

   # ...
    def __init__(
        self,
        raw_data: PluginRawData,
        global_data: RigelfileGlobalData,
        application: Application,
        providers_data: Dict[str, Any],
        shared_data: Dict[str, Any] = {}  # <-------- add this line
    ) -> None:
        super().__init__(
            raw_data,
            global_data,
            application,
            providers_data,
            shared_data  # <-------- add this line
        )
    # ...

No more changes are required.

NOTE: all core Rigel plugins were updated. Ensure your plugin is also compliant with this change otherwise you'll get an error.

Plugin call changes

Since dynamic template variables are now a thing, all Plugins must be configured explicitly using the Rigelfile.

Take for instance an updated example for the rigel.plugins.core.TestPlugin plugin (the plugin that changed the most):

  test:
    plugin: "rigel.plugins.core.TestPlugin"
    with:
      timeout: "{{ data.simulation_duration }}"
      hostname: "{{ data.simulation_address }}"
      requirements:
        - 'globally : some /OSPS/TM/HeartBeep [osps_msgs/TMHeartBeep] {publisherId = "friday"}'
        - "globally : some /OSPS/TM/TaskStatus [osps_msgs/TMTaskStatus] {statusCode = 5}"

Previously the fields timeout and hostname would be obtained automatically from shared data. The plugin would internally access a hardcoded field and use its value.

Now, all configuration data must be defined explicitly (even if in practice we end up referring to the same shared data as before). This allows for more customization if so required.

NOTE: default values still exist and are supported! Make sure to consult a Plugin model if required.

I hope this message proves useful to you.

Try updating your Rigelfile and external plugin to match these changes.
As usual, let me know if you have any questions or suggestions 😃

@Kazadhum
Copy link
Owner

Kazadhum commented Jun 2, 2023

Hello @MisterOwlPT ! Sorry for the delay, I've been trying to focus a little more on writing as of late. However, I just got to try to try and update everything on my flow branch to match these changes and I'm running into a problem.

After making these changes and trying to run the test sequence, I get the following error:

  File "/home/diogo/rigel/rigel/files/decoder.py", line 58, in __aux_decode
    self.__aux_decode_list(data, vars, header, path)
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'
  File "/home/diogo/rigel/rigel/files/decoder.py", line 239, in __aux_decode_list
    self.__aux_decode_list(data, vars, header, path)
  File "/home/diogo/rigel/rigel/files/decoder.py", line 239, in __aux_decode_list
  File "/home/diogo/rigel/rigel/files/decoder.py", line 239, in __aux_decode_list
    raise RigelError(f"Field '{new_path}' set to undeclared shared variable '{variable_name}'")
    raise RigelError(f"Field '{new_path}' set to undeclared shared variable '{variable_name}'")
    raise RigelError(f"Field '{new_path}' set to undeclared shared variable '{variable_name}'")
    raise RigelError(f"Field '{new_path}' set to undeclared shared variable '{variable_name}'")
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'
rigel.exceptions.RigelError: Field 'components[1].command[2]' set to undeclared shared variable 'matrix.trans_noise'

I made the changes to my plugin just like you showed me but I'm running into an issue when declaring variables. Maybe I'm getting this wrong, but from what I can gather, if I want to access the value of a variable declared in a matrix in the sequences field (to use with different values in each parallel run) I should use {{ data.matrix.trans_noise }}, for example, right?

@MisterOwlPT
Copy link
Collaborator Author

Hi @Kazadhum!

It's okay 😃 happy to hear from you again.

No, dynamic data has no internal hierarchy, and all fields are placed at the same root level (this is something I am considering implementing). All dynamic data can only be accessed via {{ data.<FIELD_NAME> }}.

Therefore you must change {{ data.matrix.trans_noise }} to {{ data.trans_noise }} and everything should work.

@Kazadhum
Copy link
Owner

Kazadhum commented Jun 5, 2023

Thanks @MisterOwlPT, it works now! 😃

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants