Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Pos() deadlock with file system invalidation #206

Merged
merged 1 commit into from Nov 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 17 additions & 41 deletions db.go
Expand Up @@ -13,7 +13,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -33,14 +33,13 @@ const (

// DB represents a SQLite database.
type DB struct {
mu sync.Mutex
store *Store // parent store
name string // name of database
path string // full on-disk path
pageSize uint32 // database page size, if known
pageN uint32 // database size, in pages
pos Pos // current tx position
mode DBMode // database journaling mode (rollback, wal)
store *Store // parent store
name string // name of database
path string // full on-disk path
pageSize uint32 // database page size, if known
pageN uint32 // database size, in pages
pos atomic.Value // current tx position (Pos)
mode DBMode // database journaling mode (rollback, wal)

dirtyPageSet map[uint32]struct{}

Expand Down Expand Up @@ -83,6 +82,7 @@ func NewDB(store *Store, name string, path string) *DB {

Now: time.Now,
}
db.pos.Store(Pos{})
db.wal.frameOffsets = make(map[uint32]int64)
return db
}
Expand Down Expand Up @@ -137,23 +137,14 @@ func (db *DB) WALPath() string { return filepath.Join(db.path, "wal") }
// SHMPath returns the path to the underlying shared memory file.
func (db *DB) SHMPath() string { return filepath.Join(db.path, "shm") }

// PageSize returns the page size of the underlying database.
func (db *DB) PageSize() uint32 {
db.mu.Lock()
defer db.mu.Unlock()
return db.pageSize
}

// Pos returns the current transaction position of the database.
func (db *DB) Pos() Pos {
db.mu.Lock()
defer db.mu.Unlock()
return db.pos
return db.pos.Load().(Pos)
}

// setPos sets the current transaction position of the database.
func (db *DB) setPos(pos Pos) error {
db.pos = pos
db.pos.Store(pos)

// Invalidate page cache.
if invalidator := db.store.Invalidator; invalidator != nil {
Expand All @@ -163,7 +154,7 @@ func (db *DB) setPos(pos Pos) error {
}

// Update metrics.
dbTXIDMetricVec.WithLabelValues(db.name).Set(float64(db.pos.TXID))
dbTXIDMetricVec.WithLabelValues(db.name).Set(float64(pos.TXID))

return nil
}
Expand Down Expand Up @@ -491,8 +482,8 @@ func (db *DB) verifyDatabaseFile() error {
}

// Ensure database checksum matches checksum in current position.
if chksum != db.pos.PostApplyChecksum {
return fmt.Errorf("database checksum (%016x) does not match latest LTX checksum (%016x)", chksum, db.pos.PostApplyChecksum)
if pos := db.Pos(); chksum != pos.PostApplyChecksum {
return fmt.Errorf("database checksum (%016x) does not match latest LTX checksum (%016x)", chksum, pos.PostApplyChecksum)
}

return nil
Expand All @@ -513,9 +504,6 @@ func (db *DB) OpenLTXFile(txID uint64) (*os.File, error) {

// WriteDatabase writes data to the main database file.
func (db *DB) WriteDatabase(f *os.File, data []byte, offset int64) error {
db.mu.Lock()
defer db.mu.Unlock()

// Return an error if the current process is not the leader.
if !db.store.IsPrimary() {
return ErrReadOnlyReplica
Expand Down Expand Up @@ -568,9 +556,6 @@ func (db *DB) CreateWAL() (*os.File, error) {
// WriteWAL writes data to the WAL file. On final commit write, an LTX file is
// generated for the transaction.
func (db *DB) WriteWAL(f *os.File, data []byte, offset int64) error {
db.mu.Lock()
defer db.mu.Unlock()

// Return an error if the current process is not the leader.
if !db.store.IsPrimary() {
return ErrReadOnlyReplica
Expand Down Expand Up @@ -703,7 +688,7 @@ func (db *DB) CommitWAL() error {
prevPageN := binary.BigEndian.Uint32(page[SQLITE_DATABASE_SIZE_OFFSET:])

// Determine transaction ID of the in-process transaction.
pos := db.pos
pos := db.Pos()
txID := pos.TXID + 1

// Compute rolling checksum based off previous LTX database checksum.
Expand Down Expand Up @@ -908,9 +893,6 @@ func isByteSliceZero(b []byte) bool {

// CommitJournal deletes the journal file which commits or rolls back the transaction.
func (db *DB) CommitJournal(mode JournalMode) error {
db.mu.Lock()
defer db.mu.Unlock()

// Return an error if the current process is not the leader.
if !db.store.IsPrimary() {
return ErrReadOnlyReplica
Expand All @@ -933,7 +915,7 @@ func (db *DB) CommitJournal(mode JournalMode) error {
}

// Determine transaction ID of the in-process transaction.
pos := db.pos
pos := db.Pos()
txID := pos.TXID + 1

dbFile, err := os.Open(db.DatabasePath())
Expand Down Expand Up @@ -1262,9 +1244,6 @@ func (db *DB) ApplyLTX(ctx context.Context, path string) error {
return fmt.Errorf("sync database file: %w", err)
}

db.mu.Lock()
defer db.mu.Unlock()

db.mode = dbMode

if db.pageSize == 0 {
Expand Down Expand Up @@ -1433,14 +1412,12 @@ func (db *DB) WriteSnapshotTo(ctx context.Context, dst io.Writer) (header ltx.He
}

// Determine current position & snapshot overriding WAL frames.
db.mu.Lock()
pos := db.pos
pos := db.Pos()
pageSize, pageN := db.pageSize, db.pageN
walFrameOffsets := make(map[uint32]int64, len(db.wal.frameOffsets))
for k, v := range db.wal.frameOffsets {
walFrameOffsets[k] = v
}
db.mu.Unlock()

// Log transaction ID for the snapshot.
log.Printf("writing snapshot %q @ %s", db.name, ltx.FormatTXID(pos.TXID))
Expand Down Expand Up @@ -1560,7 +1537,6 @@ func (db *DB) EnforceRetention(ctx context.Context, minTime time.Time) error {

type dbVarJSON struct {
Name string `json:"name"`
PageSize uint32 `json:"pageSize"`
TXID string `json:"txid"`
Checksum string `json:"checksum"`

Expand Down
1 change: 0 additions & 1 deletion store.go
Expand Up @@ -737,7 +737,6 @@ func (v *StoreVar) String() string {

dbJSON := &dbVarJSON{
Name: db.Name(),
PageSize: db.PageSize(),
TXID: ltx.FormatTXID(pos.TXID),
Checksum: fmt.Sprintf("%016x", pos.PostApplyChecksum),
}
Expand Down