Skip to content

Commit

Permalink
Segment flushing and persistence part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Nov 28, 2018
1 parent 95660fd commit e47c75c
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 74 deletions.
22 changes: 17 additions & 5 deletions persist/manager.go → persist/persist.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package persist

import "github.com/pilosa/pilosa/roaring"

// Manager manages the internals of persisting data onto storage layer.
type Manager interface {
// StartPersist starts persisting data.
Expand All @@ -16,19 +18,29 @@ type Persister interface {
}

// PrepareOptions provide a set of options for data persistence.
// TODO(xichen): Flesh this out.
type PrepareOptions struct {
Namespace []byte
Shard uint32
MinTimeNanos int64
MaxTimeNanos int64
NumDocs int
}

// Fn is a function that persists an in-memory segment.
// TODO(xichen): Flesh this out.
type Fn func() error
// Fns contains a set of function that persists document IDs
// and different types of document values for a given field.
type Fns struct {
WriteNulls func(fieldPath []string, docIDs *roaring.Bitmap) error
WriteBools func(fieldPath []string, docIDs *roaring.Bitmap, vals []bool) error
WriteInts func(fieldPath []string, docIDs *roaring.Bitmap, vals []int) error
WriteDoubles func(fieldPath []string, docIDs *roaring.Bitmap, vals []float64) error
WriteStrings func(fieldPath []string, docIDs *roaring.Bitmap, vals []string) error
}

// Closer is a function that performs cleanup after persistence.
type Closer func() error

// PreparedPersister is an object that wraps a persist function and a closer.
type PreparedPersister struct {
Persist Fn
Persist Fns
Close Closer
}
30 changes: 30 additions & 0 deletions storage/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"github.com/xichen2020/eventdb/event/field"
"github.com/xichen2020/eventdb/persist"

"github.com/pilosa/pilosa/roaring"
)
Expand Down Expand Up @@ -80,6 +81,35 @@ func (w *fieldWriter) addString(docID int32, val string) {
w.sw.add(docID, val)
}

func (w *fieldWriter) flush(persistFns persist.Fns) error {
if w.nw != nil {
if err := persistFns.WriteNulls(w.path, w.nw.docIDs); err != nil {
return err
}
}
if w.bw != nil {
if err := persistFns.WriteBools(w.path, w.bw.docIDs, w.bw.vals); err != nil {
return err
}
}
if w.iw != nil {
if err := persistFns.WriteInts(w.path, w.iw.docIDs, w.iw.vals); err != nil {
return err
}
}
if w.dw != nil {
if err := persistFns.WriteDoubles(w.path, w.dw.docIDs, w.dw.vals); err != nil {
return err
}
}
if w.sw != nil {
if err := persistFns.WriteStrings(w.path, w.sw.docIDs, w.sw.vals); err != nil {
return err
}
}
return nil
}

type nullValueWriter struct {
docIDs *roaring.Bitmap
}
Expand Down
90 changes: 74 additions & 16 deletions storage/flush.go
Original file line number Diff line number Diff line change
@@ -1,49 +1,107 @@
package storage

import (
"errors"
"fmt"
"sync"
"time"

"github.com/xichen2020/eventdb/persist"

"github.com/m3db/m3x/clock"
xerrors "github.com/m3db/m3x/errors"
)

type databaseFlushManager interface {
// ComputeFlushTargets computes the list of namespaces eligible for flushing.
ComputeFlushTargets(namespaces []databaseNamespace) []databaseNamespace

// Flush attempts to flush the database if deemed necessary.
Flush(namespace databaseNamespace) error
// Flush attempts to flush the database.
Flush() error
}

var (
errFlushOperationsInProgress = errors.New("flush operations already in progress")
)

type flushManagerState int

const (
flushManagerIdle flushManagerState = iota
// flushManagerNotIdle is used to protect the flush manager from concurrent use
// when we haven't begun either a flush or snapshot.
flushManagerNotIdle
flushManagerFlushInProgress
)

type flushManager struct {
sync.Mutex

database database
pm persist.Manager
opts *Options
pm persist.Manager

nowFn clock.NowFn
sleepFn sleepFn
nowFn clock.NowFn
state flushManagerState
}

func newFlushManager(database database, opts *Options) *flushManager {
return &flushManager{
database: database,
// pm: fs.NewPersistManager()
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
opts: opts,
pm: opts.PersistManager(),
nowFn: opts.ClockOptions().NowFn(),
}
}

func (m *flushManager) Flush() error {
// Ensure only a single flush is happening at a time.
m.Lock()
if m.state != flushManagerIdle {
m.Unlock()
return errFlushOperationsInProgress
}
m.state = flushManagerNotIdle
m.Unlock()

defer m.setState(flushManagerIdle)

persister, err := m.pm.StartPersist()
if err != nil {
return err
}

namespaces, err := m.database.GetOwnedNamespaces()
if err != nil {
return err
}

// Determine which namespaces are in scope for flushing.
toFlush := m.computeFlushTargets(namespaces)
if len(toFlush) == 0 {
// Nothing to do.
return nil
}

m.setState(flushManagerFlushInProgress)

var multiErr xerrors.MultiError
for _, n := range namespaces {
// NB(xichen): we still want to proceed if a namespace fails to flush its data.
// Probably want to emit a counter here, but for now just log it.
if err := n.Flush(persister); err != nil {
detailedErr := fmt.Errorf("namespace %s failed to flush data: %v", n.ID(), err)
multiErr = multiErr.Add(detailedErr)
}
}

return multiErr.FinalError()
}

// NB: This is just a no-op implementation for now.
// In reality, this should be determined based on memory usage of each namespace.
func (mgr *flushManager) ComputeFlushTargets(namespaces []databaseNamespace) []databaseNamespace {
func (m *flushManager) computeFlushTargets(namespaces []databaseNamespace) []databaseNamespace {
return namespaces
}

func (mgr *flushManager) Flush(namespace databaseNamespace) error {
return namespace.Flush(mgr.pm)
func (m *flushManager) setState(state flushManagerState) {
m.Lock()
m.state = state
m.Unlock()
}
37 changes: 2 additions & 35 deletions storage/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ package storage
import (
"errors"
"sync"
"time"

"github.com/m3db/m3x/clock"
xerrors "github.com/m3db/m3x/errors"
"github.com/uber-go/tally"
)

Expand All @@ -16,8 +13,7 @@ type databaseFileSystemManager interface {
}

var (
errEmptyNamespaces = errors.New("empty namespaces")
errRunInProgress = errors.New("another run is in progress")
errRunInProgress = errors.New("another run is in progress")
)

type runStatus int
Expand Down Expand Up @@ -47,8 +43,6 @@ type fileSystemManager struct {

status runStatus
metrics fileSystemManagerMetrics
nowFn clock.NowFn
sleepFn sleepFn
}

func newFileSystemManager(database database, opts *Options) *fileSystemManager {
Expand All @@ -58,8 +52,6 @@ func newFileSystemManager(database database, opts *Options) *fileSystemManager {
database: database,
databaseFlushManager: newFlushManager(database, opts),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
metrics: newFileSystemManagerMetrics(scope),
}
}
Expand All @@ -72,30 +64,5 @@ func (mgr *fileSystemManager) Run() error {
return errRunInProgress
}

namespaces, err := mgr.database.GetOwnedNamespaces()
if err != nil {
return err
}
if len(namespaces) == 0 {
return errEmptyNamespaces
}

// Determine which namespaces are in scope for flushing.
toFlush := mgr.ComputeFlushTargets(namespaces)
if len(toFlush) == 0 {
// Nothing to do.
return nil
}

var (
start = mgr.nowFn()
multiErr xerrors.MultiError
)
for _, n := range namespaces {
multiErr = multiErr.Add(mgr.Flush(n))
}

took := mgr.nowFn().Sub(start)
mgr.metrics.runDuration.Record(took)
return multiErr.FinalError()
return mgr.Flush()
}
52 changes: 46 additions & 6 deletions storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,27 @@ import (
"fmt"
"sync"

"github.com/uber-go/tally"

"github.com/xichen2020/eventdb/event"
"github.com/xichen2020/eventdb/persist"
"github.com/xichen2020/eventdb/sharding"

"github.com/m3db/m3x/clock"
xerrors "github.com/m3db/m3x/errors"
"github.com/m3db/m3x/instrument"
)

// databaseNamespace is a database namespace.
type databaseNamespace interface {
// ID returns the ID of the namespace.
ID() []byte

// Write writes an event within the namespace.
Write(ev event.Event) error

// Flush performs a flush against the namespace.
Flush(pm persist.Manager) error
Flush(ps persist.Persister) error

// Close closes the namespace.
Close() error
Expand All @@ -28,15 +35,27 @@ var (
errNamespaceAlreadyClosed = errors.New("namespace already closed")
)

type databaseNamespaceMetrics struct {
flush instrument.MethodMetrics
}

func newDatabaseNamespaceMetrics(scope tally.Scope, samplingRate float64) databaseNamespaceMetrics {
return databaseNamespaceMetrics{
flush: instrument.NewMethodMetrics(scope, "flush", samplingRate),
}
}

type dbNamespace struct {
sync.RWMutex

id []byte
shardSet sharding.ShardSet
opts *Options

closed bool
shards []databaseShard
closed bool
shards []databaseShard
metrics databaseNamespaceMetrics
nowFn clock.NowFn
}

func newDatabaseNamespace(
Expand All @@ -47,15 +66,22 @@ func newDatabaseNamespace(
idClone := make([]byte, len(id))
copy(idClone, id)

instrumentOpts := opts.InstrumentOptions()
scope := instrumentOpts.MetricsScope()
samplingRate := instrumentOpts.MetricsSamplingRate()
n := &dbNamespace{
id: id,
shardSet: shardSet,
opts: opts,
metrics: newDatabaseNamespaceMetrics(scope, samplingRate),
nowFn: opts.ClockOptions().NowFn(),
}
n.initShards()
return n
}

func (n *dbNamespace) ID() []byte { return n.id }

func (n *dbNamespace) Write(ev event.Event) error {
shard, err := n.shardFor(ev.ID)
if err != nil {
Expand All @@ -64,8 +90,22 @@ func (n *dbNamespace) Write(ev event.Event) error {
return shard.Write(ev)
}

func (n *dbNamespace) Flush(pm persist.Manager) error {
return fmt.Errorf("not implemented")
func (n *dbNamespace) Flush(ps persist.Persister) error {
callStart := n.nowFn()
multiErr := xerrors.NewMultiError()
shards := n.getOwnedShards()
for _, shard := range shards {
// NB(xichen): we still want to proceed if a shard fails to flush its data.
// Probably want to emit a counter here, but for now just log it.
if err := shard.Flush(ps); err != nil {
detailedErr := fmt.Errorf("shard %d failed to flush data: %v", shard.ID(), err)
multiErr = multiErr.Add(detailedErr)
}
}

res := multiErr.FinalError()
n.metrics.flush.ReportSuccessOrError(res, n.nowFn().Sub(callStart))
return res
}

func (n *dbNamespace) Close() error {
Expand All @@ -90,7 +130,7 @@ func (n *dbNamespace) initShards() {
shards := n.shardSet.AllIDs()
dbShards := make([]databaseShard, n.shardSet.Max()+1)
for _, shard := range shards {
dbShards[shard] = newDatabaseShard(shard, n.opts)
dbShards[shard] = newDatabaseShard(n.ID(), shard, n.opts)
}
n.shards = dbShards
}
Expand Down
Loading

0 comments on commit e47c75c

Please sign in to comment.