Skip to content

Commit

Permalink
*(ticdc): do not print password in cdc log (#9691) (#9725)
Browse files Browse the repository at this point in the history
close #9690
  • Loading branch information
ti-chi-bot committed Sep 11, 2023
1 parent 4547916 commit b49ddbe
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 28 deletions.
11 changes: 3 additions & 8 deletions cdc/api/v1/api.go
Expand Up @@ -304,13 +304,6 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

infoStr, err := info.Marshal()
if err != nil {
_ = c.Error(err)
return
}

o, err := h.capture.GetOwner()
if err != nil {
_ = c.Error(err)
Expand Down Expand Up @@ -338,7 +331,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr))
log.Info("Create changefeed successfully!",
zap.String("id", changefeedConfig.ID),
zap.String("changefeed", info.String()))
c.Status(http.StatusAccepted)
}

Expand Down
9 changes: 2 additions & 7 deletions cdc/api/v2/changefeed.go
Expand Up @@ -116,13 +116,8 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CAPath: cfg.CAPath,
CertAllowedCN: cfg.CertAllowedCN,
}
infoStr, err := info.Marshal()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
o, err := h.capture.GetOwner()
// cannot create changefeed if there are running lightning/restore tasks
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
Expand All @@ -147,7 +142,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {

log.Info("Create changefeed successfully!",
zap.String("id", info.ID),
zap.String("changefeed", infoStr))
zap.String("changefeed", info.String()))
c.JSON(http.StatusOK, toAPIModel(info,
info.StartTs, info.StartTs,
nil, true))
Expand Down
17 changes: 11 additions & 6 deletions cdc/model/changefeed.go
Expand Up @@ -46,6 +46,11 @@ type ChangeFeedID struct {
ID string
}

// String implements fmt.Stringer interface
func (c ChangeFeedID) String() string {
return c.Namespace + "/" + c.ID
}

// DefaultChangeFeedID returns `ChangeFeedID` with default namespace
func DefaultChangeFeedID(id string) ChangeFeedID {
return ChangeFeedID{
Expand Down Expand Up @@ -233,9 +238,9 @@ func (info *ChangeFeedInfo) String() (str string) {
return
}

clone.SinkURI, err = util.MaskSinkURI(clone.SinkURI)
if err != nil {
log.Error("failed to marshal changefeed info", zap.Error(err))
clone.SinkURI = util.MaskSensitiveDataInURI(clone.SinkURI)
if clone.Config != nil {
clone.Config.MaskSensitiveData()
}

str, err = clone.Marshal()
Expand Down Expand Up @@ -476,11 +481,11 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() {
}

func (info *ChangeFeedInfo) updateSinkURIAndConfigProtocol(uri *url.URL, newProtocol string, newQuery url.Values) {
oldRawQuery := uri.RawQuery
newRawQuery := newQuery.Encode()
maskedURI, _ := util.MaskSinkURI(uri.String())
log.Info("handle incompatible protocol from sink URI",
zap.String("oldUriQuery", oldRawQuery),
zap.String("fixedUriQuery", newQuery.Encode()))
zap.String("oldURI", maskedURI),
zap.String("newProtocol", newProtocol))

uri.RawQuery = newRawQuery
fixedSinkURI := uri.String()
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed.go
Expand Up @@ -615,7 +615,7 @@ LOOP2:
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.Stringer("info", c.state.Info))
zap.String("info", c.state.Info.String()))

return nil
}
Expand Down Expand Up @@ -688,7 +688,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Any("status", c.state.Status),
zap.Stringer("info", c.state.Info),
zap.String("info", c.state.Info.String()),
zap.Bool("isRemoved", c.isRemoved))
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/codec/avro/schema_registry.go
Expand Up @@ -368,7 +368,7 @@ func (m *schemaManager) ClearRegistry(ctx context.Context, topicName string) err
)
req, err := http.NewRequestWithContext(ctx, "DELETE", uri, nil)
if err != nil {
log.Error("Could not construct request for clearRegistry", zap.String("uri", uri))
log.Error("Could not construct request for clearRegistry", zap.Error(err))
return cerror.WrapError(cerror.ErrAvroSchemaAPIError, err)
}
req.Header.Add(
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mysql/mysql_syncpoint_store.go
Expand Up @@ -144,7 +144,7 @@ func newMySQLSyncPointStore(
return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection")
}

log.Info("Start mysql syncpoint sink")
log.Info("Start mysql syncpoint sink", zap.String("changefeed", id.String()))

return &mysqlSyncPointStore{
db: syncDB,
Expand Down Expand Up @@ -213,7 +213,7 @@ func (s *mysqlSyncPointStore) SinkSyncPoint(ctx context.Context,
var secondaryTs string
err = row.Scan(&secondaryTs)
if err != nil {
log.Info("sync table: get tidb_current_ts err")
log.Info("sync table: get tidb_current_ts err", zap.String("changefeed", id.String()))
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to write syncpoint table", zap.Error(err))
Expand Down
4 changes: 2 additions & 2 deletions cmd/kafka-consumer/main.go
Expand Up @@ -868,7 +868,7 @@ func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partiti
func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Error("open db failed", zap.String("dsn", dsn), zap.Error(err))
log.Error("open db failed", zap.Error(err))
return nil, errors.Trace(err)
}

Expand All @@ -879,7 +879,7 @@ func openDB(ctx context.Context, dsn string) (*sql.DB, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err = db.PingContext(ctx); err != nil {
log.Error("ping db failed", zap.String("dsn", dsn), zap.Error(err))
log.Error("ping db failed", zap.Error(err))
return nil, errors.Trace(err)
}
log.Info("open db success", zap.String("dsn", dsn))
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/consistent.go
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/util"
)

// ConsistentConfig represents replication consistency config for a changefeed.
Expand Down Expand Up @@ -56,3 +57,8 @@ func (c *ConsistentConfig) ValidateAndAdjust() error {
}
return redo.ValidateStorage(uri)
}

// MaskSensitiveData masks sensitive data in ConsistentConfig
func (c *ConsistentConfig) MaskSensitiveData() {
c.Storage = util.MaskSensitiveDataInURI(c.Storage)
}
10 changes: 10 additions & 0 deletions pkg/config/replica_config.go
Expand Up @@ -279,3 +279,13 @@ func (c *ReplicaConfig) AdjustEnableOldValueAndVerifyForceReplicate(sinkURI *url

return nil
}

// MaskSensitiveData masks sensitive data in ReplicaConfig
func (c *ReplicaConfig) MaskSensitiveData() {
if c.Sink != nil {
c.Sink.MaskSensitiveData()
}
if c.Consistent != nil {
c.Consistent.MaskSensitiveData()
}
}
25 changes: 25 additions & 0 deletions pkg/config/replica_config_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -253,3 +254,27 @@ func TestAdjustEnableOldValueAndVerifyForceReplicate(t *testing.T) {
require.NoError(t, err)
require.False(t, config.EnableOldValue)
}

func TestMaskSensitiveData(t *testing.T) {
config := ReplicaConfig{
Sink: nil,
Consistent: nil,
}
config.MaskSensitiveData()
require.Nil(t, config.Sink)
require.Nil(t, config.Consistent)
config.Sink = &SinkConfig{}
config.Sink.KafkaConfig = &KafkaConfig{
SASLOAuthTokenURL: aws.String("http://abc.com?password=bacd"),
SASLOAuthClientSecret: aws.String("bacd"),
}
config.Sink.SchemaRegistry = "http://abc.com?password=bacd"
config.Consistent = &ConsistentConfig{
Storage: "http://abc.com?password=bacd",
}
config.MaskSensitiveData()
require.Equal(t, "http://abc.com?password=xxxxx", config.Sink.SchemaRegistry)
require.Equal(t, "http://abc.com?password=xxxxx", config.Consistent.Storage)
require.Equal(t, "http://abc.com?password=xxxxx", *config.Sink.KafkaConfig.SASLOAuthTokenURL)
require.Equal(t, "******", *config.Sink.KafkaConfig.SASLOAuthClientSecret)
}
29 changes: 29 additions & 0 deletions pkg/config/sink.go
Expand Up @@ -18,10 +18,12 @@ import (
"net/url"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/pingcap/errors"
"github.com/pingcap/log"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -154,6 +156,16 @@ type KafkaConfig struct {
LargeMessageHandle *LargeMessageHandleConfig `toml:"large-message-handle" json:"large-message-handle,omitempty"`
}

// MaskSensitiveData masks sensitive data in SinkConfig
func (s *SinkConfig) MaskSensitiveData() {
if s.SchemaRegistry != "" {
s.SchemaRegistry = util.MaskSensitiveDataInURI(s.SchemaRegistry)
}
if s.KafkaConfig != nil {
s.KafkaConfig.MaskSensitiveData()
}
}

// CSVConfig defines a series of configuration items for csv codec.
type CSVConfig struct {
// delimiter between fields
Expand Down Expand Up @@ -273,6 +285,23 @@ type ColumnSelector struct {
Columns []string `toml:"columns" json:"columns"`
}

// CodecConfig represents a MQ codec configuration
type CodecConfig struct {
EnableTiDBExtension *bool `toml:"enable-tidb-extension" json:"enable-tidb-extension,omitempty"`
MaxBatchSize *int `toml:"max-batch-size" json:"max-batch-size,omitempty"`
AvroEnableWatermark *bool `toml:"avro-enable-watermark" json:"avro-enable-watermark"`
AvroDecimalHandlingMode *string `toml:"avro-decimal-handling-mode" json:"avro-decimal-handling-mode,omitempty"`
AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"`
}

// MaskSensitiveData masks sensitive data in KafkaConfig
func (k *KafkaConfig) MaskSensitiveData() {
k.SASLOAuthClientSecret = aws.String("******")
if k.SASLOAuthTokenURL != nil {
k.SASLOAuthTokenURL = aws.String(util.MaskSensitiveDataInURI(*k.SASLOAuthTokenURL))
}
}

func (s *SinkConfig) validateAndAdjust(sinkURI *url.URL) error {
if err := s.validateAndAdjustSinkURI(sinkURI); err != nil {
return err
Expand Down
29 changes: 29 additions & 0 deletions pkg/util/uri.go
Expand Up @@ -80,3 +80,32 @@ func MaskSinkURI(uri string) (string, error) {
}
return uriParsed.Redacted(), nil
}

var sensitiveQueryParameterNames = []string{
"password",
"sasl-password",
"access-key",
"secret-access-key",
"access_token",
"token",
"secret",
"passwd",
"pwd",
}

// MaskSensitiveDataInURI returns an uri that sensitive infos has been masked.
func MaskSensitiveDataInURI(uri string) string {
uriParsed, err := url.Parse(uri)
if err != nil {
log.Error("failed to parse URI", zap.Error(err))
return ""
}
queries := uriParsed.Query()
for _, secretKey := range sensitiveQueryParameterNames {
if queries.Has(secretKey) {
queries.Set(secretKey, "xxxxx")
}
}
uriParsed.RawQuery = queries.Encode()
return uriParsed.Redacted()
}
34 changes: 34 additions & 0 deletions pkg/util/uri_test.go
Expand Up @@ -88,3 +88,37 @@ func TestMaskSinkURI(t *testing.T) {
require.Equal(t, tt.masked, maskedURI)
}
}

func TestMaskSensitiveDataInURI(t *testing.T) {
tests := []struct {
uri string
masked string
}{
{
"mysql://root:123456@127.0.0.1:3306/?time-zone=c",
"mysql://root:xxxxx@127.0.0.1:3306/?time-zone=c",
},
{
"",
"",
},
{
"abc",
"abc",
},
}
for _, q := range sensitiveQueryParameterNames {
tests = append(tests, struct {
uri string
masked string
}{
"kafka://127.0.0.1:9093/cdc?" + q + "=verysecure",
"kafka://127.0.0.1:9093/cdc?" + q + "=xxxxx",
})
}

for _, tt := range tests {
maskedURI := MaskSensitiveDataInURI(tt.uri)
require.Equal(t, tt.masked, maskedURI)
}
}

0 comments on commit b49ddbe

Please sign in to comment.