Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Agent comms API endpoint client #1

Closed
Tracked by #22677
TomasTurina opened this issue May 10, 2024 · 14 comments
Closed
Tracked by #22677

New Agent comms API endpoint client #1

TomasTurina opened this issue May 10, 2024 · 14 comments
Assignees
Labels

Comments

@TomasTurina
Copy link
Member

TomasTurina commented May 10, 2024

Description

As detailed in wazuh/wazuh#22677, Wazuh's current communication setup is complex and needs to be refactored.

We want to replace the current wazuh-agentd service in charge of communicating with the server with a new agent. Additionally, the agent-auth tool will also be replaced by this agent.

The new agent must be able to perform the following tasks:

  • Registration:
    • Whoever installs an agent has to register it on the server only once.
    • The UUID v7 is chosen by the agent and will identify it.
    • For the request, it will use its UUID.
    • The response will include the agent credentials that need to be stored.
    • The agent will use the Server management API (it needs the login token).
  • Authentication and authorization:
    • To send/receive events, the agent needs a token.
    • For the request, it will use its UUID and credentials.
    • When it expires it must request another one and so on.
    • Proposal: OAuth 2.0 with JWTs.
    • The agent will use the Agent comms API.
  • Communication agent → server:
    • Asynchronous and event driven.
    • Open connection, pass token, send events, close connection.
    • Bulking, batching, and buffering.
    • Proposal: HTTPS, API REST.
    • The agent will use the Agent comms API.
  • Communication server → agent:
    • Connection-oriented.
    • The connection is the agent's initiative and must be kept established (pass token).
    • The agent would pull to see if it has anything (long polling GET).
    • Proposal: HTTPS, API REST.
    • The agent will use the Agent comms API.

Additionally, these are the API endpoints that the agent will use to communicate with the server:

  • /login
    Authenticate (request token).
  • /events/stateless
    Send events.
  • /events/stateful
    The same as the previous one, but it requires persistent data.
  • /commands
    In opposite direction, request made by agent to manager.

The focus of this issue will be on the following tasks:

  • Initial client design, including technological and library research.
  • PoC implementation for agent.

Implementation restrictions

  • Work with C++ (at least 17) and CMake.
  • Look for a library that has everything: libcurl, boost, gRPC, etc.
  • Everything as standard as possible.
  • Collaborate with the server team to align on communication protocols and API integration.

Plan

  • Conduct initial research on system design and relevant technologies.
  • Explore client-side designs and technologies for seamless interaction with the Agent Comms API.
  • Describe the behavior of the new agent, create UML designs.
  • Define new building system and new repository structure: Proposal.
  • Work on a Proof of Concept implementation to validate communication flows between agents and servers.
    • Initially, work with mocks.
    • Establish connection, run endpoints.
    • Upload it to a /poc folder in the new repository.

POC working branch: https://github.com/wazuh/wazuh-agent/tree/1-spike-new-agent-comms-api-endpoint-client

@jr0me
Copy link
Member

jr0me commented May 21, 2024

Initial libraries investigation

There are several C++ libraries that support our requirements. The ones described in this small list are widely used in the industry and tested. Note that no single library covers all needs: those that support high-level protocols like HTTP and HTTP/2 often lack native support for UDP, while libraries handling low-level protocols may not provide comprehensive support for higher-level abstractions.

According to wazuh/wazuh#23395 the server team is also considering a number of libraries, we should encourage sharing the same technology across both Agent and Server.

Libraries for Network Connections:

ZeroMQ (ØMQ):
ZeroMQ (ØMQ) is a high-performance asynchronous messaging library designed for distributed and concurrent applications. It supports various messaging patterns like publish/subscribe, request/reply, and client/server over multiple transports, including TCP, UDP, in-process, inter-process, multicast, and WebSocket (does not natively support HTTP/HTTP2). Highly suitable for building scalable and resilient distributed systems. The libzmq library is licensed under the Mozilla Public License 2.0.

https://zeromq.org/get-started/

gRPC:
gRPC is a high-performance open-source RPC framework that facilitates seamless communication between client and server applications, making it ideal for distributed systems. It uses HTTP/2 and Protocol Buffers for efficient serialization (does not support raw TCP/UDP). gRPC supports multiple languages, allowing interoperability between different systems. It is designed for low-latency and scalable communication, with features like load balancing, tracing, and authentication. It can also be extended to support various data formats like JSON, Protobuf, and others. It's license is Apache 2.

https://grpc.io/

Boost.Asio / Boost.Beast:
Boost.Asio provides a comprehensive suite for asynchronous network programming, supporting TCP, UDP, and serial communication. Boost.Beast extends Boost.Asio with HTTP and WebSocket capabilities. These libraries are highly performant and integrate well with other Boost libraries, offering extensive functionality for complex networking tasks.

https://www.boost.org/users/license.html
https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio.html
https://www.boost.org/doc/libs/1_85_0/libs/beast/doc/html/index.html

ASIO (Standalone):
The standalone ASIO library offers asynchronous I/O capabilities without the Boost dependency. It supports TCP, UDP, and serial port communications, and is suitable for applications requiring high performance (requires additional libraries for HTTP/HTTP2). ASIO is lightweight and provides efficient network communication primitives.

https://www.boost.org/LICENSE_1_0.txt
https://think-async.com/Asio/

CURL and http-request:

CURL supports many protocols like HTTP and HTTPS, with built-in SSL/TLS for secure communication. Given that we already have a CURL wrapper http-request, continuing to use CURL with it allows leveraging existing infrastructure and avoiding new dependencies, simplifying maintenance.

While it lacks native WebSocket support, CURL's wide support makes it suitable for HTTP-based event sending and command endpoints. Compared to other options, aside from the increased complexity of adding new dependencies ZeroMQ offers high-performance messaging but limited protocol support; gRPC supports HTTP/2 and real-time communication but is more complex given the need for proto buffer definitions. Continuing with CURL leverages existing code and avoids new dependencies.

Note that wazuh/wazuh-http-request repository requires files from wazuh/wazuh. Currently, it doesn't work as a standalone project and some minor changes need to be made for it to be used as a library.

https://curl.se/libcurl/
https://github.com/wazuh/wazuh-http-request

Libraries for JWTs:

jwt-cpp:
jwt-cpp is a lightweight library for creating and verifying JSON Web Tokens (JWT). It supports various algorithms for signing and verifying tokens, making it suitable for implementing token-based authentication in C++ applications. jwt-cpp is easy to integrate and provides a straightforward API for handling JWTs. Licensed under MIT.

https://thalhammer.github.io/jwt-cpp/

@TomasTurina
Copy link
Member Author

TomasTurina commented May 23, 2024

New agent

  • Unique binary.
  • CLI to run and manage it:
    • Agent registration.
    • Enable or disable modules.
    • Check the status of the modules.
  • Internal agent API:
    • Choose between TCP socket or HTTP.
    • Everything must be done by API.
  • Communication with the new server:
    • Through the API that the server exposes.
    • Define protocols: HTTP,…
    • Define message format: JSON,…
    • Agent -> manager:
      • Event driven, send and close.
      • Dispatcher and communicator.
      • Bulk of messages:
        • By time or quantity.
        • Compress messages (msgpack).
        • Reuse connections (optional).
      • Batching:
        • Pool of threads.
        • Send message if a thread is available.
      • Buffering:
        • Don't discard messages.
        • Push-back.
        • Rocksdb,...
    • Manager -> agent:
      • Agent starts the connection, manager accepts it.
      • Agent requests data from the manager.
        • Long polling GET waiting for available data.
        • Only one connection.
        • Reopen connection if it times out.
    • Implement statistics in memory.
  • Integrate the modules we have to the new agent.
  • Use common interfaces even if the implementation is different for each OS.
    • Wrappers: libcurl,...
    • jemalloc
    • ...
  • Thinking in the future:
    • There must be a local interface to communicate with the user.
    • Incorporate processing in the agent.
  • Research other products:
    • Vector.
    • Datadog.
    • Osquery.

New repository

  • Select development environment, build tools and dependency management:
    • Work in C++ (at least 17).
    • Build with cmake.
    • For dependencies use vcpkg or cmake package manager, seeking to reduce future work.
  • Select common libraries:
    • Logging.
    • Testing.
    • Benchmarking.
    • Concurrency.
  • Scaffolding:
    • README.
    • Directory structure.
    • Be able to separate portable code from non-portable code.
    • Legacy code?
  • First develop the protocol, then bring the rest of the modules and adapt them.

@jr0me
Copy link
Member

jr0me commented May 24, 2024

Update

New repo structure

Initial draft considerations:

  • We shouldn't mix folder that include code with others that include configuration, or scripts files for example.
  • Separate owned code from external third-party dependencies
  • When possible, each module folder should generate at most one binary program.
  • Each module, or folder, contains its own tests.

Additionally, we should aim to identify functionalities that could be replaced by an external library to offload the maintenance of these components. Furthermore, we need to establish a strategy for common code shared between the agent and server to avoid duplication.

wazuh-agent/
├── CMakeLists.txt
├── vcpkg.json
├── dependencies/ -> may not be necessary
├── modules/
│   ├── agent/
│   │   ├── CMakeLists.txt
│   │   ├── include/
│   │   │   └── [header files...] (include error_messages)
│   │   ├── src/
│   │   │   └── [source files...]
│   │   └── tests/
│   │       ├── CMakeLists.txt
│   │       └── [test files...]
│   ├── active_response/ (former os_execd + active_response)
│   ├── agent_upgrade/
│   ├── aws/
│   ├── azure/
│   ├── docker/
│   ├── gcp/
│   ├── github/
│   ├── ms_graph/
│   ├── office365/
│   ├── logcollector/
│   ├── rootcheck/
│   ├── fim/ (former syscheck)
│   ├── inventory/ (former syscollector)
│   ├── sca/
│   └── [additional modules...]
├── common/
│   ├── data_provider/
│   ├── dbsync/
│   ├── rsync/ -> may not be necessary
│   ├── os_crypto/ -> may not be necessary
│   ├── os_net/ -> may not be necessary
│   ├── os_regex/ -> may not be necessary
│   ├── os_xml/ -> may not be necessary
│   ├── os_zlib/ -> may not be necessary
│   ├── shared/ (include headers)
│   ├── utils/ (from shared_modules)
│   ├── commonDefs.h (from shared_modules/common)
│   └── [additional modules...]
└── build/
    └── [build output...]

For folders that is not code related to the Agent:

wazuh-agent/
├── ci/
├── etc/
│   ├── config/
│   ├── selinux/
│   └── ruleset/
│       ├── sca/
│       └── rootcheck/
├── packages/
├── tools/
└── installers/
    ├── unix/ (former init folder, including upgrade.sh and install.sh)
    └── win32/

Missing from this initial draft are other files such as .gitignore, .clang-format, .clang-tidy, etc.

Complete structure:

Opción 1:

wazuh-agent/
├── CMakeLists.txt
├── vcpkg.json
├── ci/
├── common/
├── dependencies/ -> may not be necessary
├── etc/
├── installers/
├── modules/
├── packages/
└── tools/

Opción 2:

wazuh-agent/
├── ci/
├── etc/
├── installers/
├── packages/
├── src/
│   ├── CMakeLists.txt
│   ├── vcpkg.json
│   ├── common/
│   ├── dependencies/ -> may not be necessary
│   └── modules/
└── tools/

@havidarou havidarou changed the title SPIKE - Enhancing agent communication setup SPIKE - New Agent comms API endpoint client May 28, 2024
@sdvendramini
Copy link
Member

sdvendramini commented May 31, 2024

Communications server -> agent

/commands endpoint

Note

This is a work in progress

  • POC
    • I started to investigate about SSE connections, to know how I could mock the server and establish the connection through a GET command from the client.

Update: 03/06/2024

  • I did the first server-->agent (SSE) communication test using a server based on this example and a client made entirely with libcurl.
  • checked out the http-request repo to see if it could be used for the PoC. Looks like it might need some changes.
  • I started running some tests with http-request but didn't have success yet.

Update: 04/06/2024

  • I was working completing the communicacion ussing http-request with SSE communication.
  • The aproach implementing SSE communication was changed for HTTP Long Polling.

Update: 05/06/2024

  • I was working in the PoC implementing /commands endpoint using HTTP Long Polling.

Update: 06/06/2024

  • I was working in the PoC implementing /commands endpoint using HTTP Long Polling. A timeout response mechanism was implemented to simulate real behavior. Now I'm working on the storage of the commands.

Update: 07/06/2024

  • The structure of storage for commands is ready. And a thread was created to dispatch the received commands. It is necessary to separate the commands logic to a new class. And make sure the rocksDB wrapper works for events as well.

Update: 10/06/2024

  • The logic of the endpoint /commands was separated to be in a new module. Parameterized the DB name for rocksDB.

Update: 11/06/2024

  • Request /agents added to register new agents.

Update: 13/06/2024

  • Today I started working with the server created by pyserver team. Our client requires some changes to use the endpoints correctly.

@TomasTurina
Copy link
Member Author

TomasTurina commented Jun 3, 2024

Current wazuh-agentd UML diagram

image

PlantUML code
@startuml

== Initialization ==

module -> module : init module
execd -> execd : init execd
agentd -> agentd : init agentd

loop agent disconnected
agentd -> remoted : connect agent
remoted -> agentd :
end
agentd -> remoted : agent startup
note left
agent connected
end note
agentd -> agentd : update state
note right
sent count
end note
remoted -> agentd : ack
agentd -> remoted : startup event
note left
agent started event
end note
agentd -> agentd : update state
note right
sent count
end note
agentd -> agentd : update state
note right
connection status
end note

agentd -> remoted: keep-alive
note left
first keep-alive
end note
agentd -> agentd : update state
note right
sent and keep-alives count
end note

loop forever

== Keep alives ==

alt server unavailable/force reconnect
agentd -> agentd : update state
note right
connection status
end note
loop agent disconnected
agentd -> remoted : connect agent
remoted -> agentd :
end
agentd -> remoted : agent startup
note left
agent connected
end note
agentd -> agentd : update state
note right
sent count
end note
remoted -> agentd : ack
agentd -> agentd : update state
note right
connection status
end note
end

alt notify_time seconds
agentd -> remoted: keep-alive
note left
keep-alive
end note
agentd -> agentd : update state
note right
sent and keep-alives count
end note
end

== Receive messages ==

remoted -> remoted : action request
remoted -> agentd : send message
note right
AR/request/ack/merged.mg/...
end note
agentd -> agentd : update state
note right
ack count
end note

alt execd header
agentd -> execd : active response

else module header (syscheck/syscollector/sca/...)
agentd -> module : message

else force reconnect
agentd -> agentd : update state
note right
connection status
end note
loop agent disconnected
agentd -> remoted : connect agent
remoted -> agentd :
end
agentd -> remoted : agent startup
note left
agent connected
end note
agentd -> agentd : update state
note right
sent count
end note
remoted -> agentd : ack
agentd -> agentd : update state
note right
connection status
end note

else request header
agentd -> agentd : queue request
note right
requests queue
end note
alt error/udp ack
agentd -> remoted: error/udp ack event
agentd -> agentd : update state
note right
sent count
end note
end

else ack
agentd -> agentd : read ack
note right
keep-alives ack
end note

else file header (merged.mg)
alt open/write
agentd -> agentd : open/write file
else close
agentd -> agentd : close file/verify config
alt error
agentd -> remoted: error event
agentd -> agentd : update state
note right
sent count
end note
else auto restart
agentd -> execd : restart agent
execd -> agentd : response
agentd -> agentd : restart
note right
exit
end note
end
end
end

== Send events ==

module -> module : collect event
module -> agentd : event
agentd -> agentd : update state
note right
messages count
end note

alt buffer enabled
agentd -> agentd : queue event
note right
buffer queue
end note

else buffer disabled
agentd -> remoted : send event
note left
logcollector/syscheck/...
end note
agentd -> agentd : update state
note right
sent count
end note
end

end

== Client buffer (thread) ==

alt buffer enabled
loop every event
alt buffer level change
agentd -> remoted: agent buffer event
agentd -> agentd : update state
note right
sent count
end note
end
agentd -> agentd : get event
note right
buffer queue
end note
agentd -> remoted : send event
note left
logcollector/syscheck/...
end note
agentd -> agentd : update state
note right
sent count
end note
end
end

== Process requests (thread) ==

loop every request
agentd -> agentd : get request
note right
requests queue
end note
alt agent request
agentd -> agentd : getconfig/getstate
else execd request
agentd -> execd : request
execd -> agentd : response
else module request (logcollector/syscheck/wmodules/upgrade/...)
agentd -> module : request
module -> agentd : response
end
agentd -> remoted : send response
note left
request result
end note
agentd -> agentd : update state
note right
sent count
end note
end

== Log rotation (thread) ==

alt rotate log enabled
loop every second
alt condition met
agentd -> agentd : rotate logs
note right
ossec.log/ossec.json
end note
end
end
end

== State main (thread) ==

loop every interval
agentd -> agentd : write state file
note right
wazuh-agent.state
end note
end

== Exit ==

module -> module : exit module
execd -> execd : exit execd
agentd -> remoted : shutdown event
note left
agent disconnected
end note
agentd -> agentd : update state
note right
sent count
end note
agentd -> agentd : exit agentd

@enduml

@TomasTurina
Copy link
Member Author

TomasTurina commented Jun 5, 2024

New agent registration mechanism

image

PlantUML code
@startuml

actor User as user
participant Agent as agent
database AgentDB as agent_database
participant ServerAPI as server_management_API

== Register agent ==

user -> agent++ : register {user, password}
agent -> server_management_API : POST /authenticate {user, password}
server_management_API -> agent : api token
agent -> server_management_API : POST /agents {token, uuid}
server_management_API -> agent : agent credentials
agent -> agent_database : store credentials
agent_database -> agent : ok
agent -> user-- : result

@enduml

@wazuh wazuh deleted a comment from jr0me Jun 5, 2024
@TomasTurina
Copy link
Member Author

TomasTurina commented Jun 5, 2024

New agent communication mechanism

image

PlantUML code
@startuml

!pragma teoz true

actor User as user
participant Module as module
participant AR as active_response
participant Upgrade as upgrade
participant Agent as agent
database AgentDB as agent_database
database EventsDB as events_database
database CommandsDB as commands_database
participant AgentAPI as agent_comms_API

== Start agent ==

user -> agent++ : start agent

agent -> agent_database : read credentials
note right
Cache credentials
end note
agent_database -> agent : credentials

agent -> agent_database : read token
note right
Cache token
end note
agent_database -> agent : token

alt token is invalid
agent -> agent_comms_API : POST /login {credentials, uuid}
agent_comms_API -> agent : api token
agent -> agent_database : store token
note right
Update cache
end note
agent_database -> agent : ok
end

agent -> events_database : read and update events status
note right
Cache events
end note
events_database -> agent : ok
agent -> commands_database : read and update commands status
note right
Cache commands
end note
commands_database -> agent : ok

agent -> agent++-- : launch threads

agent -> module++ : start module
module -> agent : ready
agent -> active_response++ : start module
active_response -> agent : ready
agent -> upgrade++ : start module
upgrade -> agent : ready

loop
agent -> agent : keep threads running
end

== Token expire (thread) ==

loop
alt token is invalid
agent -> agent_comms_API : POST /login {credentials, uuid}
agent_comms_API -> agent : api token
agent -> agent_database : store token
note right
Update cache
end note
agent_database -> agent : ok
end
end

== Process events (threads) ==

loop
module -> module : collect event
module -> agent : send event
end

loop
agent -> agent : read event
alt event available
agent -> agent : set event timestamp
alt fim/inventory/sca
agent -> events_database : store stateful event
note right
Cache event
end note
events_database -> agent : ok
else
agent -> events_database : store stateless event
note right
Cache event
end note
events_database -> agent : ok
end
end
end

== Send events (threads) ==

loop
loop bulk not ready
agent -> events_database : get {stateful|stateless} event and update status
note right
Read/update cache
end note
events_database -> agent : event
agent -> agent : add event to {stateful|stateless} bulk
end

agent -> agent_database : read token
note right
Read cache
end note
agent_database -> agent : token

agent -> agent_comms_API : POST /events/{stateful|stateless} {token, uuid, stateful bulk}
agent_comms_API -> agent : response
alt response ok
agent -> events_database : delete {stateful|stateless} events
note right
Update cache
end note
events_database -> agent : ok
else
agent -> events_database : update {stateful|stateless} events status
note right
Update cache
end note
events_database -> agent : ok
end
end

== Receive commands (thread) ==

loop
agent_comms_API  -> agent_comms_API : collect commands
end

loop
agent -> agent_database : read token
note right
Read cache
end note
agent_database -> agent : token

agent -> agent_comms_API : GET /commands {token, uuid}
agent_comms_API -> agent : response
alt commands available
loop every command
agent -> commands_database : store command
note right
Cache command
end note
commands_database -> agent : ok
end
end
end

== Process commands (threads) ==

loop
agent -> commands_database : get command and update status
note right
Read/update cache
end note
commands_database -> agent : command

alt module
agent -> module : command
module -> agent : result

else active response
agent -> active_response : active response
active_response -> agent : result

else upgrade
agent -> agent : parse parameters
agent -> agent_database : read token
note right
Read cache
end note
agent_database -> agent : token
agent -> agent_comms_API : GET /upgrade {token, uuid, parameters}
agent_comms_API -> agent : wpk file
agent -> upgrade : upgrade
upgrade -> agent : result

else configuration
agent -> agent : parse parameters
agent -> agent_database : read token
note right
Read cache
end note
agent_database -> agent : token
agent -> agent_comms_API : GET /configuration {token, uuid, parameters}
agent_comms_API -> agent : configuration file
agent -> agent : apply configuration

else agent
agent -> agent : command
end

alt result ok
agent -> commands_database : delete command
note right
Update cache
end note
commands_database -> agent : ok
else
agent -> commands_database : update command status
note right
Update cache
end note
commands_database -> agent : ok
end
end

== Stop agent ==

user -> agent : stop agent

agent -> module!! : exit module
agent -> active_response!! : exit module
agent -> upgrade!! : exit module

agent -> agent++-- : stop threads

agent -> agent!! : exit agent

@enduml

@vikman90 vikman90 added the phase/spike Spike label Jun 7, 2024
@vikman90 vikman90 changed the title SPIKE - New Agent comms API endpoint client New Agent comms API endpoint client Jun 7, 2024
@sdvendramini
Copy link
Member

sdvendramini commented Jun 12, 2024

PoC Summary

After an initial period of research, during which various library variants were evaluated for the Proof of Concept (PoC) implementation (see comment), it was determined that the initial approach for integrating the new agent with the Agent Communications API would use the wazuh-http-request repository as a cURL wrapper.

The PoC aims to address the fundamental aspects of communication and design, based on the component diagrams detailed in this issue. This PoC specifically targets the client module and the submodules that interact with the commander and the queue of that design.

The following requests specified in the description of this issue were successfully implemented:

  • POST /login
  • POST /stateless
  • GET /commands

The POST /stateful request was not implemented, as it is assumed to be analogous to the stateless request.

For data persistence, two basic storage models were developed: one using SQLite and the other using RocksDB. These were implemented with a wrapper to abstract the design from the specific database chosen in the future. Currently, command storage utilizes RocksDB, while event storage employs SQLite.

A server was developed to mock responses, using the Boost library and JWT to provide the client with a token via the /login request.

Regarding events, their structure is defined as follows:

  • id
  • event_data
  • event_type
  • timestamp
  • status

The status of requests can be set to: pending, processing, or dispatched. By default, they are inserted into the database with a pending status.

Events are generated automatically through a Python script. The event_queue_monitor submodule extracts these events from the database, updates their status to "processing," and then accumulates them for a time T or until a count N is reached. Subsequently, it automatically sends a /stateless request with these events to the server. This submodule verifies the success of the request, marking the events as dispatched in the database or resetting their status to pending.

This submodule features a thread that continuously searches for events in the database. For each batch of events, a new thread is launched to handle dispatch.

Regarding commands, their structure is defined as follows:

  • id
  • command_data
  • status

The status can be set to either pending or dispatched.

The command_dispatcher submodule is responsible for making continuous GET requests to the server within a thread. When no commands are available, a Timeout error is returned. If commands are available, they are received and stored in the database. Another thread then retrieves commands with a pending status from the database, marks them as dispatched, and simulates sending them to the commander.

Finally, for both submodules, a message format already used by wazuh in the upgrade module was selected (see issue).
Particularly for these cases an example of an event would be as follows:

event_data = {
    "origin": {
        "module": "fim"
    },
    "command": "create_event",
    "parameters": {
    "data": "event # xxx"
    }
}

and a command would be:

command = {
    "origin": {
        "module": "upgrade_module"
    },
    "command": "upgrade_update_status",
    "parameters": {
        "agents": [20],
        "error": 0,
        "data": "Upgrade Successful",
        "status": "Done"
    }
}

The following diagram was made by @TomasTurina for a better understanding:

image

PlantUML code
@startuml

!pragma teoz true

actor User as user
box Modules
box Executors #lightblue
participant Executor as executor
end box
box Collectors #lightblue
participant Collector as collector
end box
end box
box Agent
box Commander #lightblue
database CommandsDB as commandsdb
participant Commander as commander
end box
box Queue #lightblue
database EventsDB as eventsdb
participant Queue as queue
end box
box Client #lightblue
database ClientDB as clientdb
participant Client as client
end box
end box
box Manager
box AgentCommsAPI #lightblue
participant AgentCommsAPI as agentapi
end box
end box

== Start agent ==

user -> client++ #pink : start agent

client -> clientdb : read credentials
note left
Cache credentials
end note
clientdb -> client : credentials

client -> clientdb : read token
note left
Cache token
end note
clientdb -> client : token

alt token is invalid
client -> agentapi : POST /login {credentials, uuid}
agentapi -> client : api token
client -> clientdb : store token
note left
Update cache
end note
clientdb -> client : ok
end

client -> queue++ #pink : start module
queue -> client : ready
queue -> eventsdb : read and update events status
note left
Cache events
end note
eventsdb -> queue : ok

client -> commander++ #pink : start module
commander -> client : ready
commander -> commandsdb : read and update commands status
note left
Cache commands
end note
commandsdb -> commander : ok

client -> client ++-- : launch threads

client -> collector++ #pink : start module
collector -> client : ready

client -> executor++ #pink : start module
executor -> client : ready

loop
client -> client : keep threads running
end

== Token expire (thread) ==

loop
alt token is invalid
client -> agentapi : POST /login {credentials, uuid}
agentapi -> client : api token
client -> clientdb : store token
note left
Update cache
end note
clientdb -> client : ok
end
end

== Process events (threads) ==

loop
collector -> collector : collect event
collector -> queue : send event
end

loop
queue -> queue : read event
alt event available
queue -> queue : set event timestamp
alt fim/inventory/sca
queue -> eventsdb : store stateful event
note left
Cache event
end note
eventsdb -> queue : ok
else
queue -> eventsdb : store stateless event
note left
Cache event
end note
eventsdb -> queue : ok
end
end
end

== Send events (threads) ==

loop
loop bulk not ready
client -> eventsdb : get {stateful|stateless} event and update status
note left
Read/update cache
end note
eventsdb -> client : event
client -> client : add event to {stateful|stateless} bulk
end

client -> clientdb : read token
note left
Read cache
end note
clientdb -> client : token

client -> agentapi : POST /events/{stateful|stateless} {token, uuid, stateful bulk}
agentapi -> client : response
alt response ok
client -> eventsdb : delete {stateful|stateless} events
note left
Update cache
end note
eventsdb -> client : ok
else
client -> eventsdb : update {stateful|stateless} events status
note left
Update cache
end note
eventsdb -> client : ok
end
end

== Receive commands (thread) ==

loop
agentapi  -> agentapi : collect commands
end

loop
client -> clientdb : read token
note left
Read cache
end note
clientdb -> client : token

client -> agentapi : GET /commands {token, uuid}
agentapi -> client : response
alt commands available
loop every command
client -> commandsdb : store command
note left
Cache command
end note
commandsdb -> client : ok
end
end
end

== Process commands (threads) ==

loop
commander -> commandsdb : get command and update status
note left
Read/update cache
end note
commandsdb -> commander : command

alt agent
commander -> commander : command result

else collector
commander -> collector : command
collector -> commander : result

else executor
commander -> commander : parse parameters
alt download
commander -> clientdb : read token
note right
Read cache
end note
clientdb -> commander : token
commander -> agentapi : GET /download {token, uuid, parameters}
agentapi -> commander : file
end
commander -> executor : command
executor -> commander : result
end

alt result ok
commander -> commandsdb : delete command
note left
Update cache
end note
commandsdb -> commander : ok
else
commander -> commandsdb : update command status
note left
Update cache
end note
commandsdb -> commander : ok
end
end

== Stop agent ==

user -> client : stop agent

client -> executor!! : exit module
client -> collector!! : exit module

client -> client ++-- : stop threads

client -> commander!! : exit module
client -> queue!! : exit module

client -> client!! : exit agent

@enduml

@sdvendramini
Copy link
Member

sdvendramini commented Jun 14, 2024

Test client PoC with PyServer PoC - Stop server and run it again 🟢

  • Description: The client can reconnect to the server and continue working normally.
Events

Server

agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:42648 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
^CGracefully stopping... (press Ctrl+C again to force)
Stopping comms_agent_comms_api_1 ... done
root@jellyfish:~/wazuh/comms# docker-compose up
Starting comms_agent_comms_api_1 ... done
Attaching to comms_agent_comms_api_1
agent_comms_api_1  | Generating new command
agent_comms_api_1  | INFO:     Started server process [1]
agent_comms_api_1  | INFO:     Waiting for application startup.
agent_comms_api_1  | INFO:     Application startup complete.
agent_comms_api_1  | INFO:     Uvicorn running on http://0.0.0.0:5000 (Press CTRL+C to quit)
agent_comms_api_1  | INFO:     172.18.0.1:52012 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:52012 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:52012 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:52012 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:52012 - "POST /api/v1/events/stateless HTTP/1.1" 200 OK

Client

[2024-06-14 06:50:31] [EVENT QUEUE MONITOR] Dispatching event ID: 65973, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 530"}}
[2024-06-14 06:50:31] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:32] [EVENT QUEUE MONITOR] Dispatching event ID: 65974, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 528"}}
[2024-06-14 06:50:32] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:33] [EVENT QUEUE MONITOR] Dispatching event ID: 65975, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 24"}}
[2024-06-14 06:50:33] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:34] [EVENT QUEUE MONITOR] Dispatching event ID: 65976, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 470"}}
[2024-06-14 06:50:34] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:35] [EVENT QUEUE MONITOR] Dispatching event ID: 65977, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 862"}}
[2024-06-14 06:50:35] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:36] [EVENT QUEUE MONITOR] Dispatching event ID: 65978, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 926"}}
[2024-06-14 06:50:36] [HTTP REQUEST] [STATELESS RESPONSE] Failure when receiving data from the peer with code -1
[2024-06-14 06:50:37] [EVENT QUEUE MONITOR] Dispatching event ID: 65979, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 51"}}
[2024-06-14 06:50:37] [HTTP REQUEST] [STATELESS RESPONSE] Failure when receiving data from the peer with code -1
[2024-06-14 06:50:38] [EVENT QUEUE MONITOR] Dispatching event ID: 65980, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 554"}}
[2024-06-14 06:50:38] [HTTP REQUEST] [STATELESS RESPONSE] Failure when receiving data from the peer with code -1
[2024-06-14 06:50:39] [EVENT QUEUE MONITOR] Dispatching event ID: 65980, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 554"}}
[2024-06-14 06:50:39] [EVENT QUEUE MONITOR] Dispatching event ID: 65981, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 714"}}
[2024-06-14 06:50:39] [HTTP REQUEST] [STATELESS RESPONSE] Failure when receiving data from the peer with code -1
[2024-06-14 06:50:40] [EVENT QUEUE MONITOR] Dispatching event ID: 65982, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 906"}}
[2024-06-14 06:50:40] [HTTP REQUEST] [STATELESS RESPONSE] Server returned nothing (no headers, no data) with code -1
[2024-06-14 06:50:41] [EVENT QUEUE MONITOR] Dispatching event ID: 65983, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 837"}}
[2024-06-14 06:50:41] [HTTP REQUEST] [STATELESS RESPONSE] Failure when receiving data from the peer with code -1
[2024-06-14 06:50:42] [EVENT QUEUE MONITOR] Dispatching event ID: 65984, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 320"}}
[2024-06-14 06:50:42] [HTTP REQUEST] [STATELESS RESPONSE] Failure when receiving data from the peer with code -1
[2024-06-14 06:50:43] [EVENT QUEUE MONITOR] Dispatching event ID: 65985, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 656"}}
[2024-06-14 06:50:43] [HTTP REQUEST] [STATELESS RESPONSE] Failure when receiving data from the peer with code -1
[2024-06-14 06:50:44] [EVENT QUEUE MONITOR] Dispatching event ID: 65986, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 280"}}
[2024-06-14 06:50:44] [HTTP REQUEST] [STATELESS RESPONSE] Failure when receiving data from the peer with code -1
[2024-06-14 06:50:45] [EVENT QUEUE MONITOR] Dispatching event ID: 65987, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 269"}}
[2024-06-14 06:50:45] [HTTP REQUEST] [STATELESS RESPONSE] Failure when receiving data from the peer with code -1
[2024-06-14 06:50:46] [EVENT QUEUE MONITOR] Dispatching event ID: 65988, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 438"}}
[2024-06-14 06:50:46] [HTTP REQUEST] [STATELESS RESPONSE] Couldn't connect to server with code -1
[2024-06-14 06:50:47] [EVENT QUEUE MONITOR] Dispatching event ID: 65988, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 438"}}
[2024-06-14 06:50:47] [EVENT QUEUE MONITOR] Dispatching event ID: 65989, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 232"}}
[2024-06-14 06:50:47] [HTTP REQUEST] [STATELESS RESPONSE] Couldn't connect to server with code -1
[2024-06-14 06:50:48] [EVENT QUEUE MONITOR] Dispatching event ID: 65990, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 541"}}
[2024-06-14 06:50:48] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:49] [EVENT QUEUE MONITOR] Dispatching event ID: 65991, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 170"}}
[2024-06-14 06:50:49] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:50] [EVENT QUEUE MONITOR] Dispatching event ID: 65992, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 352"}}
[2024-06-14 06:50:50] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:51] [EVENT QUEUE MONITOR] Dispatching event ID: 65993, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 15"}}
[2024-06-14 06:50:51] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:52] [EVENT QUEUE MONITOR] Dispatching event ID: 65994, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 554"}}
[2024-06-14 06:50:52] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:53] [EVENT QUEUE MONITOR] Dispatching event ID: 65995, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 520"}}
[2024-06-14 06:50:53] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:54] [EVENT QUEUE MONITOR] Dispatching event ID: 65996, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 417"}}
[2024-06-14 06:50:54] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:55] [EVENT QUEUE MONITOR] Dispatching event ID: 65997, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 836"}}
[2024-06-14 06:50:55] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:56] [EVENT QUEUE MONITOR] Dispatching event ID: 65998, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 217"}}
[2024-06-14 06:50:56] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:57] [EVENT QUEUE MONITOR] Dispatching event ID: 65999, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 385"}}
[2024-06-14 06:50:57] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:58] [EVENT QUEUE MONITOR] Dispatching event ID: 66000, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 887"}}
[2024-06-14 06:50:58] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:50:59] [EVENT QUEUE MONITOR] Dispatching event ID: 66001, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 310"}}
[2024-06-14 06:50:59] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:51:00] [EVENT QUEUE MONITOR] Dispatching event ID: 66002, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 948"}}
[2024-06-14 06:51:00] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:51:01] [EVENT QUEUE MONITOR] Dispatching event ID: 66003, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 346"}}
[2024-06-14 06:51:01] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}
[2024-06-14 06:51:02] [EVENT QUEUE MONITOR] Dispatching event ID: 66004, Data: {"origin": {"module": "fim"}, "command": "create_event", "parameters": {"data": "event # 399"}}
[2024-06-14 06:51:02] [HTTP REQUEST] [STATELESS RESPONSE] {"message":"Events received"}

Commands

Server

agent_comms_api_1  | INFO:     172.18.0.1:43346 - "POST /api/v1/login HTTP/1.1" 200 OK
agent_comms_api_1  | INFO:     172.18.0.1:43356 - "GET /api/v1/commands HTTP/1.1" 200 OK
agent_comms_api_1  | Generating new command
agent_comms_api_1  | INFO:     172.18.0.1:43356 - "GET /api/v1/commands HTTP/1.1" 200 OK
^CGracefully stopping... (press Ctrl+C again to force)
root@jellyfish:~/wazuh/comms# docker-compose up
Starting comms_agent_comms_api_1 ... done
Attaching to comms_agent_comms_api_1
agent_comms_api_1  | Generating new command
agent_comms_api_1  | INFO:     Started server process [1]
agent_comms_api_1  | INFO:     Waiting for application startup.
agent_comms_api_1  | INFO:     Application startup complete.
agent_comms_api_1  | INFO:     Uvicorn running on http://0.0.0.0:5000 (Press CTRL+C to quit)
agent_comms_api_1  | INFO:     172.18.0.1:37982 - "GET /api/v1/commands HTTP/1.1" 200 OK
agent_comms_api_1  | Generating new command
agent_comms_api_1  | INFO:     172.18.0.1:37982 - "GET /api/v1/commands HTTP/1.1" 200 OK
agent_comms_api_1  | Generating new command

Client

root@jellyfish:~/wazuh-agent/poc/curl/build# ./client 
[2024-06-14 06:43:31] [CLIENT] Starting client...
[2024-06-14 06:43:31] [CLIENT] Reading credentials...
[2024-06-14 06:43:31] [HTTP REQUEST] [LOGIN RESPONSE] {"token":"eyJhbGciOiJFUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJ3YXp1aCIsImF1ZCI6IldhenVoIEFnZW50IENvbW1zIEFQSSIsImlhdCI6MTcxODM1ODIxMSwiZXhwIjoxNzE4MzU5MTExLCJ1dWlkIjoiMDE4ZmU0NzctMzFjOC03NTgwLWFlNGEtZTBiMzY3MTNlYjA1In0.9Czf_RTz7f8jIT3nM4AUBMofOJcBqXaBjh9e2DyUPLFYICOPzE2GwzglQ_1VNwn8yHn0uyE5ZAi3DZrWy3oM0w"}
[2024-06-14 06:43:31] [COMMAND DISPATCHER] Starting command dispatcher thread
[2024-06-14 06:43:31] [EVENT QUEUE MONITOR] Starting event queue thread
[2024-06-14 06:43:31] [HTTP REQUEST] [COMMAND RECEIVED] {"commands":[{"id":"1","type":"restart"}]}
[2024-06-14 06:43:31] [COMMAND DISPATCHER] Dispatching command ID: 0, Data: {"id":"1","type":"restart"}
[2024-06-14 06:43:37] [HTTP REQUEST] [COMMAND RECEIVED] {"commands":[{"id":"1","type":"restart"}]}
[2024-06-14 06:43:37] [COMMAND DISPATCHER] Dispatching command ID: 1, Data: {"id":"1","type":"restart"}
[2024-06-14 06:43:47] [HTTP REQUEST] [COMMAND RECEIVED] {"commands":[{"id":"1","type":"restart"}]}
[2024-06-14 06:43:47] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:43:47] [COMMAND DISPATCHER] Dispatching command ID: 2, Data: {"id":"1","type":"restart"}
[2024-06-14 06:43:48] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:43:49] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:43:50] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:43:51] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:43:52] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:43:53] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:43:54] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:43:55] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:43:56] [HTTP REQUEST] [COMMAND REQUEST FAILED] Couldn't connect to server with code -1
[2024-06-14 06:43:57] [HTTP REQUEST] [COMMAND REQUEST FAILED] Couldn't connect to server with code -1
[2024-06-14 06:43:58] [HTTP REQUEST] [COMMAND REQUEST FAILED] Couldn't connect to server with code -1
[2024-06-14 06:43:59] [HTTP REQUEST] [COMMAND REQUEST FAILED] Failure when receiving data from the peer with code -1
[2024-06-14 06:44:00] [HTTP REQUEST] [COMMAND RECEIVED] {"commands":[{"id":"1","type":"restart"}]}
[2024-06-14 06:44:00] [COMMAND DISPATCHER] Dispatching command ID: 3, Data: {"id":"1","type":"restart"}

@sdvendramini
Copy link
Member

sdvendramini commented Jun 14, 2024

Client Send Events Benchmarks

Introduction

The purpose of these benchmarks is to evaluate the performance of sending pre-loaded pending events using std::thread. The benchmarks create a instance of the client using real tcp request and a real SQLite DB.

Benchmark Results

The results of the benchmarks are categorized based on different parameters to provide a clear and detailed analysis. 100 iterations of each test were made to average the results.

1. Varying Batch Sizes with a Fixed Event Count (1,000 events)
Batch Size Time (manual_time)
1 2572 ms
10 1068 ms
100 830 ms
1,000 856 ms

Analysis: As the batch size increases, the time required for dispatching events significantly decreases initially and then stabilizes. This demonstrates that larger batch sizes are more efficient for dispatching a high number of events.

2. Varying Event Counts with a Fixed Batch Size (100)
Event Count Time (manual_time)
1 2.66 ms
10 10.5 ms
100 84.7 ms
1,000 746 ms

Analysis: The time required increases with the number of events, which is expected.

3. Varying Event Data Sizes with a Fixed Event Count (1000 events) and Batch Size (100)
Event Data Size Time (manual_time)
1 288 ms
10 283 ms
100 260 ms
1,000 682 ms
10,000 694 ms
100,000 1432 ms

Analysis: As the data size increases, the time required for dispatching events increases too. This demonstrates that the request take more time, indicating a potential limit for optimization when handling very large data sizes.

Conclusion

It will be necessary in successive versions of the client and the server to establish an optimal number for the batch size according to the average or maximum size of the events. Although a larger batch is more efficient, when using the whole communication stack and the database, the times get worse for large volumes of information.


The same tests were repeated with AsioThreadManager instead of std::thread. The results are almost the same. Only small differences were found.

1. Varying Batch Sizes with a Fixed Event Count (1,000 events)
Batch Size Time (manual_time)
1 2577 ms
10 1048 ms
100 807 ms
1,000 841 ms
2. Varying Event Counts with a Fixed Batch Size (100)
Event Count Time (manual_time)
1 2.84 ms
10 12.3 ms
100 103 ms
1,000 852 ms
3. Varying Event Data Sizes with a Fixed Event Count (1000 events) and Batch Size (100)
Event Data Size Time (manual_time)
1 296 ms
10 286 ms
100 272 ms
1,000 676 ms
10,000 709 ms
100,000 1632 ms

@jr0me
Copy link
Member

jr0me commented Jun 19, 2024

Update on Event Queue Dispatch Benchmarks

Commit 805436a adds the following benchmark tests to the PoC

Introduction

The purpose of these benchmarks is to evaluate the performance of dispatching pre-loaded pending events using std::thread. The benchmarks have abstracted away a real database and a real connection to a server to which the events are dispatched. Instead, the focus is solely on measuring the time taken to create threads that batch and send the events. One of our objectives is to assess the efficiency of using std::thread.

Benchmark Results

The results of the benchmarks are categorized based on different parameters to provide a clear and detailed analysis. 100 iterations of each test were made to average the results.

1. Varying Batch Sizes with a Fixed Event Count (200,000 events)
Batch Size Time (manual_time)
10 342 ms
100 44.6 ms
1,000 16.6 ms
10,000 13.9 ms
100,000 14.4 ms
200,000 14.7 ms

Analysis: As the batch size increases, the time required for dispatching events significantly decreases initially and then stabilizes. This demonstrates that larger batch sizes are more efficient for dispatching a high number of events.

2. Varying Event Counts with a Fixed Batch Size (100)
Event Count Time (manual_time)
10 0.022 ms
100 0.032 ms
1,000 0.185 ms
10,000 2.01 ms
100,000 21.4 ms
200,000 43.9 ms
300,000 64.1 ms
500,000 106 ms
1,000,000 213 ms

Analysis: The time required increases with the number of events, which is expected. However, the increase is more pronounced as the event count grows, indicating a potential area for optimization when handling very large volumes of events.

3. Varying Event Data Sizes with a Fixed Event Count (100,000 events) and Batch Size (100)
Event Data Size Time (manual_time)
10 21.4 ms
100 21.0 ms
1,000 21.1 ms
10,000 20.9 ms
100,000 21.1 ms
250,000 21.2 ms
500,000 21.0 ms
1,000,000 20.8 ms

Analysis: The dispatch time remains relatively constant regardless of the event data size, suggesting that the current implementation efficiently handles varying sizes of event data.

Conclusion

The benchmark results provide valuable insights into the performance characteristics of using std::thread for event dispatching. The use of std::thread demonstrates efficient handling of varying batch sizes, event counts, and event data sizes. Keeping in mind that there is no DB and server connection involved in these benchmarks, the key takeaways include:

  1. Batch Size Efficiency: Larger batch sizes significantly reduce the time required for dispatching a fixed number of events.
  2. Scalability: While the dispatch time increases with the event count, the system manages up to 1,000,000 events with reasonable efficiency.
  3. Data Size Handling: The dispatch time remains stable across different event data sizes.
  4. Thread Management: The current implementation does not limit the number of threads being created. For optimal performance, thread creation should be managed according to the hardware capabilities and other fine-tuning decisions. Future benchmarks and tuning are necessary to determine the ideal number of threads for various environments.

Overall, the results are good and shed a positive light on the use of std::thread for this purpose. These findings support the consideration of std::thread as a viable alternative to other libraries like libuv or the C++20 coroutines for event dispatching tasks. Further optimization and testing, including the management of thread creation, can help enhance performance, particularly for very large event counts.

It's also worth noting that given these measurements results, the true bottleneck in handling events will more likely come from database transactions and http requests to the server.

@jr0me
Copy link
Member

jr0me commented Jun 25, 2024

Report on Event Queue Dispatch Benchmarks

Introduction

The purpose of these benchmarks is to evaluate the performance of dispatching pre-loaded pending events using std::thread and compare it with using Asio's thread pool. The benchmarks have abstracted away a real database and a real connection to a server to which the events are dispatched. Instead, the focus is solely on measuring the time taken to create threads that batch and send the events. One of our objectives is to assess the efficiency of using std::thread in comparison to Asio's thread pool.

Asio's thread pool has been limited to 32 threads (the result of std::thread::hardware_concurrency() on the machine running the tests).

Benchmark Results

The results of the benchmarks are categorized based on different parameters to provide a clear and detailed analysis.

1. Varying Batch Sizes with a Fixed Event Count (200,000 events)

Using std::thread:

Batch Size Time (manual_time)
10 342 ms
100 44.6 ms
1,000 16.6 ms
10,000 13.9 ms
100,000 14.4 ms
200,000 14.7 ms

Using Asio's Thread Pool:

Batch Size Time (manual_time)
10 47.4 ms
100 17.0 ms
1,000 14.2 ms
10,000 13.7 ms
100,000 15.3 ms
200,000 18.2 ms

Analysis: Asio's thread pool performs significantly better than std::thread for smaller batch sizes (10 and 100). For larger batch sizes, the performance of Asio's thread pool is comparable to std::thread, with Asio showing slightly better efficiency for batch sizes of 1,000 and 10,000.

2. Varying Event Counts with a Fixed Batch Size (100)

Using std::thread:

Event Count Time (manual_time)
10 0.022 ms
100 0.032 ms
1,000 0.185 ms
10,000 2.01 ms
100,000 21.4 ms
200,000 43.9 ms
300,000 64.1 ms
500,000 106 ms
1,000,000 213 ms

Using Asio's Thread Pool:

Event Count Time (manual_time)
10 0.376 ms
100 0.269 ms
1,000 0.385 ms
10,000 1.27 ms
100,000 9.01 ms
200,000 17.1 ms
300,000 25.5 ms
500,000 43.4 ms
1,000,000 88.3 ms

Analysis: Asio's thread pool significantly outperforms std::thread across all event counts, showing better scalability and efficiency as the number of events increases.

3. Varying Event Data Sizes with a Fixed Event Count (100,000 events) and Batch Size (100)

Using std::thread:

Event Data Size Time (manual_time)
10 21.4 ms
100 21.0 ms
1,000 21.1 ms
10,000 20.9 ms
100,000 21.1 ms
250,000 21.2 ms
500,000 21.0 ms
1,000,000 20.8 ms

Using Asio's Thread Pool:

Event Data Size Time (manual_time)
10 9.31 ms
100 9.52 ms
1,000 9.30 ms
10,000 9.16 ms
100,000 9.79 ms
250,000 10.1 ms
500,000 9.94 ms
1,000,000 9.95 ms

Analysis: Asio's thread pool demonstrates a more consistent and lower dispatch time across different event data sizes compared to std::thread.

Conclusion

The benchmark results provide valuable insights into the performance characteristics of using std::thread compared to Asio's thread pool for event dispatching. Key takeaways include:

  1. Batch Size Efficiency: Asio's thread pool shows significant performance improvements for smaller batch sizes and maintains competitive efficiency for larger batch sizes.
  2. Scalability: Asio's thread pool scales more efficiently with increasing event counts, outperforming std::thread by a substantial margin.
  3. Data Size Handling: Asio's thread pool provides consistent and lower dispatch times across varying event data sizes, highlighting its superior efficiency in handling different data sizes.
  4. Thread Management: The current implementation using std::thread does not limit the number of threads being created. Proper thread management, according to hardware capabilities and other fine-tuning decisions, is essential for optimal performance. Asio's thread pool inherently manages threads more effectively, leading to better performance.

Overall, Asio's thread pool demonstrates superior performance and scalability compared to a manual handling of std::threads. These findings support the consideration of Asio's thread pool as a more efficient alternative for event dispatching tasks. Further optimization and testing, including fine-tuning the number of threads and other parameters, can help enhance performance, particularly for very large event counts.

@aritosteles
Copy link

aritosteles commented Jun 25, 2024

Performance comparison between RocksDB and SQLite

The test focused on basic database operations — writing, reading, and updating records — without utilizing multi-threading aiming to provide insights into the efficiency and suitability of each database for potential use in our project.
Default configurations for both RocksDB and SQLite were used.
The test consisted in writing, reading and updating 10000 elements.

Results

image

To do

  • Investigate RocksDB configuration option to write to disk every operation, which will ensure persistence buy may hinder performance.

Update: Performance comparison between RocksDB and SQLite

Using transactions for bulk inserts dramatically improves SQLite performance.

Events Time
10000 26ms
100000 143

There's no meaningful difference in terms of performance between SQLite and RocksDB.

@aritosteles aritosteles self-assigned this Jun 25, 2024
@TomasTurina
Copy link
Member Author

Conclusion

  • After analyzing different libraries for sending messages using HTTP, we decided to use the library wazuh-http-request which is a wrapper for libcurl, due to its simplicity and because it covers all the use cases we need.
  • A directory layout was proposed here and implemented in the new Wazuh agent repository.
  • The behavior of the new agent was described and detailed in the following UML diagrams:
  • A POC was developed and all key concepts were tested, allowing us to validate the initial design and be ready to start working on the MVP. Resume here.
  • The POC was integrated with the server's POC. Everything worked with minor changes necessary.
  • Benchmark tests were run to validate that the proposed solution will work as fast as we need. Results here and here.
  • Initial tests were run to select a database library. Both SQLite and RocksDB will be considered.

We are ready to move forward with the implementation of the MVP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment