Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
8837be6
Fix jinja template bug under SA-Eventgen app
Mar 5, 2019
3a664d3
Feature timeMultiple (#141)
jmeixensperger Mar 6, 2019
65f06d3
Merge branch 'develop' into bugfix/jinja_template_bug_under_app
li-wu Mar 7, 2019
2cba4ce
Merge pull request #137 from splunk/bugfix/jinja_template_bug_under_app
li-wu Mar 7, 2019
a13e819
Update issue templates
GordonWang Mar 12, 2019
48637f2
Merge pull request #143 from splunk/dev-process-refinement
lephino Mar 12, 2019
571dbd6
changing the stanzas to produce data
Mar 12, 2019
2f64665
Merge pull request #144 from splunk/cleanup
arctan5x Mar 12, 2019
03b7b91
Windbag generator/count + end=0 edge cases (#145)
jmeixensperger Mar 15, 2019
9018b34
Update docs (#146)
jmeixensperger Mar 15, 2019
a3fd0e3
Bump version
jmeixensperger Mar 15, 2019
e1e7929
add python code style lint and format
Mar 20, 2019
eacc92d
Merge pull request #147 from splunk/codeformat
li-wu Mar 21, 2019
c24f655
[Docs] update contribute docs (#148)
GordonWang Mar 22, 2019
9602895
Fix make docs bug and summary anchor link error
Mar 28, 2019
070c6e0
Merge pull request #152 from splunk/gitbookdocs
li-wu Apr 2, 2019
26f0ed2
init unittest
Mar 14, 2019
986d89f
Add more UT for config module
Mar 19, 2019
574c64c
Add more unit tests for config module
Mar 25, 2019
f0cc38b
[Build] add ut for timeparser functions
Mar 15, 2019
5cb93a6
Merge pull request #150 from splunk/unittest
li-wu Apr 4, 2019
710da6e
Pep8 (#151)
jmeixensperger Apr 4, 2019
0279c9c
Update docs
Apr 10, 2019
e3f8b17
Merge pull request #156 from splunk/docupdate
li-wu Apr 14, 2019
1a35eea
Post-PEP8 Fixes (#157)
jmeixensperger Apr 15, 2019
2a416a9
Issue 160 (#163)
arctan5x Apr 18, 2019
b67c295
Fixed timer and token (#162)
arctan5x Apr 18, 2019
ad6b942
add extendIndexes feature (#154)
Yangxulight Apr 18, 2019
da1fb02
Test fix (#168)
arctan5x Apr 18, 2019
7139cb5
Update version.json
arctan5x Apr 18, 2019
420fea1
Merge branch 'master' into develop
arctan5x Apr 18, 2019
8cb2fe0
Fix previous pep8 format regression bug (#171)
li-wu Apr 18, 2019
6e7a379
Merge branch 'master' into develop
li-wu Apr 18, 2019
6cbdf22
Fix merge conflict bug
li-wu Apr 18, 2019
6a3f27e
Delete tool orca related part (#178)
li-wu Apr 24, 2019
c03c0f9
add test for jinja template test (#177)
GordonWang Apr 24, 2019
965f9b9
fix a issue that when setup eventgen with 200+ indexers, any exceptio…
Apr 25, 2019
d04ec50
Merge pull request #180 from splunk/yifeng
Apr 26, 2019
4fa10b7
Fix bug #179 urlparams is not set (#181)
li-wu Apr 27, 2019
c6d9586
fix issue-183
Apr 28, 2019
b9ca648
add modinput ft (#185)
li-wu Apr 30, 2019
5d51d88
Merge branch 'develop' into yifeng
Apr 30, 2019
15988e1
Issue 159 (#186)
arctan5x May 2, 2019
999cd60
Add default pull request reviewers (#190)
li-wu May 4, 2019
be2a2ce
Default to -1 (#189)
li-wu May 5, 2019
f8c6ffe
Changed verbose -> verbosity (#191)
May 5, 2019
6a7e578
Merge branch 'develop' into yifeng
May 5, 2019
0cbc45b
Merge pull request #184 from splunk/yifeng
May 5, 2019
7c56c74
Update README.md (#195)
huntershen May 7, 2019
10b3e41
update doc for friendly reading and add backward capability section. …
li-wu May 8, 2019
1d2df70
Update index.md (#194)
huntershen May 8, 2019
1d2ffb3
change the sampleDir setting in test cases (#196)
May 8, 2019
7341af5
return status=2 when all tasks finished. (#182)
Yangxulight May 8, 2019
6a36192
Change release verion to 6.3.6 (#200)
li-wu May 8, 2019
bed39a8
Fix flaky tests (#204)
li-wu May 10, 2019
e499b45
Merge branch 'master' into develop
li-wu May 10, 2019
09811fd
when hit an exception, make sure egx logs it to files
May 13, 2019
e922db1
Merge pull request #206 from splunk/yifeng
May 14, 2019
e7a9bba
Fix circleci status badges (#208)
li-wu May 15, 2019
ab1363b
Clean up and interval error (#211)
May 16, 2019
e950a83
Fix generatorWorks not work issue (#207)
li-wu May 16, 2019
299432c
Fix navigation error with installed with splunk stream (#214)
li-wu May 20, 2019
6e5577d
add metric_httpevent plugin
Yangxulight May 17, 2019
a0bd53f
update log content in metric_httpevent
Yangxulight May 21, 2019
582893a
add doc for metric_httpevent
Yangxulight May 21, 2019
ce3582e
Add 3rd lib in app (#210)
li-wu May 23, 2019
04d2351
Bugfix/197 multiprocess not working (#218)
li-wu May 23, 2019
e2e9a5e
fix issue 219 (#220)
li-wu May 24, 2019
c13170c
define httpevent_core and add fix eventgen-httpeventout log handler
Yangxulight May 28, 2019
d8f6337
restore samples file
Yangxulight May 28, 2019
b99ed51
restore unit test file
Yangxulight May 28, 2019
f8a59e9
add metric_httpevent plugin
Yangxulight May 17, 2019
4e076de
update log content in metric_httpevent
Yangxulight May 21, 2019
dc3e0e5
add doc for metric_httpevent
Yangxulight May 21, 2019
7584c79
define httpevent_core and add fix eventgen-httpeventout log handler
Yangxulight May 28, 2019
b6a3dba
restore samples file
Yangxulight May 28, 2019
b09272f
restore unit test file
Yangxulight May 28, 2019
1a00f75
resolve conflict
Yangxulight May 28, 2019
c1ab6a7
Add license credits (#222)
li-wu May 29, 2019
4f9b7db
Feature/multi indexes (#224)
Yangxulight May 29, 2019
7af2228
Merge branch 'develop' into metrics_output_plugin
li-wu May 29, 2019
27db6cf
Merge pull request #215 from splunk/metrics_output_plugin
Yangxulight May 30, 2019
bda7720
Revert "Metrics output plugin" (#226)
li-wu May 30, 2019
65051af
[issue 217]disable logging queue in multiprocess mode (#223)
May 30, 2019
a84b4c5
Fixed fileName (#229)
Jun 5, 2019
57f4549
Issue 201 (#221)
arctan5x Jun 5, 2019
cba971c
fix #166 (#192)
GordonWang Jun 5, 2019
134b674
Merge branch 'master' into develop
li-wu Jun 5, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ eventgen_wsgi.conf
.cache
*.xml
!tests/large/splunk/input.xml
!splunk_eventgen/splunk_app/default/data/ui/nav/default.xml
dist
_book
*.result
Expand Down
56 changes: 55 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright [yyyy] [name of copyright owner]
Copyright 2019 Splunk, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -200,3 +200,57 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

=======================================================================
Splunk Event Generator

Splunk Event Generator project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.

========================================================================
Apache License 2.0
========================================================================
The following components are provided under the Apache License 2.0. See project link for details.

(Apache License 2.0) boto3 (https://github.com/boto/boto3/blob/master/LICENSE)
(Apache License 2.0) requests (https://github.com/kennethreitz/requests/blob/master/LICENSE)
(Apache License 2.0) pyOpenSSL (https://github.com/pyca/pyopenssl/blob/master/LICENSE)
(Apache License 2.0) docker (https://github.com/docker/docker-py/blob/master/LICENSE)
(Apache License 2.0) requests-futures (https://github.com/ross/requests-futures/blob/master/LICENSE)
(Apache License 2.0) nameko (https://github.com/nameko/nameko/blob/master/LICENSE.txt)

========================================================================
MIT licenses
========================================================================
The following components are provided under the MIT License. See project link for details.

(MIT License) pyyaml (https://github.com/yaml/pyyaml/blob/master/LICENSE)
(MIT License) httplib2 (https://github.com/httplib2/httplib2/blob/master/LICENSE)
(MIT License) urllib3 (https://github.com/urllib3/urllib3/blob/master/LICENSE.txt)
(MIT License) isort (https://github.com/timothycrosley/isort/blob/develop/LICENSE)
(MIT License) flake8 (https://gitlab.com/pycqa/flake8/blob/master/LICENSE)
(MIT License) pytest (https://github.com/pytest-dev/pytest/blob/master/LICENSE)
(MIT License) pytest-mock (https://github.com/pytest-dev/pytest-mock/blob/master/LICENSE)
(MIT License) pytest-xdist (https://github.com/pytest-dev/pytest-xdist/blob/master/LICENSE)
(MIT License) pytest-cov (https://github.com/pytest-dev/pytest-cov/blob/master/LICENSE)

========================================================================
BSD-style licenses
========================================================================

The following components are provided under a BSD-style license. See project link for details.

(BSD 2-Clause "Simplified" License) mock (https://github.com/testing-cabal/mock/blob/master/LICENSE.txt)
(BSD 3-Clause) pyrabbit (https://github.com/bkjones/pyrabbit/blob/master/LICENSE)
(BSD 3-Clause) logutils (https://opensource.org/licenses/BSD-3-Clause)
(BSD 3-Clause) jinja2 (https://github.com/pallets/jinja/blob/master/LICENSE)
(BSD 3-Clause) ujson(https://github.com/esnme/ultrajson/blob/master/LICENSE.txt)

========================================================================
PSF licenses
========================================================================

The following components are provided under a PSF license. See project link for details.
(PSD License) futures (https://github.com/agronholm/pythonfutures/blob/master/LICENSE)
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Splunk Event Generator (Eventgen)

### Status
[![CircleCI](https://circleci.com/gh/splunk/eventgen/tree/develop.svg?style=svg)](https://circleci.com/gh/splunk/eventgen/tree/develop)
[![CircleCI](https://circleci.com/gh/splunk/eventgen/tree/develop.svg?style=svg&circle-token=15e952a75e368102d8cebc6d9445af87e6c7d57e)](https://circleci.com/gh/splunk/eventgen/tree/develop)

### Introduction

Expand Down
4 changes: 2 additions & 2 deletions dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ RUN apk --no-cache upgrade && \
erlang-asn1 \
erlang-inets \
erlang-os-mon \
erlang-xmerl \
erlang-xmerl \
erlang-eldap \
erlang-syntax-tools \
pwgen \
xz \
curl \
bash && \
bash && \
rm -rf /var/cache/apk/* && \
curl -sL https://www.rabbitmq.com/releases/rabbitmq-server/v${RABBITMQ_VERSION}/rabbitmq-server-generic-unix-${RABBITMQ_VERSION}.tar.xz | tar -xJ -C /usr/local && \
ln -s /usr/local/rabbitmq_server-${RABBITMQ_VERSION}/sbin/rabbitmq-server /usr/sbin/rabbitmq-server && \
Expand Down
3 changes: 3 additions & 0 deletions docs/CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ specifically be supported by all plugins. Plugins that write to files like spool
httpeventWaitResponse = <bool>
* wait for all responses on a generator output before returning the outputter.
* Defaults to true.
---
httpeventAllowFailureCount = <int>
* Number of transmission failure allowed for a certain httpserver before we remove that server from the pool. For example, 100 means that we will no longer include a specific httpserver after 100 failures. Even after some failures, if we see a success for the server, we will reset the count and continue the transmission.

###### spool
spoolDir = <spool directory>
Expand Down
9 changes: 9 additions & 0 deletions docs/REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ maxQueueLength = 0
autotimestamps = [ <jsonlist> ]
autotimestamp = false
outputCounter = false
disableLoggingQueue = true


[<sample file name>]
Expand Down Expand Up @@ -105,6 +106,11 @@ useOutputQueue = true | false
for instance if you're outputting to a file or to stdout/modular input.
* Default value depends on the output plugin being used.

disableLoggingQueue = true | false
* Disable the logging queue for process mode
* In process mode, logs in each process will be collected via a logging queue
* Default is true which will disable the logging queue

#############################
## OUTPUT RELATED SETTINGS ##
#############################
Expand Down Expand Up @@ -161,6 +167,9 @@ httpeventWaitResponse = <bool>
* wait for all responses on a generator output before returning the outputter.
* Defaults to true.

httpeventAllowFailureCount = <int>
* Number of transmission failure allowed for a certain httpserver before we remove that server from the pool. For example, 100 means that we will no longer include a specific httpserver after 100 failures. Even after some failures, if we see a success for the server, we will reset the count and continue the transmission.

spoolDir = <spool directory>
* Spool directory is the generated files destination directory.
* Only valid in spool outputMode.
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ each_dict_entry_on_separate_line = false

[isort]
# isort/yapf solutions to below files are not compatible
skip = splunk_eventgen/lib/concurrent,splunk_eventgen/lib/requests_futures
skip = splunk_eventgen/lib/concurrent,splunk_eventgen/lib/requests_futures
13 changes: 13 additions & 0 deletions splunk_eventgen/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def parse_args():
help="Use multiprocesing instead of threading")
generate_subparser.add_argument("--profiler", action="store_true", help="Turn on cProfiler")
generate_subparser.add_argument("--log-path", type=str, default="{0}/logs".format(FILE_LOCATION))
generate_subparser.add_argument(
"--generator-queue-size", type=int, default=500, help="the max queue size for the "
"generator queue, timer object puts all the generator tasks into this queue, default max size is 500")
# Build subparser
build_subparser = subparsers.add_parser('build', help="Will build different forms of sa-eventgen")
build_subparser.add_argument("--mode", type=str, default="splunk-app",
Expand Down Expand Up @@ -308,6 +311,16 @@ def build_splunk_app(dest, source=os.getcwd(), remove=True):
directory_default_dir = os.path.join(directory, 'default', 'eventgen.conf')
eventgen_conf = os.path.join(module_path, 'default', 'eventgen.conf')
shutil.copyfile(eventgen_conf, directory_default_dir)

# install 3rd lib dependencies
install_target = os.path.join(directory, 'lib')
install_cmd = "pip install --requirement splunk_eventgen/lib/requirements.txt --upgrade --no-compile " + \
"--no-binary :all: --target " + install_target
return_code = os.system(install_cmd)
if return_code != 0:
print("Failed to install dependencies via pip. Please check whether pip is installed.")
os.system("rm -rf " + os.path.join(install_target, "*.egg-info"))

make_tarfile(target_file, directory)
shutil.rmtree(splunk_app_samples)
if remove:
Expand Down
1 change: 1 addition & 0 deletions splunk_eventgen/default/eventgen.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ useOutputQueue = false
autotimestamps = [["\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", "%Y-%m-%d %H:%M:%S"], ["\\d{1,2}\\/\\w{3}\\/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}:\\d{1,3}", "%d/%b/%Y %H:%M:%S:%f"], ["\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}", "%Y-%m-%dT%H:%M:%S.%f"], ["\\d{1,2}/\\w{3}/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}:\\d{1,3}", "%d/%b/%Y %H:%M:%S:%f"], ["\\d{1,2}/\\d{2}/\\d{2}\\s\\d{1,2}:\\d{2}:\\d{2}", "%m/%d/%y %H:%M:%S"], ["\\d{2}-\\d{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}", "%m-%d-%Y %H:%M:%S"], ["\\w{3} \\w{3} +\\d{1,2} \\d{2}:\\d{2}:\\d{2}", "%a %b %d %H:%M:%S"], ["\\w{3} \\w{3} \\d{2} \\d{4} \\d{2}:\\d{2}:\\d{2}", "%a %b %d %Y %H:%M:%S"], ["^(\\w{3}\\s+\\d{1,2}\\s\\d{2}:\\d{2}:\\d{2})", "%b %d %H:%M:%S"], ["(\\w{3}\\s+\\d{1,2}\\s\\d{1,2}:\\d{1,2}:\\d{1,2})", "%b %d %H:%M:%S"], ["(\\w{3}\\s\\d{1,2}\\s\\d{1,4}\\s\\d{1,2}:\\d{1,2}:\\d{1,2})", "%b %d %Y %H:%M:%S"], ["\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}", "%Y-%m-%d %H:%M:%S.%f"], ["\\,\\d{2}\\/\\d{2}\\/\\d{2,4}\\s+\\d{2}:\\d{2}:\\d{2}\\s+[AaPp][Mm]\\,", ",%m/%d/%Y %I:%M:%S %p,"], ["^\\w{3}\\s+\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}", "%b %d %H:%M:%S"], ["\\d{2}/\\d{2}/\\d{4} \\d{2}:\\d{2}:\\d{2}", "%m/%d/%Y %H:%M:%S"], ["^\\d{2}\\/\\d{2}\\/\\d{2,4}\\s+\\d{2}:\\d{2}:\\d{2}\\s+[AaPp][Mm]", "%m/%d/%Y %I:%M:%S %p"], ["\\d{2}\\/\\d{2}\\/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}", "%m-%d-%Y %H:%M:%S"], ["\\\"timestamp\\\":\\s\\\"(\\d+)", "%s"], ["\\d{2}\\/\\w+\\/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}:\\d{3}", "%d-%b-%Y %H:%M:%S:%f"], ["\\\"created\\\":\\s(\\d+)", "%s"], ["\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}", "%Y-%m-%dT%H:%M:%S"], ["\\d{1,2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2}:\\d{1,3}", "%d/%b/%Y:%H:%M:%S:%f"], ["\\d{1,2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2}", "%d/%b/%Y:%H:%M:%S"]]
autotimestamp = false
httpeventWaitResponse = true
disableLoggingQueue = true
72 changes: 38 additions & 34 deletions splunk_eventgen/eventgen_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ def __init__(self, args=None):
# attach to the logging queue
self.logger.info("Logging Setup Complete.")

self._generator_queue_size = getattr(self.args, 'generator_queue_size', 500)
if self._generator_queue_size < 0:
self._generator_queue_size = 0
self.logger.info("set generator queue size to %d", self._generator_queue_size)

if self.args and 'configfile' in self.args and self.args.configfile:
self._load_config(self.args.configfile, args=args)


def _load_config(self, configfile, **kwargs):
'''
This method will use a configfile and set self.confg as a processeded config object,
Expand Down Expand Up @@ -115,8 +121,13 @@ def _load_config(self, configfile, **kwargs):
self.config = Config(configfile, **new_args)
self.config.parse()
self._reload_plugins()
if "args" in kwargs and getattr(kwargs["args"], "generators"):
generator_worker_count = kwargs["args"].generators
else:
generator_worker_count = self.config.generatorWorkers

# TODO: Probably should destroy pools better so processes are cleaned.
self._setup_pools()
self._setup_pools(generator_worker_count)

def _reload_plugins(self):
# Initialize plugins
Expand All @@ -133,7 +144,7 @@ def _reload_plugins(self):
os.path.join(file_path, 'lib', 'plugins', 'rater'), self.config.plugins, 'rater')
self.config._complexSettings['rater'] = plugins
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))

def _load_custom_plugins(self, PluginNotLoadedException):
plugintype = PluginNotLoadedException.type
Expand All @@ -147,7 +158,7 @@ def _load_custom_plugins(self, PluginNotLoadedException):
# APPPERF-263: be greedy when scanning plugin dir (eat all the pys)
self._initializePlugins(plugindir, pluginsdict, plugintype)

def _setup_pools(self):
def _setup_pools(self, generator_worker_count):
'''
This method is an internal method called on init to generate pools needed for processing.

Expand All @@ -157,7 +168,7 @@ def _setup_pools(self):
self._create_generator_pool()
self._create_timer_threadpool()
self._create_output_threadpool()
self._create_generator_workers()
self._create_generator_workers(generator_worker_count)

def _create_timer_threadpool(self, threadcount=100):
'''
Expand Down Expand Up @@ -212,15 +223,19 @@ def _create_generator_pool(self, workercount=20):
if self.args.multiprocess:
import multiprocessing
self.manager = multiprocessing.Manager()
self.loggingQueue = self.manager.Queue()
self.logging_pool = Thread(target=self.logger_thread, args=(self.loggingQueue, ), name="LoggerThread")
self.logging_pool.start()
if self.config.disableLoggingQueue:
self.loggingQueue = None
else:
# TODO crash caused by logging Thread https://github.com/splunk/eventgen/issues/217
self.loggingQueue = self.manager.Queue()
self.logging_pool = Thread(target=self.logger_thread, args=(self.loggingQueue, ), name="LoggerThread")
self.logging_pool.start()
# since we're now in multiprocess, we need to use better queues.
self.workerQueue = multiprocessing.JoinableQueue(maxsize=500)
self.workerQueue = multiprocessing.JoinableQueue(maxsize=self._generator_queue_size)
self.genconfig = self.manager.dict()
self.genconfig["stopping"] = False
else:
self.workerQueue = Queue(maxsize=500)
self.workerQueue = Queue(maxsize=self._generator_queue_size)
worker_threads = workercount
if hasattr(self.config, 'outputCounter') and self.config.outputCounter:
self.output_counters = []
Expand Down Expand Up @@ -354,13 +369,13 @@ def _worker_do_work(self, work_queue, logging_queue):
startTime = time.time()
item.run()
totalTime = time.time() - startTime
if totalTime > self.config.interval:
if totalTime > self.config.interval and self.config.end != 1:
self.logger.warning("work took longer than current interval, queue/threading throughput limitation")
work_queue.task_done()
except Empty:
pass
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e

def _generator_do_work(self, work_queue, logging_queue, output_counter=None):
Expand All @@ -370,23 +385,27 @@ def _generator_do_work(self, work_queue, logging_queue, output_counter=None):
startTime = time.time()
item.run(output_counter=output_counter)
totalTime = time.time() - startTime
if totalTime > self.config.interval:
if totalTime > self.config.interval and item._sample.end != 1:
self.logger.warning("work took longer than current interval, queue/threading throughput limitation")
work_queue.task_done()
except Empty:
pass
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e

@staticmethod
def _proc_worker_do_work(work_queue, logging_queue, config):
genconfig = config
stopping = genconfig['stopping']
qh = logutils.queue.QueueHandler(logging_queue)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
if logging_queue is not None:
# TODO https://github.com/splunk/eventgen/issues/217
qh = logutils.queue.QueueHandler(logging_queue)
root.addHandler(qh)
else:
root.addHandler(logging.StreamHandler())
while not stopping:
try:
root.info("Checking for work")
Expand Down Expand Up @@ -418,7 +437,7 @@ def logger_thread(self, loggingQueue):
except Empty:
pass
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e

def _initializePlugins(self, dirname, plugins, plugintype, name=None):
Expand Down Expand Up @@ -493,7 +512,7 @@ def _initializePlugins(self, dirname, plugins, plugintype, name=None):
self.logger.warn("Could not load plugin: %s, skipping" % mod_name.name)
self.logger.exception(ie)
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e
return ret

Expand Down Expand Up @@ -521,21 +540,6 @@ def start(self, join_after_start=True):
if join_after_start:
self.logger.info("All timers started, joining queue until it's empty.")
self.join_process()
# Only need to start timers once
# Every 5 seconds, get values and output basic statistics about our operations
# TODO: Figure out how to do this better...
# generatorsPerSec = (generatorDecrements - generatorQueueCounter) / 5
# outputtersPerSec = (outputDecrements - outputQueueCounter) / 5
# outputQueueCounter = outputDecrements
# generatorQueueCounter = generatorDecrements
# self.logger.info('OutputQueueDepth=%d GeneratorQueueDepth=%d GeneratorsPerSec=%d OutputtersPerSec=%d' %
# (self.config.outputQueueSize.value(), self.config.generatorQueueSize.value(),
# generatorsPerSec, outputtersPerSec))
# kiloBytesPerSec = self.config.bytesSent.valueAndClear() / 5 / 1024
# gbPerDay = (kiloBytesPerSec / 1024 / 1024) * 60 * 60 * 24
# eventsPerSec = self.config.eventsSent.valueAndClear() / 5
# self.logger.info('GlobalEventsPerSec=%s KilobytesPerSec=%1f GigabytesPerDay=%1f' %
# (eventsPerSec, kiloBytesPerSec, gbPerDay))

def join_process(self):
'''
Expand All @@ -550,7 +554,7 @@ def join_process(self):
self.logger.info("All timers have finished, signalling workers to exit.")
self.stop()
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e

def stop(self):
Expand All @@ -577,7 +581,7 @@ def stop(self):
count += 1
self.logger.info("All generators working/exited, joining output queue until it's empty.")
self.outputQueue.join()
self.logger.info("All items fully processed, stopping.")
self.logger.info("All items fully processed. Cleaning up internal processes.")
self.started = False
self.stopping = False

Expand Down
Loading