Skip to content

Commit

Permalink
Support configuring maximum number of documents per segment
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Dec 9, 2018
1 parent 5381ac8 commit d806688
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 39 deletions.
2 changes: 2 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ database:
fieldPathSeparator: .
namespaceFieldName: service
timestampFieldName: "@timestamp"
minRunInterval: 1m
maxNumCachedSegmentsPerShard: 1
maxNumDocsPerSegment: 1048576
persist:
filePathPrefix: /tmp
writeBufferSize: 65536
Expand Down
8 changes: 8 additions & 0 deletions services/eventdb/config/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ type DatabaseConfiguration struct {
FieldPathSeparator *separator `yaml:"fieldPathSeparator"`
NamespaceFieldName *string `yaml:"namespaceFieldName"`
TimestampFieldName *string `yaml:"timestampFieldName"`
MinRunInterval *time.Duration `yaml:"minRunInterval"`
MaxNumCachedSegmentsPerShard *int `yaml:"maxNumCachedSegmentsPerShard"`
MaxNumDocsPerSegment *int32 `yaml:"maxNumDocsPerSegment"`
PersistManager *persistManagerConfiguration `yaml:"persist"`
BoolArrayPool *pool.BucketizedBoolArrayPoolConfiguration `yaml:"boolArrayPool"`
IntArrayPool *pool.BucketizedIntArrayPoolConfiguration `yaml:"intArrayPool"`
Expand All @@ -48,9 +50,15 @@ func (c *DatabaseConfiguration) NewOptions(scope tally.Scope) (*storage.Options,
if c.TimestampFieldName != nil {
opts = opts.SetTimestampFieldName(*c.TimestampFieldName)
}
if c.MinRunInterval != nil {
opts = opts.SetMinRunInterval(*c.MinRunInterval)
}
if c.MaxNumCachedSegmentsPerShard != nil {
opts = opts.SetMaxNumCachedSegmentsPerShard(*c.MaxNumCachedSegmentsPerShard)
}
if c.MaxNumDocsPerSegment != nil {
opts = opts.SetMaxNumDocsPerSegment(*c.MaxNumDocsPerSegment)
}
persistManager := c.PersistManager.NewPersistManager(opts.FieldPathSeparator(), opts.TimestampFieldName())
opts = opts.SetPersistManager(persistManager)

Expand Down
27 changes: 14 additions & 13 deletions storage/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type mediatorState int

const (
runCheckInterval = 5 * time.Second
minRunInterval = time.Minute

mediatorNotOpen mediatorState = iota
mediatorOpen
Expand All @@ -38,13 +37,14 @@ type sleepFn func(time.Duration)

type mediator struct {
sync.RWMutex

database Database
databaseFileSystemManager

opts *Options
nowFn clock.NowFn
sleepFn sleepFn
database Database
opts *Options
minRunInterval time.Duration
nowFn clock.NowFn
sleepFn sleepFn

state mediatorState
closedCh chan struct{}
}
Expand All @@ -53,11 +53,12 @@ func newMediator(database database, opts *Options) databaseMediator {
return &mediator{
database: database,
databaseFileSystemManager: newFileSystemManager(database, opts),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
state: mediatorNotOpen,
closedCh: make(chan struct{}),
opts: opts,
minRunInterval: opts.MinRunInterval(),
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
state: mediatorNotOpen,
closedCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -113,9 +114,9 @@ func (m *mediator) runOnce() {
// Otherwise, we make sure the subsequent run is at least
// minRunInterval apart from the last one.
took := m.nowFn().Sub(start)
if took > minRunInterval {
if took > m.minRunInterval {
return
}
m.sleepFn(minRunInterval - took)
m.sleepFn(m.minRunInterval - took)
}
}
36 changes: 36 additions & 0 deletions storage/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package storage

import (
"time"

"github.com/m3db/m3x/clock"
"github.com/m3db/m3x/instrument"

Expand All @@ -13,7 +15,9 @@ const (
defaultFieldPathSeparator = '.'
defaultNamespaceFieldName = "service"
defaultTimestampFieldName = "@timestamp"
defaultMinRunInterval = time.Minute
defaultMaxNumCachedSegmentsPerShard = 1
defaultMaxNumDocsPerSegment = 1024 * 1024
)

var (
Expand All @@ -28,7 +32,9 @@ type Options struct {
namespaceFieldName string
timeStampFieldName string
persistManager persist.Manager
minRunInterval time.Duration
maxNumCachedSegmentsPerShard int
maxNumDocsPerSegment int32
boolArrayPool *pool.BucketizedBoolArrayPool
intArrayPool *pool.BucketizedIntArrayPool
int64ArrayPool *pool.BucketizedInt64ArrayPool
Expand All @@ -45,7 +51,9 @@ func NewOptions() *Options {
namespaceFieldName: defaultNamespaceFieldName,
timeStampFieldName: defaultTimestampFieldName,
persistManager: defaultPersistManager,
minRunInterval: defaultMinRunInterval,
maxNumCachedSegmentsPerShard: defaultMaxNumCachedSegmentsPerShard,
maxNumDocsPerSegment: defaultMaxNumDocsPerSegment,
}
o.initPools()
return o
Expand Down Expand Up @@ -125,6 +133,22 @@ func (o *Options) TimestampFieldName() string {
return o.timeStampFieldName
}

// SetMinRunInterval sets the minimum interval between consecutive mediator
// runs for performing periodic administrative tasks (e.g., flushing)
// to smoothen the load.
func (o *Options) SetMinRunInterval(v time.Duration) *Options {
opts := *o
opts.minRunInterval = v
return &opts
}

// MinRunInterval sets the minimum interval between consecutive mediator
// runs for performing periodic administrative tasks (e.g., flushing)
// to smoothen the load.
func (o *Options) MinRunInterval() time.Duration {
return o.minRunInterval
}

// SetMaxNumCachedSegmentsPerShard sets the maximum number of segments cached in
// memory per shard in a namespace.
func (o *Options) SetMaxNumCachedSegmentsPerShard(v int) *Options {
Expand All @@ -139,6 +163,18 @@ func (o *Options) MaxNumCachedSegmentsPerShard() int {
return o.maxNumCachedSegmentsPerShard
}

// SetMaxNumDocsPerSegment sets the maximum number of documents per segment.
func (o *Options) SetMaxNumDocsPerSegment(v int32) *Options {
opts := *o
opts.maxNumDocsPerSegment = v
return &opts
}

// MaxNumDocsPerSegment returns the maximum number of documents per segment.
func (o *Options) MaxNumDocsPerSegment() int32 {
return o.maxNumDocsPerSegment
}

// SetBoolArrayPool sets the bool array pool.
func (o *Options) SetBoolArrayPool(v *pool.BucketizedBoolArrayPool) *Options {
opts := *o
Expand Down
45 changes: 32 additions & 13 deletions storage/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ const (
)

var (
// errSegmentIsFull is raised when writing to a segment whose document
// count has reached the maximum threshold.
errSegmentIsFull = errors.New("segment is full")

// errSegmentAlreadySealed is raised when trying to mutabe a sealed segment.
errSegmentAlreadySealed = errors.New("segment is already sealed")

Expand Down Expand Up @@ -50,6 +54,10 @@ type immutableDatabaseSegment interface {
type mutableDatabaseSegment interface {
immutableDatabaseSegment

// IsFull returns true if the number of documents in the segment has reached
// the maximum threshold.
IsFull() bool

// Write writes an event to the mutable segment.
Write(ev event.Event) error

Expand All @@ -60,15 +68,14 @@ type mutableDatabaseSegment interface {
Close() error
}

// TODO(xichen): Pool timestamp array and raw docs byte array.
// TODO(xichen): Treat tiemstamp and rawDocs as normal fields.
type dbSegment struct {
sync.RWMutex

id string
opts *Options
int64ArrayPool *pool.BucketizedInt64ArrayPool
stringArrayPool *pool.BucketizedStringArrayPool
id string
opts *Options
maxNumDocsPerSegment int32
int64ArrayPool *pool.BucketizedInt64ArrayPool
stringArrayPool *pool.BucketizedStringArrayPool

// NB: We refer to an event containing a collection of fields a document
// in conventional information retrieval terminology.
Expand All @@ -88,13 +95,14 @@ func newDatabaseSegment(
int64ArrayPool := opts.Int64ArrayPool()
stringArrayPool := opts.StringArrayPool()
return &dbSegment{
id: uuid.New(),
opts: opts,
int64ArrayPool: int64ArrayPool,
stringArrayPool: stringArrayPool,
timeNanos: int64ArrayPool.Get(defaultInitialNumDocs),
rawDocs: stringArrayPool.Get(defaultInitialNumDocs),
fields: make(map[hash.Hash]*fieldDocValues, defaultInitialNumFields),
id: uuid.New(),
opts: opts,
maxNumDocsPerSegment: opts.MaxNumDocsPerSegment(),
int64ArrayPool: int64ArrayPool,
stringArrayPool: stringArrayPool,
timeNanos: int64ArrayPool.Get(defaultInitialNumDocs),
rawDocs: stringArrayPool.Get(defaultInitialNumDocs),
fields: make(map[hash.Hash]*fieldDocValues, defaultInitialNumFields),
}
}

Expand All @@ -106,12 +114,23 @@ func (s *dbSegment) MaxTimeNanos() int64 { return s.maxTimeNanos }

func (s *dbSegment) NumDocuments() int32 { return s.numDocs }

func (s *dbSegment) IsFull() bool {
s.RLock()
numDocs := s.numDocs
s.RUnlock()
return numDocs == s.maxNumDocsPerSegment
}

func (s *dbSegment) Write(ev event.Event) error {
s.Lock()
if s.sealed {
s.Unlock()
return errSegmentAlreadySealed
}
if s.numDocs == s.maxNumDocsPerSegment {
s.Unlock()
return errSegmentIsFull
}
if s.minTimeNanos > ev.TimeNanos {
s.minTimeNanos = ev.TimeNanos
}
Expand Down
46 changes: 33 additions & 13 deletions storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ var (
type dbShard struct {
sync.RWMutex

namespace []byte
shard uint32
opts *Options
namespace []byte
shard uint32
opts *Options
maxNumCachedSegmentsPerShard int

closed bool
active mutableDatabaseSegment
Expand All @@ -48,7 +49,8 @@ func newDatabaseShard(
namespace: namespace,
shard: shard,
opts: opts,
active: newDatabaseSegment(opts),
maxNumCachedSegmentsPerShard: opts.MaxNumCachedSegmentsPerShard(),
active: newDatabaseSegment(opts),
}
}

Expand All @@ -59,25 +61,29 @@ func (s *dbShard) Write(ev event.Event) error {
segment := s.active
s.RUnlock()

return segment.Write(ev)
err := segment.Write(ev)
if err == errSegmentIsFull {
// Active segment is full, need to seal and rotate the segment.
s.sealAndRotate()

// Retry writing the event.
return s.Write(ev)
}
return err
}

// TODO(xichen): retry logic on segment persistence failure.
func (s *dbShard) Flush(ps persist.Persister) error {
s.Lock()
activeSegment := s.active
s.active = newDatabaseSegment(s.opts)
activeSegment.Seal()
s.sealed = append(s.sealed, activeSegment)
numToFlush := s.opts.MaxNumCachedSegmentsPerShard() - len(s.sealed)
s.RLock()
numToFlush := len(s.sealed) - s.maxNumCachedSegmentsPerShard
if numToFlush <= 0 {
// Nothing to do.
s.Unlock()
s.RUnlock()
return nil
}
segmentsToFlush := make([]immutableDatabaseSegment, numToFlush)
copy(segmentsToFlush, s.sealed[:numToFlush])
s.Unlock()
s.RUnlock()

var (
multiErr xerrors.MultiError
Expand Down Expand Up @@ -108,6 +114,20 @@ func (s *dbShard) Close() error {
return nil
}

func (s *dbShard) sealAndRotate() {
s.Lock()
if !s.active.IsFull() {
// Someone else has got ahead of us and rotated the active segment.
s.Unlock()
return
}
activeSegment := s.active
s.active = newDatabaseSegment(s.opts)
activeSegment.Seal()
s.sealed = append(s.sealed, activeSegment)
s.Unlock()
}

func (s *dbShard) flushOne(
ps persist.Persister,
sm immutableDatabaseSegment,
Expand Down

0 comments on commit d806688

Please sign in to comment.