11#include " Aims.hpp"
22
3- // {{{ macros
4-
5- #define STOP_KAFKA_CONSUMER (name ) do {\
6- } while (0 )
7-
8- #define INIT_KAFKA_PRODUCER (name ) do {\
9- _kafkaProducerCallback##name = new aims::kafka::Producer##name (_context);\
10- _kafkaProducer##name = new adbase::kafka::Producer (_configure->brokerListProducer ##name,\
11- _configure->queueLength ##name, _configure->debug ##name);\
12- _kafkaProducer##name->setSendHandler (std::bind (&aims::kafka::Producer##name::send,\
13- _kafkaProducerCallback##name,\
14- std::placeholders::_1, std::placeholders::_2,\
15- std::placeholders::_3, std::placeholders::_4));\
16- _kafkaProducer##name->setAckHandler (std::bind (&aims::kafka::Producer##name::ackCallback, \
17- _kafkaProducerCallback##name, std::placeholders::_1));\
18- _kafkaProducer##name->setErrorHandler (std::bind (&aims::kafka::Producer##name::errorCallback, \
19- _kafkaProducerCallback##name, std::placeholders::_1));\
20- } while (0 )
21- #define START_KAFKA_PRODUCER (name ) do {\
22- _kafkaProducer##name->start ();\
23- } while (0 )
24- #define STOP_KAFKA_PRODUCER (name ) do {\
25- if (_kafkaProducer##name != nullptr ) {\
26- _kafkaProducer##name->stop ();\
27- }\
28- if (_kafkaProducerCallback##name != nullptr ) {\
29- delete _kafkaProducerCallback##name;\
30- _kafkaProducerCallback##name = nullptr ;\
31- }\
32- } while (0 )
33-
34- // }}}
353// {{{ Aims::Aims()
364
375Aims::Aims (AimsContext* context) :
@@ -52,10 +20,15 @@ Aims::~Aims() {
5220void Aims::run () {
5321 // 初始化 server
5422 init ();
23+ _kafka->start ();
24+ }
5525
56- for (auto &t : _kafkas) {
57- t.second ->start ();
58- }
26+ // }}}
27+ // {{{ void Aims::reload()
28+
29+ void Aims::reload () {
30+ std::vector<std::string> topicNames = adbase::explode (_configure->topicNameConsumerOut , ' ,' , true );
31+ _kafka->setTopics (topicNames);
5932}
6033
6134// }}}
@@ -69,11 +42,10 @@ void Aims::init() {
6942// {{{ void Aims::stop()
7043
7144void Aims::stop () {
72- for (auto &t : _kafkas) {
73- if (t.second != nullptr ) {
74- t.second ->stop ();
75- }
76- }
45+ if (_kafka != nullptr ) {
46+ _kafka->stop ();
47+ delete _kafka;
48+ }
7749 if (_kafkaConsumerCallbackOut != nullptr ) {
7850 delete _kafkaConsumerCallbackOut;
7951 _kafkaConsumerCallbackOut = nullptr ;
@@ -86,25 +58,19 @@ void Aims::stop() {
8658void Aims::initKafkaConsumer () {
8759 _kafkaConsumerCallbackOut = new aims::kafka::ConsumerOut (_context);
8860 std::vector<std::string> topicNames = adbase::explode (_configure->topicNameConsumerOut , ' ,' , true );
89- for (auto &t : topicNames) {
90- adbase::kafka::Consumer* consumer = new adbase::kafka::Consumer (t, _configure->groupIdOut ,
91- _configure->brokerListConsumerOut );
92- consumer->setMessageHandler (std::bind (&aims::kafka::ConsumerOut::pull,
93- _kafkaConsumerCallbackOut,
94- std::placeholders::_1, std::placeholders::_2,
95- std::placeholders::_3, std::placeholders::_4));
96- consumer->setStatCallback (std::bind (&aims::kafka::ConsumerOut::stat,
97- _kafkaConsumerCallbackOut,
98- std::placeholders::_1, std::placeholders::_2));
99- consumer->setKafkaDebug (_configure->kafkaDebugOut );
100- consumer->setOffsetStorePath (_configure->offsetPathOut );
101- consumer->setKafkaStatInterval (_configure->statIntervalOut );
102- if (_configure->isNewConsumerOut ) {
103- consumer->setIsNewConsumer (true );
104- consumer->setOffsetStoreMethod (" broker" );
105- }
106- _kafkas[t] = consumer;
107- }
61+ LOG_INFO << " Topic list:" << _configure->topicNameConsumerOut ;
62+
63+ _kafka = new adbase::kafka::ConsumerBatch (topicNames, _configure->groupIdOut ,
64+ _configure->brokerListConsumerOut );
65+ _kafka->setMessageHandler (std::bind (&aims::kafka::ConsumerOut::pull,
66+ _kafkaConsumerCallbackOut,
67+ std::placeholders::_1, std::placeholders::_2,
68+ std::placeholders::_3, std::placeholders::_4));
69+ _kafka->setStatCallback (std::bind (&aims::kafka::ConsumerOut::stat,
70+ _kafkaConsumerCallbackOut,
71+ std::placeholders::_1, std::placeholders::_2));
72+ _kafka->setKafkaDebug (_configure->kafkaDebugOut );
73+ _kafka->setKafkaStatInterval (_configure->statIntervalOut );
10874}
10975
11076// }}}
0 commit comments