Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion pytest_splunk_addon/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,15 @@ def pytest_addoption(parser):
default="514",
help="SC4S Port. default is 514",
)
group.addoption(
"--thread-count",
action="store",
default=20,
dest="thread_count",
help=(
"Thread count for Data ingestion"
),
)
group.addoption(
"--search-index",
action="store",
Expand Down Expand Up @@ -503,7 +512,8 @@ def splunk_ingest_data(request, splunk_hec_uri, sc4s):
"sc4s_host": sc4s[0], # for sc4s
"sc4s_port": sc4s[1][514] # for sc4s
}
IngestorHelper.ingest_events(ingest_meta_data, addon_path, config_path)
thread_count = int(request.config.getoption("thread_count"))
IngestorHelper.ingest_events(ingest_meta_data, addon_path, config_path, thread_count)
sleep(50)
if ("PYTEST_XDIST_WORKER" in os.environ):
with open(os.environ.get("PYTEST_XDIST_TESTRUNUID") + "_wait", "w+"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ def __init__(self, required_configs):
pass

@abc.abstractmethod
def ingest(self, event_object):
def ingest(self, event_object, thread_count):
raise NotImplementedError
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, required_configs):
self.hec_uri = required_configs.get("splunk_hec_uri")
self.session_headers = required_configs.get("session_headers")

def ingest(self, events):
def ingest(self, events, thread_count):
"""
Ingests event and metric data into splunk using HEC token via event endpoint.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, required_configs):
self.hec_uri = required_configs.get('splunk_hec_uri')
self.session_headers = required_configs.get('session_headers')

def ingest(self, data):
def ingest(self, data, thread_count):
"""
Ingests event and metric data into splunk using HEC token via event endpoint.
Args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, required_configs):
self.hec_uri = required_configs['splunk_hec_uri']
self.session_headers = required_configs['session_headers']

def ingest(self, events):
def ingest(self, events, thread_count):
"""
Ingests data into splunk via raw endpoint.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def get_event_ingestor(cls, input_type, ingest_meta_data):
return ingestor

@classmethod
def ingest_events(cls, ingest_meta_data, addon_path, config_path):
def ingest_events(cls, ingest_meta_data, addon_path, config_path, thread_count):
"""
Events are ingested in the splunk.
Args:
Expand All @@ -57,4 +57,4 @@ def ingest_events(cls, ingest_meta_data, addon_path, config_path):
for input_type, events in ingestor_dict.items():

event_ingestor = cls.get_event_ingestor(input_type, ingest_meta_data)
event_ingestor.ingest(events)
event_ingestor.ingest(events, thread_count)
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import concurrent.futures
from .base_event_ingestor import EventIngestor

THREAD_POOL = 20


class SC4SEventIngestor(EventIngestor):
"""
Expand All @@ -28,7 +26,7 @@ def __init__(self, required_configs):
self.sc4s_port = required_configs['sc4s_port']
self.server_address = (required_configs['sc4s_host'], required_configs['sc4s_port'])

def ingest(self, events):
def ingest(self, events, thread_count):
"""
Ingests events in the splunk via sc4s (Single/Batch of Events)
Expand All @@ -40,7 +38,7 @@ def ingest(self, events):
for event in events:
raw_events.extend(event.event.splitlines())

with concurrent.futures.ThreadPoolExecutor(max_workers=THREAD_POOL) as executor:
with concurrent.futures.ThreadPoolExecutor(max_workers=thread_count) as executor:
_ = list(executor.map(self.ingest_event, raw_events))

def ingest_event(self, event):
Expand Down