Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 14 additions & 15 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -532,31 +532,30 @@ wait_consumer_destroy(va_list args) {
return 0;
}

static rd_kafka_resp_err_t
void
consumer_destroy(struct lua_State *L, consumer_t *consumer) {
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;

if (consumer->topics != NULL) {
if (consumer->topics != NULL)
rd_kafka_topic_partition_list_destroy(consumer->topics);
}

if (consumer->poller != NULL) {
destroy_consumer_poller(consumer->poller);
}

if (consumer->event_queues != NULL) {
destroy_event_queues(L, consumer->event_queues);
}

/*
* Here we close consumer and only then destroys other stuff.
* Otherwise raise condition is possible when e.g.
* event queue is destroyed but consumer still receives logs, errors, etc.
* Only topics should be destroyed.
*/
if (consumer->rd_consumer != NULL) {
/* Destroy handle */
// FIXME: kafka_destroy hangs forever
coio_call(wait_consumer_destroy, consumer->rd_consumer);
}

free(consumer);
if (consumer->poller != NULL)
destroy_consumer_poller(consumer->poller);

return err;
if (consumer->event_queues != NULL)
destroy_event_queues(L, consumer->event_queues);

free(consumer);
}

int
Expand Down
27 changes: 13 additions & 14 deletions kafka/producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -417,27 +417,26 @@ wait_producer_destroy(va_list args) {

void
destroy_producer(struct lua_State *L, producer_t *producer) {
if (producer->rd_producer != NULL) {
coio_call(producer_flush, producer->rd_producer);
}

if (producer->poller != NULL) {
destroy_producer_poller(producer->poller);
}

if (producer->topics != NULL) {
if (producer->topics != NULL)
destroy_producer_topics(producer->topics);
}

if (producer->event_queues != NULL) {
destroy_event_queues(L, producer->event_queues);
}

/*
* Here we close producer and only then destroys other stuff.
* Otherwise raise condition is possible when e.g.
* event queue is destroyed but producer still receives logs, errors, etc.
* Only topics should be destroyed.
*/
if (producer->rd_producer != NULL) {
/* Destroy handle */
coio_call(wait_producer_destroy, producer->rd_producer);
}

if (producer->poller != NULL)
destroy_producer_poller(producer->poller);

if (producer->event_queues != NULL)
destroy_event_queues(L, producer->event_queues);

free(producer);
}

Expand Down