Skip to content

Commit

Permalink
Add a round trip test for persist package. (#142)
Browse files Browse the repository at this point in the history
* Add a roundtrip test for persist package.

* Fix excess checksum bytes bug.
  • Loading branch information
notbdu authored Feb 28, 2019
1 parent 8e55716 commit f427c43
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 13 deletions.
8 changes: 4 additions & 4 deletions digest/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ func Checksum(buf []byte) uint32 {

// Validate validates the data in the buffer against its checksum.
// The checksum is at the end of the buffer occupying `numChecksumBytes` bytes.
func Validate(b []byte) error {
func Validate(b []byte) ([]byte, error) {
if len(b) < numChecksumBytes {
return errChecksumMismatch
return nil, errChecksumMismatch
}
checksumStart := len(b) - numChecksumBytes
expectedChecksum := ToBuffer(b[checksumStart:]).ReadDigest()
if Checksum(b[:checksumStart]) != expectedChecksum {
return errChecksumMismatch
return nil, errChecksumMismatch
}
return nil
return b[:checksumStart], nil
}
100 changes: 100 additions & 0 deletions persist/fs/equals_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package fs

import (
"testing"

"github.com/stretchr/testify/require"
indexfield "github.com/xichen2020/eventdb/index/field"
)

func stringFieldEquals(t *testing.T, f1, f2 indexfield.StringField) bool {
// Asserts that two docs fields have equal values.
iter1, err := f1.Values().Iter()
require.NoError(t, err)

iter2, err := f2.Values().Iter()
require.NoError(t, err)

for iter1.Next() && iter2.Next() {
if iter1.Current() != iter2.Current() {
return false
}
}
require.NoError(t, iter1.Err())
require.NoError(t, iter2.Err())

// If either iterator has extra data, they are not equal.
if iter1.Next() || iter2.Next() {
return false
}
return true
}

func intFieldEquals(t *testing.T, f1, f2 indexfield.IntField) bool {
// Asserts that two docs fields have equal values.
iter1, err := f1.Values().Iter()
require.NoError(t, err)

iter2, err := f2.Values().Iter()
require.NoError(t, err)

for iter1.Next() && iter2.Next() {
if iter1.Current() != iter2.Current() {
return false
}
}
require.NoError(t, iter1.Err())
require.NoError(t, iter2.Err())

// If either iterator has extra data, they are not equal.
if iter1.Next() || iter2.Next() {
return false
}
return true
}

func boolFieldEquals(t *testing.T, f1, f2 indexfield.BoolField) bool {
// Asserts that two docs fields have equal values.
iter1, err := f1.Values().Iter()
require.NoError(t, err)

iter2, err := f2.Values().Iter()
require.NoError(t, err)

for iter1.Next() && iter2.Next() {
if iter1.Current() != iter2.Current() {
return false
}
}
require.NoError(t, iter1.Err())
require.NoError(t, iter2.Err())

// If either iterator has extra data, they are not equal.
if iter1.Next() || iter2.Next() {
return false
}
return true
}

func timeFieldEquals(t *testing.T, f1, f2 indexfield.TimeField) bool {
// Asserts that two docs fields have equal values.
iter1, err := f1.Values().Iter()
require.NoError(t, err)

iter2, err := f2.Values().Iter()
require.NoError(t, err)

for iter1.Next() && iter2.Next() {
if iter1.Current() != iter2.Current() {
return false
}
}
require.NoError(t, iter1.Err())
require.NoError(t, iter2.Err())

// If either iterator has extra data, they are not equal.
if iter1.Next() || iter2.Next() {
return false
}
return true
}
154 changes: 154 additions & 0 deletions persist/fs/persist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package fs

import (
"os"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/xichen2020/eventdb/document/field"
indexfield "github.com/xichen2020/eventdb/index/field"
"github.com/xichen2020/eventdb/persist"
)

const (
testShardID = uint32(0)
testSegmentID = "test-segment-id"
testFilePathPrefix = "./testdata"
testTimestampPrecision = time.Nanosecond
)

var (
testNamespace = []byte("test-namespace")

f0FieldPath = []string{"timestamp"}
f0ValueType = field.TimeType
f1FieldPath = []string{"service"}
f1ValueType = field.StringType
f2FieldPath = []string{"fields", "int_field"}
f2ValueType = field.IntType
f3FieldPath = []string{"fields", "bool_field"}
f3ValueType = field.BoolType

now = time.Now().UnixNano()
stepSize = int64(6000)
f0 = createDocsField(f0FieldPath, []field.ValueUnion{
{Type: f0ValueType, TimeNanosVal: now},
{Type: f0ValueType, TimeNanosVal: now + stepSize*1},
{Type: f0ValueType, TimeNanosVal: now + stepSize*2},
{Type: f0ValueType, TimeNanosVal: now + stepSize*3},
{Type: f0ValueType, TimeNanosVal: now + stepSize*4},
})
f1 = createDocsField(f1FieldPath, []field.ValueUnion{
{Type: f1ValueType, StringVal: "foo1"},
{Type: f1ValueType, StringVal: "foo2"},
{Type: f1ValueType, StringVal: "foo3"},
{Type: f1ValueType, StringVal: "foo4"},
})
f2 = createDocsField(f2FieldPath, []field.ValueUnion{
{Type: f2ValueType, IntVal: 1},
{Type: f2ValueType, IntVal: 2},
{Type: f2ValueType, IntVal: 3},
{Type: f2ValueType, IntVal: 4},
})
f3 = createDocsField(f3FieldPath, []field.ValueUnion{
{Type: f3ValueType, BoolVal: true},
{Type: f3ValueType, BoolVal: true},
{Type: f3ValueType, BoolVal: false},
{Type: f3ValueType, BoolVal: false},
})
)

func createDocsField(
fieldPath []string,
values []field.ValueUnion,
) indexfield.DocsField {
b := indexfield.NewDocsFieldBuilder(fieldPath, nil)
for docID, v := range values {
if err := b.Add(int32(docID), v); err != nil {
panic(err)
}
}
return b.Seal(int32(len(values)))
}

func writeFields(fields []indexfield.DocsField) error {
opts := NewOptions()
opts = opts.SetFilePathPrefix(testFilePathPrefix)
opts = opts.SetTimestampPrecision(testTimestampPrecision)
pm := NewPersistManager(opts)

ps, err := pm.StartPersist()
if err != nil {
return err
}
defer ps.Finish()

prepareOpts := persist.PrepareOptions{
Namespace: testNamespace,
Shard: testShardID,
SegmentMeta: persist.SegmentMetadata{
ID: testSegmentID,
},
}
prepared, err := ps.Prepare(prepareOpts)
if err != nil {
return err
}
defer prepared.Close()

return prepared.Persist.WriteFields(fields)
}

func retrieveFields(fields []persist.RetrieveFieldOptions) ([]indexfield.DocsField, error) {
opts := NewOptions()
opts = opts.SetFilePathPrefix(testFilePathPrefix)
opts = opts.SetTimestampPrecision(testTimestampPrecision)
fr := NewFieldRetriever(opts)
segmentMeta := persist.SegmentMetadata{
ID: testSegmentID,
}
return fr.RetrieveFields(testNamespace, testShardID, segmentMeta, fields)
}

func TestWriteAndRetrieveFields(t *testing.T) {
err := writeFields([]indexfield.DocsField{f0, f1, f2, f3})
require.NoError(t, err)
defer func() {
// Remove data directory entirely.
err = os.RemoveAll(testFilePathPrefix)
require.NoError(t, err)
}()

fields, err := retrieveFields([]persist.RetrieveFieldOptions{
{FieldPath: f0FieldPath, FieldTypes: field.ValueTypeSet{f0ValueType: struct{}{}}},
{FieldPath: f1FieldPath, FieldTypes: field.ValueTypeSet{f1ValueType: struct{}{}}},
{FieldPath: f2FieldPath, FieldTypes: field.ValueTypeSet{f2ValueType: struct{}{}}},
{FieldPath: f3FieldPath, FieldTypes: field.ValueTypeSet{f3ValueType: struct{}{}}},
})
require.NoError(t, err)

f0Actual, ok := fields[0].TimeField()
require.True(t, ok)
f0Expected, ok := f0.TimeField()
require.True(t, ok)
require.True(t, timeFieldEquals(t, f0Expected, f0Actual))

f1Actual, ok := fields[1].StringField()
require.True(t, ok)
f1Expected, ok := f1.StringField()
require.True(t, ok)
require.True(t, stringFieldEquals(t, f1Expected, f1Actual))

f2Actual, ok := fields[2].IntField()
require.True(t, ok)
f2Expected, ok := f2.IntField()
require.True(t, ok)
require.True(t, intFieldEquals(t, f2Expected, f2Actual))

f3Actual, ok := fields[3].BoolField()
require.True(t, ok)
f3Expected, ok := f3.BoolField()
require.True(t, ok)
require.True(t, boolFieldEquals(t, f3Expected, f3Actual))
}
22 changes: 13 additions & 9 deletions persist/fs/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,11 @@ func (r *reader) readInfoFile(segmentDir string) error {
}
defer fd.Close()

data, err := r.mmapReadAllAndValidateChecksum(fd)
data, munmap, err := r.mmapReadAllAndValidateChecksum(fd)
if err != nil {
return err
}
defer mmap.Munmap(data)
defer munmap()

size, bytesRead, err := io.ReadVarint(data)
if err != nil {
Expand Down Expand Up @@ -360,15 +360,15 @@ func (r *reader) readAndValidateFieldData(fieldPath []string) ([]byte, func(), e
if err != nil {
return nil, nil, err
}
data, err := r.mmapReadAllAndValidateChecksum(fd)
data, munmap, err := r.mmapReadAllAndValidateChecksum(fd)
if err != nil {
fd.Close()
return nil, nil, err
}

cleanup := func() {
fd.Close()
mmap.Munmap(data)
munmap()
}

// Validate magic header.
Expand All @@ -382,19 +382,23 @@ func (r *reader) readAndValidateFieldData(fieldPath []string) ([]byte, func(), e
// readAllAndValidate reads all the data from the given file via mmap and validates
// the contents against its checksum. If the validation passes, it returns the mmaped
// bytes. Otherwise, an error is returned.
func (r *reader) mmapReadAllAndValidateChecksum(fd *os.File) ([]byte, error) {
func (r *reader) mmapReadAllAndValidateChecksum(fd *os.File) ([]byte, func(), error) {
res, err := mmap.File(fd, mmap.Options{Read: true, HugeTLB: r.mmapHugeTLBOpts})
if err != nil {
return nil, err
return nil, nil, err
}
if res.Warning != nil {
r.logger.Warnf("warning during memory mapping info file %s: %s", fd.Name(), res.Warning)
}
if err := digest.Validate(res.Result); err != nil {
validatedBytes, err := digest.Validate(res.Result)
if err != nil {
mmap.Munmap(res.Result)
return nil, nil, err
}
munmap := func() {
mmap.Munmap(res.Result)
return nil, err
}
return res.Result, nil
return validatedBytes, munmap, nil
}

func (r *reader) readDocIDSet(data []byte) (index.DocIDSet, []byte, error) {
Expand Down

0 comments on commit f427c43

Please sign in to comment.