-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Agent stateful modules redesign #2
Comments
Define layout (v1)Possible directory structure (TBD)fim_project/
├── CMakeLists.txt
├── src/
│ ├── CMakeLists.txt
│ ├── fim/
│ │ ├── FIM.cpp
│ │ ├── FIM.h
│ │ └── ...
│ ├── dbsync/
│ │ ├── DBsync.cpp
│ │ ├── DBsync.h
│ │ └── ...
│ ├── persistence/
│ │ ├── Persistence.cpp
│ │ ├── Persistence.h
│ │ └── ...
│ └── main.cpp
├── include/
│ ├── fim/
│ │ ├── FIM.h
│ │ └── ...
│ ├── dbsync/
│ │ ├── DBsync.h
│ │ └── ...
│ └── persistence/
│ ├── Persistence.h
│ └── ...
├── tests/
│ ├── CMakeLists.txt
│ ├── test_fim.cpp
│ ├── test_dbsync.cpp
│ └── test_persistence.cpp
└── docs/
├── design.md
└── usage.md Steps to Define the Layout
Possible CMake
cmake_minimum_required(VERSION 3.10)
project(FIM_Project)
set(CMAKE_CXX_STANDARD 17)
# Add subdirectories
add_subdirectory(src)
add_subdirectory(tests)
# Include directories
include_directories(include)
# Add the FIM library
add_library(fim STATIC
fim/FIM.cpp
fim/FIM.h
# Add other FIM source files if needed
)
# Add the DBsync library
add_library(dbsync STATIC
dbsync/DBsync.cpp
dbsync/DBsync.h
# Add other DBsync source files if needed
)
# Add the Persistence library
add_library(persistence STATIC
persistence/Persistence.cpp
persistence/Persistence.h
# Add other Persistence source files if needed
)
# Add include directories for the libraries
target_include_directories(fim PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/fim)
target_include_directories(dbsync PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/dbsync)
target_include_directories(persistence PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/persistence)
# Create the main executable
add_executable(main main.cpp)
# Link the libraries to the main executable
target_link_libraries(main fim dbsync persistence)
# Find the GTest package
find_package(GTest REQUIRED)
include_directories(${GTEST_INCLUDE_DIRS})
# Add test executables
add_executable(test_fim test_fim.cpp)
target_link_libraries(test_fim ${GTEST_LIBRARIES} pthread fim)
add_executable(test_dbsync test_dbsync.cpp)
target_link_libraries(test_dbsync ${GTEST_LIBRARIES} pthread dbsync)
add_executable(test_persistence test_persistence.cpp)
target_link_libraries(test_persistence ${GTEST_LIBRARIES} pthread persistence)
# Enable testing
enable_testing()
# Add tests
add_test(NAME test_fim COMMAND test_fim)
add_test(NAME test_dbsync COMMAND test_dbsync)
add_test(NAME test_persistence COMMAND test_persistence)
Approval SectionThe team must review and approve the proposed design, or iterate with changes until the final design is approved.
|
Define layout (v2)After analyzing the previous proposal with the team, we decided to stick to the structure defined for the wazuh-agent project. Making some small additions in this branch only for the present spike. Steps to Define the Layout
wazuh-agent/
├── CMakeLists.txt
├── modules/
│ ├── fim/ (former syscheck)
│ │ ├── CMakeLists.txt
│ │ ├── main.cpp
│ │ ├── include/
│ │ │ ├── FIM.h
│ │ │ └── ...
│ │ ├── src/
│ │ │ ├── FIM.cpp
│ │ │ ├── FIM.h
│ │ │ └── ...
│ │ └── tests/
│ │ ├── CMakeLists.txt
│ │ └── test_fim.cpp
│ └── [additional modules...]
├── common/
│ ├── dbsync/
│ └── [additional modules...]
└── build/
└── [build output...] Possible CMake
cmake_minimum_required(VERSION 3.10)
project(WazuhAgent)
set(CMAKE_CXX_STANDARD 17)
# Add subdirectories
add_subdirectory(modules/fim)
add_subdirectory(common/dbsync)
# Enable testing
enable_testing()
# Add subdirectories for tests
add_subdirectory(modules/fim/tests)
add_subdirectory(common/dbsync/tests)
# Include directories
include_directories(modules/fim/include)
include_directories(common/dbsync/include)
# Add the FIM library
add_library(fim STATIC
src/FIM.cpp
include/FIM.h
# Add other FIM source files if needed
)
# Add include directories for the FIM library
target_include_directories(fim PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
# Create the main executable
add_executable(fim_main main.cpp)
# Link the libraries to the main executable
target_link_libraries(fim_main fim dbsync)
# Find the GTest package
find_package(GTest REQUIRED)
include_directories(${GTEST_INCLUDE_DIRS})
# Add test executables
add_executable(test_fim test_fim.cpp)
target_link_libraries(test_fim ${GTEST_LIBRARIES} pthread fim)
# Enable testing
enable_testing()
# Add tests
add_test(NAME test_fim COMMAND test_fim)
# Add the DBsync library
add_library(dbsync STATIC
src/DBsync.cpp
include/DBsync.h
# Add other DBsync source files if needed
)
# Add include directories for the DBsync library
target_include_directories(dbsync PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
|
Some basic steps needed after a quick catch-up with the issue (WIP) :
├── syscheckd
│ ├── build
│ ├── CMakeLists.txt
│ ├── coverage_report
│ ├── include
│ └── src
if(CMAKE_SYSTEM_NAME STREQUAL "Windows")
add_library(wazuh-syscheckd STATIC ${SYSCHECKD_SRC})
add_library(wazuh-syscheckd-event STATIC ${SYSCHECKD_SRC})
target_compile_definitions(wazuh-syscheckd PUBLIC -D_WIN32_WINNT=0x600)
target_compile_definitions(wazuh-syscheckd-event PUBLIC -D_WIN32_WINNT=0x600 -DEVENTCHANNEL_SUPPORT)
else()
# Create a shared library
add_library(wazuh-syscheckd SHARED ${SYSCHECKD_SRC})
# Optionally, create a static library as well
add_library(wazuh-syscheckd-static STATIC ${SYSCHECKD_SRC})
endif(CMAKE_SYSTEM_NAME STREQUAL "Windows")
|
FIM Module Transformation :Convert the File Integrity Monitoring (FIM) module into a library:
Include necessary dependencies for the FIM library:
|
Define layout (v3)After analyzing the layout v2 with the team, @LucioDonda's proposal here, and the redefinition layout here. We determined that the new structure will be as follows: wazuh-agent/
├── src/
│ ├── CMakeLists.txt
│ ├── modules/
│ │ ├── fim/ (former syscheck)
│ │ │ ├── CMakeLists.txt
│ │ │ ├── build/
│ │ │ │ └── [build output...]
│ │ │ ├── main.cpp
│ │ │ ├── include/
│ │ │ │ ├── fim.h
│ │ │ │ └── ...
│ │ │ ├── src/
│ │ │ │ ├── fim.cpp
│ │ │ │ ├── fim.h
│ │ │ │ └── ...
│ │ │ └── tests/
│ │ │ ├── CMakeLists.txt
│ │ │ └── test_fim.cpp
│ │ └── [additional modules...]
│ ├── common/
│ │ ├── dbsync/
│ │ └── [additional modules...]
│ └── build/
│ └── [build output...]
├── etc/
│ ├── config/
│ ├── selinux/
│ └── ruleset/
│ ├── sca/
│ └── rootcheck/
├── packages/
│ └── installers/
│ ├── unix/ (former init folder, including upgrade.sh and install.sh)
│ └── win32/
└── bump-version.sh |
DBsync IntegrationImport DBsync and utilize it as the engine for state difference calculations. Below is the proposed sequence diagram: sequenceDiagram
participant Agent as Agent
participant Queue as Queue
participant Fim as FIM
participant OS as os
participant Dbsync as dbsync
participant FimDB as fim.db
loop scan (each frequency seconds)
loop each directory
Fim ->> OS: get data
OS -->> Fim: info
Fim ->> Dbsync: File state
Dbsync ->> FimDB: Update DB
FimDB -->> Dbsync:
Dbsync -->> Fim: File state changed
Fim -->> Queue: message (/stateful)
Agent ->> Queue: poll (/stateful)
Queue -->> Agent: event (/stateful)
Fim -->> Queue: message (/stateless)
Agent ->> Queue: poll (/stateless)
Queue -->> Agent: event (/stateless)
end
end
As for the persistence of the FIM database, it is necessary to create an instance of DBSync in 'persistent' mode: Giving DBSync the capability of DB persistance /**
* @brief Creates a new DBSync instance (wrapper)
*
* @param host_type Dynamic library host type to be used.
* @param db_type Database type to be used (currently only supported SQLITE3)
* @param path Path where the local database will be created.
* @param sql_statement SQL sentence to create tables in a SQL engine.
* @param upgrade_statements SQL sentences to upgrade tables in a SQL engine.
*
* @note db_management will be DbManagement::PERSISTENT as default.
*
* @return Handle instance to be used for common sql operations (cannot be used by more than 1 thread).
*/
EXPORTED DBSYNC_HANDLE dbsync_create_persistent(const HostType host_type,
const DbEngineType db_type,
const char* path,
const char* sql_statement,
const char** upgrade_statements); |
3. Persistence/Storage System (Queue)PoC ObjectiveDevelop a simple test that meets the following points.
Code layout└── queue
├── build
├── CMakeLists.txt
├── include
│ └── json.hpp
├── main.cpp
├── queue.cpp
├── queue.hpp
└── sealed_bucket_persistence.json Test DescriptionThe purpose of this test is to evaluate the functionality and performance of the Queue system, which manages messages with a fixed capacity and ensures persistence. The test involves multiple producer and consumer threads handling different types of messages (STATEFUL and STATELESS). Test Setup:
Code#include "queue.hpp"
#include <iostream>
#include <thread>
#include <chrono>
/* initialization sets a limit of 10 messages for the bucket, ensuring that the queue will block inserts if this limit is reached */
#define SEALED_BUCKET_MAX 30
#define NUMBER_OF_STATEFUL_MSG_TO_GENERATE 50
#define NUMBER_OF_STATELESS_MSG_TO_GENERATE 20
void stateful_producer(SealedBucket& bucket) {
for (int i = 0; i < NUMBER_OF_STATEFUL_MSG_TO_GENERATE; ++i) {
Message msg{MessageType::STATEFUL, "Stateful message " + std::to_string(i)};
bucket.insert(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void stateless_producer(SealedBucket& bucket) {
for (int i = 0; i < NUMBER_OF_STATELESS_MSG_TO_GENERATE; ++i) {
Message msg{MessageType::STATELESS, "Stateless message " + std::to_string(i)};
bucket.insert(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void stateful_consumer(SealedBucket& bucket) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait for some messages to be produced
while (true) {
auto messages = bucket.poll(MessageType::STATEFUL, 5);
if (messages.empty()) {
break;
}
for (const auto& msg : messages) {
std::cout << "Consumer 1, Polled: " << msg.content << std::endl;
}
bucket.acknowledge(messages);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
void stateless_consumer(SealedBucket& bucket) {
std::this_thread::sleep_for(std::chrono::seconds(1)); // Wait for some messages to be produced
while (true) {
auto messages = bucket.poll(MessageType::STATELESS, 5);
if (messages.empty()) {
break;
}
for (const auto& msg : messages) {
std::cout << "Consumer 2, polled: " << msg.content << std::endl;
}
bucket.acknowledge(messages);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
int main() {
SealedBucket bucket(SEALED_BUCKET_MAX);
std::thread stateful_prod_thread(stateful_producer, std::ref(bucket));
std::thread stateless_prod_thread(stateless_producer, std::ref(bucket));
std::thread stateful_cons_thread(stateful_consumer, std::ref(bucket));
std::thread stateless_cons_thread(stateless_consumer, std::ref(bucket));
stateful_prod_thread.join();
stateless_prod_thread.join();
stateful_cons_thread.join();
stateless_cons_thread.join();
return 0;
}
Sample Output:Consumer 1, Polled: Stateful message 0
Consumer 2, Polled: Stateless message 0 This test ensures that the SealedBucket handles concurrent access, maintains persistence, and properly blocks when full, providing a robust solution for message queue management. How to test it
|
FIM Module Transformation proccess to follow(WIP):
Pseudo-code int start_fim()
{
// Options and configurations
#ifndef WIN32
/* Set the name *
/* Change current working directory */
/* Check if the group given is valid */
/* Privilege separation */
#endif
/* Initialize error logging for shared modulesd dbsync */
// Options and configurations
/* Read internal options */
/* Check if the configuration is present */
/* Read syscheck config */
/* Rootcheck config */
#ifdef USE_MAGIC
/* Setup libmagic */ --> is this still neccesary
#endif
// Communication
#ifndef WIN32
/* Start signal handling */
// Start com request thread
/* Create pid */
/* Connect to the queue */
/* Start up message */
#else
/* Start up message */
#endif
// Print Information
/* Print directories to be monitored, ignores, sregex ignores, files with no diff. */
/* WIN registry ignores*/
/* Check directories set for real time */
#ifndef WIN32
if (start_realtime == 1) {
realtime_start();
}
// Audit events thread -> whodata
#else
foreach(syscheck.directories) {
if (dir_it->options & REALTIME_ACTIVE) {
realtime_start();
}
}
#endif
fim_initialize();
start_daemon();
return (0);
}
The simplest approach would be to move all the declarations to their respective headers and include those in the
Details
classDiagram
class wmodule~ConfigType~ {
<<Template>>
-thread: std::thread
-context: wm_context*
-data: ConfigType*
+wmodule(ctx: wm_context&, d: ConfigType*)
~wmodule()
}
class wm_context {
-name: std::string
-start: std::function~void()
-destroy: std::function~void(ConfigType*)
-dump: std::function~nlohmann::json(const ConfigType*)
-sync: std::function~int(const std::string&)
-stop: std::function~void(ConfigType*)
-query: std::function~void(ConfigType*, nlohmann::json&)
}
wmodule *-- wm_context
class SysCollectorConfig {
<<ConfigType>>
- diff
- directories
- disabled
- ignore
- max_eps
- max_files_per_second
- registry_ignore
- scan_x
- skip_x
- synchronization
- whodata
- windows_registry
}
class SysCollector {
- wm_sys_main()
- wm_sys_destroy(void*)
- wm_sys_dump(const void*): nlohmann::json
- wm_sync_message(const std::string&): int
- wm_sys_stop(void*)
- wm_sys_query(void*, nlohmann::json&)
}
SysCollector ..> SysCollectorConfig : uses
SysCollector ..> wmodule~SysCollectorConfig~ : instantiates
note for SysCollector "Example instantiation\nof wmodule with\nSysCollectorConfig"
In this section the basic idea would be to:
classDiagram
class wmodule~ConfigType~ {
<<Template>>
-thread: std::thread
-context: wm_context*
-data: ConfigType*
+wmodule(ctx: wm_context&, d: ConfigType*)
~wmodule()
}
class wm_context {
-name: std::string
-start: std::function~void()~
-destroy: std::function~void(ConfigType*)~
-dump: std::function~nlohmann::json(const ConfigType*)~
-sync: std::function~int(const std::string&)~
-stop: std::function~void(ConfigType*)~
-query: std::function~void(ConfigType*, nlohmann::json&)~
}
wmodule *-- wm_context
class SysCollectorConfig {
<<ConfigType>>
# Configuration data members
}
class SysCollector {
<<Instantiation>>
- wm_sys_main()
- wm_sys_destroy(void*)
- wm_sys_dump(const void*): nlohmann::json
- wm_sync_message(const std::string&): int
- wm_sys_stop(void*)
- wm_sys_query(void*, nlohmann::json&)
}
SysCollector ..> SysCollectorConfig : uses
SysCollector ..> wmodule~SysCollectorConfig~ : instantiates
note for SysCollector "Example instantiation\nof wmodule with\nSysCollectorConfig"
And a basic code for this would look like: #include <functional>
#include <nlohmann/json.hpp>
#include <string>
#include <thread>
// Module context and main module structure
template <typename ConfigType>
class wmodule {
public:
// Module context
struct wm_context {
std::string name; // Name for module
std::function<void()> start; // Main function
std::function<nlohmann::json(const ConfigType*)> dump; // Dump current configuration
std::function<int(const std::string&)> sync; // Sync
std::function<void(ConfigType*)> stop; // Module destructor
std::function<void(ConfigType*, nlohmann::json&)> query; // Run a query
};
std::thread thread; // Thread
const wm_context* context; // Context (common structure)
ConfigType* data; // Data (module-dependent structure)
// Constructor
wmodule(const wm_context& ctx, ConfigType* d)
: context(&ctx), data(d) {}
// Destructor
~wmodule() {
context->destroy(data);
}
}; |
Parent issue:
Description
In this spike we're going to write a program which will test the persistence model within the agent. We will transform FIM module into a library which will be used to detect file changes.
FIM will ensure the state generated is persisted, across program restarts. On a restart, differences between the saved state and new changes will be calculated locally, generating change events.
We will design how these events will reach the Wazuh servers in:
Agent comms API
endpoint client #1This will give us a vision about the current persistence capabilities as we want expand this model to the rest of the modules.
During this exploration we will investigate how this model could be applied to other modules such as SCA and Syscollector.
The following diagrams present an overall design for the agent, but these will change during the development of the spike.
Component diagram
Data flow
Implementation Restrictions
Plan
Define Layout:
FIM Module Transformation:
Persistence/Storage System:
DBsync Integration:
Proof of Concept (nice-to-have):
The text was updated successfully, but these errors were encountered: