Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry-pick to 1.1dev: update kinds of abnormal txn behavior #14942

Merged
merged 11 commits into from
Mar 13, 2024
9 changes: 6 additions & 3 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package catalog

import (
"fmt"
"regexp"
"strconv"

"github.com/matrixorigin/matrixone/pkg/compress"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/pb/api"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"regexp"
"strconv"
)

const (
Expand Down Expand Up @@ -463,3 +462,7 @@ func BuildQueryResultPath(accountName, statementId string, blockIdx int) string
func BuildQueryResultMetaPath(accountName, statementId string) string {
return fmt.Sprintf(QueryResultMetaPath, accountName, statementId)
}

func BuildProfilePath(typ, name string) string {
return fmt.Sprintf("%s/%s_%s", ProfileDir, typ, name)
}
3 changes: 3 additions & 0 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,12 +763,15 @@ var (
QueryResultPath string
QueryResultMetaPath string
QueryResultMetaDir string
//ProfileDir holds all profiles dumped by the runtime/pprof
ProfileDir string
)

func init() {
QueryResultPath = fileservice.JoinPath(defines.SharedFileServiceName, "/query_result/%s_%s_%d.blk")
QueryResultMetaPath = fileservice.JoinPath(defines.SharedFileServiceName, "/query_result_meta/%s_%s.blk")
QueryResultMetaDir = fileservice.JoinPath(defines.SharedFileServiceName, "/query_result_meta")
ProfileDir = fileservice.JoinPath(defines.ETLFileServiceName, "/profile")
}

const QueryResultName = "%s_%s_%d.blk"
Expand Down
65 changes: 56 additions & 9 deletions pkg/cnservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import (
"context"
"encoding/hex"
"fmt"
"os"
"github.com/google/uuid"
"github.com/matrixorigin/matrixone/pkg/catalog"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"go.uber.org/zap"
"io"
"sync"
"time"

Expand Down Expand Up @@ -52,12 +56,10 @@ import (
"github.com/matrixorigin/matrixone/pkg/udf/pythonservice"
"github.com/matrixorigin/matrixone/pkg/util/address"
"github.com/matrixorigin/matrixone/pkg/util/executor"
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"github.com/matrixorigin/matrixone/pkg/util/profile"
"github.com/matrixorigin/matrixone/pkg/util/status"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
"go.uber.org/zap"
)

func NewService(
Expand Down Expand Up @@ -588,17 +590,36 @@ func (s *service) getTxnClient() (c client.TxnClient, err error) {
opts = append(opts,
client.WithEnableRefreshExpression())
}

if s.cfg.Txn.EnableLeakCheck == 1 {
opts = append(opts, client.WithEnableLeakCheck(
s.cfg.Txn.MaxActiveAges.Duration,
func(txnID []byte, createAt time.Time, createBy string) {
// dump all goroutines to stderr
profile.ProfileGoroutine(os.Stderr, 2)
v2.TxnLeakCounter.Inc()
runtime.DefaultRuntime().Logger().Error("found leak txn",
func(txnID []byte, createAt time.Time, createBy string, options txn.TxnOptions) {
name, _ := uuid.NewV7()
profPath := catalog.BuildProfilePath("routine", name.String())

fields := []zap.Field{
zap.String("txn-id", hex.EncodeToString(txnID)),
zap.Time("create-at", createAt),
zap.String("create-by", createBy))
zap.String("create-by", createBy),
zap.String("options", options.String()),
zap.String("profile", profPath),
}
if options.InRunSql {
//the txn runs sql in compile.Run() and doest not exist
v2.TxnLongRunningCounter.Inc()
runtime.DefaultRuntime().Logger().Error("found long running txn", fields...)
} else if options.InCommit {
v2.TxnInCommitCounter.Inc()
runtime.DefaultRuntime().Logger().Error("found txn in commit", fields...)
} else if options.InRollback {
v2.TxnInRollbackCounter.Inc()
runtime.DefaultRuntime().Logger().Error("found txn in rollback", fields...)
} else {
v2.TxnLeakCounter.Inc()
runtime.DefaultRuntime().Logger().Error("found leak txn", fields...)
}
s.saveProfile(profPath)
}))
}
if s.cfg.Txn.Limit > 0 {
Expand Down Expand Up @@ -732,6 +753,32 @@ func (s *service) bootstrap() error {
})
}

func (s *service) saveProfile(profilePath string) {
reader, writer := io.Pipe()
go func() {
// dump all goroutines
_ = profile.ProfileGoroutine(writer, 2)
_ = writer.Close()
}()
writeVec := fileservice.IOVector{
FilePath: profilePath,
Entries: []fileservice.IOEntry{
{
Offset: 0,
ReaderForWrite: reader,
Size: -1,
},
},
}
ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*3)
defer cancel()
err := s.etlFS.Write(ctx, writeVec)
if err != nil {
logutil.Errorf("save profile %s failed. err:%v", profilePath, err)
return
}
}

var (
bootstrapKey = "_mo_bootstrap"
)
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func (th *TxnHandler) NewTxnOperator() (context.Context, TxnOperator, error) {
panic("context should not be nil")
}
opts = append(opts,
client.WithTxnCreateBy(fmt.Sprintf("frontend-session-%p", th.ses)))
client.WithTxnCreateBy(fmt.Sprintf("frontend-session-%p", th.ses)),
client.WithSessionInfo(th.ses.GetDebugString()))

if th.ses != nil && th.ses.GetFromRealUser() {
opts = append(opts,
Expand Down
Loading
Loading