Skip to content

Commit

Permalink
Initialize type specific encoders and add close logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu committed Jan 22, 2019
1 parent 5f99eb3 commit 183ca68
Showing 1 changed file with 35 additions and 3 deletions.
38 changes: 35 additions & 3 deletions persist/fs/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package fs
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"os"
"time"
Expand All @@ -20,6 +21,11 @@ import (
xbytes "github.com/xichen2020/eventdb/x/bytes"
)

// Errors.
var (
errSegmentWriterClosed = errors.New("segment writer is closed")
)

// segmentWriter is responsible for writing segments to filesystem.
type segmentWriter interface {
// Open opens the writer.
Expand Down Expand Up @@ -61,11 +67,11 @@ type writer struct {
tw encoding.TimeEncoder
values valuesUnion

err error
err error
closed bool
}

// newSegmentWriter creates a new segment writer.
// TODO(xichen): Initialize the type-specific encoders and allow encoding timestamp with precision.
// TODO(xichen): Add encoding hints when encoding raw docs.
// TODO(xichen): Validate the raw doc source field does not conflict with existing field paths.
// TODO(xichen): Investigate the benefit of writing a single field file.
Expand All @@ -78,11 +84,21 @@ func newSegmentWriter(opts *Options) segmentWriter {
timestampPrecision: opts.TimestampPrecision(),
fdWithDigestWriter: digest.NewFdWithDigestWriter(opts.WriteBufferSize()),
info: &infopb.SegmentInfo{},

bw: encoding.NewBoolEncoder(),
iw: encoding.NewIntEncoder(),
dw: encoding.NewDoubleEncoder(),
sw: encoding.NewStringEncoder(),
tw: encoding.NewTimeEncoder(),
}
return w
}

func (w *writer) Open(opts writerOpenOptions) error {
if w.closed {
return errSegmentWriterClosed
}

var (
namespace = opts.Namespace
shard = opts.Shard
Expand All @@ -107,6 +123,10 @@ func (w *writer) Open(opts writerOpenOptions) error {
}

func (w *writer) WriteFields(fields ...indexfield.DocsField) error {
if w.closed {
return errSegmentWriterClosed
}

for _, field := range fields {
if err := w.writeField(field); err != nil {
return err
Expand All @@ -116,6 +136,10 @@ func (w *writer) WriteFields(fields ...indexfield.DocsField) error {
}

func (w *writer) Close() error {
if w.closed {
return errSegmentWriterClosed
}

if w.err != nil {
return w.err
}
Expand All @@ -125,6 +149,14 @@ func (w *writer) Close() error {
w.err = err
return err
}

w.closed = true
w.info = nil
w.bw = nil
w.iw = nil
w.dw = nil
w.sw = nil
w.tw = nil
return nil
}

Expand Down Expand Up @@ -276,7 +308,7 @@ func (w *writer) writeValues(
case field.StringType:
return w.sw.Encode(values.stringValues, writer)
case field.TimeType:
return w.tw.Encode(values.timeValues, writer, encoding.EncodeTimeOptions{Resolution: time.Nanosecond})
return w.tw.Encode(values.timeValues, writer, encoding.EncodeTimeOptions{Resolution: w.timestampPrecision})
default:
return fmt.Errorf("unknown value type: %v", values.valueType)
}
Expand Down

0 comments on commit 183ca68

Please sign in to comment.