Skip to content

Commit

Permalink
Segment sealing and flushing (part 1)
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Nov 27, 2018
1 parent f2181af commit 03afc39
Show file tree
Hide file tree
Showing 8 changed files with 392 additions and 10 deletions.
34 changes: 34 additions & 0 deletions persist/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package persist

// Manager manages the internals of persisting data onto storage layer.
type Manager interface {
// StartPersist starts persisting data.
StartPersist() (Persister, error)
}

// Persister is responsible for actually persisting data.
type Persister interface {
// Prepare prepares for data persistence.
Prepare(opts PrepareOptions) (PreparedPersister, error)

// Done marks the persistence as complete.
Done() error
}

// PrepareOptions provide a set of options for data persistence.
// TODO(xichen): Flesh this out.
type PrepareOptions struct {
}

// Fn is a function that persists an in-memory segment.
// TODO(xichen): Flesh this out.
type Fn func() error

// Closer is a function that performs cleanup after persistence.
type Closer func() error

// PreparedPersister is an object that wraps a persist function and a closer.
type PreparedPersister struct {
Persist Fn
Close Closer
}
26 changes: 23 additions & 3 deletions storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ type Database interface {
Close() error
}

// database provides internal database APIs.
type database interface {
Database

// GetOwnedNamespaces returns the namespaces owned by the database.
GetOwnedNamespaces() ([]databaseNamespace, error)
}

var (
// errDatabaseNotOpenOrClosed raised when trying to perform an action that requires
// the databse is open.
Expand All @@ -51,6 +59,7 @@ type db struct {
opts *Options

state databaseState
mediator databaseMediator
namespaces map[hash.Hash]databaseNamespace
}

Expand All @@ -70,14 +79,16 @@ func NewDatabase(
h := hash.BytesHash(ns)
nss[h] = newDatabaseNamespace(ns, shardSet, opts)
}
return &db{

d := &db{
opts: opts,
shardSet: shardSet,
namespaces: nss,
}
d.mediator = newMediator(d, opts)
return d
}

// TODO(xichen): Start ticking.
func (d *db) Open() error {
d.Lock()
defer d.Unlock()
Expand All @@ -86,7 +97,7 @@ func (d *db) Open() error {
return errDatabaseOpenOrClosed
}
d.state = databaseOpen
return nil
return d.mediator.Open()
}

func (d *db) Write(
Expand Down Expand Up @@ -121,6 +132,15 @@ func (d *db) Close() error {
return multiErr.FinalError()
}

func (d *db) GetOwnedNamespaces() ([]databaseNamespace, error) {
d.RLock()
defer d.RUnlock()
if d.state != databaseOpen {
return nil, errDatabaseNotOpenOrClosed
}
return d.ownedNamespacesWithLock(), nil
}

func (d *db) namespaceFor(namespace []byte) (databaseNamespace, error) {
h := hash.BytesHash(namespace)
d.RLock()
Expand Down
49 changes: 49 additions & 0 deletions storage/flush.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package storage

import (
"sync"
"time"

"github.com/xichen2020/eventdb/persist"

"github.com/m3db/m3x/clock"
)

type databaseFlushManager interface {
// ComputeFlushTargets computes the list of namespaces eligible for flushing.
ComputeFlushTargets(namespaces []databaseNamespace) []databaseNamespace

// Flush attempts to flush the database if deemed necessary.
Flush(namespace databaseNamespace) error
}

type flushManager struct {
sync.Mutex

database database
pm persist.Manager
opts *Options

nowFn clock.NowFn
sleepFn sleepFn
}

func newFlushManager(database database, opts *Options) *flushManager {
return &flushManager{
database: database,
// pm: fs.NewPersistManager()
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
}
}

// NB: This is just a no-op implementation for now.
// In reality, this should be determined based on memory usage of each namespace.
func (mgr *flushManager) ComputeFlushTargets(namespaces []databaseNamespace) []databaseNamespace {
return namespaces
}

func (mgr *flushManager) Flush(namespace databaseNamespace) error {
return namespace.Flush(mgr.pm)
}
101 changes: 101 additions & 0 deletions storage/fs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package storage

import (
"errors"
"sync"
"time"

"github.com/m3db/m3x/clock"
xerrors "github.com/m3db/m3x/errors"
"github.com/uber-go/tally"
)

type databaseFileSystemManager interface {
// Run attempts to flush the database if deemed necessary.
Run() error
}

var (
errEmptyNamespaces = errors.New("empty namespaces")
errRunInProgress = errors.New("another run is in progress")
)

type runStatus int

// nolint:deadcode,megacheck,varcheck
const (
runNotStarted runStatus = iota
runInProgress
)

type fileSystemManagerMetrics struct {
runDuration tally.Timer
}

func newFileSystemManagerMetrics(scope tally.Scope) fileSystemManagerMetrics {
return fileSystemManagerMetrics{
runDuration: scope.Timer("duration"),
}
}

type fileSystemManager struct {
sync.Mutex
databaseFlushManager

database database
opts *Options

status runStatus
metrics fileSystemManagerMetrics
nowFn clock.NowFn
sleepFn sleepFn
}

func newFileSystemManager(database database, opts *Options) *fileSystemManager {
scope := opts.InstrumentOptions().MetricsScope().SubScope("fs")

return &fileSystemManager{
database: database,
databaseFlushManager: newFlushManager(database, opts),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
metrics: newFileSystemManagerMetrics(scope),
}
}

func (mgr *fileSystemManager) Run() error {
mgr.Lock()
defer mgr.Unlock()

if mgr.status == runInProgress {
return errRunInProgress
}

namespaces, err := mgr.database.GetOwnedNamespaces()
if err != nil {
return err
}
if len(namespaces) == 0 {
return errEmptyNamespaces
}

// Determine which namespaces are in scope for flushing.
toFlush := mgr.ComputeFlushTargets(namespaces)
if len(toFlush) == 0 {
// Nothing to do.
return nil
}

var (
start = mgr.nowFn()
multiErr xerrors.MultiError
)
for _, n := range namespaces {
multiErr = multiErr.Add(mgr.Flush(n))
}

took := mgr.nowFn().Sub(start)
mgr.metrics.runDuration.Record(took)
return multiErr.FinalError()
}
121 changes: 121 additions & 0 deletions storage/mediator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package storage

import (
"errors"
"sync"
"time"

"github.com/m3db/m3x/clock"
)

// databaseMediator mediates actions among various database managers.
type databaseMediator interface {
// Open opens the mediator.
Open() error

// Close closes the mediator.
Close() error
}

type mediatorState int

const (
runCheckInterval = 5 * time.Second
minRunInterval = time.Minute

mediatorNotOpen mediatorState = iota
mediatorOpen
mediatorClosed
)

var (
errMediatorAlreadyOpen = errors.New("mediator is already open")
errMediatorNotOpen = errors.New("mediator is not open")
errMediatorAlreadyClosed = errors.New("mediator is already closed")
)

type sleepFn func(time.Duration)

type mediator struct {
sync.RWMutex

database Database
databaseFileSystemManager

opts *Options
nowFn clock.NowFn
sleepFn sleepFn
state mediatorState
closedCh chan struct{}
}

func newMediator(database database, opts *Options) databaseMediator {
return &mediator{
database: database,
databaseFileSystemManager: newFileSystemManager(database, opts),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
state: mediatorNotOpen,
closedCh: make(chan struct{}),
}
}

func (m *mediator) Open() error {
m.Lock()
defer m.Unlock()
if m.state != mediatorNotOpen {
return errMediatorAlreadyOpen
}
m.state = mediatorOpen
go m.runLoop()
return nil
}

func (m *mediator) Close() error {
m.Lock()
defer m.Unlock()

if m.state == mediatorNotOpen {
return errMediatorNotOpen
}
if m.state == mediatorClosed {
return errMediatorAlreadyClosed
}
m.state = mediatorClosed
close(m.closedCh)
return nil
}

func (m *mediator) runLoop() {
for {
select {
case <-m.closedCh:
return
default:
m.runOnce()
}
}
}

func (m *mediator) runOnce() {
start := m.nowFn()
if err := m.databaseFileSystemManager.Run(); err == errRunInProgress {
// NB(xichen): if we attempt to run while another run
// is in progress, throttle a little to avoid constantly
// checking whether the ongoing run is finished.
m.sleepFn(runCheckInterval)
} else if err != nil {
// On critical error, we retry immediately.
log := m.opts.InstrumentOptions().Logger()
log.Errorf("error within ongoingTick: %v", err)
} else {
// Otherwise, we make sure the subsequent run is at least
// minRunInterval apart from the last one.
took := m.nowFn().Sub(start)
if took > minRunInterval {
return
}
m.sleepFn(minRunInterval - took)
}
}
Loading

0 comments on commit 03afc39

Please sign in to comment.