Skip to content

Commit

Permalink
[bugfix] Fix possible race condition in federatingdb (#490)
Browse files Browse the repository at this point in the history
Signed-off-by: kim <grufwub@gmail.com>
  • Loading branch information
NyaaaWhatsUpDoc authored Apr 28, 2022
1 parent 8e80f98 commit cc5f2e9
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 219 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
codeberg.org/gruf/go-errors v1.0.5
codeberg.org/gruf/go-mutexes v1.1.2
codeberg.org/gruf/go-runners v1.2.0
codeberg.org/gruf/go-store v1.3.6
github.com/ReneKroon/ttlcache v1.7.0
Expand Down Expand Up @@ -31,7 +32,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.0
github.com/stretchr/testify v1.7.0
github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a
github.com/superseriousbusiness/activity v1.1.0-gts
github.com/superseriousbusiness/exif-terminator v0.2.0
github.com/superseriousbusiness/oauth2/v4 v4.3.2-SSB
github.com/tdewolff/minify/v2 v2.9.22
Expand All @@ -54,7 +55,6 @@ require (
codeberg.org/gruf/go-fastpath v1.0.2 // indirect
codeberg.org/gruf/go-format v1.0.3 // indirect
codeberg.org/gruf/go-hashenc v1.0.1 // indirect
codeberg.org/gruf/go-mutexes v1.1.2 // indirect
codeberg.org/gruf/go-pools v1.0.2 // indirect
github.com/aymerick/douceur v0.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -535,8 +535,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a h1:tKr18ijUgZ+PCM/n+St6uO2BaPgkReRtM3IJHC/Otf4=
github.com/superseriousbusiness/activity v1.0.1-0.20220405135100-18e8f86a760a/go.mod h1:AZw0Xb4Oju8rmaJCZ21gc5CPg47MmNgyac+Hx5jo8VM=
github.com/superseriousbusiness/activity v1.1.0-gts h1:BSnMzs/84s0Zme7BngE9iJAHV7g1Bv1nhLCP0aJtU3I=
github.com/superseriousbusiness/activity v1.1.0-gts/go.mod h1:AZw0Xb4Oju8rmaJCZ21gc5CPg47MmNgyac+Hx5jo8VM=
github.com/superseriousbusiness/exif-terminator v0.2.0 h1:C21KOUr54E37qTqYS7WJX0J83sNzzCwBEy0KXyDprqU=
github.com/superseriousbusiness/exif-terminator v0.2.0/go.mod h1:DHJuKguXqyOVqB/oyOylutEDIZCbkYsn2GZFNSUDT9E=
github.com/superseriousbusiness/go-jpeg-image-structure/v2 v2.0.0-20220321154430-d89a106fdabe h1:ksl2oCx/Qo8sNDc3Grb8WGKBM9nkvhCm25uvlT86azE=
Expand Down
29 changes: 3 additions & 26 deletions internal/federation/federatingdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ package federatingdb

import (
"context"
"sync"
"time"

"codeberg.org/gruf/go-mutexes"
"github.com/superseriousbusiness/activity/pub"
"github.com/superseriousbusiness/activity/streams/vocab"
"github.com/superseriousbusiness/gotosocial/internal/db"
Expand All @@ -41,39 +40,17 @@ type DB interface {
// FederatingDB uses the underlying DB interface to implement the go-fed pub.Database interface.
// It doesn't care what the underlying implementation of the DB interface is, as long as it works.
type federatingDB struct {
mutex sync.Mutex
locks map[string]*mutex
pool sync.Pool
locks mutexes.MutexMap
db db.DB
typeConverter typeutils.TypeConverter
}

// New returns a DB interface using the given database and config
func New(db db.DB) DB {
fdb := federatingDB{
mutex: sync.Mutex{},
locks: make(map[string]*mutex, 100),
pool: sync.Pool{New: func() interface{} { return &mutex{} }},
locks: mutexes.NewMap(-1, -1), // use defaults
db: db,
typeConverter: typeutils.NewConverter(db),
}
go fdb.cleanupLocks()
return &fdb
}

func (db *federatingDB) cleanupLocks() {
for {
// Sleep for a minute...
time.Sleep(time.Minute)

// Delete unused locks from map
db.mutex.Lock()
for id, mu := range db.locks {
if !mu.inUse() {
delete(db.locks, id)
db.pool.Put(mu)
}
}
db.mutex.Unlock()
}
}
85 changes: 4 additions & 81 deletions internal/federation/federatingdb/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import (
"context"
"errors"
"net/url"
"sync"
"sync/atomic"

"github.com/sirupsen/logrus"
)

// Lock takes a lock for the object at the specified id. If an error
Expand All @@ -39,83 +35,10 @@ import (
// processes require tight loops acquiring and releasing locks.
//
// Used to ensure race conditions in multiple requests do not occur.
func (f *federatingDB) Lock(c context.Context, id *url.URL) error {
// Before any other Database methods are called, the relevant `id`
// entries are locked to allow for fine-grained concurrency.

// Strategy: create a new lock, if stored, continue. Otherwise, lock the
// existing mutex.
if id == nil {
return errors.New("Lock: id was nil")
}
idStr := id.String()

// Acquire map lock
f.mutex.Lock()

// Get mutex, or create new
mu, ok := f.locks[idStr]
if !ok {
mu, ok = f.pool.Get().(*mutex)
if !ok {
logrus.Panic("Lock: pool entry was not a *mutex")
}
f.locks[idStr] = mu
}

// Unlock map, acquire mutex lock
f.mutex.Unlock()
mu.Lock()
return nil
}

// Unlock makes the lock for the object at the specified id available.
// If an error is returned, the lock must have still been freed.
//
// Used to ensure race conditions in multiple requests do not occur.
func (f *federatingDB) Unlock(c context.Context, id *url.URL) error {
// Once Go-Fed is done calling Database methods, the relevant `id`
// entries are unlocked.
func (f *federatingDB) Lock(c context.Context, id *url.URL) (func(), error) {
if id == nil {
return errors.New("Unlock: id was nil")
return nil, errors.New("Lock: id was nil")
}
idStr := id.String()

// Check map for mutex
f.mutex.Lock()
mu, ok := f.locks[idStr]
f.mutex.Unlock()

if !ok {
return errors.New("missing an id in unlock")
}

// Unlock the mutex
mu.Unlock()
return nil
}

// mutex defines a mutex we can check the lock status of.
// this is not perfect, but it's good enough for a semi
// regular mutex cleanup routine
type mutex struct {
mu sync.Mutex
st uint32
}

// inUse returns if the mutex is in use
func (mu *mutex) inUse() bool {
return atomic.LoadUint32(&mu.st) == 1
}

// Lock acquire mutex lock
func (mu *mutex) Lock() {
mu.mu.Lock()
atomic.StoreUint32(&mu.st, 1)
}

// Unlock releases mutex lock
func (mu *mutex) Unlock() {
mu.mu.Unlock()
atomic.StoreUint32(&mu.st, 0)
unlock := f.locks.Lock(id.String())
return unlock, nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cc5f2e9

Please sign in to comment.