Skip to content

Commit

Permalink
Eagerly flush segments on batched writes
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Mar 16, 2019
1 parent 14a8b0d commit d5bd83c
Show file tree
Hide file tree
Showing 29 changed files with 679 additions and 403 deletions.
34 changes: 34 additions & 0 deletions filter/filter_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package filter
import (
"encoding/json"
"fmt"
"math"

"github.com/xichen2020/eventdb/document/field"
"github.com/xichen2020/eventdb/generated/proto/servicepb"
"github.com/xichen2020/eventdb/index"
"github.com/xichen2020/eventdb/x/bytes"
"github.com/xichen2020/eventdb/x/convert"
)

// Op represents a filter operator.
Expand Down Expand Up @@ -424,6 +426,38 @@ func (f Op) DoubleMaybeInRange(min, max, filterVal float64) bool {
return true
}

// DoubleMaybeInIntRange returns true if filterVal may produce a match against the int values
// in the int range [min, max].
// If this returns false, it means there can be no value match within the int value range
// [min, max] against the given double filter value.
// If this returns true, it returns the equivalent integer value for more efficient filtering,
// but doesn't guarantee a match against the integer values in range [min, max].
func (f Op) DoubleMaybeInIntRange(min, max int, filterVal float64) (int, bool, error) {
switch f {
case Equals:
// The filter value must be an integer in order to match against int values.
intVal, ok := convert.TryAsInt(filterVal)
if !ok {
return 0, false, nil
}
return intVal, f.IntMaybeInRange(min, max, intVal), nil
case LargerThan:
intVal := int(filterVal)
return intVal, f.IntMaybeInRange(min, max, intVal), nil
case LargerThanOrEqual:
intVal := int(math.Ceil(filterVal))
return intVal, f.IntMaybeInRange(min, max, intVal), nil
case SmallerThan:
intVal := int(math.Ceil(filterVal))
return intVal, f.IntMaybeInRange(min, max, intVal), nil
case SmallerThanOrEqual:
intVal := int(filterVal)
return intVal, f.IntMaybeInRange(min, max, intVal), nil
default:
return 0, false, fmt.Errorf("unexpected filter %v checking if double value %f is in range [%d, %d]", f, filterVal, min, max)
}
}

// TimeMaybeInRange returns true if filterVal is within the range defined by min and max.
// If this returns false, it means filterVal is definitely not within the value range [min, max].
// If this returns true, it doesn't necessarily mean the filterVal exists in the values that this
Expand Down
67 changes: 36 additions & 31 deletions integration/raw_query_with_filter_orderby_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,26 @@ func TestRawQueryWithFilterOrderByGRPC(t *testing.T) {
}()

rawDocStrs := []string{
`{"service":"testNamespace","@timestamp":"2019-01-22T13:25:42-08:00","st":true,"sid":{"foo":1,"bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:26:42-08:00","st":true,"sid":{"foo":1,"bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:27:42-08:00","st":true,"sid":{"foo":1,"bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:28:42-08:00","st":true,"sid":{"foo":1,"bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:29:42-08:00","st":true,"sid":{"foo":1,"bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:30:42-08:00","st":true,"sid":{"foo":2,"bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:31:42-08:00","st":true,"sid":{"foo":2,"bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:32:42-08:00","st":true,"sid":{"foo":2,"bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:33:42-08:00","st":true,"sid":{"foo":2,"bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:34:42-08:00","st":true,"sid":{"foo":2,"bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:35:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:36:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:37:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:38:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:39:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:40:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:41:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:42:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:43:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:44:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:25:42-08:00","st":true,"sid":{"foo":"a","bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:26:42-08:00","st":true,"sid":{"foo":"a","bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:27:42-08:00","st":true,"sid":{"foo":"a","bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:28:42-08:00","st":true,"sid":{"foo":"a","bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:29:42-08:00","st":true,"sid":{"foo":"a","bar":2},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:30:42-08:00","st":true,"sid":{"foo":"b","bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:31:42-08:00","st":true,"sid":{"foo":"b","bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:32:42-08:00","st":true,"sid":{"foo":"b","bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:33:42-08:00","st":true,"sid":{"foo":"b","bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:34:42-08:00","st":true,"sid":{"foo":"b","bar":4},"tt":"active","tz":-6,"v":1.5}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:35:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:36:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:37:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:38:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:39:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:40:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:41:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:42:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:43:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`,
`{"service":"testNamespace","@timestamp":"2019-01-22T13:44:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`,
}
var (
timestampFieldPath = []string{"@timestamp"}
Expand Down Expand Up @@ -90,7 +90,12 @@ func TestRawQueryWithFilterOrderByGRPC(t *testing.T) {
{
Field: "sid.foo",
Op: filter.Equals,
Value: 3,
Value: "c",
},
{
Field: "st",
Op: filter.Equals,
Value: false,
},
},
FilterCombinator: pFilterCombinator(filter.And),
Expand All @@ -104,11 +109,11 @@ func TestRawQueryWithFilterOrderByGRPC(t *testing.T) {
},
},
expectedSortedResults: [][]byte{
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:35:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:36:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:37:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:38:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:39:42-08:00","st":false,"sid":{"foo":3,"bar":6},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:35:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:36:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:37:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:38:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:39:42-08:00","st":false,"sid":{"foo":"c","bar":6},"tt":"inactive","tz":-6,"v":15}`),
},
},
{
Expand Down Expand Up @@ -141,11 +146,11 @@ func TestRawQueryWithFilterOrderByGRPC(t *testing.T) {
},
},
expectedSortedResults: [][]byte{
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:40:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:41:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:42:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:43:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:44:42-08:00","st":false,"sid":{"foo":4,"bar":8},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:40:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:41:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:42:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:43:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`),
b(`{"service":"testNamespace","@timestamp":"2019-01-22T13:44:42-08:00","st":false,"sid":{"foo":"d","bar":8},"tt":"inactive","tz":-6,"v":15}`),
},
},
{
Expand Down
12 changes: 6 additions & 6 deletions persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"path"

"github.com/xichen2020/eventdb/document/field"
"github.com/xichen2020/eventdb/index/segment"
)

Expand Down Expand Up @@ -60,19 +61,18 @@ func fileExists(filePath string) (bool, error) {
func fieldDataFilePath(
segmentDirPath string,
fieldPath []string,
fieldType field.ValueType,
fieldSeparator string,
buf *bytes.Buffer,
) string {
buf.Reset()
for i, p := range fieldPath {
for _, p := range fieldPath {
buf.WriteString(p)
if i < len(fieldPath)-1 {
buf.WriteString(fieldSeparator)
}
buf.WriteString(fieldSeparator)
}
buf.WriteString(fieldType.String())
buf.WriteString(fieldDataFileSuffix)
b := buf.Bytes()
return segmentFilePath(segmentDirPath, string(b))
return segmentFilePath(segmentDirPath, buf.String())
}

// openWritable opens a file for writing and truncating as necessary.
Expand Down
20 changes: 12 additions & 8 deletions persist/fs/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (r *reader) readInfoFile(segmentDir string) error {
}

func (r *reader) readNullField(fieldPath []string) (indexfield.CloseableNullField, error) {
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath)
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath, field.NullType)
if err != nil {
return nil, err
}
Expand All @@ -251,7 +251,7 @@ func (r *reader) readNullField(fieldPath []string) (indexfield.CloseableNullFiel
}

func (r *reader) readBoolField(fieldPath []string) (indexfield.CloseableBoolField, error) {
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath)
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath, field.BoolType)
if err != nil {
return nil, err
}
Expand All @@ -271,7 +271,7 @@ func (r *reader) readBoolField(fieldPath []string) (indexfield.CloseableBoolFiel
}

func (r *reader) readIntField(fieldPath []string) (indexfield.CloseableIntField, error) {
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath)
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath, field.IntType)
if err != nil {
return nil, err
}
Expand All @@ -291,7 +291,7 @@ func (r *reader) readIntField(fieldPath []string) (indexfield.CloseableIntField,
}

func (r *reader) readDoubleField(fieldPath []string) (indexfield.CloseableDoubleField, error) {
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath)
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath, field.DoubleType)
if err != nil {
return nil, err
}
Expand All @@ -311,7 +311,7 @@ func (r *reader) readDoubleField(fieldPath []string) (indexfield.CloseableDouble
}

func (r *reader) readBytesField(fieldPath []string) (indexfield.CloseableBytesField, error) {
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath)
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath, field.BytesType)
if err != nil {
return nil, err
}
Expand All @@ -331,7 +331,7 @@ func (r *reader) readBytesField(fieldPath []string) (indexfield.CloseableBytesFi
}

func (r *reader) readTimeField(fieldPath []string) (indexfield.CloseableTimeField, error) {
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath)
rawData, cleanup, err := r.readAndValidateFieldData(fieldPath, field.TimeType)
if err != nil {
return nil, err
}
Expand All @@ -350,8 +350,11 @@ func (r *reader) readTimeField(fieldPath []string) (indexfield.CloseableTimeFiel
return indexfield.NewCloseableTimeFieldWithCloseFn(docIDSet, values, cleanup), nil
}

func (r *reader) readAndValidateFieldData(fieldPath []string) ([]byte, func(), error) {
filePath := fieldDataFilePath(r.segmentDir, fieldPath, r.fieldPathSeparator, &r.bytesBuf)
func (r *reader) readAndValidateFieldData(
fieldPath []string,
fieldType field.ValueType,
) ([]byte, func(), error) {
filePath := fieldDataFilePath(r.segmentDir, fieldPath, fieldType, r.fieldPathSeparator, &r.bytesBuf)
fd, err := os.Open(filePath)
if err != nil {
return nil, nil, err
Expand All @@ -372,6 +375,7 @@ func (r *reader) readAndValidateFieldData(fieldPath []string) ([]byte, func(), e
cleanup()
return nil, nil, errMagicHeaderMismatch
}

return data[len(magicHeader):], cleanup, nil
}

Expand Down
13 changes: 7 additions & 6 deletions persist/fs/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ func (w *writer) writeField(df indexfield.DocsField) error {
}

// Write string values.
if stringField, exists := df.BytesField(); exists {
docIDSet := stringField.DocIDSet()
if bytesField, exists := df.BytesField(); exists {
docIDSet := bytesField.DocIDSet()
w.values.valueType = field.BytesType
w.values.stringValues = stringField.Values()
w.values.bytesValues = bytesField.Values()
if err := w.writeFieldDataFile(w.segmentDir, path, docIDSet, w.values); err != nil {
return err
}
Expand Down Expand Up @@ -224,7 +224,8 @@ func (w *writer) writeFieldDataFile(
if w.err != nil {
return w.err
}
path := fieldDataFilePath(segmentDir, fieldPath, w.fieldPathSeparator, &w.bytesBuf)
path := fieldDataFilePath(segmentDir, fieldPath, values.valueType, w.fieldPathSeparator, &w.bytesBuf)

f, err := w.openWritable(path)
if err != nil {
return err
Expand Down Expand Up @@ -268,7 +269,7 @@ func (w *writer) writeValues(
case field.DoubleType:
return w.dw.Encode(values.doubleValues, writer)
case field.BytesType:
return w.sw.Encode(values.stringValues, writer)
return w.sw.Encode(values.bytesValues, writer)
case field.TimeType:
return w.tw.Encode(values.timeValues, writer, encoding.EncodeTimeOptions{Resolution: w.timestampPrecision})
default:
Expand Down Expand Up @@ -298,6 +299,6 @@ type valuesUnion struct {
boolValues values.BoolValues
intValues values.IntValues
doubleValues values.DoubleValues
stringValues values.BytesValues
bytesValues values.BytesValues
timeValues values.TimeValues
}
29 changes: 24 additions & 5 deletions storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type db struct {

state databaseState
mediator databaseMediator
flushMgr databaseFlushManager
namespaces map[hash.Hash]databaseNamespace

nowFn clock.NowFn
Expand All @@ -125,22 +126,30 @@ func NewDatabase(
if opts == nil {
opts = NewOptions()
}

// Create Flush manager.
instrumentOpts := opts.InstrumentOptions()
scope := instrumentOpts.MetricsScope()
flushManagerScope := scope.SubScope("flush-mgr")
flushManager := newFlushManager(opts.SetInstrumentOptions(instrumentOpts.SetMetricsScope(flushManagerScope)))
opts = opts.setDatabaseFlushManager(flushManager)

nss := make(map[hash.Hash]databaseNamespace, len(namespaces))
for _, ns := range namespaces {
h := hash.BytesHash(ns.ID())
nss[h] = newDatabaseNamespace(ns, opts)
}

instrumentOpts := opts.InstrumentOptions()
scope := instrumentOpts.MetricsScope()
samplingRate := instrumentOpts.MetricsSamplingRate()
d := &db{
opts: opts,
flushMgr: flushManager,
namespaces: nss,
nowFn: opts.ClockOptions().NowFn(),
metrics: newDatabaseMetrics(scope, samplingRate),
}
d.mediator = newMediator(d, opts)

return d
}

Expand All @@ -153,8 +162,14 @@ func (d *db) Open() error {
if d.state != databaseNotOpen {
return errDatabaseOpenOrClosed
}
if err := d.flushMgr.Open(); err != nil {
return err
}
if err := d.mediator.Open(); err != nil {
return err
}
d.state = databaseOpen
return d.mediator.Open()
return nil
}

func (d *db) WriteBatch(
Expand Down Expand Up @@ -226,17 +241,21 @@ func (d *db) Close() error {
d.state = databaseClosed

// Close database-level resources.
var multiErr xerrors.MultiError
if err := d.mediator.Close(); err != nil {
return err
multiErr = multiErr.Add(err)
}
if err := d.flushMgr.Close(); err != nil {
multiErr = multiErr.Add(err)
}

// Close namespaces.
var multiErr xerrors.MultiError
for _, ns := range d.ownedNamespacesWithLock() {
if err := ns.Close(); err != nil {
multiErr = multiErr.Add(err)
}
}

return multiErr.FinalError()
}

Expand Down
Loading

0 comments on commit d5bd83c

Please sign in to comment.