From a3c4960069720ccda79f02582e8f5da3a70d520b Mon Sep 17 00:00:00 2001 From: lxning Date: Thu, 26 Sep 2019 16:24:20 -0700 Subject: [PATCH] replace confluent with samrama (#306) --- CMakeLists.txt | 12 +- controller/models/job.go | 10 +- .../tasks/etcd/ingestion_assignment_test.go | 6 +- rdkafka.cmake | 50 --- subscriber/common/consumer/kafka/kafka.go | 376 ++++++++++++++++++ .../common/consumer/kafka/kafka_confluent.go | 320 --------------- ...{kafka_confluent_test.go => kafka_test.go} | 106 ++--- subscriber/common/job/driver_test.go | 23 ++ .../common/job/streaming_processor_test.go | 84 ++-- subscriber/config/test/jobs/job1-local.json | 6 +- 10 files changed, 522 insertions(+), 471 deletions(-) delete mode 100644 rdkafka.cmake create mode 100644 subscriber/common/consumer/kafka/kafka.go delete mode 100644 subscriber/common/consumer/kafka/kafka_confluent.go rename subscriber/common/consumer/kafka/{kafka_confluent_test.go => kafka_test.go} (57%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 911e2385..0b7eb581 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -202,10 +202,6 @@ add_library(algorithm SHARED target_link_libraries(algorithm mem) ################################ -# rdkafka -################################ -include(rdkafka.cmake) -################################ # Unit Tests ################################ file(GLOB QUERY_UNITTEST_FILES query/*_unittest.cu) @@ -255,13 +251,11 @@ execute_process( ) add_custom_target(lint-all - DEPENDS rdkafka::rdkafka lib COMMAND ./scripts/clang-lint.sh ${ALL_C_SRC} COMMAND ./scripts/golang-lint.sh ${ALL_GO_SRC} VERBATIM) add_custom_target(lint - DEPENDS rdkafka::rdkafka lib COMMAND ./scripts/clang-lint.sh ${CHANGED_C_SRC} COMMAND ./scripts/golang-lint.sh ${CHANGED_GO_SRC} VERBATIM) @@ -280,7 +274,7 @@ add_custom_target(aresbrokerd DEPENDS ${ALL_GO_SRC_LIST} VERBATIM ) -add_custom_target(ares-subscriber DEPENDS rdkafka::rdkafka ${ALL_GO_SRC_LIST} +add_custom_target(ares-subscriber DEPENDS ${ALL_GO_SRC_LIST} COMMAND go build -tags static -o bin/ares-subscriber ./cmd/subscriber/main.go VERBATIM ) @@ -289,7 +283,7 @@ add_custom_target(run_server DEPENDS aresd COMMAND bash -c 'DYLD_LIBRARY_PATH=$$LIBRARY_PATH ./bin/aresd' ) -add_custom_target(test-golang DEPENDS aresd rdkafka::rdkafka +add_custom_target(test-golang COMMAND bash -c 'ARES_ENV=test DYLD_LIBRARY_PATH=$$LIBRARY_PATH ginkgo -r' ) @@ -297,7 +291,7 @@ add_custom_target(test-golang DEPENDS aresd rdkafka::rdkafka # misc ############################### -add_custom_target(travis DEPENDS rdkafka::rdkafka +add_custom_target(travis COMMAND bash -c 'ARES_ENV=test .travis/run_unittest.sh' ) diff --git a/controller/models/job.go b/controller/models/job.go index 81400d20..1c698972 100644 --- a/controller/models/job.go +++ b/controller/models/job.go @@ -44,11 +44,11 @@ type KafkaConfig struct { RestartInterval int `json:"restartInterval,omitempty"` FailureHandler FailureHandler `json:"failureHandler,omitempty"` - // confluent kafka - KafkaBroker string `json:"kafkaBroker" yaml:"kafkaBroker"` - MaxPollIntervalMs int `json:"maxPollIntervalMs" yaml:"maxPollIntervalMs" default:"300000"` - SessionTimeoutNs int `json:"sessionTimeoutNs" yaml:"sessionTimeoutNs" default:"10000"` - ChannelBufferSize uint `json:"channelBufferSize" yaml:"channelBufferSize" default:"256"` + // sarama config + KafkaBroker string `json:"kafkaBroker" yaml:"kafkaBroker"` + SessionTimeoutMs int `json:"sessionTimeoutMs" yaml:"sessionTimeoutMs" default:"10000"` + ChannelBufferSize uint `json:"channelBufferSize" yaml:"channelBufferSize" default:"256"` + ReblanceTimeoutSec int `json:"reblanceTimeoutSec" yaml:"reblanceTimeoutSec" default:"10"` } // JobConfig is job's config diff --git a/controller/tasks/etcd/ingestion_assignment_test.go b/controller/tasks/etcd/ingestion_assignment_test.go index 60c2b538..bb589c6b 100644 --- a/controller/tasks/etcd/ingestion_assignment_test.go +++ b/controller/tasks/etcd/ingestion_assignment_test.go @@ -311,9 +311,9 @@ func TestIngestionAssignmentTask(t *testing.T) { } }, "kafkaBroker": "", - "maxPollIntervalMs": 0, - "sessionTimeoutNs": 0, - "channelBufferSize": 0 + "sessionTimeoutMs": 0, + "channelBufferSize": 0, + "reblanceTimeoutSec": 0 } } ]` diff --git a/rdkafka.cmake b/rdkafka.cmake deleted file mode 100644 index 1c49b994..00000000 --- a/rdkafka.cmake +++ /dev/null @@ -1,50 +0,0 @@ -# Build only the static C library -# Does not build C++ library -# Does not build shared libraries -# Does not run tests -# Does not generate documentation - - -# Protect multiple-inclusion of this CMake file -if( NOT TARGET rdkafka::rdkafka ) - -# librdkafka depends on pthread and zlib -find_package(Threads REQUIRED) -find_package(ZLIB REQUIRED) # sudo apt install zlib1g-dev - -# The librdkafka build is based on tools './configure' and 'make' -include(ExternalProject) -ExternalProject_Add( rdkafka -# DEPENDS Threads::Threads ZLIB::ZLIB - SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}/thirdparty/librdkafka - BUILD_IN_SOURCE 1 -# UPDATE_COMMAND echo "Full clean (make clean)" && make clean && rm -f Makefile.config config.cache config.h config.log config.log.old rdkafka.pc - UPDATE_COMMAND echo "Full clean (make distclean)" && make distclean - CONFIGURE_COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/librdkafka/configure - #--prefix=${CMAKE_BINARY_DIR} - --prefix=${CMAKE_CURRENT_SOURCE_DIR} - --cc=${CMAKE_C_COMPILER} - --cxx=${CMAKE_CXX_COMPILER} -# --arch=${MARCH} - --CFLAGS=${CMAKE_C_FLAGS_${CMAKE_BUILD_TYPE}} # TODO(olibre): Retrieve correct flags set by - --CXXFLAGS=${CMAKE_CXX_FLAGS_${CMAKE_BUILD_TYPE}} # add_compile_options() and add_definitions() - --LDFLAGS=${CMAKE_STATIC_LINKER_FLAGS} - --ARFLAGS=${CMAKE_STATIC_LINKER_FLAGS} - --enable-static - BUILD_COMMAND echo "Build only librdkafka.a (make librdkafka.a -j4) => No librdkafka++ No shared library No check" && - #make -C src librdkafka.a -j4 - make - INSTALL_COMMAND echo "Install only librdkafka.a" && - make install - BUILD_BYPRODUCTS ${CMAKE_BINARY_DIR}/lib/librdkafka.a -) - -# Target 'rdkafka::rdkafka' to define: lib-location, include-dir and dependencies -add_library( rdkafka::rdkafka STATIC IMPORTED GLOBAL ) -add_dependencies( rdkafka::rdkafka rdkafka ) -set_target_properties( rdkafka::rdkafka PROPERTIES - IMPORTED_LOCATION ${CMAKE_CURRENT_SOURCE_DIR}/lib/librdkafka.a - INTERFACE_INCLUDE_DIRECTORIES ${CMAKE_CURRENT_SOURCE_DIR}/thirdparty/librdkafka/src - INTERFACE_LINK_LIBRARIES "Threads::Threads;${ZLIB_LIBRARIES}") #ZLIB::ZLIB -endif() - diff --git a/subscriber/common/consumer/kafka/kafka.go b/subscriber/common/consumer/kafka/kafka.go new file mode 100644 index 00000000..f136a733 --- /dev/null +++ b/subscriber/common/consumer/kafka/kafka.go @@ -0,0 +1,376 @@ +// Copyright (c) 2017-2018 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package kafka + +import ( + "context" + "fmt" + "github.com/Shopify/sarama" + "github.com/uber-go/tally" + "github.com/uber/aresdb/subscriber/common/consumer" + "github.com/uber/aresdb/subscriber/common/rules" + "github.com/uber/aresdb/subscriber/config" + "github.com/uber/aresdb/utils" + "go.uber.org/zap" + "strconv" + "strings" + "sync" + "time" +) + +// KafkaConsumer implements Consumer interface +type KafkaConsumer struct { + sarama.ConsumerGroup + *sarama.Config + sync.Mutex + + group string + topicArray []string + logger *zap.Logger + scope tally.Scope + msgCh chan consumer.Message + + // WARNING: The following channels should not be closed by the lib users + closeAttempted bool + closeCh chan struct{} +} + +// KafkaMessage implements Message interface +type KafkaMessage struct { + *sarama.ConsumerMessage + + consumer consumer.Consumer + clusterName string + session sarama.ConsumerGroupSession +} + +// CGHandler represents a Sarama consumer group handler +type CGHandler struct { + consumer *KafkaConsumer + msgCounter map[string]map[int32]tally.Counter + msgByteCounter map[string]map[int32]tally.Counter + msgOffsetGauge map[string]map[int32]tally.Gauge + msgLagGauge map[string]map[int32]tally.Gauge +} + +// GetConsumerGroupName will return the consumer group name to use or being used +// for given deployment and job name +func GetConsumerGroupName(deployment, jobName string, aresCluster string) string { + return fmt.Sprintf("ares-subscriber_%s_%s_%s_streaming", deployment, jobName, aresCluster) +} + +func getKafkaVersion(v string) sarama.KafkaVersion { + switch v { + case "V0_10_2_0": + return sarama.V0_10_2_0 + case "V0_10_2_1": + return sarama.V0_10_2_1 + case "V0_11_0_0": + return sarama.V0_11_0_0 + case "V0_11_0_1": + return sarama.V0_11_0_1 + case "V0_11_0_2": + return sarama.V0_11_0_2 + case "V1_0_0_0": + return sarama.V1_0_0_0 + case "V1_1_0_0": + return sarama.V1_1_0_0 + case "V1_1_1_0": + return sarama.V1_1_1_0 + case "V2_0_0_0": + return sarama.V2_0_0_0 + case "V2_0_1_0": + return sarama.V2_0_1_0 + case "V2_1_0_0": + return sarama.V2_1_0_0 + case "V2_2_0_0": + return sarama.V2_2_0_0 + default: + return sarama.V0_10_2_0 + } +} + +// NewKafkaConsumer creates kafka consumer +func NewKafkaConsumer(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (consumer.Consumer, error) { + cfg := sarama.NewConfig() + if jobConfig.StreamingConfig.SessionTimeoutMs > 0 { + cfg.Consumer.Group.Session.Timeout = time.Duration(jobConfig.StreamingConfig.SessionTimeoutMs) * time.Millisecond + } + offsetReset := sarama.OffsetOldest + if jobConfig.StreamingConfig.LatestOffset { + offsetReset = sarama.OffsetNewest + } + cfg.Consumer.Offsets.Initial = offsetReset + cfg.Consumer.Return.Errors = true + if jobConfig.StreamingConfig.ReblanceTimeoutSec > 0 { + cfg.Consumer.Group.Rebalance.Timeout = time.Duration(jobConfig.StreamingConfig.ReblanceTimeoutSec) * time.Second + } + cfg.Version = getKafkaVersion(jobConfig.StreamingConfig.KafkaVersion) + + serviceConfig.Logger.Info("Kafka consumer", + zap.String("job", jobConfig.Name), + zap.String("broker", jobConfig.StreamingConfig.KafkaBroker), + zap.Any("config", cfg)) + + group := GetConsumerGroupName(serviceConfig.Environment.Deployment, jobConfig.Name, jobConfig.AresTableConfig.Cluster) + c, err := sarama.NewConsumerGroup(strings.Split(jobConfig.StreamingConfig.KafkaBroker, ","), group, cfg) + if err != nil { + return nil, utils.StackError(err, "Unable to initialize Kafka consumer") + } + + logger := serviceConfig.Logger.With( + zap.String("kafkaBroker", jobConfig.StreamingConfig.KafkaBroker), + zap.String("topic", jobConfig.StreamingConfig.Topic), + ) + + scope := serviceConfig.Scope.Tagged(map[string]string{ + "broker": jobConfig.StreamingConfig.KafkaBroker, + }) + + kc := KafkaConsumer{ + ConsumerGroup: c, + Config: cfg, + group: group, + topicArray: []string{jobConfig.StreamingConfig.Topic}, + logger: logger, + scope: scope, + msgCh: make(chan consumer.Message, jobConfig.StreamingConfig.ChannelBufferSize), + closeCh: make(chan struct{}), + } + + cgHandler := CGHandler{ + consumer: &kc, + } + ctx := context.Background() + go kc.startConsuming(ctx, &cgHandler) + + logger.Info("Consumer is up and running") + return &kc, nil +} + +// Name returns the name of this consumer group. +func (c *KafkaConsumer) Name() string { + return c.group +} + +// Topics returns the names of the topics being consumed. +func (c *KafkaConsumer) Topics() []string { + return append([]string(nil), c.topicArray...) +} + +// Errors returns a channel of errors for the topic. To prevent deadlocks, +// users must read from the error channel. +// +// All errors returned from this channel can be safely cast to the +// consumer.Error interface, which allows structured access to the topic +// name and partition number. +func (c *KafkaConsumer) Errors() <-chan error { + return c.ConsumerGroup.Errors() +} + +// Closed returns a channel that unblocks when the consumer successfully shuts +// down. +func (c *KafkaConsumer) Closed() <-chan struct{} { + return c.closeCh +} + +// Messages returns a channel of messages for the topic. +// +// If the consumer is not configured with nonzero buffer size, the Errors() +// channel must be read in conjunction with Messages() to prevent deadlocks. +func (c *KafkaConsumer) Messages() <-chan consumer.Message { + return c.msgCh +} + +// CommitUpTo marks this message and all previous messages in the same partition +// as processed. The last processed offset for each partition is periodically +// flushed to ZooKeeper; on startup, consumers begin processing after the last +// stored offset. +func (c *KafkaConsumer) CommitUpTo(msg consumer.Message) error { + if concreteMsg, ok := msg.(*KafkaMessage); ok { + if concreteMsg.session != nil { + concreteMsg.session.MarkMessage(concreteMsg.ConsumerMessage, "") + } else { + return fmt.Errorf("Session is nil, msg:%v", msg) + } + } else { + return fmt.Errorf("Failed to convert KafkaMessage, msg:%v", msg) + } + return nil +} + +func (c *KafkaConsumer) startConsuming(ctx context.Context, cgHandler *CGHandler) { + c.logger.Info("Start consumption goroutine") + + // those four Metrics are of the format {"":{: , ...}, ...} + cgHandler.msgCounter = make(map[string]map[int32]tally.Counter) + cgHandler.msgByteCounter = make(map[string]map[int32]tally.Counter) + cgHandler.msgOffsetGauge = make(map[string]map[int32]tally.Gauge) + cgHandler.msgLagGauge = make(map[string]map[int32]tally.Gauge) + + // initialize counter map + for _, topic := range c.topicArray { + cgHandler.msgCounter[topic] = make(map[int32]tally.Counter) + cgHandler.msgByteCounter[topic] = make(map[int32]tally.Counter) + cgHandler.msgOffsetGauge[topic] = make(map[int32]tally.Gauge) + cgHandler.msgLagGauge[topic] = make(map[int32]tally.Gauge) + } + + for run := true; run; { + if err := c.ConsumerGroup.Consume(ctx, c.topicArray, cgHandler); err != nil { + c.logger.Error("Received error from consumer", zap.Error(err)) + run = false + } + // check if context was cancelled, signaling that the consumer should stop + if ctx.Err() != nil { + run = false + c.logger.Info("Received close Signal") + } + if !run { + c.Close() + } + } +} + +func (c *KafkaConsumer) processMsg(msg *sarama.ConsumerMessage, cgHandler *CGHandler, + highWaterOffset int64, session sarama.ConsumerGroupSession) { + c.Lock() + defer c.Unlock() + + c.logger.Debug("Received nessage event", zap.Any("message", msg)) + c.msgCh <- &KafkaMessage{ + ConsumerMessage: msg, + consumer: c, + session: session, + } + + topic := msg.Topic + partition := msg.Partition + pncm := cgHandler.msgCounter[topic] + nCounter, ok := pncm[partition] + if !ok { + nCounter = c.scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Counter("messages-count") + pncm[partition] = nCounter + } + nCounter.Inc(1) + + pbcm := cgHandler.msgByteCounter[topic] + bCounter, ok := pbcm[partition] + if !ok { + bCounter = c.scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Counter("message-bytes-count") + pbcm[partition] = bCounter + } + bCounter.Inc(int64(len(msg.Value))) + + pogm := cgHandler.msgOffsetGauge[topic] + oGauge, ok := pogm[partition] + if !ok { + oGauge = c.scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Gauge("latest-offset") + pogm[partition] = oGauge + } + oGauge.Update(float64(msg.Offset)) + + plgm := cgHandler.msgLagGauge[topic] + lGauge, ok := plgm[partition] + if !ok { + lGauge = c.scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Gauge("offset-lag") + } + + if highWaterOffset > int64(msg.Offset) { + lGauge.Update(float64(highWaterOffset - int64(msg.Offset) - 1)) + } else { + lGauge.Update(0) + } +} + +func (c *KafkaConsumer) Close() error { + c.Lock() + defer c.Unlock() + + if c.closeAttempted { + return fmt.Errorf("Close attempted again on consumer group %s", c.group) + } + c.logger.Info("Attempting to close consumer", + zap.String("consumerGroup", c.group)) + err := c.ConsumerGroup.Close() + if err != nil { + c.logger.Error("Failed to close consumer", + zap.String("consumerGroup", c.group), + zap.Error(err)) + } else { + c.logger.Info("Started to close consumer", + zap.String("consumerGroup", c.group)) + } + close(c.closeCh) + c.closeAttempted = true + return err +} + +func (m *KafkaMessage) Key() []byte { + return m.ConsumerMessage.Key +} + +func (m *KafkaMessage) Value() []byte { + return m.ConsumerMessage.Value +} + +func (m *KafkaMessage) Topic() string { + return m.ConsumerMessage.Topic +} + +func (m *KafkaMessage) Partition() int32 { + return m.ConsumerMessage.Partition +} + +func (m *KafkaMessage) Offset() int64 { + return m.ConsumerMessage.Offset +} + +func (m *KafkaMessage) Ack() { + if m.consumer != nil { + m.consumer.CommitUpTo(m) + } +} + +func (m *KafkaMessage) Nack() { + // No op for now since Kafka based DLQ is not implemented +} + +func (m *KafkaMessage) Cluster() string { + return m.clusterName +} + +// Setup is run at the beginning of a new session, before ConsumeClaim +func (h *CGHandler) Setup(sarama.ConsumerGroupSession) error { + return nil +} + +// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited +func (h *CGHandler) Cleanup(sarama.ConsumerGroupSession) error { + return nil +} + +// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). +func (h *CGHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + + // NOTE: + // Do not move the code below to a goroutine. + // The `ConsumeClaim` itself is called within a goroutine, see: + // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29 + for message := range claim.Messages() { + h.consumer.processMsg(message, h, claim.HighWaterMarkOffset(), session) + } + + return nil +} diff --git a/subscriber/common/consumer/kafka/kafka_confluent.go b/subscriber/common/consumer/kafka/kafka_confluent.go deleted file mode 100644 index 29a6cb94..00000000 --- a/subscriber/common/consumer/kafka/kafka_confluent.go +++ /dev/null @@ -1,320 +0,0 @@ -// Copyright (c) 2017-2018 Uber Technologies, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka - -import ( - "fmt" - "sync" - - "github.com/uber/aresdb/subscriber/common/consumer" - - "strconv" - - kafkaConfluent "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/uber-go/tally" - "github.com/uber/aresdb/subscriber/common/rules" - "github.com/uber/aresdb/subscriber/config" - "github.com/uber/aresdb/utils" - "go.uber.org/zap" -) - -// KafkaConsumer implements Consumer interface -type KafkaConsumer struct { - *kafkaConfluent.Consumer - kafkaConfluent.ConfigMap - sync.Mutex - - TopicArray []string - Logger *zap.Logger - Scope tally.Scope - ErrCh chan error - MsgCh chan consumer.Message - - // WARNING: The following channels should not be closed by the lib users - CloseAttempted bool - CloseErr error - CloseCh chan struct{} -} - -// KafkaMessage implements Message interface -type KafkaMessage struct { - *kafkaConfluent.Message - - Consumer consumer.Consumer - ClusterName string -} - -// NewKafkaConsumer creates kafka consumer by using https://github.com/confluentinc/confluent-kafka-go. -func NewKafkaConsumer(jobConfig *rules.JobConfig, serviceConfig config.ServiceConfig) (consumer.Consumer, error) { - offsetReset := "earliest" - if jobConfig.StreamingConfig.LatestOffset { - offsetReset = "latest" - } - cfg := kafkaConfluent.ConfigMap{ - "bootstrap.servers": jobConfig.StreamingConfig.KafkaBroker, - "group.id": GetConsumerGroupName(serviceConfig.Environment.Deployment, jobConfig.Name, jobConfig.AresTableConfig.Cluster), - "max.poll.interval.ms": jobConfig.StreamingConfig.MaxPollIntervalMs, - "session.timeout.ms": jobConfig.StreamingConfig.SessionTimeoutNs, - "go.events.channel.enable": false, - "go.application.rebalance.enable": false, - "enable.partition.eof": true, - "auto.offset.reset": offsetReset, - } - serviceConfig.Logger.Info("Kafka consumer", - zap.String("job", jobConfig.Name), - zap.String("broker", jobConfig.StreamingConfig.KafkaBroker), - zap.Any("config", cfg)) - - c, err := kafkaConfluent.NewConsumer(&cfg) - if err != nil { - return nil, utils.StackError(err, "Unable to initialize Kafka consumer") - } - - err = c.Subscribe(jobConfig.StreamingConfig.Topic, nil) - if err != nil { - return nil, utils.StackError(err, fmt.Sprintf("Unable to subscribe to topic: %s", jobConfig.StreamingConfig.Topic)) - } - - logger := serviceConfig.Logger.With( - zap.String("kafkaBroker", jobConfig.StreamingConfig.KafkaBroker), - zap.String("topic", jobConfig.StreamingConfig.Topic), - ) - - scope := serviceConfig.Scope.Tagged(map[string]string{ - "broker": jobConfig.StreamingConfig.KafkaBroker, - }) - - kc := KafkaConsumer{ - Consumer: c, - ConfigMap: cfg, - TopicArray: []string{jobConfig.StreamingConfig.Topic}, - Logger: logger, - Scope: scope, - ErrCh: make(chan error, jobConfig.StreamingConfig.ChannelBufferSize), - MsgCh: make(chan consumer.Message, jobConfig.StreamingConfig.ChannelBufferSize), - CloseCh: make(chan struct{}), - } - - go kc.startConsuming() - return &kc, nil -} - -// Name returns the name of this consumer group. -func (c *KafkaConsumer) Name() string { - return c.ConfigMap["group.id"].(string) -} - -// Topics returns the names of the topics being consumed. -func (c *KafkaConsumer) Topics() []string { - return append([]string(nil), c.TopicArray...) -} - -// Errors returns a channel of errors for the topic. To prevent deadlocks, -// users must read from the error channel. -// -// All errors returned from this channel can be safely cast to the -// consumer.Error interface, which allows structured access to the topic -// name and partition number. -func (c *KafkaConsumer) Errors() <-chan error { - return c.ErrCh -} - -// Closed returns a channel that unblocks when the consumer successfully shuts -// down. -func (c *KafkaConsumer) Closed() <-chan struct{} { - return c.CloseCh -} - -// Messages returns a channel of messages for the topic. -// -// If the consumer is not configured with nonzero buffer size, the Errors() -// channel must be read in conjunction with Messages() to prevent deadlocks. -func (c *KafkaConsumer) Messages() <-chan consumer.Message { - return c.MsgCh -} - -// CommitUpTo marks this message and all previous messages in the same partition -// as processed. The last processed offset for each partition is periodically -// flushed to ZooKeeper; on startup, consumers begin processing after the last -// stored offset. -func (c *KafkaConsumer) CommitUpTo(msg consumer.Message) error { - if concreteMsg, ok := msg.(*KafkaMessage); ok { - // Just unwrap the underlying message. - c.CommitMessage(concreteMsg.Message) - } else { - topic := msg.Topic() - c.CommitOffsets([]kafkaConfluent.TopicPartition{ - { - Topic: &topic, - Partition: msg.Partition(), - Offset: kafkaConfluent.Offset(msg.Offset()), - Error: nil, - }, - }) - } - return nil -} - -func (c *KafkaConsumer) startConsuming() { - c.Logger.Debug("Start consumption goroutine") - - // those four Metrics are of the format {"":{: , ...}, ...} - msgCounter := make(map[string]map[int32]tally.Counter) - msgByteCounter := make(map[string]map[int32]tally.Counter) - msgOffsetGauge := make(map[string]map[int32]tally.Gauge) - msgLagGauge := make(map[string]map[int32]tally.Gauge) - - // initialize outer map - for _, topic := range c.TopicArray { - msgCounter[topic] = make(map[int32]tally.Counter) - msgByteCounter[topic] = make(map[int32]tally.Counter) - msgOffsetGauge[topic] = make(map[int32]tally.Gauge) - msgLagGauge[topic] = make(map[int32]tally.Gauge) - } - - for run := true; run; { - select { - case _ = <-c.CloseCh: - c.Logger.Info("Received close Signal") - run = false - case event := <-c.Events(): - switch e := event.(type) { - case *kafkaConfluent.Message: - c.processMsg(e, msgCounter, msgByteCounter, msgOffsetGauge, msgLagGauge) - case kafkaConfluent.Error: - c.ErrCh <- e - c.Logger.Error("Received error event", zap.Error(e)) - default: - c.Logger.Info("Ignored consumer event", zap.Any("event", e)) - } - } - } -} - -func (c *KafkaConsumer) processMsg(msg *kafkaConfluent.Message, - msgCounter map[string]map[int32]tally.Counter, - msgByteCounter map[string]map[int32]tally.Counter, - msgOffsetGauge map[string]map[int32]tally.Gauge, - msgLagGauge map[string]map[int32]tally.Gauge) { - - c.Logger.Debug("Received nessage event", zap.Any("message", msg)) - c.MsgCh <- &KafkaMessage{ - Message: msg, - Consumer: c, - } - - topic := *msg.TopicPartition.Topic - partition := msg.TopicPartition.Partition - pncm := msgCounter[topic] - nCounter, ok := pncm[partition] - if !ok { - nCounter = c.Scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Counter("messages-count") - pncm[partition] = nCounter - } - nCounter.Inc(1) - - pbcm := msgByteCounter[topic] - bCounter, ok := pbcm[partition] - if !ok { - bCounter = c.Scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Counter("message-bytes-count") - pbcm[partition] = bCounter - } - bCounter.Inc(int64(len(msg.Value))) - - pogm := msgOffsetGauge[topic] - oGauge, ok := pogm[partition] - if !ok { - oGauge = c.Scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Gauge("latest-offset") - pogm[partition] = oGauge - } - oGauge.Update(float64(msg.TopicPartition.Offset)) - - plgm := msgLagGauge[topic] - lGauge, ok := plgm[partition] - if !ok { - lGauge = c.Scope.Tagged(map[string]string{"topic": topic, "partition": strconv.Itoa(int(partition))}).Gauge("offset-lag") - } - - _, offset, _ := c.Consumer.QueryWatermarkOffsets(topic, partition, 100) - - if offset > int64(msg.TopicPartition.Offset) { - lGauge.Update(float64(offset - int64(msg.TopicPartition.Offset) - 1)) - } else { - lGauge.Update(0) - } -} - -func (c *KafkaConsumer) Close() error { - c.Lock() - defer c.Unlock() - - if c.CloseAttempted { - return fmt.Errorf("Close attempted again on consumer group %s", c.ConfigMap["group.id"].(string)) - } - - c.Logger.Debug("Attempting to close consumer", - zap.String("consumerGroup", c.ConfigMap["group.id"].(string))) - c.CloseErr = c.Consumer.Close() - if c.CloseErr != nil { - c.Logger.With(zap.NamedError("error", c.CloseErr)).Error("Failed to close consumer", - zap.String("consumerGroup", c.ConfigMap["group.id"].(string))) - } else { - c.Logger.Debug("Started to close consumer", - zap.String("consumerGroup", c.ConfigMap["group.id"].(string))) - } - close(c.CloseCh) - c.CloseAttempted = true - return c.CloseErr -} - -func (m *KafkaMessage) Key() []byte { - return m.Message.Key -} - -func (m *KafkaMessage) Value() []byte { - return m.Message.Value -} - -func (m *KafkaMessage) Topic() string { - return *m.TopicPartition.Topic -} - -func (m *KafkaMessage) Partition() int32 { - return m.TopicPartition.Partition -} - -func (m *KafkaMessage) Offset() int64 { - return int64(m.TopicPartition.Offset) -} - -func (m *KafkaMessage) Ack() { - if m.Consumer != nil { - m.Consumer.CommitUpTo(m) - } -} - -func (m *KafkaMessage) Nack() { - // No op for now since Kafka based DLQ is not implemented -} - -func (m *KafkaMessage) Cluster() string { - return m.ClusterName -} - -// GetConsumerGroupName will return the consumer group name to use or being used -// for given deployment and job name -func GetConsumerGroupName(deployment, jobName string, aresCluster string) string { - return fmt.Sprintf("ares-subscriber_%s_%s_%s_streaming", deployment, jobName, aresCluster) -} diff --git a/subscriber/common/consumer/kafka/kafka_confluent_test.go b/subscriber/common/consumer/kafka/kafka_test.go similarity index 57% rename from subscriber/common/consumer/kafka/kafka_confluent_test.go rename to subscriber/common/consumer/kafka/kafka_test.go index 35104af2..c55bfe6e 100644 --- a/subscriber/common/consumer/kafka/kafka_confluent_test.go +++ b/subscriber/common/consumer/kafka/kafka_test.go @@ -15,9 +15,7 @@ package kafka import ( - "os" - - kafkaConfluent "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/Shopify/sarama" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/uber-go/tally" @@ -27,9 +25,11 @@ import ( "github.com/uber/aresdb/subscriber/config" "github.com/uber/aresdb/utils" "go.uber.org/zap" + "os" ) var _ = Describe("KafkaConsumer", func() { + var broker *sarama.MockBroker serviceConfig := config.ServiceConfig{ Environment: utils.EnvironmentContext{ Deployment: "test", @@ -61,6 +61,32 @@ var _ = Describe("KafkaConsumer", func() { jobConfigs["job1"]["dev01"].AresTableConfig.Cluster = "dev01" } + BeforeEach(func() { + // kafka broker mock setup + broker = sarama.NewMockBrokerAddr(serviceConfig.Logger.Sugar(), 1, jobConfigs["job1"]["dev01"].StreamingConfig.KafkaBroker) + mockFetchResponse := sarama.NewMockFetchResponse(serviceConfig.Logger.Sugar(), 1) + for i := 0; i < 10; i++ { + mockFetchResponse.SetMessage("job1-topic", 0, int64(i+1234), sarama.StringEncoder("foo")) + } + + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(serviceConfig.Logger.Sugar()). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader("job1-topic", 0, broker.BrokerID()), + "OffsetRequest": sarama.NewMockOffsetResponse(serviceConfig.Logger.Sugar()). + SetOffset("job1-topic", 0, sarama.OffsetOldest, 0). + SetOffset("job1-topic", 0, sarama.OffsetNewest, 2345), + "FetchRequest": mockFetchResponse, + "JoinGroupRequest": sarama.NewMockConsumerMetadataResponse(serviceConfig.Logger.Sugar()). + SetCoordinator("ares-subscriber_test_job1_dev01_streaming", broker), + "OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(serviceConfig.Logger.Sugar()), + }) + }) + + AfterEach(func() { + broker.Close() + }) + It("KafkaConsumer functions", func() { kc, err := NewKafkaConsumer(jobConfigs["job1"]["dev01"], serviceConfig) Ω(err).Should(BeNil()) @@ -83,31 +109,30 @@ var _ = Describe("KafkaConsumer", func() { closeCh := kc.Closed() Ω(closeCh).ShouldNot(BeNil()) - go kc.(*KafkaConsumer).startConsuming() - - topic := "topic" - msg := &kafkaConfluent.Message{ - TopicPartition: kafkaConfluent.TopicPartition{ - Topic: &topic, - Partition: int32(0), - Offset: 0, - }, - Value: []byte("value"), - Key: []byte("key"), + msg := sarama.ConsumerMessage{ + Topic: "job1-topic", + Partition: 0, + Value: []byte("foo")} + cgHandler := CGHandler{ + msgCounter: make(map[string]map[int32]tally.Counter), + msgByteCounter: make(map[string]map[int32]tally.Counter), + msgOffsetGauge: make(map[string]map[int32]tally.Gauge), + msgLagGauge: make(map[string]map[int32]tally.Gauge), } - msgCounter := map[string]map[int32]tally.Counter{ - "topic": make(map[int32]tally.Counter), - } - msgByteCounter := map[string]map[int32]tally.Counter{ - "topic": make(map[int32]tally.Counter), - } - msgOffsetGauge := map[string]map[int32]tally.Gauge{ - "topic": make(map[int32]tally.Gauge), - } - msgLagGauge := map[string]map[int32]tally.Gauge{ - "topic": make(map[int32]tally.Gauge), + + cgHandler.msgCounter["job1-topic"] = make(map[int32]tally.Counter) + cgHandler.msgByteCounter["job1-topic"] = make(map[int32]tally.Counter) + cgHandler.msgOffsetGauge["job1-topic"] = make(map[int32]tally.Gauge) + cgHandler.msgLagGauge["job1-topic"] = make(map[int32]tally.Gauge) + + kc.(*KafkaConsumer).processMsg(&msg, &cgHandler, 5632, nil) + + kafkaMsg := KafkaMessage{ + ConsumerMessage: &msg, + consumer: kc, + session: nil, } - kc.(*KafkaConsumer).processMsg(msg, msgCounter, msgByteCounter, msgOffsetGauge, msgLagGauge) + kc.CommitUpTo(&kafkaMsg) err = kc.(*KafkaConsumer).Close() Ω(err).Should(BeNil()) @@ -117,19 +142,17 @@ var _ = Describe("KafkaConsumer", func() { }) It("KafkaMessage functions", func() { - topic := "topic" message := &KafkaMessage{ - &kafkaConfluent.Message{ - TopicPartition: kafkaConfluent.TopicPartition{ - Topic: &topic, - Partition: int32(0), - Offset: 0, - }, - Value: []byte("value"), - Key: []byte("key"), + ConsumerMessage: &sarama.ConsumerMessage{ + Topic: "topic", + Partition: 0, + Offset: 0, + Value: []byte("value"), + Key: []byte("key"), }, - nil, - "kafka-cluster1", + consumer: nil, + clusterName: "kafka-cluster1", + session: nil, } key := message.Key() @@ -141,7 +164,7 @@ var _ = Describe("KafkaConsumer", func() { cluster := message.Cluster() Ω(cluster).Should(Equal("kafka-cluster1")) - topic = message.Topic() + topic := message.Topic() Ω(topic).Should(Equal("topic")) offset := message.Offset() @@ -151,12 +174,5 @@ var _ = Describe("KafkaConsumer", func() { Ω(partition).Should(Equal(int32(0))) message.Ack() - message.Consumer, _ = NewKafkaConsumer(jobConfigs["job1"]["dev01"], serviceConfig) - - message.Ack() - - message.Nack() - - message.Consumer.Close() }) }) diff --git a/subscriber/common/job/driver_test.go b/subscriber/common/job/driver_test.go index 42a58289..a8943d03 100644 --- a/subscriber/common/job/driver_test.go +++ b/subscriber/common/job/driver_test.go @@ -15,6 +15,7 @@ package job import ( + "github.com/Shopify/sarama" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/uber-go/tally" @@ -42,6 +43,7 @@ import ( ) var _ = Describe("driver", func() { + var broker *sarama.MockBroker serviceConfig := config.ServiceConfig{ Environment: utils.EnvironmentContext{ Deployment: "test", @@ -178,10 +180,31 @@ var _ = Describe("driver", func() { serviceConfig.ActiveAresClusters = map[string]config.SinkConfig{ "dev01": sinkConfig, } + + // kafka broker mock setup + broker = sarama.NewMockBrokerAddr(serviceConfig.Logger.Sugar(), 1, jobConfigs["job1"]["dev01"].StreamingConfig.KafkaBroker) + mockFetchResponse := sarama.NewMockFetchResponse(serviceConfig.Logger.Sugar(), 1) + for i := 0; i < 10; i++ { + mockFetchResponse.SetMessage("job1-topic", 0, int64(i+1234), sarama.StringEncoder("foo")) + } + + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(serviceConfig.Logger.Sugar()). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader("job1-topic", 0, broker.BrokerID()), + "OffsetRequest": sarama.NewMockOffsetResponse(serviceConfig.Logger.Sugar()). + SetOffset("job1-topic", 0, sarama.OffsetOldest, 0). + SetOffset("job1-topic", 0, sarama.OffsetNewest, 2345), + "FetchRequest": mockFetchResponse, + "JoinGroupRequest": sarama.NewMockConsumerMetadataResponse(serviceConfig.Logger.Sugar()). + SetCoordinator("ares-subscriber_test_job1_dev01_streaming", broker), + "OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(serviceConfig.Logger.Sugar()), + }) }) AfterEach(func() { testServer.Close() + broker.Close() }) It("NewDriver", func() { diff --git a/subscriber/common/job/streaming_processor_test.go b/subscriber/common/job/streaming_processor_test.go index 81f5c7bc..a375bff7 100644 --- a/subscriber/common/job/streaming_processor_test.go +++ b/subscriber/common/job/streaming_processor_test.go @@ -16,15 +16,7 @@ package job import ( "encoding/json" - "io/ioutil" - "net/http" - "net/http/httptest" - "os" - "regexp" - "strings" - "time" - - "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/Shopify/sarama" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/uber-go/tally" @@ -32,7 +24,7 @@ import ( "github.com/uber/aresdb/client/mocks" memCom "github.com/uber/aresdb/memstore/common" metaCom "github.com/uber/aresdb/metastore/common" - kafka2 "github.com/uber/aresdb/subscriber/common/consumer/kafka" + "github.com/uber/aresdb/subscriber/common/consumer/kafka" "github.com/uber/aresdb/subscriber/common/message" "github.com/uber/aresdb/subscriber/common/rules" "github.com/uber/aresdb/subscriber/common/sink" @@ -40,9 +32,17 @@ import ( "github.com/uber/aresdb/subscriber/config" "github.com/uber/aresdb/utils" "go.uber.org/zap" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "regexp" + "strings" + "time" ) var _ = Describe("streaming_processor", func() { + var broker *sarama.MockBroker serviceConfig := config.ServiceConfig{ Environment: utils.EnvironmentContext{ Deployment: "test", @@ -104,33 +104,24 @@ var _ = Describe("streaming_processor", func() { JobConfig: jobConfig, } - topic := "topic" - msg := &kafka2.KafkaMessage{ - Message: &kafka.Message{ - TopicPartition: kafka.TopicPartition{ - Topic: &topic, - Partition: int32(0), - Offset: 0, - }, - Value: []byte(`{"project": "ares-subscriber"}`), - Key: []byte("key"), + msg := &kafka.KafkaMessage{ + ConsumerMessage: &sarama.ConsumerMessage{ + Topic: "topic", + Partition: 0, + Offset: 0, + Value: []byte(`{"project": "ares-subscriber"}`), + Key: []byte("key"), }, - Consumer: nil, - ClusterName: "kafka-cluster1", } - errMsg := &kafka2.KafkaMessage{ - Message: &kafka.Message{ - TopicPartition: kafka.TopicPartition{ - Topic: &topic, - Partition: int32(0), - Offset: 0, - }, - Value: []byte(`{project: ares-subscriber}`), - Key: []byte("key"), + errMsg := &kafka.KafkaMessage{ + ConsumerMessage: &sarama.ConsumerMessage{ + Topic: "topic", + Partition: 0, + Offset: 0, + Value: []byte(`{project: ares-subscriber}`), + Key: []byte("key"), }, - Consumer: nil, - ClusterName: "kafka-cluster1", } var address string @@ -230,13 +221,34 @@ var _ = Describe("streaming_processor", func() { })) testServer.Start() address = testServer.Listener.Addr().String() + + // kafka broker mock setup + broker = sarama.NewMockBrokerAddr(serviceConfig.Logger.Sugar(), 1, jobConfigs["job1"]["dev01"].StreamingConfig.KafkaBroker) + mockFetchResponse := sarama.NewMockFetchResponse(serviceConfig.Logger.Sugar(), 1) + for i := 0; i < 10; i++ { + mockFetchResponse.SetMessage("job1-topic", 0, int64(i+1234), sarama.StringEncoder("foo")) + } + + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(serviceConfig.Logger.Sugar()). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader("job1-topic", 0, broker.BrokerID()), + "OffsetRequest": sarama.NewMockOffsetResponse(serviceConfig.Logger.Sugar()). + SetOffset("job1-topic", 0, sarama.OffsetOldest, 0). + SetOffset("job1-topic", 0, sarama.OffsetNewest, 2345), + "FetchRequest": mockFetchResponse, + "JoinGroupRequest": sarama.NewMockConsumerMetadataResponse(serviceConfig.Logger.Sugar()). + SetCoordinator("ares-subscriber_test_job1_dev01_streaming", broker), + "OffsetCommitRequest": sarama.NewMockOffsetCommitResponse(serviceConfig.Logger.Sugar()), + }) }) AfterEach(func() { testServer.Close() + broker.Close() }) It("NewStreamingProcessor", func() { - p, err := NewStreamingProcessor(1, jobConfig, nil, sink.NewAresDatabase, kafka2.NewKafkaConsumer, message.NewDefaultDecoder, + p, err := NewStreamingProcessor(1, jobConfig, nil, sink.NewAresDatabase, kafka.NewKafkaConsumer, message.NewDefaultDecoder, make(chan ProcessorError), make(chan int64), serviceConfig) Ω(p).ShouldNot(BeNil()) Ω(err).Should(BeNil()) @@ -248,7 +260,7 @@ var _ = Describe("streaming_processor", func() { serviceConfig.ActiveAresClusters = map[string]config.SinkConfig{ "dev01": sinkConfig, } - p, err = NewStreamingProcessor(1, jobConfig, nil, sink.NewAresDatabase, kafka2.NewKafkaConsumer, message.NewDefaultDecoder, + p, err = NewStreamingProcessor(1, jobConfig, nil, sink.NewAresDatabase, kafka.NewKafkaConsumer, message.NewDefaultDecoder, make(chan ProcessorError), make(chan int64), serviceConfig) Ω(p).ShouldNot(BeNil()) Ω(p.(*StreamingProcessor).highLevelConsumer).ShouldNot(BeNil()) @@ -318,7 +330,7 @@ var _ = Describe("streaming_processor", func() { go p.Run() p.Restart() - p.(*StreamingProcessor).highLevelConsumer.(*kafka2.KafkaConsumer).Close() + p.(*StreamingProcessor).highLevelConsumer.(*kafka.KafkaConsumer).Close() p.(*StreamingProcessor).reInitialize() go p.Run() diff --git a/subscriber/config/test/jobs/job1-local.json b/subscriber/config/test/jobs/job1-local.json index 60634548..4bf508ff 100644 --- a/subscriber/config/test/jobs/job1-local.json +++ b/subscriber/config/test/jobs/job1-local.json @@ -2,9 +2,9 @@ "job": "job1", "streamConfig": { "kafkaBroker": "localhost:17791", - "ChannelBufferSize": 256, - "maxPollIntervalMs": 300000, - "sessionTimeoutNs": 10000, + "channelBufferSize": 256, + "sessionTimeoutMs": 10000, + "reblanceTimeoutSec": 10, "kafkaClusterName": "kafka-cluster1", "kafkaClusterFile": "clusters.yaml", "topic": "job1-topic",