Skip to content

Commit

Permalink
correct fix?
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed May 23, 2023
1 parent 7880d2d commit 0e25ed4
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 21 deletions.
3 changes: 2 additions & 1 deletion src/micro.c
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,13 @@ on_error(natsConnection *nc, natsSubscription *sub, natsStatus s, void *closure)
{
if (micro_match_endpoint_subject(ep->subject, subject))
{
our_subject = true;
break;
}
}
micro_unlock_service(m);

if (m->cfg->ErrHandler != NULL)
if (m->cfg->ErrHandler != NULL && our_subject)
{
(*m->cfg->ErrHandler)(m, ep, s);
}
Expand Down
2 changes: 1 addition & 1 deletion src/micro_error.c
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ micro_ErrorFromStatus(natsStatus s)
err = NATS_CALLOC(1, sizeof(microError) + message_len + 1);
if (err == NULL)
return &_errorOutOfMemory;

err->message = (char *)(err + 1);
err->status = s;
memcpy(err->message, message, message_len + 1);
return err;
Expand Down
51 changes: 32 additions & 19 deletions test/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -33073,34 +33073,50 @@ void micro_async_error_handler(microService *m, microEndpoint *ep, natsStatus s)
struct threadArg *arg = (struct threadArg*) microService_GetState(m);

natsMutex_Lock(arg->m);

if (arg->sum == 1)
{
natsMutex_Unlock(arg->m);
return;
}

arg->sum = 1;

// unblock the blocked sub
// release the pending test request that caused the error
arg->closed = true;

// set the data to verify
arg->status = s;
arg->string = nats_GetLastError(NULL);
natsCondition_Broadcast(arg->c);
natsMutex_Unlock(arg->m);
}

microError *
micro_async_error_request_handler(microRequest *req)
{
struct threadArg *arg = microRequest_GetServiceState(req);

natsMutex_Lock(arg->m);

arg->msgReceived = true;
natsCondition_Signal(arg->c);

while (!arg->closed)
natsCondition_Wait(arg->c, arg->m);

natsMutex_Unlock(arg->m);

return NULL;
}

static void
test_MicroAsyncErrorHandler(void)
{
natsStatus s;
struct threadArg arg;
natsConnection *nc = NULL;
natsOptions *opts = NULL;
natsSubscription *sub = NULL;
natsPid serverPid = NATS_INVALID_PID;
struct threadArg arg;
microService *m = NULL;
microService *m = NULL;
microEndpoint *ep = NULL;
microEndpointConfig ep_cfg = {
.Name = "do",
.Subject = "async_test",
.Handler = micro_async_error_request_handler,
};
microServiceConfig cfg = {
.Name = "test",
.Version = "1.0.0",
Expand All @@ -33115,7 +33131,6 @@ test_MicroAsyncErrorHandler(void)
s = natsOptions_Create(&opts);
IFOK(s, natsOptions_SetURL(opts, NATS_DEFAULT_URL));
IFOK(s, natsOptions_SetMaxPendingMsgs(opts, 10));

if (s != NATS_OK)
FAIL("Unable to create options for test AsyncErrHandler");

Expand All @@ -33131,12 +33146,11 @@ test_MicroAsyncErrorHandler(void)
test("Test microservice is running: ");
testCond(!microService_IsStopped(m))

test("Subscribe to async_test: ");
testCond(NATS_OK == natsConnection_Subscribe(&sub, nc, "async_test", _recvTestString, (void*) &arg));
test("Add test endpoint: ");
testCond(NULL == micro_add_endpoint(&ep, m, NULL, &ep_cfg, false));

natsMutex_Lock(arg.m);
arg.status = NATS_OK;
arg.control= 7;
natsMutex_Unlock(arg.m);

test("Cause an error by sending too many messages: ");
Expand All @@ -33150,11 +33164,10 @@ test_MicroAsyncErrorHandler(void)

test("Wait for async err callback: ");
natsMutex_Lock(arg.m);
while ((s != NATS_TIMEOUT) && (arg.status != NATS_SLOW_CONSUMER))
while ((s != NATS_TIMEOUT) && !arg.closed)
s = natsCondition_TimedWait(arg.c, arg.m, 1000);
natsMutex_Unlock(arg.m);
testCond((s == NATS_OK)
&& (arg.status == NATS_SLOW_CONSUMER));
testCond((s == NATS_OK) && arg.closed && (arg.status == NATS_SLOW_CONSUMER));

microService_Destroy(m);
natsOptions_Destroy(opts);
Expand Down

0 comments on commit 0e25ed4

Please sign in to comment.