diff --git a/kafka/consumer.c b/kafka/consumer.c index b124006..5c765bb 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -541,13 +541,13 @@ consumer_destroy(struct lua_State *L, consumer_t *consumer) { } if (consumer->event_queues != NULL) { - destroy_event_queues(L, consumer->event_queues); + destroy_event_queues(L, consumer->event_queues); // Potential deadlock with callback in wait_consumer_close? } if (consumer->rd_consumer != NULL) { /* Destroy handle */ // FIXME: kafka_destroy hangs forever -// coio_call(kafka_destroy, consumer->rd_consumer); + rd_kafka_destroy_flags(consumer->rd_consumer, RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE); } free(consumer); @@ -563,15 +563,7 @@ lua_consumer_close(struct lua_State *L) { return 1; } - // trying to close in background until success - while (coio_call(wait_consumer_close, (*consumer_p)->rd_consumer) == -1) { - // FIXME: maybe send errors to error queue? - } - - if ((*consumer_p)->poller != NULL) { - destroy_consumer_poller((*consumer_p)->poller); - (*consumer_p)->poller = NULL; - } + consumer_destroy(L,*(consumer_p)); lua_pushboolean(L, 1); return 1; @@ -581,7 +573,7 @@ int lua_consumer_gc(struct lua_State *L) { consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label); if (consumer_p && *consumer_p) { - consumer_destroy(L, *consumer_p); + //consumer_destroy(L, *consumer_p); Manually destruction } if (consumer_p) *consumer_p = NULL;