Skip to content

Commit

Permalink
Persist timestamps to segment files
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Dec 3, 2018
1 parent ba4a0a4 commit 4bf4603
Show file tree
Hide file tree
Showing 12 changed files with 250 additions and 33 deletions.
10 changes: 10 additions & 0 deletions encoding/time_decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package encoding

// TimeDecoder decodes time values.
type TimeDecoder interface {
// Decode decodes times from reader.
Decode(reader Reader) (ForwardTimeIterator, error)

// Reset resets the decoder.
Reset()
}
30 changes: 30 additions & 0 deletions encoding/time_encode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package encoding

import (
"errors"
"io"
)

// TimeEncoder encodes times values.
type TimeEncoder interface {
// Encode encodes a collection of time values and writes the encoded bytes to the writer.
// Callers should explicitly call `Reset` before subsequent call to `Encode`.
Encode(writer io.Writer, values ForwardTimeIterator) error

// Reset resets the encoder.
Reset()
}

// TimeEnc is a int encoder.
type TimeEnc struct{}

// NewTimeEncoder creates a new time encoder.
func NewTimeEncoder(writer io.Writer) *TimeEnc { return &TimeEnc{} }

// Encode encodes a collection of time values and writes the encoded bytes to the writer.
func (enc *TimeEnc) Encode(writer io.Writer, values ForwardTimeIterator) error {
return errors.New("not implemented")
}

// Reset resets the encoder.
func (enc *TimeEnc) Reset() { panic("not implemented") }
63 changes: 63 additions & 0 deletions encoding/time_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package encoding

// ForwardTimeIterator performs forward iteration over times.
type ForwardTimeIterator interface {
baseIterator

// Current returns the current value in nanoseconds in the iteration.
Current() int64
}

// RewindableTimeIterator is a forward time iterator that can
// be reset and rewind.
type RewindableTimeIterator interface {
ForwardTimeIterator

// Rewind rewinds the iterator.
Rewind()

// Reset resets the data source of the iterator.
Reset(timeNanos []int64)
}

// ArrayBasedTimeIterator is an array-based time iterator.
type ArrayBasedTimeIterator struct {
timeNanos []int64
idx int
}

// NewArrayBasedTimeIterator is a new array-based time iterator.
// NB: The values are in nanoseconds.
func NewArrayBasedTimeIterator(timeNanos []int64) *ArrayBasedTimeIterator {
return &ArrayBasedTimeIterator{
timeNanos: timeNanos,
idx: -1,
}
}

// Reset resets the values.
func (it *ArrayBasedTimeIterator) Reset(timeNanos []int64) {
it.timeNanos = timeNanos
it.idx = -1
}

// Next returns whether the next value is available.
func (it *ArrayBasedTimeIterator) Next() bool {
if it.idx >= len(it.timeNanos) {
return false
}
it.idx++
return it.idx >= len(it.timeNanos)
}

// Current returns the current time value in nanoseconds.
func (it *ArrayBasedTimeIterator) Current() int64 { return it.timeNanos[it.idx] }

// Err returns error if any.
func (it *ArrayBasedTimeIterator) Err() error { return nil }

// Close closes the iterator.
func (it *ArrayBasedTimeIterator) Close() error { return nil }

// Rewind rewinds the time iterator.
func (it *ArrayBasedTimeIterator) Rewind() { it.idx = -1 }
12 changes: 7 additions & 5 deletions event/field/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ const (
IntType
DoubleType
StringType
TimeType
)

// Value is a value union.
type Value struct {
Type ValueType
BoolVal bool
IntVal int
DoubleVal float64
StringVal string
Type ValueType
BoolVal bool
IntVal int
DoubleVal float64
StringVal string
TimeNanosVal int64
}

// Field is an event field.
Expand Down
12 changes: 8 additions & 4 deletions persist/fs/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func (pm *persistManager) Prepare(opts persist.PrepareOptions) (persist.Prepared
return pm.pp, nil
}

func (pm *persistManager) writeTimestamps(timeNanos []int64) error {
return pm.writer.WriteTimestamps(timeNanos)
}

func (pm *persistManager) writeRawDocs(docs []string) error {
return pm.writer.WriteRawDocs(docs)
}

func (pm *persistManager) writeNullField(fieldPath []string, docIDs *roaring.Bitmap) error {
return pm.writer.WriteNullField(fieldPath, docIDs)
}
Expand All @@ -126,10 +134,6 @@ func (pm *persistManager) writeStringField(fieldPath []string, docIDs *roaring.B
return pm.writer.WriteStringField(fieldPath, docIDs, vals)
}

func (pm *persistManager) writeRawDocs(vals []string) error {
return pm.writer.WriteRawDocs(vals)
}

func (pm *persistManager) close() error {
return pm.writer.Close()
}
Expand Down
17 changes: 17 additions & 0 deletions persist/fs/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var (

// Default raw document source field.
defaultRawDocSourceField = "_source"

// Default timestamp field.
defaultTimestampField = "_timestamp"
)

// Options provide a set of options for data persistence.
Expand All @@ -39,6 +42,7 @@ type Options struct {
writeBufferSize int
fieldPathSeparator byte
rawDocSourceField string
timestampField string
}

// NewOptions provide a new set of options.
Expand All @@ -52,6 +56,7 @@ func NewOptions() *Options {
writeBufferSize: defaultWriterBufferSize,
fieldPathSeparator: defaultFieldPathSeparator,
rawDocSourceField: defaultRawDocSourceField,
timestampField: defaultTimestampField,
}
}

Expand Down Expand Up @@ -150,3 +155,15 @@ func (o *Options) SetRawDocSourceField(v string) *Options {
func (o *Options) RawDocSourceField() string {
return o.rawDocSourceField
}

// SetTimestampField sets the timestamp field.
func (o *Options) SetTimestampField(v string) *Options {
opts := *o
opts.timestampField = v
return &opts
}

// TimestampField returns the timestamp field.
func (o *Options) TimestampField() string {
return o.timestampField
}
54 changes: 39 additions & 15 deletions persist/fs/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ type segmentWriter interface {
// Open opens the writer.
Open(opts writerOpenOptions) error

// WriteTimestamps writes event timestamps.
WriteTimestamps(timeNanos []int64) error

// WriteRawDocs writes raw documents.
WriteRawDocs(docs []string) error

// WriteNullField writes a field with docID set and null values.
WriteNullField(fieldPath []string, docIDs *roaring.Bitmap) error

Expand All @@ -40,9 +46,6 @@ type segmentWriter interface {
// WriteStringField writes a field with docID set and string values.
WriteStringField(fieldPath []string, docIDs *roaring.Bitmap, vals []string) error

// WriteRawDocs writes raw documents.
WriteRawDocs(vals []string) error

// Close closes the writer.
Close() error
}
Expand All @@ -67,6 +70,7 @@ type writer struct {
newDirectoryMode os.FileMode
fieldPathSeparator string
rawDocSourceField string
timestampField string

fdWithDigestWriter digest.FdWithDigestWriter
info *infopb.SegmentInfo
Expand All @@ -79,11 +83,13 @@ type writer struct {
iw encoding.IntEncoder
dw encoding.DoubleEncoder
sw encoding.StringEncoder
tw encoding.TimeEncoder

boolIt encoding.RewindableBoolIterator
intIt encoding.RewindableIntIterator
doubleIt encoding.RewindableDoubleIterator
stringIt encoding.RewindableStringIterator
timeIt encoding.RewindableTimeIterator
valueIt valueIteratorUnion

err error
Expand All @@ -103,6 +109,7 @@ func newSegmentWriter(opts *Options) segmentWriter {
newDirectoryMode: opts.NewDirectoryMode(),
fieldPathSeparator: string(opts.FieldPathSeparator()),
rawDocSourceField: opts.RawDocSourceField(),
timestampField: opts.TimestampField(),
fdWithDigestWriter: digest.NewFdWithDigestWriter(opts.WriteBufferSize()),
info: &infopb.SegmentInfo{},
boolIt: encoding.NewArrayBasedBoolIterator(nil),
Expand All @@ -115,6 +122,7 @@ func newSegmentWriter(opts *Options) segmentWriter {
intIt: w.intIt,
doubleIt: w.doubleIt,
stringIt: w.stringIt,
timeIt: w.timeIt,
}
return w
}
Expand Down Expand Up @@ -144,6 +152,30 @@ func (w *writer) Open(opts writerOpenOptions) error {
return w.writeInfoFile(segmentDir, w.info)
}

func (w *writer) WriteTimestamps(timeNanos []int64) error {
var fieldPath [1]string
fieldPath[0] = w.timestampField
docIDSet := docIDSetUnion{
docIDSetType: schema.FullDocIDSet,
numDocs: w.numDocuments,
}
w.timeIt.Reset(timeNanos)
w.valueIt.valueType = field.TimeType
return w.writeFieldDataFileInternal(w.segmentDir, fieldPath[:], docIDSet, w.valueIt)
}

func (w *writer) WriteRawDocs(docs []string) error {
var fieldPath [1]string
fieldPath[0] = w.rawDocSourceField
docIDSet := docIDSetUnion{
docIDSetType: schema.FullDocIDSet,
numDocs: w.numDocuments,
}
w.stringIt.Reset(docs)
w.valueIt.valueType = field.StringType
return w.writeFieldDataFileInternal(w.segmentDir, fieldPath[:], docIDSet, w.valueIt)
}

func (w *writer) WriteNullField(fieldPath []string, docIDs *roaring.Bitmap) error {
w.valueIt.valueType = field.NullType
return w.writeFieldDataFile(w.segmentDir, fieldPath, docIDs, w.valueIt)
Expand Down Expand Up @@ -173,18 +205,6 @@ func (w *writer) WriteStringField(fieldPath []string, docIDs *roaring.Bitmap, va
return w.writeFieldDataFile(w.segmentDir, fieldPath, docIDs, w.valueIt)
}

func (w *writer) WriteRawDocs(vals []string) error {
var fieldPath [1]string
fieldPath[0] = w.rawDocSourceField
docIDSet := docIDSetUnion{
docIDSetType: schema.FullDocIDSet,
numDocs: w.numDocuments,
}
w.stringIt.Reset(vals)
w.valueIt.valueType = field.StringType
return w.writeFieldDataFileInternal(w.segmentDir, fieldPath[:], docIDSet, w.valueIt)
}

func (w *writer) Close() error {
if w.err != nil {
return w.err
Expand Down Expand Up @@ -353,6 +373,9 @@ func (w *writer) writeValues(
case field.StringType:
w.sw.Reset()
return w.sw.Encode(writer, valueIt.stringIt)
case field.TimeType:
w.tw.Reset()
return w.tw.Encode(writer, valueIt.timeIt)
default:
return fmt.Errorf("unknown value type: %v", valueIt.valueType)
}
Expand Down Expand Up @@ -387,4 +410,5 @@ type valueIteratorUnion struct {
intIt encoding.RewindableIntIterator
doubleIt encoding.RewindableDoubleIterator
stringIt encoding.RewindableStringIterator
timeIt encoding.RewindableTimeIterator
}
3 changes: 2 additions & 1 deletion persist/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ type PrepareOptions struct {
// Fns contains a set of function that persists document IDs
// and different types of document values for a given field.
type Fns struct {
WriteTimestamps func(timeNanos []int64) error
WriteRawDocs func(docs []string) error
WriteNullField func(fieldPath []string, docIDs *roaring.Bitmap) error
WriteBoolField func(fieldPath []string, docIDs *roaring.Bitmap, vals []bool) error
WriteIntField func(fieldPath []string, docIDs *roaring.Bitmap, vals []int) error
WriteDoubleField func(fieldPath []string, docIDs *roaring.Bitmap, vals []float64) error
WriteStringField func(fieldPath []string, docIDs *roaring.Bitmap, vals []string) error
WriteRawDocs func(vals []string) error
}

// Closer is a function that performs cleanup after persistence.
Expand Down
28 changes: 23 additions & 5 deletions server/http/handlers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (

type service struct {
db storage.Database
dbOpts *storage.Options
parserPool *json.ParserPool
idFn IDFn
namespaceFn NamespaceFn
Expand All @@ -44,6 +45,7 @@ func NewService(db storage.Database, opts *Options) Service {
}
return &service{
db: db,
dbOpts: db.Options(),
parserPool: opts.ParserPool(),
idFn: opts.IDFn(),
namespaceFn: opts.NamespaceFn(),
Expand Down Expand Up @@ -89,23 +91,39 @@ func (s *service) Write(w http.ResponseWriter, r *http.Request) {
}

// NB: Perhaps better to specify as a URL param.
namespace, err := s.namespaceFn(v)
// Extract event namespace from JSON.
namespaceFieldName := s.dbOpts.NamespaceFieldName()
namespaceVal, ok := v.Get(namespaceFieldName)
if !ok {
err = fmt.Errorf("cannot find namespace field %s for event %v", namespaceFieldName, data)
writeErrorResponse(w, err)
return
}
namespace, err := s.namespaceFn(namespaceVal)
if err != nil {
err = fmt.Errorf("cannot determine namespace for event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

id, err := s.idFn(v)
// Extract event timestamp from JSON.
timestampFieldName := s.dbOpts.TimestampFieldName()
tsVal, ok := v.Get(timestampFieldName)
if !ok {
err = fmt.Errorf("cannot find timestamp field %s for event %v", timestampFieldName, data)
writeErrorResponse(w, err)
return
}
timeNanos, err := s.timeNanosFn(tsVal)
if err != nil {
err = fmt.Errorf("cannot determine ID for event %s: %v", data, err)
err = fmt.Errorf("cannot determine timestamp for event %s: %v", data, err)
writeErrorResponse(w, err)
return
}

timeNanos, err := s.timeNanosFn(v)
id, err := s.idFn(v)
if err != nil {
err = fmt.Errorf("cannot determine timestamp for event %s: %v", data, err)
err = fmt.Errorf("cannot determine ID for event %s: %v", data, err)
writeErrorResponse(w, err)
return
}
Expand Down
Loading

0 comments on commit 4bf4603

Please sign in to comment.