diff --git a/pkg/sqlreplay/cmd/audit_log_plugin.go b/pkg/sqlreplay/cmd/audit_log_plugin.go index fd4f45bd..707d44ec 100644 --- a/pkg/sqlreplay/cmd/audit_log_plugin.go +++ b/pkg/sqlreplay/cmd/audit_log_plugin.go @@ -147,6 +147,8 @@ func (decoder *AuditLogPluginDecoder) Decode(reader LineReader) (*Command, error cmd.Success = true cmd.ConnID = connID cmd.StartTs = startTs + cmd.FileName = filename + cmd.Line = lineIdx } if len(cmds) > 1 { decoder.pendingCmds = cmds[1:] diff --git a/pkg/sqlreplay/cmd/audit_log_plugin_test.go b/pkg/sqlreplay/cmd/audit_log_plugin_test.go index 00f37f40..8686bb8f 100644 --- a/pkg/sqlreplay/cmd/audit_log_plugin_test.go +++ b/pkg/sqlreplay/cmd/audit_log_plugin_test.go @@ -636,7 +636,7 @@ func TestDecodeSingleLine(t *testing.T) { for i, test := range tests { decoder := NewAuditLogPluginDecoder() decoder.SetPSCloseStrategy(PSCloseStrategyAlways) - mr := mockReader{data: append([]byte(test.line), '\n')} + mr := mockReader{data: append([]byte(test.line), '\n'), filename: "my/file"} cmds := make([]*Command, 0, 4) var err error for { @@ -654,6 +654,10 @@ func TestDecodeSingleLine(t *testing.T) { } else { require.ErrorIs(t, err, io.EOF, "case %d", i) } + for _, cmd := range test.cmds { + cmd.FileName = "my/file" + cmd.Line = 1 + } require.Equal(t, test.cmds, cmds, "case %d", i) } } @@ -675,6 +679,7 @@ func TestDecodeMultiLines(t *testing.T) { Type: pnet.ComQuery, Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("set sql_mode=''")...), StmtType: "Set", + Line: 1, Success: true, }, { @@ -682,6 +687,7 @@ func TestDecodeMultiLines(t *testing.T) { ConnID: 3695181836, StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Line: 3, Success: true, }, { @@ -690,6 +696,7 @@ func TestDecodeMultiLines(t *testing.T) { Type: pnet.ComQuery, Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), StmtType: "Select", + Line: 3, Success: true, }, }, @@ -705,6 +712,7 @@ func TestDecodeMultiLines(t *testing.T) { ConnID: 3695181836, StartTs: time.Date(2025, 9, 6, 16, 16, 29, 583942167, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Line: 1, Success: true, }, { @@ -713,6 +721,7 @@ func TestDecodeMultiLines(t *testing.T) { Type: pnet.ComQuery, Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("set sql_mode=''")...), StmtType: "Set", + Line: 1, Success: true, }, { @@ -721,6 +730,7 @@ func TestDecodeMultiLines(t *testing.T) { Type: pnet.ComQuery, Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), StmtType: "Select", + Line: 3, Success: true, }, }, @@ -735,6 +745,7 @@ func TestDecodeMultiLines(t *testing.T) { ConnID: 3552575570, Payload: []byte{pnet.ComQuit.Byte()}, Type: pnet.ComQuit, + Line: 2, Success: true, }, }, @@ -749,6 +760,7 @@ func TestDecodeMultiLines(t *testing.T) { ConnID: 3695181836, StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Line: 1, Success: true, }, { @@ -758,6 +770,7 @@ func TestDecodeMultiLines(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("select \"?\"")...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -767,6 +780,7 @@ func TestDecodeMultiLines(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 1, 0, 0, 0, 0, 0, 0, 0}...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -776,6 +790,7 @@ func TestDecodeMultiLines(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -785,6 +800,7 @@ func TestDecodeMultiLines(t *testing.T) { CapturedPsID: 2, Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("select \"?\"")...), StmtType: "Select", + Line: 2, Success: true, }, { @@ -794,6 +810,7 @@ func TestDecodeMultiLines(t *testing.T) { CapturedPsID: 2, Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{2, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 1, 0, 0, 0, 0, 0, 0, 0}...), StmtType: "Select", + Line: 2, Success: true, }, { @@ -803,6 +820,7 @@ func TestDecodeMultiLines(t *testing.T) { CapturedPsID: 2, Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{2, 0, 0, 0}...), StmtType: "Select", + Line: 2, Success: true, }, }, @@ -817,6 +835,7 @@ func TestDecodeMultiLines(t *testing.T) { ConnID: 3695181836, StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Line: 1, Success: true, }, { @@ -825,6 +844,7 @@ func TestDecodeMultiLines(t *testing.T) { StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -832,6 +852,7 @@ func TestDecodeMultiLines(t *testing.T) { ConnID: 3695181837, StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Line: 2, Success: true, }, { @@ -840,6 +861,7 @@ func TestDecodeMultiLines(t *testing.T) { StartTs: time.Date(2025, 9, 6, 17, 3, 53, 718663917, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), StmtType: "Select", + Line: 2, Success: true, }, }, @@ -849,7 +871,7 @@ func TestDecodeMultiLines(t *testing.T) { for i, test := range tests { decoder := NewAuditLogPluginDecoder() decoder.SetPSCloseStrategy(PSCloseStrategyAlways) - mr := mockReader{data: append([]byte(test.lines), '\n')} + mr := mockReader{data: append([]byte(test.lines), '\n'), filename: "my/file"} cmds := make([]*Command, 0, len(test.cmds)) for { cmd, err := decoder.Decode(&mr) @@ -859,6 +881,9 @@ func TestDecodeMultiLines(t *testing.T) { } cmds = append(cmds, cmd) } + for _, cmd := range test.cmds { + cmd.FileName = "my/file" + } require.Equal(t, test.cmds, cmds, "case %d", i) } } @@ -879,6 +904,7 @@ func TestDecodeAuditLogWithCommandStartTime(t *testing.T) { ConnID: 3695181836, StartTs: time.Date(2025, 9, 14, 16, 16, 53, 718663917, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Line: 3, Success: true, }, { @@ -887,6 +913,7 @@ func TestDecodeAuditLogWithCommandStartTime(t *testing.T) { Type: pnet.ComQuery, Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), StmtType: "Select", + Line: 3, Success: true, }, }, @@ -902,6 +929,7 @@ func TestDecodeAuditLogWithCommandStartTime(t *testing.T) { ConnID: 3695181836, StartTs: time.Date(2025, 9, 14, 16, 16, 53, 718663917, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("b")...), + Line: 3, Success: true, }, { @@ -910,6 +938,7 @@ func TestDecodeAuditLogWithCommandStartTime(t *testing.T) { Type: pnet.ComQuery, Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select \"[=]\"")...), StmtType: "Select", + Line: 3, Success: true, }, }, @@ -965,6 +994,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { StartTs: time.Date(2025, 9, 18, 17, 48, 20, 613951140, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), CapturedPsID: 0, + Line: 1, Success: true, }, { @@ -974,6 +1004,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -983,6 +1014,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -992,6 +1024,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), StmtType: "Select", + Line: 2, Success: true, }, }, @@ -1008,6 +1041,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { StartTs: time.Date(2025, 9, 18, 17, 48, 20, 613951140, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), CapturedPsID: 0, + Line: 1, Success: true, }, { @@ -1017,6 +1051,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -1026,6 +1061,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -1035,6 +1071,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xf9, 0xe4, 0x01, 0, 0, 0, 0, 0}...), StmtType: "Select", + Line: 2, Success: true, }, { @@ -1044,6 +1081,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), StmtType: "Select", + Line: 3, Success: true, }, }, @@ -1060,6 +1098,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { StartTs: time.Date(2025, 9, 18, 17, 48, 20, 613951140, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), CapturedPsID: 0, + Line: 1, Success: true, }, { @@ -1069,6 +1108,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -1078,6 +1118,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -1087,6 +1128,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), StmtType: "Select", + Line: 2, Success: true, }, { @@ -1096,6 +1138,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...), StmtType: "Select", + Line: 3, Success: true, }, { @@ -1105,6 +1148,7 @@ func TestDecodeAuditLogInDirectedMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xf9, 0xe4, 0x01, 0, 0, 0, 0, 0}...), StmtType: "Select", + Line: 3, Success: true, }, }, @@ -1144,6 +1188,7 @@ func TestDecodeAuditLogInNeverMode(t *testing.T) { ConnID: 3807050215081378201, StartTs: time.Date(2025, 9, 18, 17, 48, 20, 613951140, time.FixedZone("", 8*3600+600)), Payload: append([]byte{pnet.ComInitDB.Byte()}, []byte("test")...), + Line: 1, Success: true, }, { @@ -1153,6 +1198,7 @@ func TestDecodeAuditLogInNeverMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT c FROM sbtest1 WHERE id=?")...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -1162,6 +1208,7 @@ func TestDecodeAuditLogInNeverMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...), StmtType: "Select", + Line: 1, Success: true, }, { @@ -1171,6 +1218,7 @@ func TestDecodeAuditLogInNeverMode(t *testing.T) { CapturedPsID: 1, Payload: append([]byte{pnet.ComStmtExecute.Byte()}, []byte{1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 8, 0, 0xe8, 0xaf, 0x07, 0, 0, 0, 0, 0}...), StmtType: "Select", + Line: 2, Success: true, }, }, diff --git a/pkg/sqlreplay/cmd/cmd.go b/pkg/sqlreplay/cmd/cmd.go index f50fd16b..eaedd178 100644 --- a/pkg/sqlreplay/cmd/cmd.go +++ b/pkg/sqlreplay/cmd/cmd.go @@ -63,6 +63,9 @@ type Command struct { StartTs time.Time ConnID uint64 Type pnet.Command + // The place in the traffic file, used to report. + FileName string + Line int // Logged only in audit log. StmtType string // Logged only in native log. diff --git a/pkg/sqlreplay/cmd/mock_test.go b/pkg/sqlreplay/cmd/mock_test.go index 535baf50..ca8ccdf1 100644 --- a/pkg/sqlreplay/cmd/mock_test.go +++ b/pkg/sqlreplay/cmd/mock_test.go @@ -9,8 +9,10 @@ import ( ) type mockReader struct { - data []byte - curIdx int + filename string + data []byte + curLine int + curIdx int } func (mr *mockReader) ReadLine() ([]byte, string, int, error) { @@ -24,7 +26,8 @@ func (mr *mockReader) ReadLine() ([]byte, string, int, error) { idx += mr.curIdx line := mr.data[mr.curIdx:idx] mr.curIdx = idx + 1 - return line, "", 0, nil + mr.curLine++ + return line, mr.filename, mr.curLine, nil } func (mr *mockReader) Read(data []byte) (string, int, error) { @@ -34,7 +37,7 @@ func (mr *mockReader) Read(data []byte) (string, int, error) { } copy(data, mr.data[mr.curIdx:mr.curIdx+n]) mr.curIdx += n - return "", 0, nil + return mr.filename, mr.curLine, nil } func (mr *mockReader) Close() { diff --git a/pkg/sqlreplay/cmd/native.go b/pkg/sqlreplay/cmd/native.go index 1b974d17..03e17eb8 100644 --- a/pkg/sqlreplay/cmd/native.go +++ b/pkg/sqlreplay/cmd/native.go @@ -167,6 +167,8 @@ func (rw *NativeDecoder) Decode(reader LineReader) (c *Command, err error) { if err = c.Validate(filename, lineIdx); err != nil { return nil, err } + c.FileName = filename + c.Line = lineIdx return c, nil } } diff --git a/pkg/sqlreplay/cmd/native_test.go b/pkg/sqlreplay/cmd/native_test.go index 108d98ba..3774faf7 100644 --- a/pkg/sqlreplay/cmd/native_test.go +++ b/pkg/sqlreplay/cmd/native_test.go @@ -148,6 +148,7 @@ select 2 ConnID: 100, Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("select 2")...), StartTs: time.Date(2024, 8, 28, 18, 51, 21, 477067000, time.FixedZone("", 8*3600+600)), + Line: 1, Success: true, }, }, @@ -158,10 +159,11 @@ select 2 for i, test := range tests { decoder := NewCmdDecoder(FormatNative) decoder.SetCommandStartTime(commandStartTime) - mr := mockReader{data: []byte(test.lines)} + mr := mockReader{data: []byte(test.lines), filename: "my/file"} for j, cmd := range test.cmds { newCmd, err := decoder.Decode(&mr) require.NoError(t, err, "case %d-%d", i, j) + cmd.FileName = "my/file" require.True(t, cmd.Equal(newCmd), "case %d-%d", i, j) } } diff --git a/pkg/sqlreplay/report/report_db.go b/pkg/sqlreplay/report/report_db.go index a85fe4c2..d39c3b37 100644 --- a/pkg/sqlreplay/report/report_db.go +++ b/pkg/sqlreplay/report/report_db.go @@ -102,7 +102,7 @@ func (rdb *reportDB) InsertExceptions(startTime time.Time, tp conn.ExceptionType sample := value.sample.(*conn.FailException) command := sample.Command() args = []any{startTime.String(), command.Type.String(), command.Digest(), command.QueryText(), sample.Error(), sample.ConnID(), - command.StartTs.String(), sample.Time().String(), value.count, value.count} + command.FileName, command.Line, command.StartTs.String(), sample.Time().String(), value.count, value.count} case conn.Other: sample := value.sample.(*conn.OtherException) args = []any{startTime.String(), sample.Key(), sample.Error(), sample.Time().String(), value.count, value.count} diff --git a/pkg/sqlreplay/report/report_db_test.go b/pkg/sqlreplay/report/report_db_test.go index 08a9b3f2..99448eed 100644 --- a/pkg/sqlreplay/report/report_db_test.go +++ b/pkg/sqlreplay/report/report_db_test.go @@ -53,8 +53,10 @@ func TestInitDB(t *testing.T) { func TestInsertExceptions(t *testing.T) { now := time.Now() - failSample := conn.NewFailException(errors.New("mock error"), - cmd.NewCommand(append([]byte{pnet.ComQuery.Byte()}, []byte("select 1")...), now, 1)) + cmd := cmd.NewCommand(append([]byte{pnet.ComQuery.Byte()}, []byte("select 1")...), now, 1) + cmd.FileName = "my/file" + cmd.Line = 100 + failSample := conn.NewFailException(errors.New("mock error"), cmd) otherSample1 := conn.NewOtherException(errors.Wrapf(errors.New("mock error"), "wrap"), 1) otherSample2 := conn.NewOtherException(errors.New("mock error"), 1) otherSample3 := conn.NewOtherException(errors.New("another error"), 2) @@ -101,7 +103,7 @@ func TestInsertExceptions(t *testing.T) { }, stmtID: []uint32{1}, args: [][]any{{now.String(), "Query", "e1c71d1661ae46e09b7aaec1c390957f0d6260410df4e4bc71b9c8d681021471", "select 1", "mock error", - uint64(1), now.String(), failSample.Time().String(), uint64(1), uint64(1)}}, + uint64(1), "my/file", 100, now.String(), failSample.Time().String(), uint64(1), uint64(1)}}, }, } diff --git a/pkg/sqlreplay/report/tables.go b/pkg/sqlreplay/report/tables.go index 6cf32ce4..f73e3e5e 100644 --- a/pkg/sqlreplay/report/tables.go +++ b/pkg/sqlreplay/report/tables.go @@ -13,6 +13,8 @@ const ( sample_stmt text, sample_err_msg text, sample_conn_id bigint, + sample_filename text, + sample_fileline int, sample_capture_time timestamp, sample_replay_time timestamp, count bigint, @@ -24,16 +26,20 @@ const ( sample_stmt, sample_err_msg, sample_conn_id, + sample_filename, + sample_fileline, sample_capture_time, sample_replay_time, count) - values(?, ?, ?, ?, ?, ?, ?, ?, ?) on duplicate key update count = count + ?` + values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) on duplicate key update count = count + ?` checkFailTable = `select replay_start_time, cmd_type, digest, sample_stmt, sample_err_msg, sample_conn_id, + sample_filename, + sample_fileline, sample_capture_time, sample_replay_time, count from tiproxy_traffic_replay.fail limit 0` diff --git a/pkg/sqlreplay/store/line_test.go b/pkg/sqlreplay/store/line_test.go index 52d3cdfd..6d5bcc5b 100644 --- a/pkg/sqlreplay/store/line_test.go +++ b/pkg/sqlreplay/store/line_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "testing" @@ -110,7 +111,7 @@ func TestReadLine(t *testing.T) { data, filename, idx, err := l.ReadLine() require.NoError(t, err) require.Equal(t, test.lines[fileIdx][lineIdx], string(data), "case %d file %d line %d", i, fileIdx, lineIdx) - require.Equal(t, fileNames[fileIdx], filename, "case %d file %d", i, fileIdx) + require.Equal(t, path.Join("file:/", dir, fileNames[fileIdx]), filename, "case %d file %d", i, fileIdx) require.Equal(t, lineIdx+1, idx, "case %d file %d", i, fileIdx) } } @@ -213,7 +214,7 @@ func TestRead(t *testing.T) { filename, idx, err := l.Read(data) require.NoError(t, err) require.Equal(t, test.str[j], string(data), "case %d", i) - require.Equal(t, fileNames[test.fileIdx[j]], filename, "case %d", i) + require.Equal(t, path.Join("file:/", dir, fileNames[test.fileIdx[j]]), filename, "case %d", i) require.Equal(t, test.lineIdx[j], idx, "case %d", i) } _, _, err = l.Read(data) diff --git a/pkg/sqlreplay/store/rotate.go b/pkg/sqlreplay/store/rotate.go index 1bed0401..1bce5836 100644 --- a/pkg/sqlreplay/store/rotate.go +++ b/pkg/sqlreplay/store/rotate.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "path" "reflect" "strconv" "strings" @@ -113,6 +114,7 @@ type fileMeta struct { type rotateReader struct { cfg ReaderCfg curFileName string + absolutePath string curFileIdx int64 reader io.Reader externalFile storage.ExternalFileReader @@ -164,7 +166,7 @@ func (r *rotateReader) Read(data []byte) (int, error) { } func (r *rotateReader) CurFile() string { - return r.curFileName + return r.absolutePath } func (r *rotateReader) Close() error { @@ -225,6 +227,7 @@ func (r *rotateReader) nextReader() error { r.reader = fileReader r.curFileIdx = minFileIdx r.curFileName = minFileName + r.absolutePath = path.Join(r.storage.URI(), minFileName) // rotateReader -> encryptReader -> compressReader -> file if strings.HasSuffix(minFileName, fileCompressFormat) { if r.reader, err = newCompressReader(r.reader); err != nil { @@ -235,8 +238,7 @@ func (r *rotateReader) nextReader() error { if err != nil { return err } - r.lg.Info("reading next file", zap.String("prefix", r.storage.URI()), - zap.String("file", minFileName), + r.lg.Info("reading next file", zap.String("file", r.absolutePath), zap.Duration("open_time", time.Since(startTime)), zap.Int("files_in_cache", len(r.fileMetaCache)-r.fileMetaCacheIdx)) return nil