Skip to content

Commit

Permalink
Merge pull request #5270 from alorbach/pr-issue-5269
Browse files Browse the repository at this point in the history
omazureeventhubs: Corrected handling of transport closed failures
  • Loading branch information
rgerhards committed Dec 6, 2023
2 parents 84539a7 + 193fc7b commit d25d9e4
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 38 deletions.
117 changes: 81 additions & 36 deletions plugins/omazureeventhubs/omazureeventhubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -328,36 +328,19 @@ static pn_message_t* proton_encode_message(wrkrInstanceData_t *const pWrkrData,
return message;
}

static sbool proton_check_condition(pn_event_t *event, pn_condition_t *cond, const char * pszReason) {
if (pn_condition_is_set(cond)) {
DBGPRINTF("proton_check_condition: %s %s: %s: %s",
pszReason,
pn_event_type_name(pn_event_type(event)),
pn_condition_get_name(cond),
pn_condition_get_description(cond));
LogError(0, RS_RET_ERR, "omazureeventhubs: %s %s: %s: %s",
pszReason,
pn_event_type_name(pn_event_type(event)),
pn_condition_get_name(cond),
pn_condition_get_description(cond));
pn_connection_close(pn_event_connection(event));
return 0;
} else {
return 1;
}
}

static rsRetVal
closeProton(wrkrInstanceData_t *const __restrict__ pWrkrData)
{
DEFiRet;
instanceData *const pData = (instanceData *const) pWrkrData->pData;
#ifndef NDEBUG
DBGPRINTF("closeProton[%p]: ENTER\n", pWrkrData);
#endif
if (pWrkrData->pnSender) {
pn_link_close(pWrkrData->pnSender);
DBGPRINTF("closeProton[%p]: pn_link_close\n", pWrkrData);
pn_session_close(pn_link_session(pWrkrData->pnSender));
DBGPRINTF("closeProton[%p]: pn_link_close/pn_session_close Session\n", pWrkrData);
DBGPRINTF("closeProton[%p]: pn_session_close\n", pWrkrData);
}
if (pWrkrData->pnConn) {
DBGPRINTF("closeProton[%p]: pn_connection_close connection\n", pWrkrData);
Expand All @@ -373,6 +356,23 @@ closeProton(wrkrInstanceData_t *const __restrict__ pWrkrData)
pWrkrData->iMsgSeq = 0;
pWrkrData->iMaxMsgSeq = 0;

// Mark all remaining entries as REJECTED
if(pWrkrData->aProtonMsgs != NULL) {
for(unsigned int i = 0 ; i < pWrkrData->nProtonMsgs ; ++i) {
if (pWrkrData->aProtonMsgs[i] != NULL && (
pWrkrData->aProtonMsgs[i]->status == PROTON_UNSUBMITTED ||
pWrkrData->aProtonMsgs[i]->status == PROTON_SUBMITTED)
) {
pWrkrData->aProtonMsgs[i]->status = PROTON_REJECTED;
DBGPRINTF("closeProton[%p]: Setting ProtonMsg %s to PROTON_REJECTED \n",
pWrkrData, pWrkrData->aProtonMsgs[i]->MsgID);
// Increment Stats Counter
STATSCOUNTER_INC(ctrAzureFail, mutCtrAzureFail);
INST_STATSCOUNTER_INC(pData, pData->ctrAzureFail, pData->mutCtrAzureFail);
}
}
}

FINALIZE;
finalize_it:
RETiRet;
Expand All @@ -390,13 +390,14 @@ openProton(wrkrInstanceData_t *const __restrict__ pWrkrData)
#ifndef NDEBUG
DBGPRINTF("openProton[%p]: ENTER\n", pWrkrData);
#endif
if(pWrkrData->bIsConnecting || pWrkrData->bIsConnected)
if(pWrkrData->bIsConnecting == 1 || pWrkrData->bIsConnected == 1)
FINALIZE;
pWrkrData->pnStatus = PN_EVENT_NONE;

pn_proactor_addr(szAddr, sizeof(szAddr),
(const char *) pData->azurehost,
(const char *) pData->azureport);

// Configure a transport for SSL. The transport will be freed by the proactor.
pWrkrData->pnTransport = pn_transport();
DBGPRINTF("openProton[%p]: create transport to '%s:%s'\n",
Expand Down Expand Up @@ -441,21 +442,52 @@ openProton(wrkrInstanceData_t *const __restrict__ pWrkrData)

// Successfully connecting
pWrkrData->bIsConnecting = 1;
pWrkrData->bIsSuspended = 0;
finalize_it:
if(iRet != RS_RET_OK) {
closeProton(pWrkrData); // Make sure to free ressources
}
RETiRet;
}

static sbool
proton_check_condition( pn_event_t *event,
wrkrInstanceData_t *const __restrict__ pWrkrData,
pn_condition_t *cond,
const char * pszReason) {
if (pn_condition_is_set(cond)) {
DBGPRINTF("proton_check_condition: %s %s: %s: %s",
pszReason,
pn_event_type_name(pn_event_type(event)),
pn_condition_get_name(cond),
pn_condition_get_description(cond));
LogError(0, RS_RET_ERR, "omazureeventhubs: %s %s: %s: %s",
pszReason,
pn_event_type_name(pn_event_type(event)),
pn_condition_get_name(cond),
pn_condition_get_description(cond));

// Connection can be closed
closeProton(pWrkrData);

// Set Worker to suspended state!
pWrkrData->bIsSuspended = 1;

return 0;
} else {
return 1;
}
}

static rsRetVal
setupProtonHandle(wrkrInstanceData_t *const __restrict__ pWrkrData, int autoclose)
{
DEFiRet;
DBGPRINTF("omazureeventhubs[%p]: setupProtonHandle ENTER\n", pWrkrData);

pthread_rwlock_wrlock(&pWrkrData->pnLock);
if (autoclose == SETUP_PROTON_AUTOCLOSE) {
if (autoclose == SETUP_PROTON_AUTOCLOSE && (pWrkrData->bIsConnected == 1)) {
DBGPRINTF("omazureeventhubs[%p]: setupProtonHandle closeProton\n", pWrkrData);
closeProton(pWrkrData);
}
CHKiRet(openProton(pWrkrData));
Expand Down Expand Up @@ -631,6 +663,7 @@ CODESTARTtryResume
DBGPRINTF("omazureeventhubs[%p]: tryResume ENTER\n", pWrkrData);
#endif
if (pWrkrData->bIsConnecting == 0 && pWrkrData->bIsConnected == 0) {
DBGPRINTF("omazureeventhubs[%p]: tryResume setupProtonHandle\n", pWrkrData);
CHKiRet(setupProtonHandle(pWrkrData, SETUP_PROTON_AUTOCLOSE));
}
finalize_it:
Expand Down Expand Up @@ -661,6 +694,7 @@ CODESTARTcommitTransaction
#ifndef NDEBUG
DBGPRINTF("omazureeventhubs[%p]: commitTransaction [%d msgs] ENTER\n", pWrkrData, nParams);
#endif

// Handle/Expand our proton helper array
if (nParams > pWrkrData->nMaxProtonMsgs) {
// Free old Array
Expand Down Expand Up @@ -712,12 +746,26 @@ CODESTARTcommitTransaction
}
}

if ( pWrkrData->bIsConnected == 1 &&
iNeedSubmission > 0 &&
pn_link_credit(pWrkrData->pnSender) > 0) {
pn_connection_wake(pWrkrData->pnConn);
/* Grant enough credit to bring it up to BATCH: */
// pn_link_flow(pWrkrData->pnSender, iToSubmit);
if (iNeedSubmission > 0) {
if ( pWrkrData->bIsConnected == 1) {
int credits = pn_link_credit(pWrkrData->pnSender);
if (pn_link_credit(pWrkrData->pnSender) > 0) {
DBGPRINTF("omazureeventhubs[%p]: trigger pn_connection_wake\n",
pWrkrData);
pn_connection_wake(pWrkrData->pnConn);
} else {
DBGPRINTF("omazureeventhubs[%p]: warning pn_link_credit returned %d\n",
pWrkrData, credits);
}
} else {
DBGPRINTF("omazureeventhubs[%p]: commitTransaction Suspended=%s Connecting=%s\n",
pWrkrData,
pWrkrData->bIsSuspended == 1 ? "YES" : "NO",
pWrkrData->bIsConnecting == 1 ? "YES" : "NO");
if (pWrkrData->bIsSuspended == 1 && pWrkrData->bIsConnecting == 0) {
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
}
}
} while (bDone == 0);
finalize_it:
Expand All @@ -733,7 +781,7 @@ CODESTARTcommitTransaction
}

/* TODO: Suspend Action if broker problems were reported in error callback */
if (pWrkrData->bIsSuspended) {
if (pWrkrData->bIsSuspended == 1) {
DBGPRINTF("omazureeventhubs[%p]: commitTransaction failed to send messages, suspending action\n",
pWrkrData);
iRet = RS_RET_SUSPENDED;
Expand Down Expand Up @@ -1271,26 +1319,23 @@ handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event) {
}
case PN_TRANSPORT_CLOSED:
DBGPRINTF("handleProton: transport closed for %p:%s\n", pWrkrData, pData->azurehost);
proton_check_condition(event, pn_transport_condition(pn_event_transport(event)),
proton_check_condition(event, pWrkrData, pn_transport_condition(pn_event_transport(event)),
"transport closed");
// Disconnected
pWrkrData->bIsConnected = 0;
pWrkrData->bIsConnecting = 0;
break;
case PN_CONNECTION_REMOTE_CLOSE:
DBGPRINTF("handleProton: connection closed for %p:%s\n", pWrkrData, pData->azurehost);
proton_check_condition(event, pn_connection_remote_condition(pn_event_connection(event)),
proton_check_condition(event, pWrkrData, pn_connection_remote_condition(pn_event_connection(event)),
"connection closed");
break;
case PN_SESSION_REMOTE_CLOSE:
DBGPRINTF("handleProton: remote session closed for %p:%s\n", pWrkrData, pData->azurehost);
proton_check_condition(event, pn_session_remote_condition(pn_event_session(event)),
proton_check_condition(event, pWrkrData, pn_session_remote_condition(pn_event_session(event)),
"remote session closed");
break;
case PN_LINK_REMOTE_CLOSE:
case PN_LINK_REMOTE_DETACH:
DBGPRINTF("handleProton: remote link closed for %p:%s\n", pWrkrData, pData->azurehost);
proton_check_condition(event, pn_link_remote_condition(pn_event_link(event)),
proton_check_condition(event, pWrkrData, pn_link_remote_condition(pn_event_link(event)),
"remote link closed");
break;
case PN_PROACTOR_INACTIVE:
Expand Down
10 changes: 8 additions & 2 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -991,10 +991,13 @@ if ENABLE_OMAZUREEVENTHUBS
if ENABLE_OMAZUREEVENTHUBS_TESTS
TESTS += \
omazureeventhubs-basic.sh \
omazureeventhubs-list.sh
omazureeventhubs-list.sh \
omazureeventhubs-stress.sh \
omazureeventhubs-interrupt.sh
if HAVE_VALGRIND
TESTS += \
omazureeventhubs-stress.sh
omazureeventhubs-basic-vg.sh \
omazureeventhubs-interrupt-vg.sh
endif
endif
endif
Expand Down Expand Up @@ -2809,6 +2812,9 @@ EXTRA_DIST= \
omazureeventhubs-basic.sh \
omazureeventhubs-list.sh \
omazureeventhubs-stress.sh \
omazureeventhubs-interrupt.sh \
omazureeventhubs-basic-vg.sh \
omazureeventhubs-interrupt-vg.sh \
mmpstrucdata.sh \
mmpstrucdata-escaping.sh \
mmpstrucdata-case.sh \
Expand Down
19 changes: 19 additions & 0 deletions tests/diag.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1080,6 +1080,16 @@ wait_file_lines() {
count_function="$2"
shift 2
fi
interrupt_connection=NO
if [ "$1" == "--interrupt-connection" ]; then
interrupt_connection="YES"
interrupt_host="$2"
interrupt_port="$3"
interrupt_tick="$4"
shift 4
lastcurrent_time=0
fi

timeout=${3:-$TB_TEST_TIMEOUT}
timeoutbegin=$(date +%s)
timeoutend=$(( timeoutbegin + timeout ))
Expand Down Expand Up @@ -1121,6 +1131,15 @@ wait_file_lines() {
error_exit 1
else
echo $(date +%H:%M:%S) wait_file_lines waiting, expected $waitlines, current $count lines

current_time=$(date +%s)
if [ $interrupt_connection == "YES" ] && [ $current_time -gt $lastcurrent_time ] && [ $((current_time % $interrupt_tick)) -eq 0 ] && [ ${count} -gt 1 ]; then
# Interrupt Connection - requires root and linux kernel >= 4.9 in order to work!
echo wait_file_lines Interrupt Connection on ${interrupt_host}:${interrupt_port}
sudo ss -K dst ${interrupt_host} dport = ${interrupt_port}
fi
lastcurrent_time=$current_time

$TESTTOOL_DIR/msleep $delay
fi
fi
Expand Down
5 changes: 5 additions & 0 deletions tests/omazureeventhubs-interrupt-vg.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash
export USE_VALGRIND="YES"
export RS_TEST_VALGRIND_EXTRA_OPTS="--keep-debuginfo=yes --leak-check=full"
export EXTRA_VALGRIND_SUPPRESSIONS="--suppressions=omazureeventhubs.supp"
source ${srcdir:-.}/omazureeventhubs-interrupt.sh

0 comments on commit d25d9e4

Please sign in to comment.