Skip to content

Commit

Permalink
omprog: simplify 'plugin-with-feedback.py' example
Browse files Browse the repository at this point in the history
Make the skeleton easier to understand by removing transaction support.
Also, transaction failures did not work as explained in the skeleton,
because of issue rsyslog#2420. In the future, a 'plugin-with-transactions.py'
example can be added, ideally once the issue is solved.
  • Loading branch information
jsiwrk committed Jun 3, 2018
1 parent 428fa43 commit 7b24781
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 133 deletions.
10 changes: 3 additions & 7 deletions plugins/external/INTERFACE.md
Expand Up @@ -36,7 +36,7 @@ Providing Feedback to Rsyslog
The plugin may convey error information to rsyslog. To do this, set the
`confirmMessages` flag to `on` in the `omprog` action configuration (this flag
is disabled by default). When this flag is enabled, rsyslog will wait for a
confirmation from the plugin after sending every log message to it.
confirmation from the plugin after sending each log message to it.

The plugin must confirm the message by writing a line with the word `OK` to
its standard output. That is, the plugin must write the characters `O`, `K` and
Expand Down Expand Up @@ -89,8 +89,8 @@ Apart from this facility, rsyslog will ignore the plugin's stderr.

Note: When the `output` setting is specified and `confirmMessages` is set to
`off`, rsyslog will capture both the stdout and stderr of the plugin to the
specified file. You can use this to debug your plugin if you think it is not confirming
the messages as expected.
specified file. You can use this to debug your plugin if you think it is not
confirming the messages as expected.

Example implementation
----------------------
Expand Down Expand Up @@ -185,10 +185,6 @@ confirms the log messages within each transaction with `DEFER_COMMIT`:
=> COMMIT TRANSACTION
<= OK

Example implementation
----------------------
For a reference example of a plugin with transaction support, see [this Python
plugin skeleton](skeletons/python/plugin-with-feedback.py).

Threading Model
===============
Expand Down
142 changes: 16 additions & 126 deletions plugins/external/skeletons/python/plugin-with-feedback.py
@@ -1,13 +1,12 @@
#!/usr/bin/env python3
"""A skeleton for a Python rsyslog output plugin with error handling
and transaction support. Requires Python 3.
"""A skeleton for a Python rsyslog output plugin with error handling.
Requires Python 3.
To integrate a plugin based on this skeleton with rsyslog, configure an
'omprog' action like the following:
action(type="omprog"
binary="/usr/bin/myplugin.py"
confirmMessages="on"
useTransactions="on" # or "off" if you don't need transactions
...)
Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -35,13 +34,13 @@


class RecoverableError(Exception):
"""An error that has caused the processing of the current message or
transaction to fail, but does not require restarting the plugin.
"""An error that has caused the processing of the current message to
fail, but does not require restarting the plugin.
An example of such an error would be a temporary loss of connection to
a database or a server. If such an error occurs in the onBeginTransation,
onMessage or onCommitTransaction functions, your plugin should wrap it
in a RecoverableError before raising it. For example:
a database or a server. If such an error occurs in the onMessage function,
your plugin should wrap it in a RecoverableError before raising it.
For example:
try:
# code that connects to a database
Expand All @@ -51,7 +50,7 @@ class RecoverableError(Exception):
Recoverable errors will cause the 'omprog' action to be temporarily
suspended by rsyslog, during a period that can be configured using the
"action.resumeInterval" action parameter. When the action is resumed,
rsyslog will resend the failed message or transaction to your plugin.
rsyslog will resend the failed message to your plugin.
"""


Expand Down Expand Up @@ -84,57 +83,25 @@ def onInit():
# For illustrative purposes, this plugin skeleton appends the received logs
# to a file. When implementing your plugin, remove the following code.
global outfile
outfile = open("/tmp/logfile", "w")


def onBeginTransaction():
"""Begin the processing of a batch of messages.
This function is invoked only when the "useTransactions" parameter is
configured to "on" in the 'omprog' action.
You can implement this function to e.g. start a database transaction.
Raises:
RecoverableError: If a recoverable error occurs. The message or the
transaction will be retried without restarting the plugin.
Exception: If a non-recoverable error occurs. The plugin will be
restarted before retrying the message or the transaction.
"""
logging.debug("onBeginTransaction called")
outfile = open("/tmp/logfile", "w")


def onMessage(msg):
"""Process one log message received from rsyslog (e.g. send it to a
database).
If this function raises an error and the "useTransactions" parameter is
configured to "on" in the 'omprog' action, rsyslog will retry the full
batch of messages. Otherwise, if "useTransactions" is set to "off", only
this message will be retried.
database). If this function raises an error, the message will be retried
by rsyslog.
Args:
msg (str): the log message. Does NOT include a trailing newline.
Raises:
RecoverableError: If a recoverable error occurs. The message or the
transaction will be retried without restarting the plugin.
RecoverableError: If a recoverable error occurs. The message will be
retried without restarting the plugin.
Exception: If a non-recoverable error occurs. The plugin will be
restarted before retrying the message or the transaction.
restarted before retrying the message.
"""
logging.debug("onMessage called")

# It is recommended to check that the "useTransactions" flag is
# appropriately configured in 'omprog'. If your plugin requires
# transactions, you can check that they are enabled as follows:
# global inTransaction
# assert inTransaction, "This plugin requires transactions to be enabled"

# Otherwise, if your plugin does not support transactions, you can check
# that they are disabled as follows:
# global inTransaction
# assert not inTransaction, "This plugin does not support transactions"

# For illustrative purposes, this plugin skeleton appends the received logs
# to a file. When implementing your plugin, remove the following code.
global outfile
Expand All @@ -143,40 +110,6 @@ def onMessage(msg):
outfile.flush()


def onCommitTransaction():
"""Complete the processing of a batch of messages.
This function is invoked only when the "useTransactions" parameter is
configured to "on" in the 'omprog' action.
You can implement this function to e.g. commit a database transaction.
Raises:
RecoverableError: If a recoverable error occurs. The transaction
will be retried without restarting the plugin.
Exception: If a non-recoverable error occurs. The plugin will be
restarted before retrying the transaction.
"""
logging.debug("onCommitTransaction called")


def onRollbackTransaction():
"""Cancel the processing of a batch of messages.
This function is invoked only when the "useTransactions" parameter is
configured to "on" in the 'omprog' action, and when the "onMessage"
function has raised a (recoverable or non-recoverable) error for one of
the messages in the batch. It is also invoked if "onCommitTransaction"
raises an error.
You can implement this function to e.g. rollback a database transaction.
This function should not raise any error. If it does, the error will be
logged as a warning and ignored.
"""
logging.debug("onRollbackTransaction called")


def onExit():
"""Do everything that is needed to finish processing (e.g. close files,
handles, disconnect from systems...). This is being called immediately
Expand All @@ -200,21 +133,6 @@ def onExit():
This is the main loop that receives messages from rsyslog via stdin,
invokes the above entrypoints, and provides status codes to rsyslog
via stdout. In most cases, modifying this code should not be necessary.
You will have to change the code below if you need the following
advanced features:
* Custom begin/end transaction marks: if you have configured the
'omprog' action in rsyslog to use your own marks for transaction
boundaries (instead of the default "BEGIN TRANSACTION" and "COMMIT
TRANSACTION" messages), modify the code below accordingly.
* Partial transaction commits: this skeleton confirms all messages
inside a transaction using the "DEFER_COMMIT" status code. If you
want to return the "PREVIOUS_COMMITED" or "OK" status codes within
transactions, you will need to modify the code below. See
http://www.rsyslog.com/doc/v8-stable/development/dev_oplugins.html
for information about these status codes. Note that rsyslog will not
send the "COMMIT TRANSACTION" mark if the last message in the
transaction is confirmed with an "OK" status code.
"""
try:
onInit()
Expand All @@ -227,41 +145,14 @@ def onExit():
# Tell rsyslog we are ready to start processing messages:
print("OK", flush=True)

inTransaction = False
endedWithError = False
try:
line = sys.stdin.readline()
while line:
line = line.rstrip('\n')
try:
try:
if line == "BEGIN TRANSACTION":
onBeginTransaction()
inTransaction = True
status = "OK"
elif line == "COMMIT TRANSACTION":
onCommitTransaction()
inTransaction = False
status = "OK"
else:
onMessage(line)
status = "DEFER_COMMIT" if inTransaction else "OK"

except Exception:
# If a transaction was in progress, call onRollbackTransaction
# to facilitate cleaning up the transaction state. Note that
# rsyslog does not support this notification. (It probably
# should, to allow the plugin to be notified in case the
# transaction fails in the rsyslog side.)
if inTransaction:
try:
onRollbackTransaction()
except Exception as ignored:
ignored.__suppress_context__ = True
logging.warning("Exception ignored in onRollbackTransaction", exc_info=True)
inTransaction = False
raise

onMessage(line)
status = "OK"
except RecoverableError as e:
# Any line written to stdout that is not a status code will be
# treated as a recoverable error by 'omprog', and cause the action
Expand All @@ -272,7 +163,6 @@ def onExit():
# We also log the complete exception to stderr (or to the logging
# handler(s) configured in doInit, if any).
logging.exception(e)
inTransaction = False

# Send the status code (or the one-line error message) to rsyslog:
print(status, flush=True)
Expand Down

0 comments on commit 7b24781

Please sign in to comment.