Skip to content

Commit

Permalink
cmd (ticdc): a better Kafka consumer (#10990)
Browse files Browse the repository at this point in the history
close #10989
  • Loading branch information
wk989898 committed Jun 19, 2024
1 parent dba5af5 commit 11ae941
Show file tree
Hide file tree
Showing 9 changed files with 1,010 additions and 861 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ endif
# ref: https://github.com/pingcap/tidb/pull/39526#issuecomment-1407952955
OS := "$(shell go env GOOS)"
SED_IN_PLACE ?= $(shell which sed)
IS_ALPINE := $(shell grep -qi Alpine /etc/os-release && echo 1)
ifeq (${OS}, "linux")
CGO := 0
SED_IN_PLACE += -i
Expand All @@ -65,7 +66,13 @@ ifeq ("${ENABLE_FIPS}", "1")
GOEXPERIMENT = GOEXPERIMENT=boringcrypto
CGO = 1
endif

CONSUMER_BUILD_FLAG=
ifeq ("${IS_ALPINE}", "1")
CONSUMER_BUILD_FLAG = -tags musl
endif
GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=$(CGO) $(GO) build $(BUILD_FLAG) -trimpath $(GOVENDORFLAG)
CONSUMER_GOBUILD := $(GOEXPERIMENT) CGO_ENABLED=1 $(GO) build $(CONSUMER_BUILD_FLAG) -trimpath $(GOVENDORFLAG)
GOBUILDNOVENDOR := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG) -trimpath
GOTEST := CGO_ENABLED=1 $(GO) test -p $(P) --race --tags=intest
GOTESTNORACE := CGO_ENABLED=1 $(GO) test -p $(P)
Expand Down Expand Up @@ -165,7 +172,7 @@ cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/cdc

kafka_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer/main.go
$(CONSUMER_GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer

storage_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go
Expand Down
144 changes: 144 additions & 0 deletions cmd/kafka-consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"strings"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/errors"
"go.uber.org/zap"
)

func getPartitionNum(o *option) (int32, error) {
configMap := &kafka.ConfigMap{
"bootstrap.servers": strings.Join(o.address, ","),
}
if len(o.ca) != 0 {
_ = configMap.SetKey("security.protocol", "SSL")
_ = configMap.SetKey("ssl.ca.location", o.ca)
_ = configMap.SetKey("ssl.key.location", o.key)
_ = configMap.SetKey("ssl.certificate.location", o.cert)
}
admin, err := kafka.NewAdminClient(configMap)
if err != nil {
return 0, errors.Trace(err)
}
defer admin.Close()

timeout := 3000
for i := 0; i <= o.retryTime; i++ {
resp, err := admin.GetMetadata(&o.topic, false, timeout)
if err != nil {
if err.(kafka.Error).Code() == kafka.ErrTransport {
log.Info("retry get partition number", zap.Int("retryTime", i), zap.Int("timeout", timeout))
timeout += 100
continue
}
return 0, errors.Trace(err)
}
if topicDetail, ok := resp.Topics[o.topic]; ok {
numPartitions := int32(len(topicDetail.Partitions))
log.Info("get partition number of topic",
zap.String("topic", o.topic),
zap.Int32("partitionNum", numPartitions))
return numPartitions, nil
}
log.Info("retry get partition number", zap.String("topic", o.topic))
time.Sleep(1 * time.Second)
}
return 0, errors.Errorf("get partition number(%s) timeout", o.topic)
}

type consumer struct {
client *kafka.Consumer
writer *writer
}

// newConsumer will create a consumer client.
func newConsumer(ctx context.Context, o *option) *consumer {
partitionNum, err := getPartitionNum(o)
if err != nil {
log.Panic("cannot get the partition number", zap.String("topic", o.topic), zap.Error(err))
}
if o.partitionNum == 0 {
o.partitionNum = partitionNum
}
topics := strings.Split(o.topic, ",")
if len(topics) == 0 {
log.Panic("no topic provided for the consumer")
}
configMap := &kafka.ConfigMap{
"bootstrap.servers": strings.Join(o.address, ","),
"group.id": o.groupID,
// Start reading from the first message of each assigned
// partition if there are no previously committed offsets
// for this group.
"auto.offset.reset": "earliest",
// Whether we store offsets automatically.
"enable.auto.offset.store": false,
"enable.auto.commit": false,
}
if len(o.ca) != 0 {
_ = configMap.SetKey("security.protocol", "SSL")
_ = configMap.SetKey("ssl.ca.location", o.ca)
_ = configMap.SetKey("ssl.key.location", o.key)
_ = configMap.SetKey("ssl.certificate.location", o.cert)
}
client, err := kafka.NewConsumer(configMap)
if err != nil {
log.Panic("create kafka consumer failed", zap.Error(err))
}
err = client.SubscribeTopics(topics, nil)
if err != nil {
log.Panic("subscribe topics failed", zap.Error(err))
}
return &consumer{
writer: newWriter(ctx, o),
client: client,
}
}

// Consume will read message from Kafka.
func (c *consumer) Consume(ctx context.Context) {
defer func() {
if err := c.client.Close(); err != nil {
log.Panic("close kafka consumer failed", zap.Error(err))
}
}()
for {
msg, err := c.client.ReadMessage(-1)
if err != nil {
if err.(kafka.Error).IsRetriable() {
log.Error("read message failed, just continue to retry", zap.Error(err))
continue
}
log.Panic("read message failed", zap.Error(err))
}
needCommit := c.writer.WriteMessage(ctx, msg)
if !needCommit {
continue
}
if _, err = c.client.CommitMessage(msg); err != nil {
if err.(kafka.Error).IsRetriable() {
log.Error("commit failed, just continue and retry", zap.Error(err))
continue
}
log.Panic("commit message failed", zap.Error(err))
}
}
}
74 changes: 74 additions & 0 deletions cmd/kafka-consumer/event_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"sort"

"github.com/pingcap/tiflow/cdc/model"
)

const (
defaultMaxBufferedBytes = 100 * 1024 * 1024
defaultMaxBufferedCount = 500
)

// EventsGroup could store change event message.
type eventsGroup struct {
events []*model.RowChangedEvent
bytes int
}

// NewEventsGroup will create new event group.
func NewEventsGroup() *eventsGroup {
return &eventsGroup{
events: make([]*model.RowChangedEvent, 0),
}
}

// Append will append an event to event groups.
func (g *eventsGroup) Append(e *model.RowChangedEvent) {
g.events = append(g.events, e)
g.bytes += e.ApproximateBytes()
}

// ShouldFlushEvents return true if buffered events too much, to reduce memory usage.
func (g *eventsGroup) ShouldFlushEvents() bool {
return g.bytes >= defaultMaxBufferedBytes || len(g.events) >= defaultMaxBufferedCount
}

// Resolve will get events where CommitTs is less than resolveTs.
func (g *eventsGroup) Resolve(resolveTs uint64) *eventsGroup {
sort.Slice(g.events, func(i, j int) bool {
return g.events[i].CommitTs < g.events[j].CommitTs
})

i := sort.Search(len(g.events), func(i int) bool {
return g.events[i].CommitTs > resolveTs
})

result := &eventsGroup{
events: g.events[:i],
}
var bytes int
for _, e := range result.events {
bytes += e.ApproximateBytes()
}
result.bytes = bytes

g.events = g.events[i:]
g.bytes = g.bytes - bytes

return result
}
Loading

0 comments on commit 11ae941

Please sign in to comment.