Skip to content

Commit

Permalink
Merge pull request #5302 from n2yen/dev-omhttp-patches
Browse files Browse the repository at this point in the history
omhttp patches and updates
  • Loading branch information
rgerhards committed Apr 2, 2024
2 parents a32483d + a67af36 commit 8d384b9
Show file tree
Hide file tree
Showing 11 changed files with 548 additions and 8 deletions.
286 changes: 279 additions & 7 deletions contrib/omhttp/omhttp.c

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -912,16 +912,19 @@ if ENABLE_OMHTTP
TESTS += \
omhttp-auth.sh \
omhttp-basic.sh \
omhttp-basic-ignorecodes.sh \
omhttp-batch-fail-with-400.sh \
omhttp-batch-jsonarray-compress.sh \
omhttp-batch-jsonarray-retry.sh \
omhttp-batch-jsonarray.sh \
omhttp-batch-kafkarest-retry.sh \
omhttp-batch-kafkarest.sh \
omhttp-batch-retry-metadata.sh \
omhttp-batch-lokirest-retry.sh \
omhttp-batch-lokirest.sh \
omhttp-batch-newline.sh \
omhttp-retry.sh \
omhttp-retry-timeout.sh \
omhttp-httpheaderkey.sh \
omhttp-multiplehttpheaders.sh \
omhttp-dynrestpath.sh \
Expand All @@ -930,12 +933,15 @@ if HAVE_VALGRIND
TESTS += \
omhttp-auth-vg.sh \
omhttp-basic-vg.sh \
omhttp-basic-ignorecodes-vg.sh \
omhttp-batch-jsonarray-compress-vg.sh \
omhttp-batch-jsonarray-retry-vg.sh \
omhttp-batch-jsonarray-vg.sh \
omhttp-batch-kafkarest-retry-vg.sh \
omhttp-batch-retry-metadata-vg.sh \
omhttp-batch-lokirest-retry-vg.sh \
omhttp-retry-vg.sh \
omhttp-retry-timeout-vg.sh \
omhttp-batch-lokirest-vg.sh
endif
endif
Expand Down Expand Up @@ -2632,6 +2638,7 @@ EXTRA_DIST= \
omhttp-batch-lokirest-retry-vg.sh \
omhttp-retry-vg.sh \
omhttp_server.py \
omhttp-validate-response.py \
omprog-defaults.sh \
omprog-defaults-vg.sh \
omprog-output-capture.sh \
Expand Down
14 changes: 14 additions & 0 deletions tests/diag.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2476,6 +2476,20 @@ omhttp_get_data() {
> ${RSYSLOG_OUT_LOG}
}

omhttp_validate_metadata_response() {
echo "starting to validate omhttp response metadata."
omhttp_response_validate_py=$srcdir/omhttp-validate-response.py
if [ ! -f $omhttp_response_validate_py ]; then
echo "Cannot find ${omhttp_response_validate_py} for omhttp test"
error_exit 1
fi

$PYTHON ${omhttp_response_validate_py} --error ${RSYSLOG_DYNNAME}/omhttp.error.log --response ${RSYSLOG_DYNNAME}/omhttp.response.log 2>&1
if [ $? -ne 0 ] ; then
printf 'omhttp_validate_metadata_response failed \n'
error_exit 1
fi
}

# prepare MySQL for next test
# each test receives its own database so that we also can run in parallel
Expand Down
3 changes: 3 additions & 0 deletions tests/omhttp-basic-ignorecodes-vg.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhttp-basic-ignorecodes.sh
44 changes: 44 additions & 0 deletions tests/omhttp-basic-ignorecodes.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash
# This file is part of the rsyslog project, released under ASL 2.0

# Starting actual testbench
. ${srcdir:=.}/diag.sh init

export NUMMESSAGES=10000

port="$(get_free_port)"
omhttp_start_server $port --fail-with-401-or-403-after 5000

generate_conf
add_conf '
template(name="tpl" type="string"
string="{\"msgnum\":\"%msg:F,58:2%\"}")
module(load="../contrib/omhttp/.libs/omhttp")
if $msg contains "msgnum:" then
action(
# Payload
name="my_http_action"
type="omhttp"
errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'"
template="tpl"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
batch="off"
httpignorablecodes=["401", "NA", "403"]
# Auth
usehttps="off"
)
'
startup
injectmsg
shutdown_when_empty
wait_shutdown
omhttp_get_data $port my/endpoint
omhttp_stop_server
seq_check 0 4999
exit_test
3 changes: 3 additions & 0 deletions tests/omhttp-batch-retry-metadata-vg.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhttp-batch-retry-metadata.sh
89 changes: 89 additions & 0 deletions tests/omhttp-batch-retry-metadata.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/bin/bash
# This file is part of the rsyslog project, released under ASL 2.0

# Starting actual testbench
. ${srcdir:=.}/diag.sh init

export NUMMESSAGES=50000

port="$(get_free_port)"
omhttp_start_server $port --fail-every 100 --fail-with 207

generate_conf
add_conf '
module(load="../contrib/omhttp/.libs/omhttp")
main_queue(queue.dequeueBatchSize="2048")
template(name="tpl" type="string"
string="{\"msgnum\":\"%msg:F,58:2%\"}")
# Echo message as-is for retry
template(name="tpl_echo" type="string" string="%msg%\n")
# Echo response as-is for retry
template(name="tpl_response" type="string" string="{ \"message\": %msg%, \"response\": %$!omhttp!response% }\n")
ruleset(name="ruleset_omhttp_retry") {
#action(type="omfile" file="'$RSYSLOG_DYNNAME/omhttp.message.log'" template="tpl_echo")
# log the response
action(type="omfile" file="'$RSYSLOG_DYNNAME/omhttp.response.log'" template="tpl_response")
action(
name="action_omhttp"
type="omhttp"
errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'"
template="tpl_echo"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
batch="on"
batch.maxsize="100"
batch.format="kafkarest"
httpretrycodes=["207","500"]
retry="on"
retry.ruleset="ruleset_omhttp_retry"
retry.addmetadata="on"
# Auth
usehttps="off"
) & stop
}
ruleset(name="ruleset_omhttp") {
action(
name="action_omhttp"
type="omhttp"
errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'"
template="tpl"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
batch="on"
batch.maxsize="100"
batch.format="kafkarest"
httpretrycodes=["207", "500"]
retry="on"
retry.ruleset="ruleset_omhttp_retry"
retry.addmetadata="on"
# Auth
usehttps="off"
) & stop
}
if $msg contains "msgnum:" then
call ruleset_omhttp
'
startup
injectmsg
shutdown_when_empty
wait_shutdown
omhttp_get_data $port my/endpoint kafkarest
omhttp_stop_server
seq_check
omhttp_validate_metadata_response
exit_test
3 changes: 3 additions & 0 deletions tests/omhttp-retry-timeout-vg.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhttp-retry-timeout.sh
49 changes: 49 additions & 0 deletions tests/omhttp-retry-timeout.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash
# This file is part of the rsyslog project, released under ASL 2.0

# Starting actual testbench
. ${srcdir:=.}/diag.sh init

export NUMMESSAGES=10000

port="$(get_free_port)"
omhttp_start_server $port --fail-every 1000 --fail-with-delay-secs 2

generate_conf
add_conf '
module(load="../contrib/omhttp/.libs/omhttp")
main_queue(queue.dequeueBatchSize="2048")
template(name="tpl" type="string"
string="{\"msgnum\":\"%msg:F,58:2%\"}")
if $msg contains "msgnum:" then
action(
# Payload
action.resumeRetryCount="-1"
action.resumeInterval="1"
name="my_http_action"
type="omhttp"
errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'"
template="tpl"
server="localhost"
serverport="'$port'"
restpath="my/endpoint"
restpathtimeout="1000"
checkpath="ping"
batch="off"
# Auth
usehttps="off"
)
'
startup
injectmsg
shutdown_when_empty
wait_shutdown
omhttp_get_data $port my/endpoint
omhttp_stop_server
seq_check
exit_test
34 changes: 34 additions & 0 deletions tests/omhttp-validate-response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import json
import argparse
from collections import defaultdict

if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Archive and delete core app log files')
parser.add_argument('--error', action='store', type=str, help='error')
parser.add_argument('--response', action='store', type=str, help='response')
args = parser.parse_args()

messages = defaultdict(dict)
with open(args.error, "r") as error_f, open(args.response, "r") as response_f:
for line in error_f:
json_obj = json.loads(line)
# postdata contains a json string of records array
records = json.loads(json_obj['request']['postdata'])
if records:
for i, val in enumerate(records['records']):
messages[val['value']['msgnum']]['response'] = json_obj['response']
messages[val['value']['msgnum']]['index'] = i
#print (len(messages), "messages:", messages)

# validate with responses
for line in response_f:
json_obj = json.loads(line)
msgnum = json_obj['message']['msgnum']
code = json_obj['response']['code']
body = json_obj['response']['body']
batch_index = json_obj['response']['batch_index']
#print('msgnum:', msgnum, 'code:', code, 'body:', body, 'batch_index:', batch_index)
assert(msgnum in messages)
assert(messages[msgnum]['response']['status'] == code)
assert(messages[msgnum]['response']['message'] == body)
assert(messages[msgnum]['index'] == batch_index)
24 changes: 23 additions & 1 deletion tests/omhttp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
import zlib
import base64
import random
import time

try:
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer # Python 2
Expand Down Expand Up @@ -57,13 +59,27 @@ def do_POST(self):
return

if metadata['fail_with_400_after'] != -1 and metadata['posts'] > metadata['fail_with_400_after']:
if metadata['fail_with_delay_secs']:
print("sleeping for: {0}".format(metadata['fail_with_delay_secs']))
time.sleep(metadata['fail_with_delay_secs'])
self.send_response(400)
self.end_headers()
self.wfile.write(b'BAD REQUEST')
return

if metadata['fail_with_401_or_403_after'] != -1 and metadata['posts'] > metadata['fail_with_401_or_403_after']:
status = random.choice([401, 403])
self.send_response(status)
self.end_headers()
self.wfile.write(b'BAD REQUEST')
return

if metadata['posts'] > 1 and metadata['fail_every'] != -1 and metadata['posts'] % metadata['fail_every'] == 0:
self.send_response(500)
if metadata['fail_with_delay_secs']:
print("sleeping for: {0}".format(metadata['fail_with_delay_secs']))
time.sleep(metadata['fail_with_delay_secs'])
code = metadata['fail_with'] if metadata['fail_with'] else 500
self.send_response(code)
self.end_headers()
self.wfile.write(b'INTERNAL ERROR')
return
Expand Down Expand Up @@ -114,13 +130,19 @@ def do_GET(self):
parser.add_argument('-i', '--interface', action='store', type=str, default='localhost', help='port')
parser.add_argument('--fail-after', action='store', type=int, default=0, help='start failing after n posts')
parser.add_argument('--fail-every', action='store', type=int, default=-1, help='fail every n posts')
parser.add_argument('--fail-with', action='store', type=int, default=500, help='on failure, fail with this code')
parser.add_argument('--fail-with-400-after', action='store', type=int, default=-1, help='fail with 400 after n posts')
parser.add_argument('--fail-with-401-or-403-after', action='store', type=int, default=-1, help='fail with 401 or 403 after n posts')
parser.add_argument('--fail-with-delay-secs', action='store', type=int, default=0, help='fail with n secs of delay')
parser.add_argument('--decompress', action='store_true', default=False, help='decompress posted data')
parser.add_argument('--userpwd', action='store', default='', help='only accept this user:password combination')
args = parser.parse_args()
metadata['fail_after'] = args.fail_after
metadata['fail_every'] = args.fail_every
metadata['fail_with'] = args.fail_with
metadata['fail_with_400_after'] = args.fail_with_400_after
metadata['fail_with_401_or_403_after'] = args.fail_with_401_or_403_after
metadata['fail_with_delay_secs'] = args.fail_with_delay_secs
metadata['decompress'] = args.decompress
metadata['userpwd'] = args.userpwd
server = HTTPServer((args.interface, args.port), MyHandler)
Expand Down

0 comments on commit 8d384b9

Please sign in to comment.