diff --git a/kafka/consumer.c b/kafka/consumer.c index 5f68b4c..8c6ed10 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -532,31 +532,30 @@ wait_consumer_destroy(va_list args) { return 0; } -static rd_kafka_resp_err_t +void consumer_destroy(struct lua_State *L, consumer_t *consumer) { - rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; - - if (consumer->topics != NULL) { + if (consumer->topics != NULL) rd_kafka_topic_partition_list_destroy(consumer->topics); - } - - if (consumer->poller != NULL) { - destroy_consumer_poller(consumer->poller); - } - - if (consumer->event_queues != NULL) { - destroy_event_queues(L, consumer->event_queues); - } + /* + * Here we close consumer and only then destroys other stuff. + * Otherwise raise condition is possible when e.g. + * event queue is destroyed but consumer still receives logs, errors, etc. + * Only topics should be destroyed. + */ if (consumer->rd_consumer != NULL) { /* Destroy handle */ // FIXME: kafka_destroy hangs forever coio_call(wait_consumer_destroy, consumer->rd_consumer); } - free(consumer); + if (consumer->poller != NULL) + destroy_consumer_poller(consumer->poller); - return err; + if (consumer->event_queues != NULL) + destroy_event_queues(L, consumer->event_queues); + + free(consumer); } int diff --git a/kafka/producer.c b/kafka/producer.c index 4334f0f..bcd1a67 100644 --- a/kafka/producer.c +++ b/kafka/producer.c @@ -417,27 +417,26 @@ wait_producer_destroy(va_list args) { void destroy_producer(struct lua_State *L, producer_t *producer) { - if (producer->rd_producer != NULL) { - coio_call(producer_flush, producer->rd_producer); - } - - if (producer->poller != NULL) { - destroy_producer_poller(producer->poller); - } - - if (producer->topics != NULL) { + if (producer->topics != NULL) destroy_producer_topics(producer->topics); - } - - if (producer->event_queues != NULL) { - destroy_event_queues(L, producer->event_queues); - } + /* + * Here we close producer and only then destroys other stuff. + * Otherwise raise condition is possible when e.g. + * event queue is destroyed but producer still receives logs, errors, etc. + * Only topics should be destroyed. + */ if (producer->rd_producer != NULL) { /* Destroy handle */ coio_call(wait_producer_destroy, producer->rd_producer); } + if (producer->poller != NULL) + destroy_producer_poller(producer->poller); + + if (producer->event_queues != NULL) + destroy_event_queues(L, producer->event_queues); + free(producer); }