diff --git a/pytest_splunk_addon/splunk.py b/pytest_splunk_addon/splunk.py index 11d1dcf25..a5ec32185 100644 --- a/pytest_splunk_addon/splunk.py +++ b/pytest_splunk_addon/splunk.py @@ -172,6 +172,15 @@ def pytest_addoption(parser): default="514", help="SC4S Port. default is 514", ) + group.addoption( + "--thread-count", + action="store", + default=20, + dest="thread_count", + help=( + "Thread count for Data ingestion" + ), + ) group.addoption( "--search-index", action="store", @@ -503,7 +512,8 @@ def splunk_ingest_data(request, splunk_hec_uri, sc4s): "sc4s_host": sc4s[0], # for sc4s "sc4s_port": sc4s[1][514] # for sc4s } - IngestorHelper.ingest_events(ingest_meta_data, addon_path, config_path) + thread_count = int(request.config.getoption("thread_count")) + IngestorHelper.ingest_events(ingest_meta_data, addon_path, config_path, thread_count) sleep(50) if ("PYTEST_XDIST_WORKER" in os.environ): with open(os.environ.get("PYTEST_XDIST_TESTRUNUID") + "_wait", "w+"): diff --git a/pytest_splunk_addon/standard_lib/event_ingestors/base_event_ingestor.py b/pytest_splunk_addon/standard_lib/event_ingestors/base_event_ingestor.py index e27dadca3..58e85f9a1 100644 --- a/pytest_splunk_addon/standard_lib/event_ingestors/base_event_ingestor.py +++ b/pytest_splunk_addon/standard_lib/event_ingestors/base_event_ingestor.py @@ -11,5 +11,5 @@ def __init__(self, required_configs): pass @abc.abstractmethod - def ingest(self, event_object): + def ingest(self, event_object, thread_count): raise NotImplementedError diff --git a/pytest_splunk_addon/standard_lib/event_ingestors/hec_event_ingestor.py b/pytest_splunk_addon/standard_lib/event_ingestors/hec_event_ingestor.py index a3816920e..10bd26364 100644 --- a/pytest_splunk_addon/standard_lib/event_ingestors/hec_event_ingestor.py +++ b/pytest_splunk_addon/standard_lib/event_ingestors/hec_event_ingestor.py @@ -32,7 +32,7 @@ def __init__(self, required_configs): self.hec_uri = required_configs.get("splunk_hec_uri") self.session_headers = required_configs.get("session_headers") - def ingest(self, events): + def ingest(self, events, thread_count): """ Ingests event and metric data into splunk using HEC token via event endpoint. diff --git a/pytest_splunk_addon/standard_lib/event_ingestors/hec_metric_ingestor.py b/pytest_splunk_addon/standard_lib/event_ingestors/hec_metric_ingestor.py index 1a03b3ae8..b7509700a 100644 --- a/pytest_splunk_addon/standard_lib/event_ingestors/hec_metric_ingestor.py +++ b/pytest_splunk_addon/standard_lib/event_ingestors/hec_metric_ingestor.py @@ -31,7 +31,7 @@ def __init__(self, required_configs): self.hec_uri = required_configs.get('splunk_hec_uri') self.session_headers = required_configs.get('session_headers') - def ingest(self, data): + def ingest(self, data, thread_count): """ Ingests event and metric data into splunk using HEC token via event endpoint. Args: diff --git a/pytest_splunk_addon/standard_lib/event_ingestors/hec_raw_ingestor.py b/pytest_splunk_addon/standard_lib/event_ingestors/hec_raw_ingestor.py index e66e4d908..fce89df5c 100644 --- a/pytest_splunk_addon/standard_lib/event_ingestors/hec_raw_ingestor.py +++ b/pytest_splunk_addon/standard_lib/event_ingestors/hec_raw_ingestor.py @@ -32,7 +32,7 @@ def __init__(self, required_configs): self.hec_uri = required_configs['splunk_hec_uri'] self.session_headers = required_configs['session_headers'] - def ingest(self, events): + def ingest(self, events, thread_count): """ Ingests data into splunk via raw endpoint. diff --git a/pytest_splunk_addon/standard_lib/event_ingestors/ingestor_helper.py b/pytest_splunk_addon/standard_lib/event_ingestors/ingestor_helper.py index 2e32c0e09..97f64f87f 100644 --- a/pytest_splunk_addon/standard_lib/event_ingestors/ingestor_helper.py +++ b/pytest_splunk_addon/standard_lib/event_ingestors/ingestor_helper.py @@ -30,7 +30,7 @@ def get_event_ingestor(cls, input_type, ingest_meta_data): return ingestor @classmethod - def ingest_events(cls, ingest_meta_data, addon_path, config_path): + def ingest_events(cls, ingest_meta_data, addon_path, config_path, thread_count): """ Events are ingested in the splunk. Args: @@ -57,4 +57,4 @@ def ingest_events(cls, ingest_meta_data, addon_path, config_path): for input_type, events in ingestor_dict.items(): event_ingestor = cls.get_event_ingestor(input_type, ingest_meta_data) - event_ingestor.ingest(events) + event_ingestor.ingest(events, thread_count) diff --git a/pytest_splunk_addon/standard_lib/event_ingestors/sc4s_event_ingestor.py b/pytest_splunk_addon/standard_lib/event_ingestors/sc4s_event_ingestor.py index c24cebbc4..4e80c8fcd 100644 --- a/pytest_splunk_addon/standard_lib/event_ingestors/sc4s_event_ingestor.py +++ b/pytest_splunk_addon/standard_lib/event_ingestors/sc4s_event_ingestor.py @@ -5,8 +5,6 @@ import concurrent.futures from .base_event_ingestor import EventIngestor -THREAD_POOL = 20 - class SC4SEventIngestor(EventIngestor): """ @@ -28,7 +26,7 @@ def __init__(self, required_configs): self.sc4s_port = required_configs['sc4s_port'] self.server_address = (required_configs['sc4s_host'], required_configs['sc4s_port']) - def ingest(self, events): + def ingest(self, events, thread_count): """ Ingests events in the splunk via sc4s (Single/Batch of Events) @@ -40,7 +38,7 @@ def ingest(self, events): for event in events: raw_events.extend(event.event.splitlines()) - with concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_POOL) as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as executor: _ = list(executor.map(self.ingest_event, raw_events)) def ingest_event(self, event):