Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broadcasting system for the cluster #13124

Merged
merged 4 commits into from May 20, 2022

Conversation

Selutario
Copy link
Member

@Selutario Selutario commented Apr 13, 2022

Related issue
Closes #12912

Description

This PR adds a new broadcasting system for the master node of the Wazuh cluster, as requested at #12912.

To do this, an asynchronous queue is created for each AbstractServerHandler (there is one for each connected client), to which the functions to be executed are added. Thus, the functions will be executed as soon as the AbstractServerHandler is free, in the most resource-efficient way possible.

schema_1

Broadcast methods

This broadcasting system adds three new methods to make use of it. They are defined inside the AbstractServer class. Therefore, they can be used there or in any inheriting class (such as Master or LocalServer):

  • broadcast(f, *args, **kwargs): Run a function in all server handlers. There is one server handler for each connected client. Nothing is returned and it is not possible to check if the function was run in every server handler or the result.
  • broadcast_add(f, *args, **kwargs): It works as the method above (broadcast) but it does return an identifier (broadcast_id). When this function is used, the result of each server handler is stored and it can be queried with its broadcast_id.
  • broadcast_pop(broadcast_id): Receive broadcast_id as a parameter. It returns False if the request has not yet been run in all expected server handlers. Otherwise, a dict with the response for each one is returned (or True if the broadcast_id is unknown).

Usage example

An example of use could be this development of agent-groups (#10771). Until now, due to the lack of a broadcasting method, a function was called from which each MasterHandler obtained the info:

while True:
try:
before = perf_counter()
sync_object.logger.info("Starting.")
if len(self.agent_groups_control_workers) >= len(self.clients.keys()) > 0:
self.agent_groups_control = await sync_object.retrieve_information()
self.agent_groups_control_workers.clear()
after = perf_counter()
logger.info(f"Finished in {(after - before):.3f}s.")
elif len(self.clients.keys()) == 0:
logger.info("No clients connected. Skipping.")
except Exception as e:
sync_object.logger.error(f"Error getting agent-groups from WDB: {e}")

Now, it could just call broadcast(), specifying a reference to the MasterHandler.send_agent_groups_information method and the info as a keyword:

        while True:
            try:
                before = perf_counter()
                sync_object.logger.info("Starting.")

                if len(self.clients.keys()) > 0:
                    if info := await sync_object.retrieve_information():
                        self.broadcast(MasterHandler.send_agent_groups_information, info)
                    after = perf_counter()
                    logger.info(f"Finished in {(after - before):.3f}s.")
                elif len(self.clients.keys()) == 0:
                    logger.info("No clients connected. Skipping.")
            except Exception as e:
                sync_object.logger.error(f"Error getting agent-groups from WDB: {e}")

There is no way to verify if the function has been run in all MasterHandlers. If that is required, broadcast_add() and broadcast_pop() could be used instead. broadcast_pop would return False until all handlers run the function:

        bc_id = None
        bc_sent = False
        while True:
            try:
                before = perf_counter()
                sync_object.logger.info("Starting.")

                bc_sent = self.broadcast_pop(bc_id)
                if bc_sent and len(self.clients.keys()) > 0:
                    if info := await sync_object.retrieve_information():
                        bc_id = self.broadcast_add(MasterHandler.send_agent_groups_information, info)
                    after = perf_counter()
                    logger.info(f"Finished in {(after - before):.3f}s.")
                elif len(self.clients.keys()) == 0:
                    logger.info("No clients connected. Skipping.")
            except Exception as e:
                sync_object.logger.error(f"Error getting agent-groups from WDB: {e}")

Tests results

As it can be seen below, the coverage for the server.py module is still 100% after this development:

============================================================================================ test session starts ============================================================================================
platform linux -- Python 3.9.5, pytest-7.0.1, pluggy-1.0.0 -- /home/selu/venv/3.9-unittest-env/bin/python3.9
cachedir: .pytest_cache
rootdir: /home/selu/Git/wazuh/framework
plugins: anyio-3.5.0, asyncio-0.18.1, aiohttp-1.0.4, cov-3.0.0
asyncio: mode=auto
collected 30 items                                                                                                                                                                                          

framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_init PASSED                                                                                                             [  3%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_to_dict PASSED                                                                                                          [  6%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_connection_made PASSED                                                                                                  [ 10%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_process_request PASSED                                                                                                  [ 13%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_echo_master PASSED                                                                                                      [ 16%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_hello PASSED                                                                                                            [ 20%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_process_response PASSED                                                                                                 [ 23%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_connection_lost PASSED                                                                                                  [ 26%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_add_request PASSED                                                                                                      [ 30%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServerHandler_broadcast_reader PASSED                                                                                                 [ 33%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_init PASSED                                                                                                                    [ 36%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_broadcast PASSED                                                                                                               [ 40%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_broadcast_ko PASSED                                                                                                            [ 43%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_broadcast_add PASSED                                                                                                           [ 46%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_broadcast_add_ko PASSED                                                                                                        [ 50%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_broadcast_pop[broadcast_results0-False] PASSED                                                                                 [ 53%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_broadcast_pop[broadcast_results1-False] PASSED                                                                                 [ 56%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_broadcast_pop[broadcast_results2-True] PASSED                                                                                  [ 60%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_broadcast_pop[broadcast_results3-expected_response3] PASSED                                                                    [ 63%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_broadcast_pop[broadcast_results4-expected_response4] PASSED                                                                    [ 66%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_to_dict PASSED                                                                                                                 [ 70%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_setup_task_logger PASSED                                                                                                       [ 73%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_get_connected_nodes PASSED                                                                                                     [ 76%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_get_connected_nodes_ko PASSED                                                                                                  [ 80%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_check_clients_keepalive PASSED                                                                                                 [ 83%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_echo PASSED                                                                                                                    [ 86%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_performance_test PASSED                                                                                                        [ 90%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_concurrency_test PASSED                                                                                                        [ 93%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_start PASSED                                                                                                                   [ 96%]
framework/wazuh/core/cluster/tests/test_server.py::test_AbstractServer_start_ko PASSED                                                                                                                [100%]


============================================================================================= warnings summary ==============================================================================================
../../venv/3.9-unittest-env/lib/python3.9/site-packages/pytest_aiohttp/plugin.py:28
  /home/selu/venv/3.9-unittest-env/lib/python3.9/site-packages/pytest_aiohttp/plugin.py:28: DeprecationWarning: The 'asyncio_mode' is 'legacy', switching to 'auto' for the sake of pytest-aiohttp backward compatibility. Please explicitly use 'asyncio_mode=strict' or 'asyncio_mode=auto' in pytest configuration file.
    config.issue_config_time_warning(LEGACY_MODE, stacklevel=2)

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html

----------- coverage: platform linux, python 3.9.5-final-0 -----------
Name                                                Stmts   Miss  Cover
-----------------------------------------------------------------------
api/api/__init__.py                                     0      0   100%
api/api/api_exception.py                               11      5    55%
api/api/configuration.py                               87     42    52%
api/api/constants.py                                   11      0   100%
api/api/validator.py                                  149     50    66%
framework/__init__.py                                   0      0   100%
framework/wazuh/__init__.py                            56     36    36%
framework/wazuh/core/__init__.py                        0      0   100%
framework/wazuh/core/cluster/__init__.py                5      0   100%
framework/wazuh/core/cluster/common.py                445    342    23%
framework/wazuh/core/cluster/server.py                196      0   100%
framework/wazuh/core/cluster/tests/__init__.py          0      0   100%
framework/wazuh/core/cluster/tests/test_server.py     395      5    99%
framework/wazuh/core/cluster/utils.py                 141     98    30%
framework/wazuh/core/common.py                        130     19    85%
framework/wazuh/core/configuration.py                 418    378    10%
framework/wazuh/core/database.py                       55     36    35%
framework/wazuh/core/exception.py                     113     36    68%
framework/wazuh/core/pyDaemonModule.py                 63     52    17%
framework/wazuh/core/results.py                       334    240    28%
framework/wazuh/core/utils.py                         932    779    16%
framework/wazuh/core/wazuh_socket.py                  151    112    26%
framework/wazuh/core/wdb.py                           174    148    15%
framework/wazuh/core/wlogging.py                       66     50    24%
-----------------------------------------------------------------------
TOTAL                                                3932   2428    38%

======================================================================================= 30 passed, 1 warning in 1.00s =======================================================================================

@Selutario Selutario self-assigned this Apr 13, 2022
@Selutario Selutario marked this pull request as draft April 19, 2022 12:26
@Selutario Selutario marked this pull request as ready for review April 19, 2022 12:27
@Selutario Selutario force-pushed the feature/12912-cluster-broadcast-system branch from 6dd550f to 0d502ad Compare April 19, 2022 12:27
framework/wazuh/core/cluster/server.py Outdated Show resolved Hide resolved
framework/wazuh/core/cluster/server.py Outdated Show resolved Hide resolved
framework/wazuh/core/cluster/server.py Outdated Show resolved Hide resolved
framework/wazuh/core/cluster/server.py Outdated Show resolved Hide resolved
framework/wazuh/core/cluster/server.py Outdated Show resolved Hide resolved
@Selutario Selutario force-pushed the feature/12912-cluster-broadcast-system branch from 7ad9ac1 to ba87003 Compare April 29, 2022 10:15
@davidjiglesias davidjiglesias merged commit e590a00 into master May 20, 2022
@davidjiglesias davidjiglesias deleted the feature/12912-cluster-broadcast-system branch May 20, 2022 07:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add broadcast system for the master node
3 participants