Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -541,13 +541,13 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) {
}

if (consumer->event_queues != NULL) {
destroy_event_queues(L, consumer->event_queues);
destroy_event_queues(L, consumer->event_queues); // Potential deadlock with callback in wait_consumer_close?
}

if (consumer->rd_consumer != NULL) {
/* Destroy handle */
// FIXME: kafka_destroy hangs forever
// coio_call(kafka_destroy, consumer->rd_consumer);
rd_kafka_destroy_flags(consumer->rd_consumer, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
}

free(consumer);
Expand All @@ -563,15 +563,7 @@ lua_consumer_close(struct lua_State *L) {
return 1;
}

// trying to close in background until success
while (coio_call(wait_consumer_close, (*consumer_p)->rd_consumer) == -1) {
// FIXME: maybe send errors to error queue?
}

if ((*consumer_p)->poller != NULL) {
destroy_consumer_poller((*consumer_p)->poller);
(*consumer_p)->poller = NULL;
}
consumer_destroy(L,*(consumer_p));

lua_pushboolean(L, 1);
return 1;
Expand All @@ -581,7 +573,7 @@ int
lua_consumer_gc(struct lua_State *L) {
consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label);
if (consumer_p && *consumer_p) {
consumer_destroy(L, *consumer_p);
//consumer_destroy(L, *consumer_p); Manually destruction
}
if (consumer_p)
*consumer_p = NULL;
Expand Down