Skip to content

Commit

Permalink
Fix flushing logic and add functional test case. (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored and xichen2020 committed Feb 11, 2019
1 parent e5ab829 commit e0d1341
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 56 deletions.
19 changes: 10 additions & 9 deletions persist/fs/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewPersistManager(opts *Options) persist.Manager {
Persist: persist.Fns{
WriteFields: pm.writeFields,
},
Close: pm.close,
Close: pm.writer.Finish,
}
return pm
}
Expand Down Expand Up @@ -82,13 +82,13 @@ func (pm *persistManager) Prepare(opts persist.PrepareOptions) (persist.Prepared
return prepared, errPersistManagerCannotPrepareDataNotPersisting
}

writerOpts := writerOpenOptions{
writerOpts := writerStartOptions{
Namespace: opts.Namespace,
Shard: opts.Shard,
NumDocuments: opts.NumDocuments,
SegmentMeta: opts.SegmentMeta,
}
if err := pm.writer.Open(writerOpts); err != nil {
if err := pm.writer.Start(writerOpts); err != nil {
return prepared, err
}

Expand All @@ -99,12 +99,8 @@ func (pm *persistManager) writeFields(fields []field.DocsField) error {
return pm.writer.WriteFields(fields...)
}

func (pm *persistManager) close() error {
return pm.writer.Close()
}

// Done is called to finish the data persistence process.
func (pm *persistManager) Done() error {
// Finish is called to finish the data persistence process.
func (pm *persistManager) Finish() error {
pm.Lock()
defer pm.Unlock()

Expand All @@ -117,3 +113,8 @@ func (pm *persistManager) Done() error {

return nil
}

// Close is a no-op for now.
func (pm *persistManager) Close() error {
return nil
}
2 changes: 1 addition & 1 deletion persist/fs/reader_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func BenchmarkReadField(b *testing.B) {
}
}
docsField := builder.Seal(int32(totDocs))
err := writer.Open(writerOpenOptions{
err := writer.Start(writerStartOptions{
Shard: 0,
NumDocuments: numDocs,
Namespace: namespace,
Expand Down
55 changes: 12 additions & 43 deletions persist/fs/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package fs
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"os"
"time"
Expand All @@ -21,24 +20,20 @@ import (
xbytes "github.com/xichen2020/eventdb/x/bytes"
)

var (
errSegmentWriterClosed = errors.New("segment writer is closed")
)

// segmentWriter is responsible for writing segments to filesystem.
type segmentWriter interface {
// Open opens the writer.
Open(opts writerOpenOptions) error
// Start starts persisting a segment.
Start(opts writerStartOptions) error

// Finish finishes persisting a segment and performs cleanups as necessary.
Finish() error

// WriteFields writes a set of document fields.
WriteFields(fields ...indexfield.DocsField) error

// Close closes the writer.
Close() error
}

// writerOpenOptions provide a set of options for opening a writer.
type writerOpenOptions struct {
// writerStartOptions provide a set of options for opening a writer.
type writerStartOptions struct {
Namespace []byte
Shard uint32
NumDocuments int32
Expand Down Expand Up @@ -66,8 +61,7 @@ type writer struct {
tw encoding.TimeEncoder
values valuesUnion

err error
closed bool
err error
}

// newSegmentWriter creates a new segment writer.
Expand All @@ -93,11 +87,7 @@ func newSegmentWriter(opts *Options) segmentWriter {
return w
}

func (w *writer) Open(opts writerOpenOptions) error {
if w.closed {
return errSegmentWriterClosed
}

func (w *writer) Start(opts writerStartOptions) error {
var (
namespace = opts.Namespace
shard = opts.Shard
Expand All @@ -122,10 +112,6 @@ func (w *writer) Open(opts writerOpenOptions) error {
}

func (w *writer) WriteFields(fields ...indexfield.DocsField) error {
if w.closed {
return errSegmentWriterClosed
}

for _, field := range fields {
if err := w.writeField(field); err != nil {
return err
Expand All @@ -134,29 +120,12 @@ func (w *writer) WriteFields(fields ...indexfield.DocsField) error {
return nil
}

func (w *writer) Close() error {
if w.closed {
return errSegmentWriterClosed
}

func (w *writer) Finish() error {
if w.err != nil {
return w.err
}
// NB(xichen): only write out the checkpoint file if there are no errors
// encountered between calling writer.Open() and writer.Close().
if err := w.writeCheckpointFile(w.segmentDir); err != nil {
w.err = err
return err
}

w.closed = true
w.info = nil
w.bw = nil
w.iw = nil
w.dw = nil
w.sw = nil
w.tw = nil
return nil
w.err = w.writeCheckpointFile(w.segmentDir)
return w.err
}

func (w *writer) writeInfoFile(
Expand Down
7 changes: 5 additions & 2 deletions persist/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ type Persister interface {
// Prepare prepares for data persistence.
Prepare(opts PrepareOptions) (PreparedPersister, error)

// Done marks the persistence as complete.
Done() error
// Finish marks the persistence as complete.
Finish() error

// Close all resources owned by the persist manager.
Close() error
}

// SegmentMetadata contains the metadata for a segment.
Expand Down
3 changes: 3 additions & 0 deletions storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ func (d *db) Close() error {
d.state = databaseClosed

// Close database-level resources.
if err := d.mediator.Close(); err != nil {
return err
}

// Close namespaces.
var multiErr xerrors.MultiError
Expand Down
107 changes: 107 additions & 0 deletions storage/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package storage

import (
"bytes"
"fmt"
"io/ioutil"
"os"
"time"

"github.com/xichen2020/eventdb/document"
"github.com/xichen2020/eventdb/parser/json"
"github.com/xichen2020/eventdb/parser/json/value"
"github.com/xichen2020/eventdb/sharding"

"github.com/m3db/m3cluster/shard"
"github.com/pborman/uuid"
)

const (
testJSONDataFilePath = "./testdata/testdata.json"
testDataPath = "./testdata/eventdb"
testMaxDocsPerSegment = 1 // Keep this # low for flushing purposes.
testDataSeparator = '\n'
)

var (
testNamespace = []byte("testNamespace")
testNumShards = 8
)

type testDatabase struct {
Database
}

func newTestDatabase() (*testDatabase, error) {
namespaces := []NamespaceMetadata{
NamespaceMetadata{
id: testNamespace,
opts: NewNamespaceOptions(),
},
}

shardIDs := make([]uint32, 0, testNumShards)
for i := 0; i < testNumShards; i++ {
shardIDs = append(shardIDs, uint32(i))
}
shards := sharding.NewShards(shardIDs, shard.Available)
hashFn := sharding.DefaultHashFn(testNumShards)
shardSet, err := sharding.NewShardSet(shards, hashFn)
if err != nil {
return nil, err
}

dbOpts := NewOptions().
SetNamespaceFieldName(string(testNamespace)).
SetMaxNumDocsPerSegment(testMaxDocsPerSegment)

db := NewDatabase(namespaces, shardSet, dbOpts)
if err := db.Open(); err != nil {
return nil, err
}

return &testDatabase{db}, nil
}

// Close removes all filesystem resources owned by the test database.
func (tdb *testDatabase) Close() error {
tdb.Database.Close()
return os.RemoveAll(testDataPath)
}

func createTestDocuments() ([]byte, []document.Document, error) {
p := json.NewParser(json.NewOptions())

data, err := ioutil.ReadFile(testJSONDataFilePath)
if err != nil {
return nil, nil, err
}

var (
end = bytes.IndexByte(data, testDataSeparator)
docs = make([]document.Document, 0)
)
if end < 0 {
end = len(data)
}
for end > 0 {
v, err := p.ParseBytes(data[:end])
if err != nil {
err = fmt.Errorf("cannot parse document %s: %v", data, err)
return nil, nil, err
}

fieldIter := value.NewFieldIterator(v)
doc := document.Document{
ID: uuid.NewUUID(),
TimeNanos: time.Now().UnixNano(), // This doesn't need to line up to what's in the raw data.
FieldIter: fieldIter,
RawData: data,
}
docs = append(docs, doc)

data = data[end+1:]
end = bytes.IndexByte(data, testDataSeparator)
}
return []byte(testNamespace), docs, nil
}
2 changes: 1 addition & 1 deletion storage/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *flushManager) Flush() error {
if err != nil {
return err
}
defer persister.Done()
defer persister.Finish()

namespaces, err := m.database.GetOwnedNamespaces()
if err != nil {
Expand Down
26 changes: 26 additions & 0 deletions storage/flush_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package storage

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestFlush(t *testing.T) {
tdb, err := newTestDatabase()
require.Nil(t, err)
defer tdb.Close()

ns, docs, err := createTestDocuments()
require.Nil(t, err)

err = tdb.WriteBatch(ns, docs)
require.Nil(t, err)

// Get a reference to the underlying fs manager and force a flush.
db := tdb.Database.(*db)
mediator := db.mediator.(*mediator)
fsMgr := mediator.databaseFileSystemManager.(*fileSystemManager)
err = fsMgr.databaseFlushManager.Flush()
require.Nil(t, err)
}
Loading

0 comments on commit e0d1341

Please sign in to comment.