Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate configuration to ossec.conf #22782

Merged
5 changes: 3 additions & 2 deletions framework/scripts/tests/test_wazuh_clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,11 @@ def start(cls):
return 'HAPHELPER_START'


async def gather(first, second, third):
async def gather(first, second):
assert first == 'MASTER_START'
assert second == 'LOCALSERVER_START'
assert third == 'HAPHELPER_START'
# FIXME: (20940) When write UT for the new components.
# assert third == 'HAPHELPER_START'


wazuh_clusterd.cluster_utils = cluster_utils
Expand Down
14 changes: 9 additions & 5 deletions framework/scripts/wazuh_clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def set_logging(foreground_mode=False, debug_mode=0) -> WazuhLogger:

def print_version():
"""Print Wazuh metadata."""
from wazuh.core.cluster import __version__, __author__, __wazuh_name__, __licence__
from wazuh.core.cluster import __author__, __licence__, __version__, __wazuh_name__
print(f"\n{__wazuh_name__} {__version__} - {__author__}\n\n{__licence__}")


Expand Down Expand Up @@ -81,7 +81,7 @@ async def master_main(args: argparse.Namespace, cluster_config: dict, cluster_it
logger : WazuhLogger
Cluster logger.
"""
from wazuh.core.cluster import master, local_server
from wazuh.core.cluster import local_server, master
from wazuh.core.cluster.hap_helper.hap_helper import HAPHelper

cluster_utils.context_tag.set('Master')
Expand All @@ -96,7 +96,10 @@ async def master_main(args: argparse.Namespace, cluster_config: dict, cluster_it
concurrency_test=args.concurrency_test, node=my_server,
configuration=cluster_config, enable_ssl=args.ssl,
cluster_items=cluster_items)
await asyncio.gather(my_server.start(), my_local_server.start(), HAPHelper.start())
tasks = [my_server, my_local_server]
if not cluster_config.get(cluster_utils.HAPROXY_HELPER, {}).get(cluster_utils.HAPROXY_DISABLED, True):
tasks.append(HAPHelper)
await asyncio.gather(*[task.start() for task in tasks])


#
Expand All @@ -116,8 +119,9 @@ async def worker_main(args: argparse.Namespace, cluster_config: dict, cluster_it
logger : WazuhLogger
Cluster logger.
"""
from wazuh.core.cluster import worker, local_server
from concurrent.futures import ProcessPoolExecutor

from wazuh.core.cluster import local_server, worker
cluster_utils.context_tag.set('Worker')

# Pool is defined here so the child process is not recreated when the connection with master node is broken.
Expand Down Expand Up @@ -254,7 +258,7 @@ def main():

if __name__ == '__main__':
import wazuh.core.cluster.utils as cluster_utils
from wazuh.core import pyDaemonModule, common, configuration
from wazuh.core import common, configuration, pyDaemonModule

cluster_items = cluster_utils.get_cluster_items()
original_sig_handler = signal.signal(signal.SIGTERM, exit_handler)
Expand Down
43 changes: 0 additions & 43 deletions framework/wazuh/core/cluster/hap_helper/configuration.py

This file was deleted.

55 changes: 0 additions & 55 deletions framework/wazuh/core/cluster/hap_helper/data/configuration.yaml

This file was deleted.

This file was deleted.

110 changes: 86 additions & 24 deletions framework/wazuh/core/cluster/hap_helper/hap_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,42 @@
from asyncio import sleep
from math import ceil, floor

from wazuh.core.cluster.hap_helper.configuration import parse_configuration
from wazuh.core.cluster.hap_helper.proxy import Proxy, ProxyAPI, ProxyServerState
from wazuh.core.cluster.hap_helper.wazuh import WazuhAgent, WazuhDAPI
from wazuh.core.cluster.utils import ClusterFilter, context_tag, get_cluster_items
from wazuh.core.cluster.utils import (
AGENT_CHUNK_SIZE,
AGENT_RECONNECTION_STABILITY_TIME,
AGENT_RECONNECTION_TIME,
EXCLUDED_NODES,
FREQUENCY,
HAPROXY_BACKEND,
HAPROXY_PORT,
HAPROXY_PROTOCOL,
HAPROXY_RESOLVER,
IMBALANCE_TOLERANCE,
REMOVE_DISCONNECTED_NODE_AFTER,
ClusterFilter,
context_tag,
get_cluster_items,
read_cluster_config,
)
from wazuh.core.configuration import get_ossec_conf
from wazuh.core.exception import WazuhException, WazuhHAPHelperError

HELPER_DEFAULTS = {
HAPROXY_PORT: 5555,
HAPROXY_PROTOCOL: 'http',
HAPROXY_BACKEND: 'wazuh_cluster',
HAPROXY_RESOLVER: None,
EXCLUDED_NODES: [],
FREQUENCY: 60,
AGENT_CHUNK_SIZE: 120,
AGENT_RECONNECTION_TIME: 5,
AGENT_RECONNECTION_STABILITY_TIME: 60,
IMBALANCE_TOLERANCE: 0.1,
REMOVE_DISCONNECTED_NODE_AFTER: 3,
GGP1 marked this conversation as resolved.
Show resolved Hide resolved
}


class HAPHelper:
"""Helper to balance Wazuh agents through cluster calling HAProxy."""
Expand All @@ -16,18 +46,29 @@ class HAPHelper:
AGENT_STATUS_SYNC_TIME: int = 25 # Default agent notify time + cluster sync + 5s
SERVER_ADMIN_STATE_DELAY: int = 5

def __init__(self, proxy: Proxy, wazuh_dapi: WazuhDAPI, tag: str, options: dict):
def __init__(
self,
proxy: Proxy,
wazuh_dapi: WazuhDAPI,
tag: str,
sleep_time: float,
agent_reconnection_stability_time: int,
agent_reconnection_time: int,
agent_reconnection_chunk_size: int,
agent_tolerance: float,
remove_disconnected_node_after: int,
):
self.tag = tag
self.logger = self._get_logger(self.tag)
self.proxy = proxy
self.wazuh_dapi = wazuh_dapi

self.sleep_time: int = options['sleep_time']
self.agent_reconnection_stability_time: int = options['agent_reconnection_stability_time']
self.agent_reconnection_chunk_size: int = options['agent_reconnection_chunk_size']
self.agent_reconnection_time: int = options['agent_reconnection_time']
self.agent_tolerance: float = options['agent_tolerance']
self.remove_disconnected_node_after: int = options['remove_disconnected_node_after']
self.sleep_time = sleep_time
self.agent_reconnection_stability_time = agent_reconnection_stability_time
self.agent_reconnection_chunk_size = agent_reconnection_chunk_size
self.agent_reconnection_time = agent_reconnection_time
self.agent_tolerance = agent_tolerance
self.remove_disconnected_node_after = remove_disconnected_node_after

@staticmethod
def _get_logger(tag: str) -> logging.Logger:
Expand Down Expand Up @@ -442,33 +483,52 @@ def get_connection_retry() -> int:
@classmethod
async def start(cls):
"""Initialize and run HAPHelper."""
tag = 'HAPHelper'
context_tag.set(tag)
logger = HAPHelper._get_logger(tag)

try:
configuration = parse_configuration()
tag = 'HAPHelper'
helper_config = read_cluster_config()['haproxy_helper']
port_config = get_ossec_conf(section='remote', field='port')
nico-stefani marked this conversation as resolved.
Show resolved Hide resolved

proxy_api = ProxyAPI(
username=configuration['proxy']['api']['user'],
password=configuration['proxy']['api']['password'],
username=helper_config['haproxy_user'],
password=helper_config['haproxy_password'],
tag=tag,
address=configuration['proxy']['api']['address'],
port=configuration['proxy']['api']['port'],
protocol=configuration['proxy']['api']['protocol'],
address=helper_config['haproxy_address'],
port=helper_config.get(HAPROXY_PORT, HELPER_DEFAULTS[HAPROXY_PORT]),
protocol=helper_config.get(HAPROXY_PROTOCOL, HELPER_DEFAULTS[HAPROXY_PROTOCOL]),
)
proxy = Proxy(
wazuh_backend=configuration['proxy']['backend'],
wazuh_connection_port=configuration['wazuh']['connection']['port'],
wazuh_backend=helper_config.get(HAPROXY_BACKEND, HELPER_DEFAULTS[HAPROXY_BACKEND]),
wazuh_connection_port=int(port_config.get('remote')[0].get('port')),
proxy_api=proxy_api,
tag=tag,
resolver=configuration['proxy'].get('resolver', None),
resolver=helper_config.get(HAPROXY_RESOLVER, HELPER_DEFAULTS[HAPROXY_RESOLVER]),
)

wazuh_dapi = WazuhDAPI(
tag=tag,
excluded_nodes=configuration['wazuh']['excluded_nodes'],
excluded_nodes=helper_config.get(EXCLUDED_NODES, HELPER_DEFAULTS[EXCLUDED_NODES]),
)

helper = cls(proxy=proxy, wazuh_dapi=wazuh_dapi, tag=tag, options=configuration['hap_helper'])
helper = cls(
proxy=proxy,
wazuh_dapi=wazuh_dapi,
tag=tag,
sleep_time=int(helper_config.get(FREQUENCY, HELPER_DEFAULTS[FREQUENCY])),
agent_reconnection_stability_time=helper_config.get(
AGENT_RECONNECTION_STABILITY_TIME, HELPER_DEFAULTS[AGENT_RECONNECTION_STABILITY_TIME]
),
agent_reconnection_time=helper_config.get(
AGENT_RECONNECTION_TIME, HELPER_DEFAULTS[AGENT_RECONNECTION_TIME]
),
agent_reconnection_chunk_size=helper_config.get(AGENT_CHUNK_SIZE, HELPER_DEFAULTS[AGENT_CHUNK_SIZE]),
agent_tolerance=helper_config.get(IMBALANCE_TOLERANCE, HELPER_DEFAULTS[IMBALANCE_TOLERANCE]),
remove_disconnected_node_after=helper_config.get(
REMOVE_DISCONNECTED_NODE_AFTER, HELPER_DEFAULTS[REMOVE_DISCONNECTED_NODE_AFTER]
),
)

await helper.initialize_proxy()

Expand All @@ -482,11 +542,13 @@ async def start(cls):
if helper.proxy.hard_stop_after is None:
await helper.set_hard_stop_after()

helper.logger.info('Starting HAProxy Helper')
logger.info('Starting HAProxy Helper')
await helper.manage_wazuh_cluster_nodes()
except KeyError as exc:
logger.error(f'Missing configuration {exc}. The helper cannot start.')
except KeyboardInterrupt:
pass
except Exception as unexpected_exc:
helper.logger.critical(f'Unexpected exception: {unexpected_exc}', exc_info=True)
logger.critical(f'Unexpected exception: {unexpected_exc}', exc_info=True)
finally:
helper.logger.info('Process ended')
logger.info('Process ended')
nico-stefani marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading