diff --git a/splunk_eventgen/__main__.py b/splunk_eventgen/__main__.py index 0437cc46..63c829ec 100644 --- a/splunk_eventgen/__main__.py +++ b/splunk_eventgen/__main__.py @@ -53,6 +53,9 @@ def parse_args(): help="Use multiprocesing instead of threading") generate_subparser.add_argument("--profiler", action="store_true", help="Turn on cProfiler") generate_subparser.add_argument("--log-path", type=str, default="{0}/logs".format(FILE_LOCATION)) + generate_subparser.add_argument( + "--generator-queue-size", type=int, default=500, help="the max queue size for the " + "generator queue, timer object puts all the generator tasks into this queue, default max size is 500") # Build subparser build_subparser = subparsers.add_parser('build', help="Will build different forms of sa-eventgen") build_subparser.add_argument("--mode", type=str, default="splunk-app", diff --git a/splunk_eventgen/eventgen_core.py b/splunk_eventgen/eventgen_core.py index 7b20bc46..7be8feec 100644 --- a/splunk_eventgen/eventgen_core.py +++ b/splunk_eventgen/eventgen_core.py @@ -75,9 +75,15 @@ def __init__(self, args=None): # attach to the logging queue self.logger.info("Logging Setup Complete.") + self._generator_queue_size = getattr(self.args, 'generator_queue_size', 500) + if self._generator_queue_size < 0: + self._generator_queue_size = 0 + self.logger.info("set generator queue size to %d", self._generator_queue_size) + if self.args and 'configfile' in self.args and self.args.configfile: self._load_config(self.args.configfile, args=args) + def _load_config(self, configfile, **kwargs): ''' This method will use a configfile and set self.confg as a processeded config object, @@ -225,11 +231,11 @@ def _create_generator_pool(self, workercount=20): self.logging_pool = Thread(target=self.logger_thread, args=(self.loggingQueue, ), name="LoggerThread") self.logging_pool.start() # since we're now in multiprocess, we need to use better queues. - self.workerQueue = multiprocessing.JoinableQueue(maxsize=500) + self.workerQueue = multiprocessing.JoinableQueue(maxsize=self._generator_queue_size) self.genconfig = self.manager.dict() self.genconfig["stopping"] = False else: - self.workerQueue = Queue(maxsize=500) + self.workerQueue = Queue(maxsize=self._generator_queue_size) worker_threads = workercount if hasattr(self.config, 'outputCounter') and self.config.outputCounter: self.output_counters = [] diff --git a/splunk_eventgen/lib/eventgentimer.py b/splunk_eventgen/lib/eventgentimer.py index effcf4c1..095ff6fb 100644 --- a/splunk_eventgen/lib/eventgentimer.py +++ b/splunk_eventgen/lib/eventgentimer.py @@ -80,7 +80,12 @@ def predict_event_size(self): except TypeError: self.logger.error("Error loading sample file for sample '%s'" % self.sample.name) return - return len(self.sample.sampleDict[0]['_raw']) + total_len = sum([len(e['_raw']) for e in self.sample.sampleDict]) + sample_count = len(self.sample.sampleDict) + if sample_count == 0: + return 0 + else: + return total_len/sample_count def run(self): """ @@ -135,9 +140,9 @@ def real_run(self): elif char != "-": backfillletter += char backfillearliest = timeParserTimeMath(plusminus=mathsymbol, num=backfillnumber, unit=backfillletter, - ret=realtime) + ret=realtime) while backfillearliest < realtime: - if self.executions == int(self.end): + if self.end and self.executions == int(self.end): self.logger.info("End executions %d reached, ending generation of sample '%s'" % (int( self.end), self.sample.name)) break @@ -148,11 +153,13 @@ def real_run(self): genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue) genPlugin.updateCounts(count=count, start_time=et, end_time=lt) try: - self.generatorQueue.put(genPlugin) + self.generatorQueue.put(genPlugin, True, 3) self.executions += 1 + backfillearliest = lt except Full: - self.logger.warning("Generator Queue Full. Skipping current generation.") - backfillearliest = lt + self.logger.warning("Generator Queue Full. Reput the backfill generator task later. %d backfill generators are dispatched.", self.executions) + backfillearliest = et + realtime = self.sample.now(realnow=True) self.sample.backfilldone = True else: diff --git a/splunk_eventgen/lib/outputplugin.py b/splunk_eventgen/lib/outputplugin.py index e3bb2fb3..50a0a163 100644 --- a/splunk_eventgen/lib/outputplugin.py +++ b/splunk_eventgen/lib/outputplugin.py @@ -55,7 +55,10 @@ def run(self): if self.output_counter is not None: self.output_counter.collect(len(self.events), sum([len(e['_raw']) for e in self.events])) self.events = None + self._output_end() + def _output_end(self): + pass def load(): return OutputPlugin diff --git a/splunk_eventgen/lib/plugins/output/counter.py b/splunk_eventgen/lib/plugins/output/counter.py new file mode 100755 index 00000000..767bed9a --- /dev/null +++ b/splunk_eventgen/lib/plugins/output/counter.py @@ -0,0 +1,49 @@ +import logging +import datetime +import pprint +import sys + +from outputplugin import OutputPlugin + + +class CounterOutputPlugin(OutputPlugin): + name = 'counter' + MAXQUEUELENGTH = 1000 + useOutputQueue = True + + dataSizeHistogram = {} + eventCountHistogram = {} + flushCount = 0 + lastPrintAt = 0 + + def __init__(self, sample, output_counter=None): + OutputPlugin.__init__(self, sample, output_counter) + + def flush(self, q): + CounterOutputPlugin.flushCount += 1 + for e in q: + ts = datetime.datetime.fromtimestamp(int(e['_time'])) + text = e['_raw'] + day = ts.strftime('%Y-%m-%d') + CounterOutputPlugin.dataSizeHistogram[day] = CounterOutputPlugin.dataSizeHistogram.get(day, 0) + len(text) + CounterOutputPlugin.eventCountHistogram[day] = CounterOutputPlugin.eventCountHistogram.get(day, 0) + 1 + + def _output_end(self): + if CounterOutputPlugin.flushCount - CounterOutputPlugin.lastPrintAt > 0: + self._print_info('----- print the output histogram -----') + self._print_info('--- data size histogram ---') + self._print_info(pprint.pformat(CounterOutputPlugin.dataSizeHistogram)) + self._print_info('--- event count histogram ---') + self._print_info(pprint.pformat(CounterOutputPlugin.eventCountHistogram)) + CounterOutputPlugin.lastPrintAt = CounterOutputPlugin.flushCount + + def _print_info(self, msg): + print >> sys.stderr, '{} {}'.format(datetime.datetime.now(), msg) + + def _setup_logging(self): + self.logger = logging.getLogger('eventgen_counter_out') + + +def load(): + """Returns an instance of the plugin""" + return CounterOutputPlugin