Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions cmd/mo-service/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,16 @@ func saveProfilesLoop(sigs chan os.Signal) {

quit := false
tk := time.NewTicker(*profileInterval)
logutil.GetGlobalLogger().Info("save profiles loop started", zap.Duration("profile-interval", *profileInterval), zap.Duration("cpuProfileInterval", cpuProfileInterval))
logutil.Info(
"save.profiles.loop.started",
zap.Duration("interval", *profileInterval),
zap.Duration("duration", cpuProfileInterval),
)
for {
select {
case <-tk.C:
logutil.GetGlobalLogger().Info("save profiles start")
saveProfiles()
saveCpuProfile(cpuProfileInterval)
logutil.GetGlobalLogger().Info("save profiles end")
case <-sigs:
quit = true
}
Expand All @@ -435,20 +437,29 @@ func saveProfiles() {
func saveProfile(typ string) string {
name, _ := uuid.NewV7()
profilePath := catalog.BuildProfilePath(globalServiceType, globalNodeId, typ, name.String()) + ".gz"
logutil.GetGlobalLogger().Info("save profiles ", zap.String("path", profilePath))
cnservice.SaveProfile(profilePath, typ, globalEtlFS)
return profilePath
}

func saveCpuProfile(cpuProfileInterval time.Duration) {
// Skip CPU profile if already in use (e.g., started via -cpu-profile flag)
if *cpuProfilePathFlag != "" {
logutil.GetGlobalLogger().Debug("skip cpu profile generation, cpu profiling already enabled via -cpu-profile flag")
return
}

genCpuProfile := func(writer io.Writer) error {
err := pprof.StartCPUProfile(writer)
if err != nil {
// If CPU profiling is already in use, skip silently
if strings.Contains(err.Error(), "cpu profiling already in use") {
return nil
}
return err
}
time.Sleep(cpuProfileInterval)
pprof.StopCPUProfile()
return err
return nil
}
saveProfileWithType("cpu", genCpuProfile)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bootstrap/versions/upgrade_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func CheckTableColumn(txn executor.TxnExecutor,
AND att_database = '%s' and att_relname = '%s' and attname = '%s';`, schema, tableName, columnName)
}

res, err := txn.Exec(sql, executor.StatementOption{}.WithAccountID(accountId))
res, err := txn.Exec(sql, executor.StatementOption{}.WithAccountID(accountId).WithDisableLog())
if err != nil {
return colInfo, err
}
Expand Down Expand Up @@ -292,7 +292,7 @@ var CheckViewDefinition = func(txn executor.TxnExecutor, accountId uint32, schem
sql = fmt.Sprintf("SELECT tbl.rel_createsql AS `VIEW_DEFINITION` FROM mo_catalog.mo_tables tbl LEFT JOIN mo_catalog.mo_user usr ON tbl.creator = usr.user_id WHERE tbl.relkind = 'v' AND account_id = 0 AND tbl.reldatabase = '%s' AND tbl.relname = '%s'", schema, viewName)
}

res, err := txn.Exec(sql, executor.StatementOption{}.WithAccountID(accountId))
res, err := txn.Exec(sql, executor.StatementOption{}.WithAccountID(accountId).WithDisableLog())
if err != nil {
return false, "", err
}
Expand Down Expand Up @@ -330,7 +330,7 @@ var CheckTableDefinition = func(txn executor.TxnExecutor, accountId uint32, sche
AND account_id = 0 AND reldatabase = '%s' AND relname = '%s'`, schema, tableName)
}

res, err := txn.Exec(sql, executor.StatementOption{}.WithAccountID(accountId))
res, err := txn.Exec(sql, executor.StatementOption{}.WithAccountID(accountId).WithDisableLog())
if err != nil {
return false, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cdc/table_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
commonutil "github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/defines"
Expand Down Expand Up @@ -646,7 +647,7 @@ func (s *TableDetector) cleanupOrphanWatermarks(ctx context.Context) {
fields := []zap.Field{
zap.Uint64("deleted-rows", res.AffectedRows),
zap.Duration("duration", duration),
zap.String("sql", sql),
zap.String("sql", commonutil.Abbreviate(sql, 500)),
}
if duration > s.cleanupWarn {
logutil.Warn(
Expand Down
14 changes: 2 additions & 12 deletions pkg/cdc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func extractRowFromVector(ctx context.Context, vec *vector.Vector, i int, row []

switch vec.GetType().Oid { //get col
case types.T_json:
row[i] = types.DecodeJson(copyBytes(vec.GetBytesAt(rowIndex)))
row[i] = types.DecodeJson(vec.CloneBytesAt(rowIndex))
case types.T_bool:
row[i] = vector.GetFixedAtWithTypeCheck[bool](vec, rowIndex)
case types.T_bit:
Expand All @@ -161,7 +161,7 @@ func extractRowFromVector(ctx context.Context, vec *vector.Vector, i int, row []
case types.T_float64:
row[i] = vector.GetFixedAtWithTypeCheck[float64](vec, rowIndex)
case types.T_char, types.T_varchar, types.T_blob, types.T_text, types.T_binary, types.T_varbinary, types.T_datalink:
row[i] = copyBytes(vec.GetBytesAt(rowIndex))
row[i] = vec.CloneBytesAt(rowIndex)
case types.T_array_float32:
// NOTE: Don't merge it with T_varchar. You will get raw binary in the SQL output
//+------------------------------+
Expand Down Expand Up @@ -211,16 +211,6 @@ func extractRowFromVector(ctx context.Context, vec *vector.Vector, i int, row []
return nil
}

func copyBytes(src []byte) []byte {
if len(src) > 0 {
dst := make([]byte, len(src))
copy(dst, src)
return dst
} else {
return []byte{}
}
}

func convertColIntoSql(
ctx context.Context,
data any,
Expand Down
3 changes: 2 additions & 1 deletion pkg/cdc/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
commonutil "github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/bytejson"
"github.com/matrixorigin/matrixone/pkg/container/types"
Expand Down Expand Up @@ -438,7 +439,7 @@ func Test_copyBytes(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, copyBytes(tt.args.src), "copyBytes(%v)", tt.args.src)
assert.Equalf(t, tt.want, commonutil.CloneBytes(tt.args.src), "copyBytes(%v)", tt.args.src)
})
}
}
Expand Down
23 changes: 19 additions & 4 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,15 +999,26 @@ func SaveProfile(profilePath string, profileType string, etlFS fileservice.FileS
}
err := profile.ProfileRuntime(profileType, gzWriter, debug)
if err != nil {
logutil.Errorf("get profile of %s failed. err:%v", profilePath, err)
logutil.Error(
"profile.save.runtime.failed",
zap.String("path", profilePath),
zap.Error(err),
)
return
}
err = gzWriter.Close()
if err != nil {
logutil.Errorf("close gzip write of %s failed. err:%v", profilePath, err)
logutil.Error(
"profile.writer.close.failed",
zap.String("path", profilePath),
zap.Error(err),
)
return
}
logutil.Info("get profile done. save profiles ", zap.String("path", profilePath))
logutil.Info(
"profile.save.get.ok",
zap.String("path", profilePath),
)
writeVec := fileservice.IOVector{
FilePath: profilePath,
Entries: []fileservice.IOEntry{
Expand All @@ -1023,7 +1034,11 @@ func SaveProfile(profilePath string, profileType string, etlFS fileservice.FileS
err = etlFS.Write(ctx, writeVec)
if err != nil {
err = moerr.AttachCause(ctx, err)
logutil.Errorf("save profile %s failed. err:%v", profilePath, err)
logutil.Error(
"profile.save.failed",
zap.String("path", profilePath),
zap.Error(err),
)
return
}
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/cnservice/server_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/morpc"
"github.com/matrixorigin/matrixone/pkg/common/system"
commonUtil "github.com/matrixorigin/matrixone/pkg/common/util"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/fileservice"
Expand Down Expand Up @@ -345,15 +346,9 @@ func copyWaitTxn(src pblock.WaitTxn) *pblock.WaitTxn {
return dst
}

func copyBytes(src []byte) []byte {
dst := make([]byte, len(src))
copy(dst, src)
return dst
}

func copyTxnMeta(src txn.TxnMeta) *txn.TxnMeta {
dst := &txn.TxnMeta{
ID: copyBytes(src.GetID()),
ID: commonUtil.CloneBytes(src.GetID()),
Status: src.GetStatus(),
SnapshotTS: src.GetSnapshotTS(),
PreparedTS: src.GetPreparedTS(),
Expand Down
Loading
Loading