Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions splunk_eventgen/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def parse_args():
help="Use multiprocesing instead of threading")
generate_subparser.add_argument("--profiler", action="store_true", help="Turn on cProfiler")
generate_subparser.add_argument("--log-path", type=str, default="{0}/logs".format(FILE_LOCATION))
generate_subparser.add_argument(
"--generator-queue-size", type=int, default=500, help="the max queue size for the "
"generator queue, timer object puts all the generator tasks into this queue, default max size is 500")
# Build subparser
build_subparser = subparsers.add_parser('build', help="Will build different forms of sa-eventgen")
build_subparser.add_argument("--mode", type=str, default="splunk-app",
Expand Down
10 changes: 8 additions & 2 deletions splunk_eventgen/eventgen_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ def __init__(self, args=None):
# attach to the logging queue
self.logger.info("Logging Setup Complete.")

self._generator_queue_size = getattr(self.args, 'generator_queue_size', 500)
if self._generator_queue_size < 0:
self._generator_queue_size = 0
self.logger.info("set generator queue size to %d", self._generator_queue_size)

if self.args and 'configfile' in self.args and self.args.configfile:
self._load_config(self.args.configfile, args=args)


def _load_config(self, configfile, **kwargs):
'''
This method will use a configfile and set self.confg as a processeded config object,
Expand Down Expand Up @@ -225,11 +231,11 @@ def _create_generator_pool(self, workercount=20):
self.logging_pool = Thread(target=self.logger_thread, args=(self.loggingQueue, ), name="LoggerThread")
self.logging_pool.start()
# since we're now in multiprocess, we need to use better queues.
self.workerQueue = multiprocessing.JoinableQueue(maxsize=500)
self.workerQueue = multiprocessing.JoinableQueue(maxsize=self._generator_queue_size)
self.genconfig = self.manager.dict()
self.genconfig["stopping"] = False
else:
self.workerQueue = Queue(maxsize=500)
self.workerQueue = Queue(maxsize=self._generator_queue_size)
worker_threads = workercount
if hasattr(self.config, 'outputCounter') and self.config.outputCounter:
self.output_counters = []
Expand Down
19 changes: 13 additions & 6 deletions splunk_eventgen/lib/eventgentimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ def predict_event_size(self):
except TypeError:
self.logger.error("Error loading sample file for sample '%s'" % self.sample.name)
return
return len(self.sample.sampleDict[0]['_raw'])
total_len = sum([len(e['_raw']) for e in self.sample.sampleDict])
sample_count = len(self.sample.sampleDict)
if sample_count == 0:
return 0
else:
return total_len/sample_count

def run(self):
"""
Expand Down Expand Up @@ -135,9 +140,9 @@ def real_run(self):
elif char != "-":
backfillletter += char
backfillearliest = timeParserTimeMath(plusminus=mathsymbol, num=backfillnumber, unit=backfillletter,
ret=realtime)
ret=realtime)
while backfillearliest < realtime:
if self.executions == int(self.end):
if self.end and self.executions == int(self.end):
self.logger.info("End executions %d reached, ending generation of sample '%s'" % (int(
self.end), self.sample.name))
break
Expand All @@ -148,11 +153,13 @@ def real_run(self):
genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue)
genPlugin.updateCounts(count=count, start_time=et, end_time=lt)
try:
self.generatorQueue.put(genPlugin)
self.generatorQueue.put(genPlugin, True, 3)
self.executions += 1
backfillearliest = lt
except Full:
self.logger.warning("Generator Queue Full. Skipping current generation.")
backfillearliest = lt
self.logger.warning("Generator Queue Full. Reput the backfill generator task later. %d backfill generators are dispatched.", self.executions)
backfillearliest = et
realtime = self.sample.now(realnow=True)

self.sample.backfilldone = True
else:
Expand Down
3 changes: 3 additions & 0 deletions splunk_eventgen/lib/outputplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def run(self):
if self.output_counter is not None:
self.output_counter.collect(len(self.events), sum([len(e['_raw']) for e in self.events]))
self.events = None
self._output_end()

def _output_end(self):
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this callback hook is added as a exit hook point for output plugin.
This hook is used in the counter output plugin to collect information.
The counter plugin is used for debug and issue triaging scenario.


def load():
return OutputPlugin
49 changes: 49 additions & 0 deletions splunk_eventgen/lib/plugins/output/counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import datetime
import pprint
import sys

from outputplugin import OutputPlugin


class CounterOutputPlugin(OutputPlugin):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we use this good stuff?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to use this dummy counter plugin, we can set outputMode=counter in eventgen conf stanza.
I did not update the readme doc because I think this is only used for debugging purpose. Do we have any place to hold the docs for developers? I can add some content to that place.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with exposing this option to customers?

name = 'counter'
MAXQUEUELENGTH = 1000
useOutputQueue = True

dataSizeHistogram = {}
eventCountHistogram = {}
flushCount = 0
lastPrintAt = 0

def __init__(self, sample, output_counter=None):
OutputPlugin.__init__(self, sample, output_counter)

def flush(self, q):
CounterOutputPlugin.flushCount += 1
for e in q:
ts = datetime.datetime.fromtimestamp(int(e['_time']))
text = e['_raw']
day = ts.strftime('%Y-%m-%d')
CounterOutputPlugin.dataSizeHistogram[day] = CounterOutputPlugin.dataSizeHistogram.get(day, 0) + len(text)
CounterOutputPlugin.eventCountHistogram[day] = CounterOutputPlugin.eventCountHistogram.get(day, 0) + 1

def _output_end(self):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is where the output_end hook is used.

if CounterOutputPlugin.flushCount - CounterOutputPlugin.lastPrintAt > 0:
self._print_info('----- print the output histogram -----')
self._print_info('--- data size histogram ---')
self._print_info(pprint.pformat(CounterOutputPlugin.dataSizeHistogram))
self._print_info('--- event count histogram ---')
self._print_info(pprint.pformat(CounterOutputPlugin.eventCountHistogram))
CounterOutputPlugin.lastPrintAt = CounterOutputPlugin.flushCount

def _print_info(self, msg):
print >> sys.stderr, '{} {}'.format(datetime.datetime.now(), msg)

def _setup_logging(self):
self.logger = logging.getLogger('eventgen_counter_out')


def load():
"""Returns an instance of the plugin"""
return CounterOutputPlugin