Skip to content

Commit

Permalink
kafka sink: fix data race in kafka sink (#565)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro committed May 13, 2020
1 parent 19bbdec commit 99f01ac
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion cdc/sink/mqProducer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
return k, nil
}

func init() {
sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB
}

// NewSaramaConfig return the default config and set the according version and metrics
func newSaramaConfig(ctx context.Context, c KafkaConfig) (*sarama.Config, error) {
config := sarama.NewConfig()
Expand All @@ -255,7 +259,6 @@ func newSaramaConfig(ctx context.Context, c KafkaConfig) (*sarama.Config, error)

config.ClientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureID, changefeedID)
config.Version = version
sarama.MaxRequestSize = int32(c.MaxMessageBytes)
config.Producer.Flush.MaxMessages = c.MaxMessageBytes
config.Metadata.Retry.Max = 20
config.Metadata.Retry.Backoff = 500 * time.Millisecond
Expand Down

0 comments on commit 99f01ac

Please sign in to comment.