Skip to content

Commit

Permalink
fix(#4543): BeginRo use semaphore (#520)
Browse files Browse the repository at this point in the history
  • Loading branch information
revitteth committed Jul 13, 2022
1 parent 21c6baf commit d629e31
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
36 changes: 19 additions & 17 deletions kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ import (

"github.com/c2h5oh/datasize"
stack2 "github.com/go-stack/stack"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/log/v3"
"github.com/torquem-ch/mdbx-go/mdbx"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"

"github.com/ledgerwatch/erigon-lib/kv"
)

const NonExistingDBI kv.DBI = 999_999_999
Expand All @@ -57,7 +59,7 @@ type MdbxOpts struct {
syncPeriod time.Duration
augumentLimit uint64
pageSize uint64
roTxsLimiter chan struct{}
roTxsLimiter *semaphore.Weighted
}

func testKVPath() string {
Expand Down Expand Up @@ -86,7 +88,7 @@ func (opts MdbxOpts) Label(label kv.Label) MdbxOpts {
return opts
}

func (opts MdbxOpts) RoTxsLimiter(l chan struct{}) MdbxOpts {
func (opts MdbxOpts) RoTxsLimiter(l *semaphore.Weighted) MdbxOpts {
opts.roTxsLimiter = l
return opts
}
Expand Down Expand Up @@ -254,7 +256,7 @@ func (opts MdbxOpts) Open() (kv.RwDB, error) {
}

if opts.roTxsLimiter == nil {
opts.roTxsLimiter = make(chan struct{}, runtime.GOMAXPROCS(-1))
opts.roTxsLimiter = semaphore.NewWeighted(int64(runtime.GOMAXPROCS(-1)))
}
db := &MdbxKV{
opts: opts,
Expand Down Expand Up @@ -328,7 +330,7 @@ type MdbxKV struct {
buckets kv.TableCfg
opts MdbxOpts
txSize uint64
roTxsLimiter chan struct{} // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor)
roTxsLimiter *semaphore.Weighted // does limit amount of concurrent Ro transactions - in most casess runtime.NumCPU() is good value for this channel capacity - this channel can be shared with other components (like Decompressor)
closed atomic.Bool
}

Expand Down Expand Up @@ -392,10 +394,16 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
if db.closed.Load() {
return nil, fmt.Errorf("db closed")
}
select {
case <-ctx.Done():

// will return nil err if context is cancelled (may appear to acquire the semaphore)
if semErr := db.roTxsLimiter.Acquire(ctx, 1); semErr != nil {
return nil, semErr
}

// if context cancelled as we acquire the sempahore, it may succeed without blocking
// in this case we should return
if ctx.Err() != nil {
return nil, ctx.Err()
case db.roTxsLimiter <- struct{}{}:
}

defer func() {
Expand All @@ -405,7 +413,7 @@ func (db *MdbxKV) BeginRo(ctx context.Context) (txn kv.Tx, err error) {
if txn == nil {
// on error, or if there is whatever reason that we don't return a tx,
// we need to free up the limiter slot, otherwise it could lead to deadlocks
<-db.roTxsLimiter
db.roTxsLimiter.Release(1)
}
}()

Expand Down Expand Up @@ -770,10 +778,7 @@ func (tx *MdbxTx) Commit() error {
tx.tx = nil
tx.db.wg.Done()
if tx.readOnly {
select {
case <-tx.db.roTxsLimiter:
default:
}
tx.db.roTxsLimiter.Release(1)
} else {
runtime.UnlockOSThread()
}
Expand Down Expand Up @@ -828,10 +833,7 @@ func (tx *MdbxTx) Rollback() {
tx.tx = nil
tx.db.wg.Done()
if tx.readOnly {
select {
case <-tx.db.roTxsLimiter:
default:
}
tx.db.roTxsLimiter.Release(1)
} else {
runtime.UnlockOSThread()
}
Expand Down

0 comments on commit d629e31

Please sign in to comment.