diff --git a/CMakeLists.txt b/CMakeLists.txt index 0570cac..9a7de1c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ if(${tntver} VERSION_LESS 1.7.4.291) message(FATAL_ERROR "Tarantool >= 1.7.4-291 is required") endif() -if(STATIC_BUILD) +if(STATIC_BUILD OR $ENV{STATIC_BUILD}) include(buildRdKafka) buildrdkafka() else() diff --git a/Makefile b/Makefile index ab346d0..65a3e5b 100644 --- a/Makefile +++ b/Makefile @@ -97,11 +97,11 @@ tests-dep: tests-run: cd ./tests && \ . venv/bin/activate && \ - pytest -vv && \ + pytest -W ignore -vv && \ deactivate test-run-with-docker: tests-dep docker-run-all - sleep 5 + sleep 10 docker run \ --net=${NETWORK} \ @@ -145,6 +145,12 @@ test-run-with-docker: tests-dep docker-run-all kafka-topics --create --topic test_multi_consume_2 --partitions 1 --replication-factor 1 \ --if-not-exists --zookeeper zookeeper:2181 + docker run \ + --net=${NETWORK} \ + --rm confluentinc/cp-kafka:5.0.0 \ + kafka-topics --create --topic test_consuming_from_last_committed_offset --partitions 1 --replication-factor 1 \ + --if-not-exists --zookeeper zookeeper:2181 + cd ./tests && \ python3 -m venv venv && \ . venv/bin/activate && \ @@ -153,7 +159,7 @@ test-run-with-docker: tests-dep docker-run-all cd ./tests && \ . venv/bin/activate && \ - pytest -vv && \ + pytest -W ignore -vv && \ deactivate ####################################################################### diff --git a/README.md b/README.md index efeed1c..545a638 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,9 @@ tarantoolctl rocks STATIC_BUILD=ON install kafka local log_callback = function(fac, str, level) log.info("got log: %d - %s - %s", level, fac, str) end + local rebalance_callback = function(msg) + log.info("got rebalance msg: %s", json.encode(msg)) + end local consumer, err = tnt_kafka.Consumer.create({ brokers = "localhost:9092", -- brokers for bootstrap @@ -61,6 +64,7 @@ tarantoolctl rocks STATIC_BUILD=ON install kafka }, -- options for librdkafka error_callback = error_callback, -- optional callback for errors log_callback = log_callback, -- optional callback for logs and debug messages + rebalance_callback = rebalance_callback, -- optional callback for rebalance messages default_topic_options = { ["auto.offset.reset"] = "earliest", }, -- optional default topic options @@ -126,6 +130,9 @@ tarantoolctl rocks STATIC_BUILD=ON install kafka local log_callback = function(fac, str, level) log.info("got log: %d - %s - %s", level, fac, str) end + local rebalance_callback = function(msg) + log.info("got rebalance msg: %s", json.encode(msg)) + end local consumer, err = tnt_kafka.Consumer.create({ brokers = "localhost:9092", -- brokers for bootstrap @@ -137,6 +144,7 @@ tarantoolctl rocks STATIC_BUILD=ON install kafka }, -- options for librdkafka error_callback = error_callback, -- optional callback for errors log_callback = log_callback, -- optional callback for logs and debug messages + rebalance_callback = rebalance_callback, -- optional callback for rebalance messages default_topic_options = { ["auto.offset.reset"] = "earliest", }, -- optional default topic options @@ -316,7 +324,7 @@ because `rd_kafka_destroy` sometimes hangs forever. ### Async -Result: over 150000 produced messages per second on macbook pro 2016 +Result: over 160000 produced messages per second on macbook pro 2016 Local run in docker: ```bash @@ -340,7 +348,7 @@ Local run in docker: ### Auto offset store enabled -Result: over 140000 consumed messages per second on macbook pro 2016 +Result: over 190000 consumed messages per second on macbook pro 2016 Local run in docker: ```bash @@ -351,7 +359,7 @@ Local run in docker: ### Manual offset store -Result: over 140000 consumed messages per second on macbook pro 2016 +Result: over 190000 consumed messages per second on macbook pro 2016 Local run in docker: ```bash diff --git a/benchmarks/sync_producer.lua b/benchmarks/sync_producer.lua index 351c4e6..59490ee 100644 --- a/benchmarks/sync_producer.lua +++ b/benchmarks/sync_producer.lua @@ -17,7 +17,7 @@ local function produce() local producer, err = tnt_kafka.Producer.create({ brokers = "kafka:9092", options = { - ["queue.buffering.max.ms"] = "100", + ["queue.buffering.max.ms"] = "50", } }) if err ~= nil then @@ -27,7 +27,7 @@ local function produce() local before = clock.monotonic64() local input_ch = fiber.channel(); - for i = 1, 12000 do + for i = 1, 10000 do fiber.create(function() while true do if input_ch:is_closed() then @@ -57,6 +57,9 @@ local function produce() for i = 1, 10000000 do input_ch:put(i) + if i % 10000 == 0 then + fiber.yield() + end end input_ch:close() diff --git a/docker/Dockerfile b/docker/Dockerfile index 70e0c3b..84da555 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -2,12 +2,19 @@ FROM tarantool/tarantool:1.x-centos7 RUN yum update -y -RUN yum install -y librdkafka librdkafka-devel cmake gcc tarantool-devel +RUN yum install -y cmake \ + gcc \ + gcc-c++ \ + cyrus-sasl-lib \ + openssl-libs \ + tarantool-devel ADD . /opt/tarantool WORKDIR /opt/tarantool +ENV STATIC_BUILD ON + RUN cmake . RUN make diff --git a/kafka/CMakeLists.txt b/kafka/CMakeLists.txt index c4417f6..4feac69 100644 --- a/kafka/CMakeLists.txt +++ b/kafka/CMakeLists.txt @@ -9,7 +9,7 @@ endif(APPLE) target_link_libraries(tntkafka pthread) -if (STATIC_BUILD) +if (STATIC_BUILD OR $ENV{STATIC_BUILD}) add_dependencies(tntkafka rdkafka) target_link_libraries(tntkafka ${CMAKE_SOURCE_DIR}/librdkafka/src/librdkafka.a) else() diff --git a/kafka/callbacks.c b/kafka/callbacks.c index f397c8c..47986d0 100644 --- a/kafka/callbacks.c +++ b/kafka/callbacks.c @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,10 @@ * Common callbacks handling */ +/** + * Handle logs from RDKafka + */ + log_msg_t * new_log_msg(int level, const char *fac, const char *buf) { log_msg_t *msg = malloc(sizeof(log_msg_t)); @@ -55,6 +60,10 @@ log_callback(const rd_kafka_t *rd_kafka, int level, const char *fac, const char } } +/** + * Handle errors from RDKafka + */ + error_msg_t * new_error_msg(int err, const char *reason) { error_msg_t *msg = malloc(sizeof(error_msg_t)); @@ -88,6 +97,10 @@ error_callback(rd_kafka_t *UNUSED(rd_kafka), int err, const char *reason, void * } } +/** + * Handle message delivery reports from RDKafka + */ + dr_msg_t * new_dr_msg(int dr_callback, int err) { dr_msg_t *dr_msg; @@ -116,6 +129,162 @@ msg_delivery_callback(rd_kafka_t *UNUSED(producer), const rd_kafka_message_t *ms } } +/** + * Handle rebalance callbacks from RDKafka + */ + +rebalance_msg_t *new_rebalance_revoke_msg(rd_kafka_topic_partition_list_t *revoked) { + rebalance_msg_t *msg = malloc(sizeof(rebalance_msg_t)); + if (msg == NULL) { + return NULL; + } + + pthread_mutex_t lock; + if (pthread_mutex_init(&lock, NULL) != 0) { + return NULL; + } + + msg->lock = lock; + + pthread_cond_t sync; + if (pthread_cond_init(&sync, NULL) != 0) { + return NULL; + } + + msg->sync = sync; + msg->revoked = revoked; + msg->assigned = NULL; + msg->err = RD_KAFKA_RESP_ERR_NO_ERROR; + return msg; +} + +rebalance_msg_t *new_rebalance_assign_msg(rd_kafka_topic_partition_list_t *assigned) { + rebalance_msg_t *msg = malloc(sizeof(rebalance_msg_t)); + if (msg == NULL) { + return NULL; + } + + pthread_mutex_t lock; + if (pthread_mutex_init(&lock, NULL) != 0) { + return NULL; + } + + msg->lock = lock; + + pthread_cond_t sync; + if (pthread_cond_init(&sync, NULL) != 0) { + return NULL; + } + + msg->sync = sync; + msg->revoked = NULL; + msg->assigned = assigned; + msg->err = RD_KAFKA_RESP_ERR_NO_ERROR; + return msg; +} + +rebalance_msg_t *new_rebalance_error_msg(rd_kafka_resp_err_t err) { + rebalance_msg_t *msg = malloc(sizeof(rebalance_msg_t)); + if (msg == NULL) { + return NULL; + } + + pthread_mutex_t lock; + if (pthread_mutex_init(&lock, NULL) != 0) { + return NULL; + } + + msg->lock = lock; + + pthread_cond_t sync; + if (pthread_cond_init(&sync, NULL) != 0) { + return NULL; + } + + msg->sync = sync; + msg->revoked = NULL; + msg->assigned = NULL; + msg->err = err; + return msg; +} + +void +destroy_rebalance_msg(rebalance_msg_t *rebalance_msg) { + pthread_mutex_destroy(&rebalance_msg->lock); + pthread_cond_destroy(&rebalance_msg->sync); + free(rebalance_msg); +} + +void +rebalance_callback(rd_kafka_t *consumer, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque) { + event_queues_t *event_queues = opaque; + rebalance_msg_t *msg = NULL; + switch (err) + { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: + msg = new_rebalance_assign_msg(partitions); + if (msg != NULL) { + + pthread_mutex_lock(&msg->lock); + + if (queue_push(event_queues->rebalance_queue, msg) == 0) { + // waiting while main TX thread invokes rebalance callback + pthread_cond_wait(&msg->sync, &msg->lock); + } + + pthread_mutex_unlock(&msg->lock); + + destroy_rebalance_msg(msg); + } + rd_kafka_assign(consumer, partitions); + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: + rd_kafka_commit(consumer, partitions, 0); // sync commit + + msg = new_rebalance_revoke_msg(partitions); + if (msg != NULL) { + + pthread_mutex_lock(&msg->lock); + + if (queue_push(event_queues->rebalance_queue, msg) == 0) { + // waiting while main TX thread invokes rebalance callback + pthread_cond_wait(&msg->sync, &msg->lock); + } + + pthread_mutex_unlock(&msg->lock); + + destroy_rebalance_msg(msg); + } + + rd_kafka_assign(consumer, NULL); + break; + + default: + msg = new_rebalance_error_msg(err); + if (msg != NULL) { + + pthread_mutex_lock(&msg->lock); + + if (queue_push(event_queues->rebalance_queue, msg) == 0) { + // waiting while main TX thread invokes rebalance callback + pthread_cond_wait(&msg->sync, &msg->lock); + } + + pthread_mutex_unlock(&msg->lock); + + destroy_rebalance_msg(msg); + } + rd_kafka_assign(consumer, NULL); + break; + } +} + +/** + * Structure which contains all queues for communication between main TX thread and + * RDKafka callbacks from background threads + */ + event_queues_t *new_event_queues() { event_queues_t *event_queues = malloc(sizeof(event_queues_t)); event_queues->error_queue = NULL; @@ -123,10 +292,24 @@ event_queues_t *new_event_queues() { event_queues->log_queue = NULL; event_queues->log_cb_ref = LUA_REFNIL; event_queues->delivery_queue = NULL; + event_queues->rebalance_cb_ref = LUA_REFNIL; + event_queues->rebalance_queue = NULL; + event_queues->consume_queue = NULL; return event_queues; } void destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) { + if (event_queues->consume_queue != NULL) { + rd_kafka_message_t *msg = NULL; + while (true) { + msg = queue_pop(event_queues->consume_queue); + if (msg == NULL) { + break; + } + rd_kafka_message_destroy(msg); + } + destroy_queue(event_queues->consume_queue); + } if (event_queues->log_queue != NULL) { log_msg_t *msg = NULL; while (true) { @@ -160,6 +343,20 @@ void destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) { } destroy_queue(event_queues->delivery_queue); } + if (event_queues->rebalance_queue != NULL) { + rebalance_msg_t *msg = NULL; + while (true) { + msg = queue_pop(event_queues->rebalance_queue); + if (msg == NULL) { + break; + } + pthread_mutex_lock(&msg->lock); + // allowing background thread proceed rebalancing + pthread_cond_signal(&msg->sync); + pthread_mutex_unlock(&msg->lock); + } + destroy_queue(event_queues->rebalance_queue); + } if (event_queues->error_cb_ref != LUA_REFNIL) { luaL_unref(L, LUA_REGISTRYINDEX, event_queues->error_cb_ref); } @@ -167,5 +364,8 @@ void destroy_event_queues(struct lua_State *L, event_queues_t *event_queues) { if (event_queues->log_cb_ref != LUA_REFNIL) { luaL_unref(L, LUA_REGISTRYINDEX, event_queues->log_cb_ref); } + if (event_queues->rebalance_cb_ref != LUA_REFNIL) { + luaL_unref(L, LUA_REGISTRYINDEX, event_queues->rebalance_cb_ref); + } free(event_queues); } diff --git a/kafka/callbacks.h b/kafka/callbacks.h index 331e344..b474fc6 100644 --- a/kafka/callbacks.h +++ b/kafka/callbacks.h @@ -1,6 +1,8 @@ #ifndef TNT_KAFKA_CALLBACKS_H #define TNT_KAFKA_CALLBACKS_H +#include + #include #include #include @@ -14,6 +16,10 @@ * Common callbacks handling */ +/** + * Handle logs from RDKafka + */ + typedef struct { int level; char *fac; @@ -26,6 +32,11 @@ void destroy_log_msg(log_msg_t *msg); void log_callback(const rd_kafka_t *rd_kafka, int level, const char *fac, const char *buf); + +/** + * Handle errors from RDKafka + */ + typedef struct { int err; char *reason; @@ -37,6 +48,11 @@ void destroy_error_msg(error_msg_t *msg); void error_callback(rd_kafka_t *UNUSED(rd_kafka), int err, const char *reason, void *opaque); + +/** + * Handle message delivery reports from RDKafka + */ + typedef struct { int dr_callback; int err; @@ -48,12 +64,43 @@ void destroy_dr_msg(dr_msg_t *dr_msg); void msg_delivery_callback(rd_kafka_t *UNUSED(producer), const rd_kafka_message_t *msg, void *opaque); + +/** + * Handle rebalance callbacks from RDKafka + */ + +typedef struct { + pthread_mutex_t lock; + pthread_cond_t sync; + rd_kafka_topic_partition_list_t *revoked; + rd_kafka_topic_partition_list_t *assigned; + rd_kafka_resp_err_t err; +} rebalance_msg_t; + +rebalance_msg_t *new_rebalance_revoke_msg(rd_kafka_topic_partition_list_t *revoked); + +rebalance_msg_t *new_rebalance_assign_msg(rd_kafka_topic_partition_list_t *assigned); + +rebalance_msg_t *new_rebalance_error_msg(rd_kafka_resp_err_t err); + +void destroy_rebalance_msg(rebalance_msg_t *rebalance_msg); + +void rebalance_callback(rd_kafka_t *consumer, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque); + +/** + * Structure which contains all queues for communication between main TX thread and + * RDKafka callbacks from background threads + */ + typedef struct { + queue_t *consume_queue; queue_t *log_queue; - int log_cb_ref; + int log_cb_ref; queue_t *error_queue; - int error_cb_ref; + int error_cb_ref; queue_t *delivery_queue; + queue_t *rebalance_queue; + int rebalance_cb_ref; } event_queues_t; event_queues_t *new_event_queues(); diff --git a/kafka/common.c b/kafka/common.c index 876a65d..e09a450 100644 --- a/kafka/common.c +++ b/kafka/common.c @@ -31,16 +31,3 @@ lua_push_error(struct lua_State *L) { lua_insert(L, -2); return 2; } - -// FIXME: suppress warning -//static ssize_t -//kafka_destroy(va_list args) { -// rd_kafka_t *kafka = va_arg(args, rd_kafka_t *); -// -// // waiting in background while garbage collector collects all refs -// sleep(5); -// -// rd_kafka_destroy(kafka); -// while (rd_kafka_wait_destroyed(1000) == -1) {} -// return 0; -//} diff --git a/kafka/consumer.c b/kafka/consumer.c index cad123b..b124006 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -1,3 +1,4 @@ +#include #include #include @@ -12,6 +13,104 @@ #include "consumer.h" //////////////////////////////////////////////////////////////////////////////////////////////////// + +/** + * Consumer poll thread + */ + +void * +consumer_poll_loop(void *arg) { + consumer_poller_t *poller = arg; + event_queues_t *event_queues = rd_kafka_opaque(poller->rd_consumer); + rd_kafka_message_t *rd_msg = NULL; + int count = 0; + int should_stop = 0; + + while (true) { + { + pthread_mutex_lock(&poller->lock); + + should_stop = poller->should_stop; + + pthread_mutex_unlock(&poller->lock); + + if (should_stop) { + break; + } + } + + { + rd_msg = rd_kafka_consumer_poll(poller->rd_consumer, 1000); + if (rd_msg != NULL) { + if (rd_msg->err != RD_KAFKA_RESP_ERR_NO_ERROR) { + // FIXME: push errors to error queue? + rd_kafka_message_destroy(rd_msg); + } else { + pthread_mutex_lock(&event_queues->consume_queue->lock); + + queue_lockfree_push(event_queues->consume_queue, rd_msg); + count = event_queues->consume_queue->count; + + pthread_mutex_unlock(&event_queues->consume_queue->lock); + + if (count >= 50000) { + // throttling calls with 100ms sleep when there are too many messages pending in queue + usleep(100000); + } + } + } else { + // throttling calls with 100ms sleep + usleep(100000); + } + } + } + + pthread_exit(NULL); +} + +consumer_poller_t * +new_consumer_poller(rd_kafka_t *rd_consumer) { + consumer_poller_t *poller = NULL; + poller = malloc(sizeof(consumer_poller_t)); + if (poller == NULL) { + return NULL; + } + poller->rd_consumer = rd_consumer; + poller->should_stop = 0; + + pthread_mutex_init(&poller->lock, NULL); + pthread_attr_init(&poller->attr); + pthread_attr_setdetachstate(&poller->attr, PTHREAD_CREATE_JOINABLE); + pthread_create(&poller->thread, &poller->attr, consumer_poll_loop, (void *)poller); + + return poller; +} + +static ssize_t +stop_poller(va_list args) { + consumer_poller_t *poller = va_arg(args, consumer_poller_t *); + pthread_mutex_lock(&poller->lock); + + poller->should_stop = 1; + + pthread_mutex_unlock(&poller->lock); + + pthread_join(poller->thread, NULL); + + return 0; +} + +void +destroy_consumer_poller(consumer_poller_t *poller) { + // stopping polling thread + coio_call(stop_poller, poller); + + pthread_attr_destroy(&poller->attr); + pthread_mutex_destroy(&poller->lock); + + free(poller); +} + /** * Consumer */ @@ -116,26 +215,6 @@ lua_consumer_tostring(struct lua_State *L) { return 1; } -static ssize_t -consumer_poll(va_list args) { - rd_kafka_t *rd_consumer = va_arg(args, rd_kafka_t *); - rd_kafka_poll(rd_consumer, 1000); - return 0; -} - -int -lua_consumer_poll(struct lua_State *L) { - if (lua_gettop(L) != 1) - luaL_error(L, "Usage: err = consumer:poll()"); - - consumer_t *consumer = lua_check_consumer(L, 1); - if (coio_call(consumer_poll, consumer->rd_consumer) == -1) { - lua_pushstring(L, "unexpected error on consumer poll"); - return 1; - } - return 0; -} - int lua_consumer_poll_msg(struct lua_State *L) { if (lua_gettop(L) != 2) @@ -144,32 +223,27 @@ lua_consumer_poll_msg(struct lua_State *L) { consumer_t *consumer = lua_check_consumer(L, 1); int counter = 0; int msgs_limit = lua_tonumber(L, 2); - rd_kafka_event_t *event = NULL; - lua_createtable(L, msgs_limit, 0); + lua_createtable(L, msgs_limit, 0); + rd_kafka_message_t *rd_msg = NULL; while (msgs_limit > counter) { - event = rd_kafka_queue_poll(consumer->rd_msg_queue, 0); - if (event == NULL) { + rd_msg = queue_pop(consumer->event_queues->consume_queue); + if (rd_msg == NULL) { break; } - if (rd_kafka_event_type(event) == RD_KAFKA_EVENT_FETCH) { - counter += 1; + counter += 1; - msg_t *msg; - msg = malloc(sizeof(msg_t)); - msg->rd_message = rd_kafka_event_message_next(event); - msg->rd_event = event; + msg_t *msg; + msg = malloc(sizeof(msg_t)); + msg->rd_message = rd_msg; - msg_t **msg_p = (msg_t **)lua_newuserdata(L, sizeof(msg)); - *msg_p = msg; + msg_t **msg_p = (msg_t **)lua_newuserdata(L, sizeof(msg)); + *msg_p = msg; - luaL_getmetatable(L, consumer_msg_label); - lua_setmetatable(L, -2); + luaL_getmetatable(L, consumer_msg_label); + lua_setmetatable(L, -2); - lua_rawseti(L, -2, counter); - } else { - rd_kafka_event_destroy(event); - } + lua_rawseti(L, -2, counter); } return 1; } @@ -276,6 +350,150 @@ lua_consumer_poll_errors(struct lua_State *L) { return 2; } +int +lua_prepare_rebalance_callback_args_on_stack(struct lua_State *L, rebalance_msg_t *msg) { + rd_kafka_topic_partition_t *tp = NULL; + // creating main table + lua_createtable(L, 0, 3); // main table + if (msg->assigned != NULL) { + lua_pushstring(L, "assigned"); // "assigned" > main table + // creating table for assigned topics + lua_createtable(L, 0, 10); // table with topics > "assigned" > main table + + for (int i = 0; i < msg->assigned->cnt; i++) { + tp = &msg->assigned->elems[i]; + + lua_pushstring(L, tp->topic); // topic name > table with topics > "assigned" > main table + + // get value from table + lua_pushstring(L, tp->topic); // topic name > topic name > table with topics > "assigned" > main table + lua_gettable(L, -3); // table with partitions or nil > topic name > table with topics > "assigned" > main table + + if (lua_isnil(L, -1) == 1) { + // removing nil from top of stack + lua_pop(L, 1); // topic name > table with topics > "assigned" > main table + // creating table for assigned partitions on topic + lua_createtable(L, 0, 10); // table with partitions > topic name > table with topics > "assigned" > main table + } + + // add partition to table + lua_pushinteger(L, tp->partition); // key > table with partitions > topic name > table with topics > "assigned" > main table + luaL_pushint64(L, tp->offset); // value > key > table with partitions > topic name > table with topics > "assigned" > main table + lua_settable(L, -3); // table with partitions > topic name > table with topics > "assigned" > main table + + // add table with partitions to table with topics + lua_settable(L, -3); // table with topics > "assigned" > main table + } + + lua_settable(L, -3); // main table + } + else if (msg->revoked != NULL) { + lua_pushstring(L, "revoked"); // "revoked" > main table + // creating table for revoked topics + lua_createtable(L, 0, 10); // table with topics > "revoked" > main table + + for (int i = 0; i < msg->revoked->cnt; i++) { + tp = &msg->revoked->elems[i]; + + lua_pushstring(L, tp->topic); // topic name > table with topics > "revoked" > main table + + // get value from table + lua_pushstring(L, tp->topic); // topic name > topic name > table with topics > "revoked" > main table + lua_gettable(L, -3); // table with partitions or nil > topic name > table with topics > "revoked" > main table + + if (lua_isnil(L, -1) == 1) { + // removing nil from top of stack + lua_pop(L, 1); // topic name > table with topics > "revoked" > main table + // creating table for revoked partitions on topic + lua_createtable(L, 0, 10); // table with partitions > topic name > table with topics > "revoked" > main table + } + + // add partition to table + lua_pushinteger(L, tp->partition); // key > table with partitions > topic name > table with topics > "revoked" > main table + luaL_pushint64(L, tp->offset); // value > key > table with partitions > topic name > table with topics > "revoked" > main table + lua_settable(L, -3); // table with partitions > topic name > table with topics > "revoked" > main table + + // add table with partitions to table with topics + lua_settable(L, -3); // table with topics > "revoked" > main table + } + + lua_settable(L, -3); // main table + } + else if (msg->err != RD_KAFKA_RESP_ERR_NO_ERROR) { + lua_pushstring(L, "error"); // "error" > main table + lua_pushstring(L, rd_kafka_err2str(msg->err)); // msg > "error" > main table + lua_settable(L, -3); // main table + } + else { + return -1; + } + return 0; +} + +int +lua_consumer_poll_rebalances(struct lua_State *L) { + if (lua_gettop(L) != 2) + luaL_error(L, "Usage: count, err = consumer:poll_rebalances(limit)"); + + consumer_t *consumer = lua_check_consumer(L, 1); + if (consumer->event_queues == NULL || consumer->event_queues->rebalance_queue == NULL || consumer->event_queues->rebalance_cb_ref == LUA_REFNIL) { + lua_pushnumber(L, 0); + int fail = safe_pushstring(L, "Consumer poll rebalances error: callback for rebalance is not set"); + if (fail) { + return lua_push_error(L); + } + return 2; + } + + int limit = lua_tonumber(L, 2); + rebalance_msg_t *msg = NULL; + int count = 0; + char *err_str = NULL; + + while (count < limit) { + msg = queue_pop(consumer->event_queues->rebalance_queue); + if (msg == NULL) { + break; + } + count++; + + pthread_mutex_lock(&msg->lock); + + // push callback on stack + lua_rawgeti(L, LUA_REGISTRYINDEX, consumer->event_queues->rebalance_cb_ref); + + // push rebalance args on stack + if (lua_prepare_rebalance_callback_args_on_stack(L, msg) == 0) { + /* do the call (1 arguments, 0 result) */ + if (lua_pcall(L, 1, 0, 0) != 0) { + err_str = (char *)lua_tostring(L, -1); + } + } else { + err_str = "unknown error on rebalance callback args processing"; + } + + // allowing background thread proceed rebalancing + pthread_cond_signal(&msg->sync); + + pthread_mutex_unlock(&msg->lock); + + if (err_str != NULL) { + break; + } + } + + lua_pushnumber(L, (double)count); + if (err_str != NULL) { + int fail = safe_pushstring(L, err_str); + if (fail) { + return lua_push_error(L); + } + } else { + lua_pushnil(L); + } + return 2; +} + int lua_consumer_store_offset(struct lua_State *L) { if (lua_gettop(L) != 2) @@ -293,20 +511,33 @@ lua_consumer_store_offset(struct lua_State *L) { return 0; } +static ssize_t +wait_consumer_close(va_list args) { + rd_kafka_t *rd_consumer = va_arg(args, rd_kafka_t *); + rd_kafka_commit(rd_consumer, NULL, 0); // sync commit of current offsets + if (rd_kafka_consumer_close(rd_consumer) != RD_KAFKA_RESP_ERR_NO_ERROR) { + // FIXME: maybe send errors to error queue? + return -1; + } + + return 0; +} + static rd_kafka_resp_err_t -consumer_close(struct lua_State *L, consumer_t *consumer) { +consumer_destroy(struct lua_State *L, consumer_t *consumer) { rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; if (consumer->topics != NULL) { rd_kafka_topic_partition_list_destroy(consumer->topics); } - if (consumer->rd_msg_queue != NULL) { - rd_kafka_queue_destroy(consumer->rd_msg_queue); + // trying to close in background until success + while (coio_call(wait_consumer_close, consumer->rd_consumer) == -1) { + // FIXME: maybe send errors to error queue? } - if (consumer->rd_consumer != NULL) { - err = rd_kafka_consumer_close(consumer->rd_consumer); + if (consumer->poller != NULL) { + destroy_consumer_poller(consumer->poller); } if (consumer->event_queues != NULL) { @@ -332,18 +563,16 @@ lua_consumer_close(struct lua_State *L) { return 1; } - rd_kafka_resp_err_t err = consumer_close(L, *consumer_p); - if (err) { - lua_pushboolean(L, 1); + // trying to close in background until success + while (coio_call(wait_consumer_close, (*consumer_p)->rd_consumer) == -1) { + // FIXME: maybe send errors to error queue? + } - const char *const_err_str = rd_kafka_err2str(err); - char err_str[512]; - strcpy(err_str, const_err_str); - int fail = safe_pushstring(L, err_str); - return fail ? lua_push_error(L): 2; + if ((*consumer_p)->poller != NULL) { + destroy_consumer_poller((*consumer_p)->poller); + (*consumer_p)->poller = NULL; } - *consumer_p = NULL; lua_pushboolean(L, 1); return 1; } @@ -352,7 +581,7 @@ int lua_consumer_gc(struct lua_State *L) { consumer_t **consumer_p = (consumer_t **)luaL_checkudata(L, 1, consumer_label); if (consumer_p && *consumer_p) { - consumer_close(L, *consumer_p); + consumer_destroy(L, *consumer_p); } if (consumer_p) *consumer_p = NULL; @@ -409,6 +638,7 @@ lua_create_consumer(struct lua_State *L) { rd_kafka_conf_set_default_topic_conf(rd_config, topic_conf); event_queues_t *event_queues = new_event_queues(); + event_queues->consume_queue = new_queue(); lua_pushstring(L, "error_callback"); lua_gettable(L, -2 ); @@ -430,6 +660,16 @@ lua_create_consumer(struct lua_State *L) { lua_pop(L, 1); } + lua_pushstring(L, "rebalance_callback"); + lua_gettable(L, -2 ); + if (lua_isfunction(L, -1)) { + event_queues->rebalance_cb_ref = luaL_ref(L, LUA_REGISTRYINDEX); + event_queues->rebalance_queue = new_queue(); + rd_kafka_conf_set_rebalance_cb(rd_config, rebalance_callback); + } else { + lua_pop(L, 1); + } + rd_kafka_conf_set_opaque(rd_config, event_queues); lua_pushstring(L, "options"); @@ -474,14 +714,17 @@ lua_create_consumer(struct lua_State *L) { return fail ? lua_push_error(L): 2; } - rd_kafka_queue_t *rd_msg_queue = rd_kafka_queue_get_consumer(rd_consumer); + rd_kafka_poll_set_consumer(rd_consumer); + + // creating background thread for polling consumer + consumer_poller_t *poller = new_consumer_poller(rd_consumer); consumer_t *consumer; consumer = malloc(sizeof(consumer_t)); consumer->rd_consumer = rd_consumer; consumer->topics = NULL; consumer->event_queues = event_queues; - consumer->rd_msg_queue = rd_msg_queue; + consumer->poller = poller; consumer_t **consumer_p = (consumer_t **)lua_newuserdata(L, sizeof(consumer)); *consumer_p = consumer; diff --git a/kafka/consumer.h b/kafka/consumer.h index f317933..9949dec 100644 --- a/kafka/consumer.h +++ b/kafka/consumer.h @@ -17,11 +17,19 @@ * Consumer */ +typedef struct { + rd_kafka_t *rd_consumer; + pthread_t thread; + pthread_attr_t attr; + int should_stop; + pthread_mutex_t lock; +} consumer_poller_t; + typedef struct { rd_kafka_t *rd_consumer; rd_kafka_topic_partition_list_t *topics; - rd_kafka_queue_t *rd_msg_queue; event_queues_t *event_queues; + consumer_poller_t *poller; } consumer_t; int lua_consumer_subscribe(struct lua_State *L); @@ -30,14 +38,14 @@ int lua_consumer_unsubscribe(struct lua_State *L); int lua_consumer_tostring(struct lua_State *L); -int lua_consumer_poll(struct lua_State *L); - int lua_consumer_poll_msg(struct lua_State *L); int lua_consumer_poll_logs(struct lua_State *L); int lua_consumer_poll_errors(struct lua_State *L); +int lua_consumer_poll_rebalances(struct lua_State *L); + int lua_consumer_store_offset(struct lua_State *L); int lua_consumer_close(struct lua_State *L); diff --git a/kafka/consumer_msg.c b/kafka/consumer_msg.c index e6e74ba..750bf3a 100644 --- a/kafka/consumer_msg.c +++ b/kafka/consumer_msg.c @@ -22,12 +22,8 @@ lua_check_consumer_msg(struct lua_State *L, int index) { int lua_consumer_msg_topic(struct lua_State *L) { msg_t *msg = lua_check_consumer_msg(L, 1); - - const char *const_topic = rd_kafka_topic_name(msg->rd_message->rkt); - char topic[sizeof(const_topic)]; - strcpy(topic, const_topic); - int fail = safe_pushstring(L, topic); - return fail ? lua_push_error(L): 1; + lua_pushstring(L, rd_kafka_topic_name(msg->rd_message->rkt)); + return 1; } int @@ -112,7 +108,9 @@ int lua_consumer_msg_gc(struct lua_State *L) { msg_t **msg_p = (msg_t **)luaL_checkudata(L, 1, consumer_msg_label); if (msg_p && *msg_p) { - rd_kafka_event_destroy((*msg_p)->rd_event); + if ((*msg_p)->rd_message != NULL) { + rd_kafka_message_destroy((*msg_p)->rd_message); + } free(*msg_p); } if (msg_p) diff --git a/kafka/consumer_msg.h b/kafka/consumer_msg.h index ad007f0..36c138b 100644 --- a/kafka/consumer_msg.h +++ b/kafka/consumer_msg.h @@ -12,8 +12,7 @@ * Consumer Message */ typedef struct { - const rd_kafka_message_t *rd_message; - rd_kafka_event_t *rd_event; + rd_kafka_message_t *rd_message; } msg_t; msg_t *lua_check_consumer_msg(struct lua_State *L, int index); diff --git a/kafka/init.lua b/kafka/init.lua index e402f1d..2c1d3e3 100644 --- a/kafka/init.lua +++ b/kafka/init.lua @@ -23,10 +23,6 @@ function Consumer.create(config) } setmetatable(new, Consumer) - new._poll_fiber = fiber.create(function() - new:_poll() - end) - new._poll_msg_fiber = fiber.create(function() new:_poll_msg() end) @@ -43,6 +39,12 @@ function Consumer.create(config) end) end + if config.rebalance_callback ~= nil then + new._poll_rebalances_fiber = fiber.create(function() + new:_poll_rebalances() + end) + end + return new, nil end @@ -114,21 +116,49 @@ end jit.off(Consumer._poll_errors) +function Consumer:_poll_rebalances() + local count, err + while true do + count, err = self._consumer:poll_rebalances(1) + if err ~= nil then + log.error("Consumer poll rebalances error: %s", err) + -- throtling poll + fiber.sleep(0.1) + elseif count > 0 then + fiber.yield() + else + -- throtling poll + fiber.sleep(0.5) + end + end +end + +jit.off(Consumer._poll_rebalances) + function Consumer:close() self._poll_msg_fiber:cancel() - self._poll_fiber:cancel() + self._output_ch:close() + + fiber.yield() + + local ok, err = self._consumer:close() + if err ~= nil then + return ok, err + end + if self._poll_logs_fiber ~= nil then self._poll_logs_fiber:cancel() end if self._poll_errors_fiber ~= nil then self._poll_errors_fiber:cancel() end - self._output_ch:close() + if self._poll_rebalances_fiber ~= nil then + self._poll_rebalances_fiber:cancel() + end - local ok, err = self._consumer:close() self._consumer = nil - return err + return ok, err end function Consumer:subscribe(topics) @@ -169,10 +199,6 @@ function Producer.create(config) } setmetatable(new, Producer) - new._poll_fiber = fiber.create(function() - new:_poll() - end) - new._msg_delivery_poll_fiber = fiber.create(function() new:_msg_delivery_poll() end) @@ -192,18 +218,6 @@ function Producer.create(config) return new, nil end -function Producer:_poll() - local err - while true do - err = self._producer:poll() - if err ~= nil then - log.error(err) - end - end -end - -jit.off(Producer._poll) - function Producer:_msg_delivery_poll() local count, err while true do @@ -289,7 +303,8 @@ function Producer:produce(msg) end function Producer:close() - self._poll_fiber:cancel() + local ok, err = self._producer:close() + self._msg_delivery_poll_fiber:cancel() if self._poll_logs_fiber ~= nil then self._poll_logs_fiber:cancel() @@ -298,10 +313,9 @@ function Producer:close() self._poll_errors_fiber:cancel() end - local ok, err = self._producer:close() self._producer = nil - return err + return ok, err end return { diff --git a/kafka/producer.c b/kafka/producer.c index b4665fe..3bb7375 100644 --- a/kafka/producer.c +++ b/kafka/producer.c @@ -1,3 +1,4 @@ +#include #include #include @@ -11,6 +12,85 @@ #include "producer.h" //////////////////////////////////////////////////////////////////////////////////////////////////// + +/** + * Producer poll thread + */ + +void * +producer_poll_loop(void *arg) { + producer_poller_t *poller = arg; + int count = 0; + int should_stop = 0; + + while (true) { + { + pthread_mutex_lock(&poller->lock); + + should_stop = poller->should_stop; + + pthread_mutex_unlock(&poller->lock); + + if (should_stop) { + break; + } + } + + { + count = rd_kafka_poll(poller->rd_producer, 1000); + if (count == 0) { + // throttling calls with 100ms sleep + usleep(100000); + } + } + } + + pthread_exit(NULL); +} + +producer_poller_t * +new_producer_poller(rd_kafka_t *rd_producer) { + producer_poller_t *poller = NULL; + poller = malloc(sizeof(producer_poller_t)); + if (poller == NULL) { + return NULL; + } + poller->rd_producer = rd_producer; + poller->should_stop = 0; + + pthread_mutex_init(&poller->lock, NULL); + pthread_attr_init(&poller->attr); + pthread_attr_setdetachstate(&poller->attr, PTHREAD_CREATE_JOINABLE); + pthread_create(&poller->thread, &poller->attr, producer_poll_loop, (void *)poller); + + return poller; +} + +static ssize_t +stop_poller(va_list args) { + producer_poller_t *poller = va_arg(args, producer_poller_t *); + pthread_mutex_lock(&poller->lock); + + poller->should_stop = 1; + + pthread_mutex_unlock(&poller->lock); + + pthread_join(poller->thread, NULL); + + return 0; +} + +void +destroy_producer_poller(producer_poller_t *poller) { + // stopping polling thread + coio_call(stop_poller, poller); + + pthread_attr_destroy(&poller->attr); + pthread_mutex_destroy(&poller->lock); + + free(poller); +} + /** * Producer */ @@ -72,7 +152,7 @@ static inline producer_t * lua_check_producer(struct lua_State *L, int index) { producer_t **producer_p = (producer_t **)luaL_checkudata(L, index, producer_label); if (producer_p == NULL || *producer_p == NULL) - luaL_error(L, "Kafka consumer fatal error: failed to retrieve producer from lua stack!"); + luaL_error(L, "Kafka producer fatal error: failed to retrieve producer from lua stack!"); return *producer_p; } @@ -83,26 +163,6 @@ lua_producer_tostring(struct lua_State *L) { return 1; } -static ssize_t -producer_poll(va_list args) { - rd_kafka_t *rd_producer = va_arg(args, rd_kafka_t *); - rd_kafka_poll(rd_producer, 1000); - return 0; -} - -int -lua_producer_poll(struct lua_State *L) { - if (lua_gettop(L) != 1) - luaL_error(L, "Usage: err = producer:poll()"); - - producer_t *producer = lua_check_producer(L, 1); - if (coio_call(producer_poll, producer->rd_producer) == -1) { - lua_pushstring(L, "unexpected error on producer poll"); - return 1; - } - return 0; -} - int lua_producer_msg_delivery_poll(struct lua_State *L) { if (lua_gettop(L) != 2) @@ -348,14 +408,16 @@ producer_flush(va_list args) { return 0; } -static rd_kafka_resp_err_t -producer_close(struct lua_State *L, producer_t *producer) { - rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR; - +void +destroy_producer(struct lua_State *L, producer_t *producer) { if (producer->rd_producer != NULL) { coio_call(producer_flush, producer->rd_producer); } + if (producer->poller != NULL) { + destroy_producer_poller(producer->poller); + } + if (producer->topics != NULL) { destroy_producer_topics(producer->topics); } @@ -365,13 +427,12 @@ producer_close(struct lua_State *L, producer_t *producer) { } if (producer->rd_producer != NULL) { - // FIXME: if instance of consumer exists then kafka_destroy always hangs forever + // FIXME: if instance of producer exists then kafka_destroy always hangs forever /* Destroy handle */ // coio_call(kafka_destroy, producer->rd_producer); } free(producer); - return err; } int @@ -382,18 +443,14 @@ lua_producer_close(struct lua_State *L) { return 1; } - rd_kafka_resp_err_t err = producer_close(L, *producer_p); - if (err) { - lua_pushboolean(L, 1); - - const char *const_err_str = rd_kafka_err2str(err); - char err_str[512]; - strcpy(err_str, const_err_str); - int fail = safe_pushstring(L, err_str); - return fail ? lua_push_error(L): 2; + if ((*producer_p)->rd_producer != NULL) { + coio_call(producer_flush, (*producer_p)->rd_producer); } - *producer_p = NULL; + if ((*producer_p)->poller != NULL) { + destroy_producer_poller((*producer_p)->poller); + (*producer_p)->poller = NULL; + } lua_pushboolean(L, 1); return 1; } @@ -402,7 +459,7 @@ int lua_producer_gc(struct lua_State *L) { producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label); if (producer_p && *producer_p) { - producer_close(L, *producer_p); + destroy_producer(L, *producer_p); } if (producer_p) *producer_p = NULL; @@ -527,11 +584,15 @@ lua_create_producer(struct lua_State *L) { return fail ? lua_push_error(L): 2; } + // creating background thread for polling consumer + producer_poller_t *poller = new_producer_poller(rd_producer); + producer_t *producer; producer = malloc(sizeof(producer_t)); producer->rd_producer = rd_producer; producer->topics = new_producer_topics(256); producer->event_queues = event_queues; + producer->poller = poller; producer_t **producer_p = (producer_t **)lua_newuserdata(L, sizeof(producer)); *producer_p = producer; diff --git a/kafka/producer.h b/kafka/producer.h index ef947f2..662e558 100644 --- a/kafka/producer.h +++ b/kafka/producer.h @@ -14,6 +14,14 @@ * Producer */ +typedef struct { + rd_kafka_t *rd_producer; + pthread_t thread; + pthread_attr_t attr; + int should_stop; + pthread_mutex_t lock; +} producer_poller_t; + typedef struct { rd_kafka_topic_t **elements; int32_t count; @@ -32,12 +40,11 @@ typedef struct { rd_kafka_t *rd_producer; producer_topics_t *topics; event_queues_t *event_queues; + producer_poller_t *poller; } producer_t; int lua_producer_tostring(struct lua_State *L); -int lua_producer_poll(struct lua_State *L); - int lua_producer_msg_delivery_poll(struct lua_State *L); int lua_producer_poll_logs(struct lua_State *L); diff --git a/kafka/queue.c b/kafka/queue.c index 5d3d5d3..8b40f64 100644 --- a/kafka/queue.c +++ b/kafka/queue.c @@ -28,6 +28,8 @@ queue_lockfree_pop(queue_t *queue) { if (queue->head == NULL) { queue->tail = NULL; } + + queue->count -= 1; } return output; @@ -44,14 +46,20 @@ queue_pop(queue_t *queue) { return output; } +/** + * Push without locking mutex. + * Caller must lock and unlock queue mutex by itself. + * Use with caution! + * @param queue + * @param value + * @return + */ int -queue_push(queue_t *queue, void *value) { +queue_lockfree_push(queue_t *queue, void *value) { if (value == NULL || queue == NULL) { return -1; } - pthread_mutex_lock(&queue->lock); - queue_node_t *new_node; new_node = malloc(sizeof(queue_node_t)); if (new_node == NULL) { @@ -70,11 +78,26 @@ queue_push(queue_t *queue, void *value) { queue->head = new_node; } - pthread_mutex_unlock(&queue->lock); + queue->count += 1; return 0; } +int +queue_push(queue_t *queue, void *value) { + if (value == NULL || queue == NULL) { + return -1; + } + + pthread_mutex_lock(&queue->lock); + + int output = queue_lockfree_push(queue, value); + + pthread_mutex_unlock(&queue->lock); + + return output; +} + queue_t * new_queue() { queue_t *queue = malloc(sizeof(queue_t)); @@ -90,6 +113,7 @@ new_queue() { queue->lock = lock; queue->head = NULL; queue->tail = NULL; + queue->count = 0; return queue; } diff --git a/kafka/queue.h b/kafka/queue.h index a9c63be..905812c 100644 --- a/kafka/queue.h +++ b/kafka/queue.h @@ -14,9 +14,10 @@ typedef struct queue_node_t { } queue_node_t; typedef struct { - pthread_mutex_t lock; - queue_node_t *head; - queue_node_t *tail; + pthread_mutex_t lock; + queue_node_t *head; + queue_node_t *tail; + int count; } queue_t; /** @@ -30,6 +31,16 @@ void *queue_lockfree_pop(queue_t *queue); void *queue_pop(queue_t *queue); +/** + * Push without locking mutex. + * Caller must lock and unlock queue mutex by itself. + * Use with caution! + * @param queue + * @param value + * @return + */ +int queue_lockfree_push(queue_t *queue, void *value); + int queue_push(queue_t *queue, void *value); queue_t *new_queue(); diff --git a/kafka/tnt_kafka.c b/kafka/tnt_kafka.c index c467654..3afc933 100644 --- a/kafka/tnt_kafka.c +++ b/kafka/tnt_kafka.c @@ -19,10 +19,10 @@ luaopen_kafka_tntkafka(lua_State *L) { static const struct luaL_Reg consumer_methods [] = { {"subscribe", lua_consumer_subscribe}, {"unsubscribe", lua_consumer_unsubscribe}, - {"poll", lua_consumer_poll}, {"poll_msg", lua_consumer_poll_msg}, {"poll_logs", lua_consumer_poll_logs}, {"poll_errors", lua_consumer_poll_errors}, + {"poll_rebalances", lua_consumer_poll_rebalances}, {"store_offset", lua_consumer_store_offset}, {"close", lua_consumer_close}, {"__tostring", lua_consumer_tostring}, @@ -58,7 +58,6 @@ luaopen_kafka_tntkafka(lua_State *L) { lua_pop(L, 1); static const struct luaL_Reg producer_methods [] = { - {"poll", lua_producer_poll}, {"produce", lua_producer_produce}, {"msg_delivery_poll", lua_producer_msg_delivery_poll}, {"poll_logs", lua_producer_poll_logs}, diff --git a/librdkafka b/librdkafka index 8695b9d..8681f88 160000 --- a/librdkafka +++ b/librdkafka @@ -1 +1 @@ -Subproject commit 8695b9d63ac0fe1b891b511d5b36302ffc84d4e2 +Subproject commit 8681f884020e880a4c6cda3cfc672f0669e1f38e diff --git a/tests/consumer.lua b/tests/consumer.lua index c6ec7e6..609fe72 100644 --- a/tests/consumer.lua +++ b/tests/consumer.lua @@ -1,4 +1,5 @@ local box = require("box") +local json = require("json") local log = require("log") local os = require("os") local fiber = require('fiber') @@ -7,11 +8,13 @@ local tnt_kafka = require('kafka') local consumer = nil local errors = {} local logs = {} +local rebalances = {} local function create(brokers, additional_opts) local err errors = {} logs = {} + rebalances = {} local error_callback = function(err) log.error("got error: %s", err) table.insert(errors, err) @@ -21,6 +24,11 @@ local function create(brokers, additional_opts) table.insert(logs, string.format("got log: %d - %s - %s", level, fac, str)) end + local rebalance_callback = function(msg) + log.info("got rebalance msg: %s", json.encode(msg)) + table.insert(rebalances, msg) + end + local options = { ["enable.auto.offset.store"] = "false", ["group.id"] = "test_consumer", @@ -38,6 +46,7 @@ local function create(brokers, additional_opts) options = options, error_callback = error_callback, log_callback = log_callback, + rebalance_callback = rebalance_callback, default_topic_options = { ["auto.offset.reset"] = "earliest", }, @@ -112,6 +121,10 @@ local function get_logs() return logs end +local function get_rebalances() + return rebalances +end + local function close() log.info("closing consumer") local exists, err = consumer:close() @@ -130,4 +143,5 @@ return { close = close, get_errors = get_errors, get_logs = get_logs, + get_rebalances = get_rebalances, } \ No newline at end of file diff --git a/tests/producer.lua b/tests/producer.lua index 6f04459..2484ace 100644 --- a/tests/producer.lua +++ b/tests/producer.lua @@ -66,7 +66,7 @@ local function get_logs() end local function close() - local err = producer:close() + local ok, err = producer:close() if err ~= nil then log.error("got err %s", err) box.error{code = 500, reason = err} diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 22065fe..f64f702 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1,5 +1,6 @@ import time import asyncio +from contextlib import contextmanager from aiokafka import AIOKafkaProducer import tarantool @@ -15,6 +16,16 @@ def get_server(): connect_now=True) +@contextmanager +def create_consumer(server, *args): + try: + server.call("consumer.create", args) + yield + + finally: + server.call("consumer.close", []) + + def write_into_kafka(topic, messages): loop = asyncio.get_event_loop() @@ -59,19 +70,16 @@ def test_consumer_should_consume_msgs(): server = get_server() - server.call("consumer.create", ["kafka:9092"]) + with create_consumer(server, "kafka:9092", {"group.id": "should_consume_msgs"}): + server.call("consumer.subscribe", [["test_consume"]]) - server.call("consumer.subscribe", [["test_consume"]]) + response = server.call("consumer.consume", [10]) - response = server.call("consumer.consume", [3]) - - assert set(*response) == { - "test1", - "test2", - "test3" - } - - server.call("consumer.close", []) + assert set(*response) == { + "test1", + "test2", + "test3" + } def test_consumer_should_consume_msgs_from_multiple_topics(): @@ -95,19 +103,16 @@ def test_consumer_should_consume_msgs_from_multiple_topics(): server = get_server() - server.call("consumer.create", ["kafka:9092"]) + with create_consumer(server, "kafka:9092", {"group.id": "should_consume_msgs_from_multiple_topics"}): + server.call("consumer.subscribe", [["test_multi_consume_1", "test_multi_consume_2"]]) - server.call("consumer.subscribe", [["test_multi_consume_1", "test_multi_consume_2"]]) + response = server.call("consumer.consume", [10]) - response = server.call("consumer.consume", [3]) - - assert set(*response) == { - "test1", - "test2", - "test3" - } - - server.call("consumer.close", []) + assert set(*response) == { + "test1", + "test2", + "test3" + } def test_consumer_should_completely_unsubscribe_from_topics(): @@ -130,26 +135,23 @@ def test_consumer_should_completely_unsubscribe_from_topics(): server = get_server() - server.call("consumer.create", ["kafka:9092"]) - - server.call("consumer.subscribe", [["test_unsubscribe"]]) + with create_consumer(server, "kafka:9092", {"group.id": "should_completely_unsubscribe_from_topics"}): + server.call("consumer.subscribe", [["test_unsubscribe"]]) - response = server.call("consumer.consume", [3]) + response = server.call("consumer.consume", [10]) - assert set(*response) == { - "test1", - "test2", - } - - server.call("consumer.unsubscribe", [["test_unsubscribe"]]) + assert set(*response) == { + "test1", + "test2", + } - write_into_kafka("test_unsubscribe", (message3, )) + server.call("consumer.unsubscribe", [["test_unsubscribe"]]) - response = server.call("consumer.consume", [3]) + write_into_kafka("test_unsubscribe", (message3, )) - assert set(*response) == set() + response = server.call("consumer.consume", [10]) - server.call("consumer.close", []) + assert set(*response) == set() def test_consumer_should_partially_unsubscribe_from_topics(): @@ -175,58 +177,115 @@ def test_consumer_should_partially_unsubscribe_from_topics(): server = get_server() - server.call("consumer.create", ["kafka:9092"]) + with create_consumer(server, "kafka:9092", {"group.id": "should_partially_unsubscribe_from_topics"}): + server.call("consumer.subscribe", [["test_unsub_partially_1", "test_unsub_partially_2"]]) - server.call("consumer.subscribe", [["test_unsub_partially_1", "test_unsub_partially_2"]]) + write_into_kafka("test_unsub_partially_1", (message1, )) + write_into_kafka("test_unsub_partially_2", (message2, )) - write_into_kafka("test_unsub_partially_1", (message1, )) - write_into_kafka("test_unsub_partially_2", (message2, )) + # waiting up to 30 seconds + response = server.call("consumer.consume", [30]) - # waiting up to 30 seconds - response = server.call("consumer.consume", [30]) + assert set(*response) == { + "test1", + "test2", + } - assert set(*response) == { - "test1", - "test2", - } + server.call("consumer.unsubscribe", [["test_unsub_partially_1"]]) - server.call("consumer.unsubscribe", [["test_unsub_partially_1"]]) + write_into_kafka("test_unsub_partially_1", (message3, )) + write_into_kafka("test_unsub_partially_2", (message4, )) - write_into_kafka("test_unsub_partially_1", (message3, )) - write_into_kafka("test_unsub_partially_2", (message4, )) + response = server.call("consumer.consume", [30]) - response = server.call("consumer.consume", [3]) + assert set(*response) == {"test4"} - assert set(*response) == {"test4"} - server.call("consumer.close", []) +def test_consumer_should_log_errors(): + server = get_server() + with create_consumer(server, "kafka:9090"): + time.sleep(2) -def test_consumer_should_log_errors(): + response = server.call("consumer.get_errors", []) + + assert len(response.data[0]) > 0 + + +def test_consumer_should_log_debug(): server = get_server() - server.call("consumer.create", ["kafka:9090"]) + with create_consumer(server, "kafka:9092", {"debug": "consumer,cgrp,topic,fetch"}): + time.sleep(2) - time.sleep(2) + response = server.call("consumer.get_logs", []) - response = server.call("consumer.get_errors", []) + assert len(response.data[0]) > 0 - assert len(response) > 0 - assert len(response[0]) > 0 - server.call("consumer.close", []) +def test_consumer_should_log_rebalances(): + server = get_server() + with create_consumer(server, "kafka:9092"): + time.sleep(2) + + server.call("consumer.subscribe", [["test_unsub_partially_1"]]) + + time.sleep(10) + + response = server.call("consumer.get_rebalances", []) + + assert len(response.data[0]) > 0 + + +def test_consumer_should_continue_consuming_from_last_committed_offset(): + message1 = { + "key": "test1", + "value": "test1" + } + + message2 = { + "key": "test1", + "value": "test2" + } + + message3 = { + "key": "test1", + "value": "test3" + } + + message4 = { + "key": "test1", + "value": "test4" + } -def test_consumer_should_log_debug(): server = get_server() - server.call("consumer.create", ["kafka:9092", {"debug": "consumer,cgrp,topic,fetch"}]) + with create_consumer(server, "kafka:9092", {"group.id": "should_continue_consuming_from_last_committed_offset"}): + server.call("consumer.subscribe", [["test_consuming_from_last_committed_offset"]]) + + write_into_kafka("test_consuming_from_last_committed_offset", (message1, )) + write_into_kafka("test_consuming_from_last_committed_offset", (message2, )) + + # waiting up to 30 seconds + response = server.call("consumer.consume", [30]) + + assert set(*response) == { + "test1", + "test2", + } time.sleep(2) - response = server.call("consumer.get_logs", []) + with create_consumer(server, "kafka:9092", {"group.id": "should_continue_consuming_from_last_committed_offset"}): + server.call("consumer.subscribe", [["test_consuming_from_last_committed_offset"]]) + + write_into_kafka("test_consuming_from_last_committed_offset", (message3, )) + write_into_kafka("test_consuming_from_last_committed_offset", (message4, )) - assert len(response) > 0 - assert len(response[0]) > 0 + response = server.call("consumer.consume", [30]) - server.call("consumer.close", []) + assert set(*response) == { + "test3", + "test4", + }