From fe78a6cc5a275c3845aeaf87995e01f38909b4ca Mon Sep 17 00:00:00 2001 From: Andrey Shitov Date: Tue, 23 Jun 2020 11:28:14 +0300 Subject: [PATCH] Added new pause, resume methods to LUA API. --- kafka/consumer.c | 81 +++++++++++++++++++++++++++++++++++++++++++++++ kafka/consumer.h | 4 +++ kafka/init.lua | 8 +++++ kafka/tnt_kafka.c | 2 ++ 4 files changed, 95 insertions(+) diff --git a/kafka/consumer.c b/kafka/consumer.c index b124006..d6d54a8 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -123,6 +123,87 @@ lua_check_consumer(struct lua_State *L, int index) { return *consumer_p; } +int +lua_consumer_pause(struct lua_State *L) { + if (lua_gettop(L) != 2 || !lua_istable(L, 2)) + luaL_error(L, "Usage: err = consumer:pause({'topic'})"); + + consumer_t *consumer = lua_check_consumer(L,1); + + rd_kafka_topic_partition_list_t *list = rd_kafka_topic_partition_list_new(lua_objlen(L, 1)); + + lua_pushnil(L); + + // stack now contains: -1 => nil; -2 => table; -3 => consumer + while (lua_next(L, -2)) { + // stack now contains: -1 => value; -2 => key; -3 => table; -4 => consumer + const char *value = lua_tostring(L, -1); + + rd_kafka_topic_partition_list_add(list, value, RD_KAFKA_PARTITION_UA); + + // pop value, leaving original key + lua_pop(L, 1); + // stack now contains: -1 => key; -2 => table; -3 => consumer + } + + rd_kafka_pause_partitions(consumer->rd_consumer, list); + + + for(int i=0; icnt;i++) { + if (list->elems[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) { + const char *const_err_str = rd_kafka_err2str(list->elems[i].err); + char err_str[512]; + strcpy(err_str, const_err_str); + int fail = safe_pushstring(L, err_str); + rd_kafka_topic_partition_list_destroy(list); + return fail ? lua_push_error(L): 1; + } + } + rd_kafka_topic_partition_list_destroy(list); + return 0; +} + +int +lua_consumer_resume(struct lua_State *L) { + if (lua_gettop(L) != 2 || !lua_istable(L, 2)) + luaL_error(L, "Usage: err = consumer:resume({'topic'})"); + + consumer_t *consumer = lua_check_consumer(L, 1); + + rd_kafka_topic_partition_list_t *list = rd_kafka_topic_partition_list_new(lua_objlen(L, 1)); + + lua_pushnil(L); + // stack now contains: -1 => nil; -2 => table; -3 => consumer + while (lua_next(L, -2)) { + // stack now contains: -1 => value; -2 => key; -3 => table; -4 => consumer + const char *value = lua_tostring(L, -1); + + rd_kafka_topic_partition_list_add(list, value, RD_KAFKA_PARTITION_UA); + // pop value, leaving original key + lua_pop(L, 1); + // stack now contains: -1 => key; -2 => table; -3 => consumer + } + // stack now contains: -1 => table; -2 => consumer + + rd_kafka_resume_partitions(consumer->rd_consumer, list); + + + for(int i=0; icnt;i++) { + if (list->elems[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) { + const char *const_err_str = rd_kafka_err2str(list->elems[i].err); + char err_str[512]; + strcpy(err_str, const_err_str); + int fail = safe_pushstring(L, err_str); + rd_kafka_topic_partition_list_destroy(list); + return fail ? lua_push_error(L): 1; + } + } + + rd_kafka_topic_partition_list_destroy(list); + + return 0; +} + int lua_consumer_subscribe(struct lua_State *L) { if (lua_gettop(L) != 2 || !lua_istable(L, 2)) diff --git a/kafka/consumer.h b/kafka/consumer.h index 9949dec..b1da518 100644 --- a/kafka/consumer.h +++ b/kafka/consumer.h @@ -36,6 +36,10 @@ int lua_consumer_subscribe(struct lua_State *L); int lua_consumer_unsubscribe(struct lua_State *L); +int lua_consumer_pause(struct lua_State *L); + +int lua_consumer_resume(struct lua_State *L); + int lua_consumer_tostring(struct lua_State *L); int lua_consumer_poll_msg(struct lua_State *L); diff --git a/kafka/init.lua b/kafka/init.lua index 2c1d3e3..0fd3a73 100644 --- a/kafka/init.lua +++ b/kafka/init.lua @@ -161,6 +161,14 @@ function Consumer:close() return ok, err end +function Consumer:pause(topics) + return self._consumer:pause(topics) +end + +function Consumer:resume(topics) + return self._consumer.resume(topics) +end + function Consumer:subscribe(topics) return self._consumer:subscribe(topics) end diff --git a/kafka/tnt_kafka.c b/kafka/tnt_kafka.c index 3afc933..52c7393 100644 --- a/kafka/tnt_kafka.c +++ b/kafka/tnt_kafka.c @@ -19,6 +19,8 @@ luaopen_kafka_tntkafka(lua_State *L) { static const struct luaL_Reg consumer_methods [] = { {"subscribe", lua_consumer_subscribe}, {"unsubscribe", lua_consumer_unsubscribe}, + {"pause", lua_consumer_pause}, + {"resume", lua_consumer_resume}, {"poll_msg", lua_consumer_poll_msg}, {"poll_logs", lua_consumer_poll_logs}, {"poll_errors", lua_consumer_poll_errors},