diff --git a/.gitignore b/.gitignore index ed66916a..90bf0f85 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ eventgen_wsgi.conf .cache *.xml !tests/large/splunk/input.xml +!splunk_eventgen/splunk_app/default/data/ui/nav/default.xml dist _book *.result diff --git a/LICENSE b/LICENSE index d6456956..6116df4d 100644 --- a/LICENSE +++ b/LICENSE @@ -187,7 +187,7 @@ same "printed page" as the copyright notice for easier identification within third-party archives. - Copyright [yyyy] [name of copyright owner] + Copyright 2019 Splunk, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -200,3 +200,57 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +======================================================================= +Splunk Event Generator + +Splunk Event Generator project contains subcomponents with separate copyright +notices and license terms. Your use of the source code for the these +subcomponents is subject to the terms and conditions of the following +licenses. + +======================================================================== +Apache License 2.0 +======================================================================== +The following components are provided under the Apache License 2.0. See project link for details. + + (Apache License 2.0) boto3 (https://github.com/boto/boto3/blob/master/LICENSE) + (Apache License 2.0) requests (https://github.com/kennethreitz/requests/blob/master/LICENSE) + (Apache License 2.0) pyOpenSSL (https://github.com/pyca/pyopenssl/blob/master/LICENSE) + (Apache License 2.0) docker (https://github.com/docker/docker-py/blob/master/LICENSE) + (Apache License 2.0) requests-futures (https://github.com/ross/requests-futures/blob/master/LICENSE) + (Apache License 2.0) nameko (https://github.com/nameko/nameko/blob/master/LICENSE.txt) + +======================================================================== +MIT licenses +======================================================================== +The following components are provided under the MIT License. See project link for details. + + (MIT License) pyyaml (https://github.com/yaml/pyyaml/blob/master/LICENSE) + (MIT License) httplib2 (https://github.com/httplib2/httplib2/blob/master/LICENSE) + (MIT License) urllib3 (https://github.com/urllib3/urllib3/blob/master/LICENSE.txt) + (MIT License) isort (https://github.com/timothycrosley/isort/blob/develop/LICENSE) + (MIT License) flake8 (https://gitlab.com/pycqa/flake8/blob/master/LICENSE) + (MIT License) pytest (https://github.com/pytest-dev/pytest/blob/master/LICENSE) + (MIT License) pytest-mock (https://github.com/pytest-dev/pytest-mock/blob/master/LICENSE) + (MIT License) pytest-xdist (https://github.com/pytest-dev/pytest-xdist/blob/master/LICENSE) + (MIT License) pytest-cov (https://github.com/pytest-dev/pytest-cov/blob/master/LICENSE) + +======================================================================== +BSD-style licenses +======================================================================== + +The following components are provided under a BSD-style license. See project link for details. + + (BSD 2-Clause "Simplified" License) mock (https://github.com/testing-cabal/mock/blob/master/LICENSE.txt) + (BSD 3-Clause) pyrabbit (https://github.com/bkjones/pyrabbit/blob/master/LICENSE) + (BSD 3-Clause) logutils (https://opensource.org/licenses/BSD-3-Clause) + (BSD 3-Clause) jinja2 (https://github.com/pallets/jinja/blob/master/LICENSE) + (BSD 3-Clause) ujson(https://github.com/esnme/ultrajson/blob/master/LICENSE.txt) + +======================================================================== +PSF licenses +======================================================================== + +The following components are provided under a PSF license. See project link for details. + (PSD License) futures (https://github.com/agronholm/pythonfutures/blob/master/LICENSE) diff --git a/README.md b/README.md index d68a6bb8..821f69c3 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Splunk Event Generator (Eventgen) ### Status -[![CircleCI](https://circleci.com/gh/splunk/eventgen/tree/develop.svg?style=svg)](https://circleci.com/gh/splunk/eventgen/tree/develop) +[![CircleCI](https://circleci.com/gh/splunk/eventgen/tree/develop.svg?style=svg&circle-token=15e952a75e368102d8cebc6d9445af87e6c7d57e)](https://circleci.com/gh/splunk/eventgen/tree/develop) ### Introduction diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index f68354bb..265a06b9 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -22,13 +22,13 @@ RUN apk --no-cache upgrade && \ erlang-asn1 \ erlang-inets \ erlang-os-mon \ - erlang-xmerl \ + erlang-xmerl \ erlang-eldap \ erlang-syntax-tools \ pwgen \ xz \ curl \ - bash && \ + bash && \ rm -rf /var/cache/apk/* && \ curl -sL https://www.rabbitmq.com/releases/rabbitmq-server/v${RABBITMQ_VERSION}/rabbitmq-server-generic-unix-${RABBITMQ_VERSION}.tar.xz | tar -xJ -C /usr/local && \ ln -s /usr/local/rabbitmq_server-${RABBITMQ_VERSION}/sbin/rabbitmq-server /usr/sbin/rabbitmq-server && \ diff --git a/docs/CONFIGURE.md b/docs/CONFIGURE.md index ead6ca50..ed26310f 100644 --- a/docs/CONFIGURE.md +++ b/docs/CONFIGURE.md @@ -459,6 +459,9 @@ specifically be supported by all plugins. Plugins that write to files like spool httpeventWaitResponse = * wait for all responses on a generator output before returning the outputter. * Defaults to true. +--- + httpeventAllowFailureCount = + * Number of transmission failure allowed for a certain httpserver before we remove that server from the pool. For example, 100 means that we will no longer include a specific httpserver after 100 failures. Even after some failures, if we see a success for the server, we will reset the count and continue the transmission. ###### spool spoolDir = diff --git a/docs/REFERENCE.md b/docs/REFERENCE.md index 0f089c84..49227a35 100644 --- a/docs/REFERENCE.md +++ b/docs/REFERENCE.md @@ -59,6 +59,7 @@ maxQueueLength = 0 autotimestamps = [ ] autotimestamp = false outputCounter = false +disableLoggingQueue = true [] @@ -105,6 +106,11 @@ useOutputQueue = true | false for instance if you're outputting to a file or to stdout/modular input. * Default value depends on the output plugin being used. +disableLoggingQueue = true | false + * Disable the logging queue for process mode + * In process mode, logs in each process will be collected via a logging queue + * Default is true which will disable the logging queue + ############################# ## OUTPUT RELATED SETTINGS ## ############################# @@ -161,6 +167,9 @@ httpeventWaitResponse = * wait for all responses on a generator output before returning the outputter. * Defaults to true. +httpeventAllowFailureCount = + * Number of transmission failure allowed for a certain httpserver before we remove that server from the pool. For example, 100 means that we will no longer include a specific httpserver after 100 failures. Even after some failures, if we see a success for the server, we will reset the count and continue the transmission. + spoolDir = * Spool directory is the generated files destination directory. * Only valid in spool outputMode. diff --git a/setup.cfg b/setup.cfg index 0a71e5d1..d8f0d78f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,4 +19,4 @@ each_dict_entry_on_separate_line = false [isort] # isort/yapf solutions to below files are not compatible -skip = splunk_eventgen/lib/concurrent,splunk_eventgen/lib/requests_futures \ No newline at end of file +skip = splunk_eventgen/lib/concurrent,splunk_eventgen/lib/requests_futures diff --git a/splunk_eventgen/__main__.py b/splunk_eventgen/__main__.py index 1dd058e9..63c829ec 100644 --- a/splunk_eventgen/__main__.py +++ b/splunk_eventgen/__main__.py @@ -53,6 +53,9 @@ def parse_args(): help="Use multiprocesing instead of threading") generate_subparser.add_argument("--profiler", action="store_true", help="Turn on cProfiler") generate_subparser.add_argument("--log-path", type=str, default="{0}/logs".format(FILE_LOCATION)) + generate_subparser.add_argument( + "--generator-queue-size", type=int, default=500, help="the max queue size for the " + "generator queue, timer object puts all the generator tasks into this queue, default max size is 500") # Build subparser build_subparser = subparsers.add_parser('build', help="Will build different forms of sa-eventgen") build_subparser.add_argument("--mode", type=str, default="splunk-app", @@ -308,6 +311,16 @@ def build_splunk_app(dest, source=os.getcwd(), remove=True): directory_default_dir = os.path.join(directory, 'default', 'eventgen.conf') eventgen_conf = os.path.join(module_path, 'default', 'eventgen.conf') shutil.copyfile(eventgen_conf, directory_default_dir) + + # install 3rd lib dependencies + install_target = os.path.join(directory, 'lib') + install_cmd = "pip install --requirement splunk_eventgen/lib/requirements.txt --upgrade --no-compile " + \ + "--no-binary :all: --target " + install_target + return_code = os.system(install_cmd) + if return_code != 0: + print("Failed to install dependencies via pip. Please check whether pip is installed.") + os.system("rm -rf " + os.path.join(install_target, "*.egg-info")) + make_tarfile(target_file, directory) shutil.rmtree(splunk_app_samples) if remove: diff --git a/splunk_eventgen/default/eventgen.conf b/splunk_eventgen/default/eventgen.conf index d3cb85c5..421158fb 100644 --- a/splunk_eventgen/default/eventgen.conf +++ b/splunk_eventgen/default/eventgen.conf @@ -48,3 +48,4 @@ useOutputQueue = false autotimestamps = [["\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", "%Y-%m-%d %H:%M:%S"], ["\\d{1,2}\\/\\w{3}\\/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}:\\d{1,3}", "%d/%b/%Y %H:%M:%S:%f"], ["\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}", "%Y-%m-%dT%H:%M:%S.%f"], ["\\d{1,2}/\\w{3}/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}:\\d{1,3}", "%d/%b/%Y %H:%M:%S:%f"], ["\\d{1,2}/\\d{2}/\\d{2}\\s\\d{1,2}:\\d{2}:\\d{2}", "%m/%d/%y %H:%M:%S"], ["\\d{2}-\\d{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}", "%m-%d-%Y %H:%M:%S"], ["\\w{3} \\w{3} +\\d{1,2} \\d{2}:\\d{2}:\\d{2}", "%a %b %d %H:%M:%S"], ["\\w{3} \\w{3} \\d{2} \\d{4} \\d{2}:\\d{2}:\\d{2}", "%a %b %d %Y %H:%M:%S"], ["^(\\w{3}\\s+\\d{1,2}\\s\\d{2}:\\d{2}:\\d{2})", "%b %d %H:%M:%S"], ["(\\w{3}\\s+\\d{1,2}\\s\\d{1,2}:\\d{1,2}:\\d{1,2})", "%b %d %H:%M:%S"], ["(\\w{3}\\s\\d{1,2}\\s\\d{1,4}\\s\\d{1,2}:\\d{1,2}:\\d{1,2})", "%b %d %Y %H:%M:%S"], ["\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}", "%Y-%m-%d %H:%M:%S.%f"], ["\\,\\d{2}\\/\\d{2}\\/\\d{2,4}\\s+\\d{2}:\\d{2}:\\d{2}\\s+[AaPp][Mm]\\,", ",%m/%d/%Y %I:%M:%S %p,"], ["^\\w{3}\\s+\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}", "%b %d %H:%M:%S"], ["\\d{2}/\\d{2}/\\d{4} \\d{2}:\\d{2}:\\d{2}", "%m/%d/%Y %H:%M:%S"], ["^\\d{2}\\/\\d{2}\\/\\d{2,4}\\s+\\d{2}:\\d{2}:\\d{2}\\s+[AaPp][Mm]", "%m/%d/%Y %I:%M:%S %p"], ["\\d{2}\\/\\d{2}\\/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}", "%m-%d-%Y %H:%M:%S"], ["\\\"timestamp\\\":\\s\\\"(\\d+)", "%s"], ["\\d{2}\\/\\w+\\/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}:\\d{3}", "%d-%b-%Y %H:%M:%S:%f"], ["\\\"created\\\":\\s(\\d+)", "%s"], ["\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}", "%Y-%m-%dT%H:%M:%S"], ["\\d{1,2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2}:\\d{1,3}", "%d/%b/%Y:%H:%M:%S:%f"], ["\\d{1,2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2}", "%d/%b/%Y:%H:%M:%S"]] autotimestamp = false httpeventWaitResponse = true +disableLoggingQueue = true diff --git a/splunk_eventgen/eventgen_core.py b/splunk_eventgen/eventgen_core.py index 05c2c90c..7be8feec 100644 --- a/splunk_eventgen/eventgen_core.py +++ b/splunk_eventgen/eventgen_core.py @@ -75,9 +75,15 @@ def __init__(self, args=None): # attach to the logging queue self.logger.info("Logging Setup Complete.") + self._generator_queue_size = getattr(self.args, 'generator_queue_size', 500) + if self._generator_queue_size < 0: + self._generator_queue_size = 0 + self.logger.info("set generator queue size to %d", self._generator_queue_size) + if self.args and 'configfile' in self.args and self.args.configfile: self._load_config(self.args.configfile, args=args) + def _load_config(self, configfile, **kwargs): ''' This method will use a configfile and set self.confg as a processeded config object, @@ -115,8 +121,13 @@ def _load_config(self, configfile, **kwargs): self.config = Config(configfile, **new_args) self.config.parse() self._reload_plugins() + if "args" in kwargs and getattr(kwargs["args"], "generators"): + generator_worker_count = kwargs["args"].generators + else: + generator_worker_count = self.config.generatorWorkers + # TODO: Probably should destroy pools better so processes are cleaned. - self._setup_pools() + self._setup_pools(generator_worker_count) def _reload_plugins(self): # Initialize plugins @@ -133,7 +144,7 @@ def _reload_plugins(self): os.path.join(file_path, 'lib', 'plugins', 'rater'), self.config.plugins, 'rater') self.config._complexSettings['rater'] = plugins except Exception as e: - self.logger.exception(e) + self.logger.exception(str(e)) def _load_custom_plugins(self, PluginNotLoadedException): plugintype = PluginNotLoadedException.type @@ -147,7 +158,7 @@ def _load_custom_plugins(self, PluginNotLoadedException): # APPPERF-263: be greedy when scanning plugin dir (eat all the pys) self._initializePlugins(plugindir, pluginsdict, plugintype) - def _setup_pools(self): + def _setup_pools(self, generator_worker_count): ''' This method is an internal method called on init to generate pools needed for processing. @@ -157,7 +168,7 @@ def _setup_pools(self): self._create_generator_pool() self._create_timer_threadpool() self._create_output_threadpool() - self._create_generator_workers() + self._create_generator_workers(generator_worker_count) def _create_timer_threadpool(self, threadcount=100): ''' @@ -212,15 +223,19 @@ def _create_generator_pool(self, workercount=20): if self.args.multiprocess: import multiprocessing self.manager = multiprocessing.Manager() - self.loggingQueue = self.manager.Queue() - self.logging_pool = Thread(target=self.logger_thread, args=(self.loggingQueue, ), name="LoggerThread") - self.logging_pool.start() + if self.config.disableLoggingQueue: + self.loggingQueue = None + else: + # TODO crash caused by logging Thread https://github.com/splunk/eventgen/issues/217 + self.loggingQueue = self.manager.Queue() + self.logging_pool = Thread(target=self.logger_thread, args=(self.loggingQueue, ), name="LoggerThread") + self.logging_pool.start() # since we're now in multiprocess, we need to use better queues. - self.workerQueue = multiprocessing.JoinableQueue(maxsize=500) + self.workerQueue = multiprocessing.JoinableQueue(maxsize=self._generator_queue_size) self.genconfig = self.manager.dict() self.genconfig["stopping"] = False else: - self.workerQueue = Queue(maxsize=500) + self.workerQueue = Queue(maxsize=self._generator_queue_size) worker_threads = workercount if hasattr(self.config, 'outputCounter') and self.config.outputCounter: self.output_counters = [] @@ -354,13 +369,13 @@ def _worker_do_work(self, work_queue, logging_queue): startTime = time.time() item.run() totalTime = time.time() - startTime - if totalTime > self.config.interval: + if totalTime > self.config.interval and self.config.end != 1: self.logger.warning("work took longer than current interval, queue/threading throughput limitation") work_queue.task_done() except Empty: pass except Exception as e: - self.logger.exception(e) + self.logger.exception(str(e)) raise e def _generator_do_work(self, work_queue, logging_queue, output_counter=None): @@ -370,23 +385,27 @@ def _generator_do_work(self, work_queue, logging_queue, output_counter=None): startTime = time.time() item.run(output_counter=output_counter) totalTime = time.time() - startTime - if totalTime > self.config.interval: + if totalTime > self.config.interval and item._sample.end != 1: self.logger.warning("work took longer than current interval, queue/threading throughput limitation") work_queue.task_done() except Empty: pass except Exception as e: - self.logger.exception(e) + self.logger.exception(str(e)) raise e @staticmethod def _proc_worker_do_work(work_queue, logging_queue, config): genconfig = config stopping = genconfig['stopping'] - qh = logutils.queue.QueueHandler(logging_queue) root = logging.getLogger() root.setLevel(logging.DEBUG) - root.addHandler(qh) + if logging_queue is not None: + # TODO https://github.com/splunk/eventgen/issues/217 + qh = logutils.queue.QueueHandler(logging_queue) + root.addHandler(qh) + else: + root.addHandler(logging.StreamHandler()) while not stopping: try: root.info("Checking for work") @@ -418,7 +437,7 @@ def logger_thread(self, loggingQueue): except Empty: pass except Exception as e: - self.logger.exception(e) + self.logger.exception(str(e)) raise e def _initializePlugins(self, dirname, plugins, plugintype, name=None): @@ -493,7 +512,7 @@ def _initializePlugins(self, dirname, plugins, plugintype, name=None): self.logger.warn("Could not load plugin: %s, skipping" % mod_name.name) self.logger.exception(ie) except Exception as e: - self.logger.exception(e) + self.logger.exception(str(e)) raise e return ret @@ -521,21 +540,6 @@ def start(self, join_after_start=True): if join_after_start: self.logger.info("All timers started, joining queue until it's empty.") self.join_process() - # Only need to start timers once - # Every 5 seconds, get values and output basic statistics about our operations - # TODO: Figure out how to do this better... - # generatorsPerSec = (generatorDecrements - generatorQueueCounter) / 5 - # outputtersPerSec = (outputDecrements - outputQueueCounter) / 5 - # outputQueueCounter = outputDecrements - # generatorQueueCounter = generatorDecrements - # self.logger.info('OutputQueueDepth=%d GeneratorQueueDepth=%d GeneratorsPerSec=%d OutputtersPerSec=%d' % - # (self.config.outputQueueSize.value(), self.config.generatorQueueSize.value(), - # generatorsPerSec, outputtersPerSec)) - # kiloBytesPerSec = self.config.bytesSent.valueAndClear() / 5 / 1024 - # gbPerDay = (kiloBytesPerSec / 1024 / 1024) * 60 * 60 * 24 - # eventsPerSec = self.config.eventsSent.valueAndClear() / 5 - # self.logger.info('GlobalEventsPerSec=%s KilobytesPerSec=%1f GigabytesPerDay=%1f' % - # (eventsPerSec, kiloBytesPerSec, gbPerDay)) def join_process(self): ''' @@ -550,7 +554,7 @@ def join_process(self): self.logger.info("All timers have finished, signalling workers to exit.") self.stop() except Exception as e: - self.logger.exception(e) + self.logger.exception(str(e)) raise e def stop(self): @@ -577,7 +581,7 @@ def stop(self): count += 1 self.logger.info("All generators working/exited, joining output queue until it's empty.") self.outputQueue.join() - self.logger.info("All items fully processed, stopping.") + self.logger.info("All items fully processed. Cleaning up internal processes.") self.started = False self.stopping = False diff --git a/splunk_eventgen/eventgen_nameko_controller.py b/splunk_eventgen/eventgen_nameko_controller.py index 52192859..6ea271ee 100644 --- a/splunk_eventgen/eventgen_nameko_controller.py +++ b/splunk_eventgen/eventgen_nameko_controller.py @@ -74,7 +74,7 @@ def index(self, target): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -88,7 +88,7 @@ def status(self, target): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -102,7 +102,7 @@ def start(self, target): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -116,7 +116,7 @@ def stop(self, target): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -130,7 +130,7 @@ def restart(self, target): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -144,7 +144,7 @@ def get_conf(self, target): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -159,7 +159,7 @@ def set_conf(self, target, data): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -174,7 +174,7 @@ def edit_conf(self, target, data): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -187,7 +187,7 @@ def bundle(self, target, data): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return "500", "Exception: {}".format(e.message) @rpc @@ -198,7 +198,7 @@ def setup(self, target, data): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return "500", "Exception: {}".format(e.message) @rpc @@ -209,7 +209,7 @@ def get_volume(self, target): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return "500", "Exception: {}".format(e.message) @rpc @@ -222,7 +222,7 @@ def set_volume(self, target, data): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return "500", "Exception: {}".format(e.message) @rpc @@ -236,7 +236,7 @@ def reset(self, target): self.log.info(msg) return msg except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) # HTTP Methods diff --git a/splunk_eventgen/eventgen_nameko_server.py b/splunk_eventgen/eventgen_nameko_server.py index 91a9b28b..dbcf422a 100644 --- a/splunk_eventgen/eventgen_nameko_server.py +++ b/splunk_eventgen/eventgen_nameko_server.py @@ -166,7 +166,7 @@ def start(self): self.eventgen_dependency.eventgen.start(join_after_start=False) return "Eventgen has successfully started." except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) def stop(self): @@ -177,7 +177,7 @@ def stop(self): return "Eventgen is stopped." return "There is no eventgen process running." except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) def restart(self): @@ -188,7 +188,7 @@ def restart(self): self.start() return "Eventgen restarted." except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) def get_conf(self): @@ -213,7 +213,7 @@ def get_conf(self): self.send_conf_to_controller(server_conf={}) return json.dumps({}, indent=4) except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -254,7 +254,7 @@ def set_conf(self, conf): self.log.info("custom_config_json is {}".format(conf_content)) return self.get_conf() except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) def edit_conf(self, conf): @@ -285,7 +285,7 @@ def edit_conf(self, conf): self.restart() return self.get_conf() except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) def bundle(self, url): @@ -314,7 +314,7 @@ def bundle(self, url): else: return self.get_conf() except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) def setup(self, data): @@ -415,7 +415,7 @@ def get_volume(self): self.send_volume_to_controller(total_volume=self.total_volume) return str(self.total_volume) except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) @rpc @@ -451,7 +451,7 @@ def set_volume(self, volume): self.get_volume() return output except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) def reset(self): @@ -461,7 +461,7 @@ def reset(self): self.eventgen_dependency.refresh_eventgen() return "Eventgen Refreshed" except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '500', "Exception: {}".format(e.message) # Event Handler Methods @@ -645,10 +645,10 @@ def http_bundle(self, request): url = data["url"] return self.bundle(url) except ValueError as e: - self.log.exception(e) + self.log.exception(str(e)) return '400', "Please pass in a valid object with bundle URL" except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '400', "Exception: {}".format(e.message) @http('POST', '/setup') @@ -657,7 +657,7 @@ def http_setup(self, request): try: return self.setup(data) except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '400', "Exception: {}".format(e.message) @http('GET', '/volume') @@ -672,7 +672,7 @@ def http_set_volume(self, request): volume = data["perDayVolume"] return self.set_volume(volume) except Exception as e: - self.log.exception(e) + self.log.exception(str(e)) return '400', "Exception: {}".format(e.message) @http('POST', '/reset') diff --git a/splunk_eventgen/lib/eventgenconfig.py b/splunk_eventgen/lib/eventgenconfig.py index 616eaecf..6db8671b 100644 --- a/splunk_eventgen/lib/eventgenconfig.py +++ b/splunk_eventgen/lib/eventgenconfig.py @@ -88,7 +88,7 @@ class Config(object): 'outputWorkers', 'generator', 'rater', 'generatorWorkers', 'timeField', 'sampleDir', 'threading', 'profiler', 'maxIntervalsBeforeFlush', 'maxQueueLength', 'splunkMethod', 'splunkPort', 'verbosity', 'useOutputQueue', 'seed','end', 'autotimestamps', 'autotimestamp', 'httpeventWaitResponse', - 'outputCounter', 'sequentialTimestamp', 'extendIndexes'] + 'outputCounter', 'sequentialTimestamp', 'extendIndexes', 'disableLoggingQueue'] _validTokenTypes = {'token': 0, 'replacementType': 1, 'replacement': 2} _validHostTokens = {'token': 0, 'replacement': 1} _validReplacementTypes = [ @@ -98,7 +98,7 @@ class Config(object): _floatSettings = ['randomizeCount', 'delay', 'timeMultiple'] _boolSettings = [ 'disabled', 'randomizeEvents', 'bundlelines', 'profiler', 'useOutputQueue', 'autotimestamp', - 'httpeventWaitResponse', 'outputCounter', 'sequentialTimestamp'] + 'httpeventWaitResponse', 'outputCounter', 'sequentialTimestamp', 'disableLoggingQueue'] _jsonSettings = [ 'hourOfDayRate', 'dayOfWeekRate', 'minuteOfHourRate', 'dayOfMonthRate', 'monthOfYearRate', 'autotimestamps'] _defaultableSettings = [ @@ -561,10 +561,14 @@ def parse(self): # Override with real name if s.outputMode == 'spool' and s.spoolFile == self.spoolFile: news.spoolFile = f.split(os.sep)[-1] - if s.outputMode == 'file' and s.fileName is None and s.spoolFile == self.spoolFile: - news.fileName = os.path.join(s.spoolDir, f.split(os.sep)[-1]) - elif s.outputMode == 'file' and s.fileName is None and s.spoolFile is not None: - news.fileName = os.path.join(s.spoolDir, s.spoolFile) + if s.outputMode == 'file' and s.fileName is None: + if self.fileName: + news.fileName = self.fileName + self.logger.debug("Found a global fileName {}. Setting the sample fileName.".format(self.fileName)) + elif s.spoolFile == self.spoolFile: + news.fileName = os.path.join(s.spoolDir, f.split(os.sep)[-1]) + elif s.spoolFile is not None: + news.fileName = os.path.join(s.spoolDir, s.spoolFile) # Override s.name with file name. Usually they'll match unless we've been a regex # 6/22/12 CS Save original name for later matching news._origName = news.name diff --git a/splunk_eventgen/lib/eventgenoutput.py b/splunk_eventgen/lib/eventgenoutput.py index c2b8c1c1..843bb088 100644 --- a/splunk_eventgen/lib/eventgenoutput.py +++ b/splunk_eventgen/lib/eventgenoutput.py @@ -124,14 +124,12 @@ def flush(self, endOfInterval=False): except Full: self.logger.warning("Output Queue full, looping again") else: - tmp = [len(s['_raw']) for s in q] - # TODO: clean out eventsSend and bytesSent if they are not being used in config - # self.config.eventsSent.add(len(tmp)) - # self.config.bytesSent.add(sum(tmp)) - if self.config.splunkEmbedded and len(tmp) > 0: - metrics = logging.getLogger('eventgen_metrics') - metrics.info({ - 'timestamp': datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'), 'sample': - self._sample.name, 'events': len(tmp), 'bytes': sum(tmp)}) - tmp = None + if self.config.splunkEmbedded: + tmp = [len(s['_raw']) for s in q] + if len(tmp) > 0: + metrics = logging.getLogger('eventgen_metrics') + metrics.info({ + 'timestamp': datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'), 'sample': + self._sample.name, 'events': len(tmp), 'bytes': sum(tmp)}) + tmp = None outputer.run() diff --git a/splunk_eventgen/lib/eventgentimer.py b/splunk_eventgen/lib/eventgentimer.py index 92a690fd..095ff6fb 100644 --- a/splunk_eventgen/lib/eventgentimer.py +++ b/splunk_eventgen/lib/eventgentimer.py @@ -32,7 +32,7 @@ def __init__(self, time, sample=None, config=None, genqueue=None, outputqueue=No self.profiler = config.profiler self.config = config self.sample = sample - self.end = getattr(self.sample, "end") if getattr(self.sample, "end") is not None else -1 + self.end = getattr(self.sample, "end", -1) self.endts = getattr(self.sample, "endts", None) self.generatorQueue = genqueue self.outputQueue = outputqueue @@ -80,7 +80,12 @@ def predict_event_size(self): except TypeError: self.logger.error("Error loading sample file for sample '%s'" % self.sample.name) return - return len(self.sample.sampleDict[0]['_raw']) + total_len = sum([len(e['_raw']) for e in self.sample.sampleDict]) + sample_count = len(self.sample.sampleDict) + if sample_count == 0: + return 0 + else: + return total_len/sample_count def run(self): """ @@ -135,9 +140,9 @@ def real_run(self): elif char != "-": backfillletter += char backfillearliest = timeParserTimeMath(plusminus=mathsymbol, num=backfillnumber, unit=backfillletter, - ret=realtime) + ret=realtime) while backfillearliest < realtime: - if self.executions == int(self.end): + if self.end and self.executions == int(self.end): self.logger.info("End executions %d reached, ending generation of sample '%s'" % (int( self.end), self.sample.name)) break @@ -148,12 +153,14 @@ def real_run(self): genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue) genPlugin.updateCounts(count=count, start_time=et, end_time=lt) try: - self.generatorQueue.put(genPlugin) + self.generatorQueue.put(genPlugin, True, 3) self.executions += 1 + backfillearliest = lt except Full: - self.logger.warning("Generator Queue Full. Skipping current generation.") - backfillearliest = lt - + self.logger.warning("Generator Queue Full. Reput the backfill generator task later. %d backfill generators are dispatched.", self.executions) + backfillearliest = et + realtime = self.sample.now(realnow=True) + self.sample.backfilldone = True else: # 12/15/13 CS Moving the rating to a separate plugin architecture @@ -182,17 +189,16 @@ def real_run(self): self.sample.config.generatorWorkers, count)) else: # Spawn workers at the beginning of job rather than wait for next interval - self.logger.info("Start '%d' generatorWorkers for sample '%s'" % + self.logger.info("Starting '%d' generatorWorkers for sample '%s'" % (self.sample.config.generatorWorkers, self.sample.name)) for worker_id in range(self.config.generatorWorkers): # self.generatorPlugin is only an instance, now we need a real plugin. Make a copy of # of the sample in case another generator corrupts it. - copy_sample = copy.copy(self.sample) - copy_tokens = [] - for token in self.sample.tokens: - copy_tokens.append(token.deepcopy(self.sample)) - copy_sample.tokens = copy_tokens - genPlugin = self.generatorPlugin(sample=copy_sample) + # copy_sample = copy.copy(self.sample) + # tokens = copy.deepcopy(self.sample.tokens) + # copy_sample.tokens = tokens + # genPlugin = self.generatorPlugin(sample=copy_sample) + genPlugin = self.generatorPlugin(sample=self.sample) # Adjust queue for threading mode genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue) genPlugin.updateCounts(count=count, start_time=et, end_time=lt) @@ -200,14 +206,14 @@ def real_run(self): try: self.generatorQueue.put(genPlugin) self.executions += 1 - self.logger.info(("Worker# {0}: Put {1} MB of events in queue for sample '{2}'" + - "with et '{3}' and lt '{4}'").format( + self.logger.debug(("Worker# {0}: Put {1} MB of events in queue for sample '{2}'" + + "with et '{3}' and lt '{4}'").format( worker_id, round((count / 1024.0 / 1024), 4), self.sample.name, et, lt)) except Full: self.logger.warning("Generator Queue Full. Skipping current generation.") except Exception as e: - self.logger.exception(e) + self.logger.exception(str(e)) if self.stopping: end = True pass diff --git a/splunk_eventgen/lib/eventgentoken.py b/splunk_eventgen/lib/eventgentoken.py index d6c402fe..f409d90f 100644 --- a/splunk_eventgen/lib/eventgentoken.py +++ b/splunk_eventgen/lib/eventgentoken.py @@ -46,9 +46,6 @@ def __init__(self, sample=None): self._earliestTime = (None, None) self._latestTime = (None, None) - if sample: - self.sample = sample - def __str__(self): """Only used for debugging, outputs a pretty printed representation of this token""" # Eliminate recursive going back to parent @@ -68,18 +65,10 @@ def __getstate__(self): def __setstate__(self, d): self.__dict__ = d self._setup_logging() - - def deepcopy(self, sample=None): - # temp = dict([(key, value) for (key, value) in token_object.items() if key != 'sample' and key != 'logger']) - cp = Token() - cp.__setstate__(self.__getstate__()) - if sample: - cp.sample = sample - return cp def _setup_logging(self): self.logger = logging.getLogger('eventgen') - + def _match(self, event): """Executes regular expression match and returns the re.Match object""" return re.match(self.token, event) diff --git a/splunk_eventgen/lib/generatorplugin.py b/splunk_eventgen/lib/generatorplugin.py index 6ab1ecac..7959da76 100644 --- a/splunk_eventgen/lib/generatorplugin.py +++ b/splunk_eventgen/lib/generatorplugin.py @@ -196,6 +196,9 @@ def replace_tokens(self, eventsDict, earliest, latest, ignore_tokens=False): eventcount = 0 send_events = [] total_count = len(eventsDict) + index = None + if total_count > 0: + index = random.choice(self._sample.index_list) if len(self._sample.index_list) else eventsDict[0]['index'] for targetevent in eventsDict: event = targetevent["_raw"] # Maintain state for every token in a given event, Hash contains keys for each file name which is @@ -227,7 +230,7 @@ def replace_tokens(self, eventsDict, earliest, latest, ignore_tokens=False): except Exception: time_val = int(time.mktime(self._sample.now().timetuple())) temp_event = { - '_raw': event, 'index': random.choice(self._sample.index_list)if len(self._sample.index_list) else targetevent['index'], 'host': host, 'hostRegex': self._sample.hostRegex, + '_raw': event, 'index': index, 'host': host, 'hostRegex': self._sample.hostRegex, 'source': targetevent['source'], 'sourcetype': targetevent['sourcetype'], '_time': time_val} send_events.append(temp_event) return send_events diff --git a/splunk_eventgen/lib/outputplugin.py b/splunk_eventgen/lib/outputplugin.py index e3bb2fb3..50a0a163 100644 --- a/splunk_eventgen/lib/outputplugin.py +++ b/splunk_eventgen/lib/outputplugin.py @@ -55,7 +55,10 @@ def run(self): if self.output_counter is not None: self.output_counter.collect(len(self.events), sum([len(e['_raw']) for e in self.events])) self.events = None + self._output_end() + def _output_end(self): + pass def load(): return OutputPlugin diff --git a/splunk_eventgen/lib/plugins/generator/default.py b/splunk_eventgen/lib/plugins/generator/default.py index ed106e39..80fc35cd 100644 --- a/splunk_eventgen/lib/plugins/generator/default.py +++ b/splunk_eventgen/lib/plugins/generator/default.py @@ -67,6 +67,5 @@ def gen(self, count, earliest, latest, samplename=None): GeneratorPlugin.build_events(self, eventsDict, startTime, earliest, latest) - def load(): return DefaultGenerator diff --git a/splunk_eventgen/lib/plugins/generator/jinja.py b/splunk_eventgen/lib/plugins/generator/jinja.py index 27de2200..4bf6f8a1 100644 --- a/splunk_eventgen/lib/plugins/generator/jinja.py +++ b/splunk_eventgen/lib/plugins/generator/jinja.py @@ -193,8 +193,7 @@ def gen(self, count, earliest, latest, samplename=None): if self._sample.splunkEmbedded is True: splunk_home = os.environ["SPLUNK_HOME"] app_name = getattr(self._sample, 'app', 'SA-Eventgen') - sample_dir = os.path.join(splunk_home, 'etc', 'apps', app_name, - 'default', self._sample.DEFAULT_SAMPLE_DIR) + sample_dir = os.path.join(splunk_home, 'etc', 'apps', app_name, 'samples') if not hasattr(self._sample, "jinja_template_dir"): template_dir = 'templates' @@ -278,7 +277,7 @@ def gen(self, count, earliest, latest, samplename=None): target_line["index"] = self._sample.index lines_out.append(target_line) except TypeError as e: - self.logger.exception(e) + self.logger.exception(str(e)) self.end_of_cycle = True self._increment_count(lines_out) self._out.bulksend(lines_out) @@ -290,7 +289,7 @@ def gen(self, count, earliest, latest, samplename=None): self.logger.info("Generation of sample '%s' completed in %s seconds." % (self._sample.name, timeDiffFrac)) return 0 except Exception as e: - self.logger.exception(e) + self.logger.exception(str(e)) return 1 diff --git a/splunk_eventgen/lib/plugins/output/counter.py b/splunk_eventgen/lib/plugins/output/counter.py new file mode 100755 index 00000000..767bed9a --- /dev/null +++ b/splunk_eventgen/lib/plugins/output/counter.py @@ -0,0 +1,49 @@ +import logging +import datetime +import pprint +import sys + +from outputplugin import OutputPlugin + + +class CounterOutputPlugin(OutputPlugin): + name = 'counter' + MAXQUEUELENGTH = 1000 + useOutputQueue = True + + dataSizeHistogram = {} + eventCountHistogram = {} + flushCount = 0 + lastPrintAt = 0 + + def __init__(self, sample, output_counter=None): + OutputPlugin.__init__(self, sample, output_counter) + + def flush(self, q): + CounterOutputPlugin.flushCount += 1 + for e in q: + ts = datetime.datetime.fromtimestamp(int(e['_time'])) + text = e['_raw'] + day = ts.strftime('%Y-%m-%d') + CounterOutputPlugin.dataSizeHistogram[day] = CounterOutputPlugin.dataSizeHistogram.get(day, 0) + len(text) + CounterOutputPlugin.eventCountHistogram[day] = CounterOutputPlugin.eventCountHistogram.get(day, 0) + 1 + + def _output_end(self): + if CounterOutputPlugin.flushCount - CounterOutputPlugin.lastPrintAt > 0: + self._print_info('----- print the output histogram -----') + self._print_info('--- data size histogram ---') + self._print_info(pprint.pformat(CounterOutputPlugin.dataSizeHistogram)) + self._print_info('--- event count histogram ---') + self._print_info(pprint.pformat(CounterOutputPlugin.eventCountHistogram)) + CounterOutputPlugin.lastPrintAt = CounterOutputPlugin.flushCount + + def _print_info(self, msg): + print >> sys.stderr, '{} {}'.format(datetime.datetime.now(), msg) + + def _setup_logging(self): + self.logger = logging.getLogger('eventgen_counter_out') + + +def load(): + """Returns an instance of the plugin""" + return CounterOutputPlugin diff --git a/splunk_eventgen/lib/plugins/output/httpevent.py b/splunk_eventgen/lib/plugins/output/httpevent.py index 81082a1c..f724f34d 100644 --- a/splunk_eventgen/lib/plugins/output/httpevent.py +++ b/splunk_eventgen/lib/plugins/output/httpevent.py @@ -58,7 +58,7 @@ def _setup_REST_workers(self, session=None, workers=10): if not session: session = Session() self.session = FuturesSession(session=session, executor=ThreadPoolExecutor(max_workers=workers)) - self.active_sessions = [] + self.active_session_info = [] @staticmethod def _urlencode(value): @@ -98,12 +98,13 @@ def updateConfig(self, config): 'outputMode httpevent but httpeventServers not specified for sample %s' % self._sample.name) # set default output mode to round robin if hasattr(self.config, 'httpeventOutputMode') and self.config.httpeventOutputMode: - self.httpeventoutputmode = config.httpeventOutputMode + self.httpeventoutputmode = self.config.httpeventOutputMode else: if hasattr(self._sample, 'httpeventOutputMode') and self._sample.httpeventOutputMode: self.httpeventoutputmode = self._sample.httpeventOutputMode else: self.httpeventoutputmode = 'roundrobin' + if hasattr(self.config, 'httpeventMaxPayloadSize') and self.config.httpeventMaxPayloadSize: self.httpeventmaxsize = self.config.httpeventMaxPayloadSize else: @@ -111,6 +112,15 @@ def updateConfig(self, config): self.httpeventmaxsize = self._sample.httpeventMaxPayloadSize else: self.httpeventmaxsize = 10000 + + if hasattr(self.config, 'httpeventAllowFailureCount') and self.config.httpeventAllowFailureCount: + self.httpeventAllowFailureCount = int(self.config.httpeventAllowFailureCount) + else: + if hasattr(self._sample, 'httpeventAllowFailureCount') and self._sample.httpeventAllowFailureCount: + self.httpeventAllowFailureCount = int(self._sample.httpeventAllowFailureCount) + else: + self.httpeventAllowFailureCount = 100 + self.logger.debug("Currentmax size: %s " % self.httpeventmaxsize) if isinstance(config.httpeventServers, str): self.httpeventServers = json.loads(config.httpeventServers) @@ -118,41 +128,29 @@ def updateConfig(self, config): self.httpeventServers = config.httpeventServers self.logger.debug("Setting up the connection pool for %s in %s" % (self._sample.name, self._app)) self.createConnections() - self.logger.debug("Pool created.") - self.logger.debug("Finished init of httpevent plugin.") + self.logger.debug("Pool created and finished init of httpevent plugin.") except Exception as e: - self.logger.exception(e) + self.logger.exception(str(e)) def createConnections(self): self.serverPool = [] if self.httpeventServers: for server in self.httpeventServers.get('servers'): if not server.get('address'): - self.logger.error( - 'requested a connection to a httpevent server, but no address specified for sample %s' % - self._sample.name) raise ValueError( 'requested a connection to a httpevent server, but no address specified for sample %s' % self._sample.name) if not server.get('port'): - self.logger.error( - 'requested a connection to a httpevent server, but no port specified for server %s' % server) raise ValueError( 'requested a connection to a httpevent server, but no port specified for server %s' % server) if not server.get('key'): - self.logger.error( - 'requested a connection to a httpevent server, but no key specified for server %s' % server) raise ValueError( 'requested a connection to a httpevent server, but no key specified for server %s' % server) if not ((server.get('protocol') == 'http') or (server.get('protocol') == 'https')): - self.logger.error( - 'requested a connection to a httpevent server, but no protocol specified for server %s' % - server) raise ValueError( 'requested a connection to a httpevent server, but no protocol specified for server %s' % server) - self.logger.debug( - "Validation Passed, Creating a requests object for server: %s" % server.get('address')) + self.logger.debug("Validation Passed, Creating a requests object for server: %s" % server.get('address')) setserver = {} setserver['url'] = "%s://%s:%s/services/collector" % (server.get('protocol'), server.get('address'), @@ -171,43 +169,40 @@ def _sendHTTPEvents(self, payload): numberevents = len(payload) self.logger.debug("Sending %s events to splunk" % numberevents) for line in payload: - self.logger.debug("line: %s " % line) + self.logger.debugv("line: %s " % line) targetline = json.dumps(line) - self.logger.debug("targetline: %s " % targetline) + self.logger.debugv("targetline: %s " % targetline) targetlinesize = len(targetline) totalbytesexpected += targetlinesize if (int(currentreadsize) + int(targetlinesize)) <= int(self.httpeventmaxsize): stringpayload = stringpayload + targetline currentreadsize = currentreadsize + targetlinesize - self.logger.debug("stringpayload: %s " % stringpayload) + self.logger.debugv("stringpayload: %s " % stringpayload) else: self.logger.debug("Max size for payload hit, sending to splunk then continuing.") - try: - self._transmitEvents(stringpayload) - totalbytessent += len(stringpayload) - currentreadsize = 0 - stringpayload = targetline - except Exception as e: - self.logger.exception(e) - raise e - else: - try: - totalbytessent += len(stringpayload) - self.logger.debug( - "End of for loop hit for sending events to splunk, total bytes sent: %s ---- out of %s -----" % - (totalbytessent, totalbytesexpected)) self._transmitEvents(stringpayload) - except Exception as e: - self.logger.exception(e) - raise e + totalbytessent += len(stringpayload) + currentreadsize = targetlinesize + stringpayload = targetline + + totalbytessent += len(stringpayload) + self.logger.debug( + "End of for loop hit for sending events to splunk, total bytes sent: %s ---- out of %s -----" % + (totalbytessent, totalbytesexpected)) + self._transmitEvents(stringpayload) def _transmitEvents(self, payloadstring): targetServer = [] - self.logger.debug("Transmission called with payloadstring: %s " % payloadstring) + self.logger.debug("Transmission called with payloadstring length: %s " % len(payloadstring)) + + if not self.serverPool: + raise Exception("No available servers exist. Please check your httpServers.") + if self.httpeventoutputmode == "mirror": targetServer = self.serverPool else: targetServer.append(random.choice(self.serverPool)) + for server in targetServer: self.logger.debug("Selected targetServer object: %s" % targetServer) url = server['url'] @@ -215,17 +210,45 @@ def _transmitEvents(self, payloadstring): headers['Authorization'] = server['header'] headers['content-type'] = 'application/json' try: - payloadsize = len(payloadstring) - # response = requests.post(url, data=payloadstring, headers=headers, verify=False) - self.active_sessions.append( - self.session.post(url=url, data=payloadstring, headers=headers, verify=False)) + session_info = list() + session_info.append(url) + session_info.append(self.session.post(url=url, data=payloadstring, headers=headers, verify=False)) + self.active_session_info.append(session_info) except Exception as e: - self.logger.error("Failed for exception: %s" % e) self.logger.error("Failed sending events to url: %s sourcetype: %s size: %s" % - (url, self.lastsourcetype, payloadsize)) - self.logger.debug( - "Failed sending events to url: %s headers: %s payload: %s" % (url, headers, payloadstring)) - raise e + (url, self.lastsourcetype, len(payloadstring))) + + def reset_count(self, url): + try: + self.config.httpeventServersCountdownMap[url] = self.httpeventAllowFailureCount + except: + pass + + def remove_requets_target(self, url): + httpeventServers = json.loads(self.config.httpeventServers) + + # If url fail more than specified count, we completely remove it from the pool. + try: + countdown_map = self.config.httpeventServersCountdownMap + except: + self.config.httpeventServersCountdownMap = {} + for i, server_info in enumerate(self.serverPool): + # URL is in format: https://2.2.2.2:8088/services/collector + self.config.httpeventServersCountdownMap[server_info.get('url', '')] = self.httpeventAllowFailureCount + countdown_map = self.config.httpeventServersCountdownMap + + for i, server_info in enumerate(httpeventServers.get('servers', [])): + target_url = '{}://{}:{}'.format(server_info.get('protocol', ''), server_info.get('address', ''), server_info.get('port', '')) + if target_url in url: + if countdown_map[url] <= 0: + del httpeventServers.get('servers')[i] + self.logger.warning("Cannot reach {}. Removing from the server pool".format(url)) + else: + countdown_map[url] -= 1 + self.logger.debug("Cannot reach {}. Lowering countdown to {}".format(url, countdown_map[url])) + self.config.httpeventServers = json.dumps(httpeventServers) + self._sample.httpeventServers = httpeventServers + self.config.httpeventServersCountdownMap = countdown_map def flush(self, q): self.logger.debug("Flush called on httpevent plugin") @@ -235,56 +258,51 @@ def flush(self, q): payload = [] self.logger.debug("Currently being called with %d events" % len(q)) for event in q: - self.logger.debug("HTTPEvent proccessing event: %s" % event) + self.logger.debugv("HTTPEvent proccessing event: %s" % event) payloadFragment = {} if event.get('_raw') is None or event['_raw'] == "\n": self.logger.error('failure outputting event, does not contain _raw') else: - self.logger.debug("Event contains _raw, attempting to process...") + self.logger.debugv("Event contains _raw, attempting to process...") payloadFragment['event'] = event['_raw'] if event.get('source'): - self.logger.debug("Event contains source, adding to httpevent event") + self.logger.debugv("Event contains source, adding to httpevent event") payloadFragment['source'] = event['source'] if event.get('sourcetype'): - self.logger.debug("Event contains sourcetype, adding to httpevent event") + self.logger.debugv("Event contains sourcetype, adding to httpevent event") payloadFragment['sourcetype'] = event['sourcetype'] self.lastsourcetype = event['sourcetype'] if event.get('host'): - self.logger.debug("Event contains host, adding to httpevent event") + self.logger.debugv("Event contains host, adding to httpevent event") payloadFragment['host'] = event['host'] if event.get('_time'): # make sure _time can be an epoch timestamp try: float(event.get("_time")) - self.logger.debug("Event contains _time, adding to httpevent event") + self.logger.debugv("Event contains _time, adding to httpevent event") payloadFragment['time'] = event['_time'] except: self.logger.error("Timestamp not in epoch format, ignoring event: {0}".format(event)) if event.get('index'): - self.logger.debug("Event contains index, adding to httpevent event") + self.logger.debugv("Event contains index, adding to httpevent event") payloadFragment['index'] = event['index'] - self.logger.debug("Full payloadFragment: %s" % json.dumps(payloadFragment)) + self.logger.debugv("Full payloadFragment: %s" % json.dumps(payloadFragment)) payload.append(payloadFragment) self.logger.debug("Finished processing events, sending all to splunk") self._sendHTTPEvents(payload) - if self.config.httpeventWaitResponse: - for session in self.active_sessions: - response = session.result() - if not response.raise_for_status(): - self.logger.debug("Payload successfully sent to httpevent server.") - else: - self.logger.error("Server returned an error while trying to send, response code: %s" % - response.status_code) - raise BadConnection( - "Server returned an error while sending, response code: %s" % response.status_code) - else: + if not self.config.httpeventWaitResponse: self.logger.debug("Ignoring response from HTTP server, leaving httpevent outputter") + else: + for session_info in self.active_session_info: + url, session = session_info[0], session_info[1] + try: + response = session.result(5) + self.reset_count(url) + self.logger.debug("Payload successfully sent to " + url) + except Exception as e: + self.remove_requets_target(url) except Exception as e: - self.logger.error('failed indexing events, reason: %s ' % e) - - def _setup_logging(self): - self.logger = logging.getLogger('eventgen_httpeventout') - + self.logger.error('Failed sending events, reason: %s ' % e) def load(): """Returns an instance of the plugin""" diff --git a/splunk_eventgen/lib/requirements.txt b/splunk_eventgen/lib/requirements.txt new file mode 100644 index 00000000..4bfb4b0b --- /dev/null +++ b/splunk_eventgen/lib/requirements.txt @@ -0,0 +1,2 @@ +ujson==1.35 +jinja2==2.10.1 diff --git a/splunk_eventgen/splunk_app/README/eventgen.conf.spec b/splunk_eventgen/splunk_app/README/eventgen.conf.spec index 8a1925ce..a4b45cea 100644 --- a/splunk_eventgen/splunk_app/README/eventgen.conf.spec +++ b/splunk_eventgen/splunk_app/README/eventgen.conf.spec @@ -22,7 +22,7 @@ [global] disabled = false debug = false -verbose = false +verbosity = false spoolDir = $SPLUNK_HOME/var/spool/splunk spoolFile = breaker = [^\r\n\s]+ @@ -61,6 +61,7 @@ maxIntervalsBeforeFlush = 3 maxQueueLength = 0 autotimestamps = [ ] autotimestamp = false +disableLoggingQueue = true [] diff --git a/splunk_eventgen/splunk_app/default/data/ui/nav/default.xml b/splunk_eventgen/splunk_app/default/data/ui/nav/default.xml new file mode 100644 index 00000000..e4e189a6 --- /dev/null +++ b/splunk_eventgen/splunk_app/default/data/ui/nav/default.xml @@ -0,0 +1,3 @@ +