Skip to content

Commit

Permalink
kafka(ticdc): open-protocol do not build previous messages to fix cal…
Browse files Browse the repository at this point in the history
…lback mismatch (#9778)

close #9775
  • Loading branch information
3AceShowHand committed Sep 25, 2023
1 parent 32f07ef commit 4c02e2b
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 197 deletions.
106 changes: 53 additions & 53 deletions metrics/grafana/ticdc.json

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions pkg/sink/codec/canal/canal_json_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/goccy/go-json"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -138,9 +137,6 @@ func (b *batchDecoder) HasNext() (model.MessageType, bool, error) {
}

func (b *batchDecoder) assembleClaimCheckRowChangedEvent(ctx context.Context, claimCheckLocation string) (*model.RowChangedEvent, error) {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

_, claimCheckFileName := filepath.Split(claimCheckLocation)
data, err := b.storage.ReadFile(ctx, claimCheckFileName)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/codec/canal/canal_json_row_event_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (c *JSONRowEventEncoder) AppendRowChangedEvent(

if c.config.LargeMessageHandle.EnableClaimCheck() {
claimCheckFileName := claimcheck.NewFileName()
if err := c.claimCheck.WriteMessage(ctx, m, claimCheckFileName); err != nil {
if err := c.claimCheck.WriteMessage(ctx, m.Key, m.Value, claimCheckFileName); err != nil {
return errors.Trace(err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sink/codec/open/open_protocol_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@ func (b *BatchDecoder) NextRowChangedEvent() (*model.RowChangedEvent, error) {
return nil, cerror.ErrOpenProtocolCodecInvalidData.GenWithStack("not found row event message")
}

event := b.nextEvent
ctx := context.Background()
// claim-check message found
if b.nextKey.ClaimCheckLocation != "" {
return b.assembleEventFromClaimCheckStorage(ctx)
}

event := b.nextEvent
if b.nextKey.OnlyHandleKey {
var err error
event, err = b.assembleHandleKeyOnlyEvent(ctx, event)
Expand Down Expand Up @@ -310,6 +310,7 @@ func (b *BatchDecoder) assembleHandleKeyOnlyEvent(

func (b *BatchDecoder) assembleEventFromClaimCheckStorage(ctx context.Context) (*model.RowChangedEvent, error) {
_, claimCheckFileName := filepath.Split(b.nextKey.ClaimCheckLocation)
b.nextKey = nil
data, err := b.storage.ReadFile(ctx, claimCheckFileName)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -346,7 +347,6 @@ func (b *BatchDecoder) assembleEventFromClaimCheckStorage(ctx context.Context) (
}

event := msgToRowChange(msgKey, rowMsg)
b.nextKey = nil

return event, nil
}
116 changes: 46 additions & 70 deletions pkg/sink/codec/open/open_protocol_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,28 @@ func (d *BatchEncoder) AppendRowChangedEvent(
return cerror.ErrMessageTooLarge.GenWithStackByArgs()
}

// single message too large, claim check enabled, encode it to a new individual message.
if d.config.LargeMessageHandle.EnableClaimCheck() {
// build previous batched messages
d.tryBuildCallback()
if err := d.appendSingleLargeMessage4ClaimCheck(ctx, key, value, e, callback); err != nil {
// send the large message to the external storage first, then
// create a new message contains the reference of the large message.
claimCheckFileName := claimcheck.NewFileName()
m := newMessage(key, value)
err = d.claimCheck.WriteMessage(ctx, m.Key, m.Value, claimCheckFileName)
if err != nil {
return errors.Trace(err)
}

key, value, err = d.newClaimCheckLocationMessage(e, claimCheckFileName)
if err != nil {
return errors.Trace(err)
}
return nil
}

// it's must that `LargeMessageHandle == LargeMessageHandleOnlyHandleKeyColumns` here.
key, value, err = d.buildMessageOnlyHandleKeyColumns(e)
if err != nil {
return errors.Trace(err)
if d.config.LargeMessageHandle.HandleKeyOnly() {
// it's must that `LargeMessageHandle == LargeMessageHandleOnlyHandleKeyColumns` here.
key, value, err = d.buildMessageOnlyHandleKeyColumns(e)
if err != nil {
return errors.Trace(err)
}
}
}

Expand Down Expand Up @@ -172,6 +180,26 @@ func (d *BatchEncoder) AppendRowChangedEvent(
return nil
}

func newMessage(key, value []byte) *common.Message {
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, codec.BatchVersion1)
message := common.NewMsg(config.ProtocolOpen, versionHead, nil, 0, model.MessageTypeRow, nil, nil)

var (
keyLenByte [8]byte
valueLenByte [8]byte
)
binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key)))
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

message.Key = append(message.Key, keyLenByte[:]...)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, valueLenByte[:]...)
message.Value = append(message.Value, value...)

return message
}

// EncodeDDLEvent implements the RowEventEncoder interface
func (d *BatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error) {
keyMsg, valueMsg := ddlEventToMsg(e)
Expand Down Expand Up @@ -262,31 +290,30 @@ func (d *BatchEncoder) tryBuildCallback() {

// NewClaimCheckLocationMessage implement the ClaimCheckLocationEncoder interface.
func (d *BatchEncoder) newClaimCheckLocationMessage(
event *model.RowChangedEvent, callback func(), fileName string,
) (*common.Message, error) {
event *model.RowChangedEvent, fileName string,
) ([]byte, []byte, error) {
keyMsg, valueMsg, err := rowChangeToMsg(event, d.config, true)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}

keyMsg.OnlyHandleKey = false
claimCheckLocation := d.claimCheck.FileNameWithPrefix(fileName)
keyMsg.ClaimCheckLocation = claimCheckLocation
keyMsg.ClaimCheckLocation = d.claimCheck.FileNameWithPrefix(fileName)
key, err := keyMsg.Encode()
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}

value, err := valueMsg.encode()
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}

value, err = common.Compress(
d.config.ChangefeedID, d.config.LargeMessageHandle.LargeMessageHandleCompression, value,
)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}

// for single message that is longer than max-message-bytes
Expand All @@ -298,61 +325,10 @@ func (d *BatchEncoder) newClaimCheckLocationMessage(
zap.Int("maxMessageBytes", d.config.MaxMessageBytes),
zap.Int("length", length),
zap.Any("key", key))
return nil, cerror.ErrMessageTooLarge.GenWithStackByArgs()
}

message := newMessage(key, value)
message.Ts = event.CommitTs
message.Schema = &event.Table.Schema
message.Table = &event.Table.Table
message.IncRowsCount()
if callback != nil {
message.Callback = callback
}
return message, nil
}

func (d *BatchEncoder) appendSingleLargeMessage4ClaimCheck(
ctx context.Context, key, value []byte, e *model.RowChangedEvent, callback func(),
) error {
message := newMessage(key, value)
message.Ts = e.CommitTs
message.Schema = &e.Table.Schema
message.Table = &e.Table.Table
message.IncRowsCount()

claimCheckFileName := claimcheck.NewFileName()
if err := d.claimCheck.WriteMessage(ctx, message, claimCheckFileName); err != nil {
return errors.Trace(err)
}

message, err := d.newClaimCheckLocationMessage(e, callback, claimCheckFileName)
if err != nil {
return errors.Trace(err)
return nil, nil, cerror.ErrMessageTooLarge.GenWithStackByArgs()
}
d.messageBuf = append(d.messageBuf, message)

return nil
}

func newMessage(key, value []byte) *common.Message {
versionHead := make([]byte, 8)
binary.BigEndian.PutUint64(versionHead, codec.BatchVersion1)
message := common.NewMsg(config.ProtocolOpen, versionHead, nil, 0, model.MessageTypeRow, nil, nil)

var (
keyLenByte [8]byte
valueLenByte [8]byte
)
binary.BigEndian.PutUint64(keyLenByte[:], uint64(len(key)))
binary.BigEndian.PutUint64(valueLenByte[:], uint64(len(value)))

message.Key = append(message.Key, keyLenByte[:]...)
message.Key = append(message.Key, key...)
message.Value = append(message.Value, valueLenByte[:]...)
message.Value = append(message.Value, value...)

return message
return key, value, nil
}

type batchEncoderBuilder struct {
Expand Down
5 changes: 1 addition & 4 deletions pkg/sink/codec/open/open_protocol_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,8 @@ func TestE2EClaimCheckMessage(t *testing.T) {
err = encoder.AppendRowChangedEvent(ctx, topic, largeTestEvent, func() {})
require.NoError(t, err)

err = encoder.AppendRowChangedEvent(ctx, topic, testEvent, func() {})
require.NoError(t, err)

messages := encoder.Build()
require.Len(t, messages, 3)
require.Len(t, messages, 2)

claimCheckLocationMessage := messages[1]
decoder, err := NewBatchDecoder(ctx, codecConfig, nil)
Expand Down
1 change: 0 additions & 1 deletion pkg/sink/codec/open/open_protocol_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ func rowChangeToMsg(
if config.OnlyOutputUpdatedColumns {
value.dropNotUpdatedColumns()
}

} else {
value.Update = rowChangeColumns2CodecColumns(e.Columns, largeMessageOnlyHandleKeyColumns)
if largeMessageOnlyHandleKeyColumns && len(value.Update) == 0 {
Expand Down
6 changes: 3 additions & 3 deletions pkg/sink/kafka/claimcheck/claim_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ func New(ctx context.Context, storageURI string, changefeedID model.ChangeFeedID
}

// WriteMessage write message to the claim check external storage.
func (c *ClaimCheck) WriteMessage(ctx context.Context, message *common.Message, fileName string) error {
func (c *ClaimCheck) WriteMessage(ctx context.Context, key, value []byte, fileName string) error {
m := common.ClaimCheckMessage{
Key: message.Key,
Value: message.Value,
Key: key,
Value: value,
}
data, err := json.Marshal(m)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sink/kafka/cluster_admin_client_mock_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// default to 1048576, identical to kafka broker's `message.max.bytes` and topic's `max.message.bytes`
// see: https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes
// see: https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes
defaultMaxMessageBytes = "1048576"
defaultMaxMessageBytes = "1048588"

// defaultMinInsyncReplicas specifies the default `min.insync.replicas` for broker and topic.
defaultMinInsyncReplicas = "1"
Expand Down
31 changes: 25 additions & 6 deletions pkg/sink/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ import (
const (
// defaultPartitionNum specifies the default number of partitions when we create the topic.
defaultPartitionNum = 3

// the `max-message-bytes` is set equal to topic's `max.message.bytes`, and is used to check
// whether the message is larger than the max size limit. It's found some message pass the message
// size limit check at the client side and failed at the broker side since message enlarged during
// the network transmission. so we set the `max-message-bytes` to a smaller value to avoid this problem.
// maxMessageBytesOverhead is used to reduce the `max-message-bytes`.
maxMessageBytesOverhead = 128
)

const (
Expand Down Expand Up @@ -596,12 +603,18 @@ func AdjustOptions(
return errors.Trace(err)
}

if topicMaxMessageBytes < options.MaxMessageBytes {
maxMessageBytes := topicMaxMessageBytes - maxMessageBytesOverhead
if topicMaxMessageBytes <= options.MaxMessageBytes {
log.Warn("topic's `max.message.bytes` less than the `max-message-bytes`,"+
"use topic's `max.message.bytes` to initialize the Kafka producer",
zap.Int("max.message.bytes", topicMaxMessageBytes),
zap.Int("max-message-bytes", options.MaxMessageBytes))
options.MaxMessageBytes = topicMaxMessageBytes
zap.Int("max-message-bytes", options.MaxMessageBytes),
zap.Int("real-max-message-bytes", maxMessageBytes))
options.MaxMessageBytes = maxMessageBytes
} else {
if maxMessageBytes < options.MaxMessageBytes {
options.MaxMessageBytes = maxMessageBytes
}
}

// no need to create the topic,
Expand Down Expand Up @@ -635,12 +648,18 @@ func AdjustOptions(
// it would use broker's `message.max.bytes` to set topic's `max.message.bytes`.
// TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than
// broker's `message.max.bytes`.
if brokerMessageMaxBytes < options.MaxMessageBytes {
maxMessageBytes := brokerMessageMaxBytes - maxMessageBytesOverhead
if brokerMessageMaxBytes <= options.MaxMessageBytes {
log.Warn("broker's `message.max.bytes` less than the `max-message-bytes`,"+
"use broker's `message.max.bytes` to initialize the Kafka producer",
zap.Int("message.max.bytes", brokerMessageMaxBytes),
zap.Int("max-message-bytes", options.MaxMessageBytes))
options.MaxMessageBytes = brokerMessageMaxBytes
zap.Int("max-message-bytes", options.MaxMessageBytes),
zap.Int("real-max-message-bytes", maxMessageBytes))
options.MaxMessageBytes = maxMessageBytes
} else {
if maxMessageBytes < options.MaxMessageBytes {
options.MaxMessageBytes = maxMessageBytes
}
}

// topic not exists yet, and user does not specify the `partition-num` in the sink uri.
Expand Down
Loading

0 comments on commit 4c02e2b

Please sign in to comment.