Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

LeakDetector: use it to find which resource was created but not closed (leaked) #990

Merged
merged 20 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 27 additions & 23 deletions common/dbg/experiments.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,25 +193,45 @@ func BigRwTxKb() uint {
}

var (
slowCommit time.Duration
getSlowCommit sync.Once
slowCommit time.Duration
slowCommitOnce sync.Once
)

func SlowCommit() time.Duration {
getSlowCommit.Do(func() {
v, _ := os.LookupEnv("DEBUG_SLOW_COMMIT_MS")
slowCommitOnce.Do(func() {
v, _ := os.LookupEnv("SLOW_COMMIT")
if v != "" {
i, err := strconv.Atoi(v)
var err error
slowCommit, err = time.ParseDuration(v)
if err != nil {
panic(err)
}
slowCommit = time.Duration(i) * time.Millisecond
log.Info("[Experiment]", "DEBUG_BIG_RW_TX_KB", slowCommit)
log.Info("[Experiment]", "SLOW_COMMIT", slowCommit.String())
}
})
return slowCommit
}

var (
slowTx time.Duration
slowTxOnce sync.Once
)

func SlowTx() time.Duration {
slowTxOnce.Do(func() {
v, _ := os.LookupEnv("SLOW_TX")
if v != "" {
var err error
slowTx, err = time.ParseDuration(v)
if err != nil {
panic(err)
}
log.Info("[Experiment]", "SLOW_TX", slowTx.String())
}
})
return slowTx
}

var (
stopBeforeStage string
stopBeforeStageFlag sync.Once
Expand Down Expand Up @@ -261,19 +281,3 @@ func StopAfterReconst() bool {
})
return stopAfterReconst
}

var (
traceAgg bool
traceAggOnce sync.Once
)

func TraceAgg() bool {
traceAggOnce.Do(func() {
v, _ := os.LookupEnv("TRACE_AGG")
if v == "true" {
traceAgg = true
log.Info("[Experiment]", "TRACE_AGG", traceAgg)
}
})
return traceAgg
}
105 changes: 105 additions & 0 deletions common/dbg/leak_detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package dbg

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/ledgerwatch/log/v3"
)

// LeakDetector - use it to find which resource was created but not closed (leaked)
// periodically does print in logs resources which living longer than 1min with their creation stack trace
// For example db transactions can call Add/Del from Begin/Commit/Rollback methods
type LeakDetector struct {
enabled atomic.Bool
slowThreshold atomic.Pointer[time.Duration]
autoIncrement atomic.Uint64

list map[uint64]LeakDetectorItem
listLock sync.Mutex
}

type LeakDetectorItem struct {
stack string
started time.Time
}

func NewLeakDetector(name string, slowThreshold time.Duration) *LeakDetector {
enabled := slowThreshold > 0
if !enabled {
return nil
}
d := &LeakDetector{list: map[uint64]LeakDetectorItem{}}
d.SetSlowThreshold(slowThreshold)

if enabled {
go func() {
logEvery := time.NewTicker(60 * time.Second)
defer logEvery.Stop()

for {
select {
case <-logEvery.C:
if list := d.slowList(); len(list) > 0 {
log.Info(fmt.Sprintf("[dbg.%s] long living resources", name), "list", strings.Join(d.slowList(), ", "))
}
}
}
}()
}
return d
}

func (d *LeakDetector) slowList() (res []string) {
if d == nil || !d.Enabled() {
return res
}
slowThreshold := *d.slowThreshold.Load()

d.listLock.Lock()
defer d.listLock.Unlock()
i := 0
for key, value := range d.list {
living := time.Since(value.started)
if living > slowThreshold {
res = append(res, fmt.Sprintf("%d(%s): %s", key, living, value.stack))
}
i++
if i > 10 { // protect logs from too many output
break
}
}
return res
}

func (d *LeakDetector) Del(id uint64) {
if d == nil || !d.Enabled() {
return
}
d.listLock.Lock()
defer d.listLock.Unlock()
delete(d.list, id)
}
func (d *LeakDetector) Add() uint64 {
if d == nil || !d.Enabled() {
return 0
}
ac := LeakDetectorItem{
stack: StackSkip(2),
started: time.Now(),
}
id := d.autoIncrement.Add(1)
d.listLock.Lock()
defer d.listLock.Unlock()
d.list[id] = ac
return id
}

func (d *LeakDetector) Enabled() bool { return d.enabled.Load() }
func (d *LeakDetector) SetSlowThreshold(t time.Duration) {
d.slowThreshold.Store(&t)
d.enabled.Store(t > 0)
}
3 changes: 3 additions & 0 deletions common/dbg/log_panic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ import (
func Stack() string {
return stack2.Trace().TrimBelow(stack2.Caller(1)).String()
}
func StackSkip(skip int) string {
return stack2.Trace().TrimBelow(stack2.Caller(skip)).String()
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ require (
github.com/stretchr/testify v1.8.2
github.com/tidwall/btree v1.6.0
golang.org/x/crypto v0.8.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/sync v0.1.0
golang.org/x/sys v0.7.0
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
golang.org/x/sync v0.2.0
golang.org/x/sys v0.8.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.54.0
google.golang.org/grpc v1.55.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0
google.golang.org/protobuf v1.30.0
)
Expand Down
Loading
Loading