Skip to content

Commit

Permalink
Preprocess query and store results in parsed raw query for reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jan 15, 2019
1 parent 716ff97 commit adb1706
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 266 deletions.
202 changes: 200 additions & 2 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/xichen2020/eventdb/document/field"
"github.com/xichen2020/eventdb/filter"
"github.com/xichen2020/eventdb/x/convert"
"github.com/xichen2020/eventdb/x/hash"

xtime "github.com/m3db/m3x/time"
)
Expand Down Expand Up @@ -77,7 +78,10 @@ type RawOrderBy struct {

// ParseOptions provide a set of options for parsing a raw query.
type ParseOptions struct {
FieldPathSeparator byte
FieldPathSeparator byte
FieldHashFn hash.StringArrayHashFn
TimestampFieldPath []string
RawDocSourceFieldPath []string
}

// Parse parses the raw query, returning any errors encountered.
Expand Down Expand Up @@ -124,7 +128,8 @@ func (q *RawQuery) Parse(opts ParseOptions) (ParsedQuery, error) {
}
sq.Limit = limit

return sq, nil
err = sq.computeDerived(opts)
return sq, err
}

func (q *RawQuery) parseNamespace() (string, error) {
Expand Down Expand Up @@ -413,6 +418,10 @@ type ParsedQuery struct {
Calculations []Calculation
OrderBy []OrderBy
Limit *int

// Derived fields for raw query.
AllowedFieldTypes map[hash.Hash]FieldMeta
RawResultLessThanFn RawResultLessThanFn
}

// IsRaw returns true if the query is querying raw results (i.e., not grouped), and false otherwise.
Expand All @@ -421,6 +430,195 @@ func (q *ParsedQuery) IsRaw() bool { return len(q.GroupBy) == 0 }
// IsGrouped returns true if the query is querying grouped results, and false otherwise.
func (q *ParsedQuery) IsGrouped() bool { return !q.IsRaw() }

// RawQuery returns the parsed raw query for raw results.
func (q *ParsedQuery) RawQuery() ParsedRawQuery {
return ParsedRawQuery{
Namespace: q.Namespace,
StartNanosInclusive: q.StartTimeNanos,
EndNanosExclusive: q.EndTimeNanos,
Filters: q.Filters,
OrderBy: q.OrderBy,
Limit: q.Limit,
AllowedFieldTypes: q.AllowedFieldTypes,
RawResultLessThanFn: q.RawResultLessThanFn,
}
}

func (q *ParsedQuery) numFieldsForQuery() int {
numFieldsForQuery := 2 // Timestamp field and raw doc source field
for _, f := range q.Filters {
numFieldsForQuery += len(f.Filters)
}
numFieldsForQuery += len(q.OrderBy)
return numFieldsForQuery
}

func (q *ParsedQuery) computeDerived(opts ParseOptions) error {
if q.IsRaw() {
return q.computeRawDerived(opts)
}
return q.computeGroupDerived(opts)
}

func (q *ParsedQuery) computeRawDerived(opts ParseOptions) error {
if err := q.computeAllowedFieldTypes(opts); err != nil {
return err
}
return q.computeRawResultLessThan()
}

func (q *ParsedQuery) computeAllowedFieldTypes(opts ParseOptions) error {
// Compute total number of fields involved in executing the query.
numFieldsForQuery := q.numFieldsForQuery()

// Collect fields needed for query execution into a map for deduplciation.
fieldMap := make(map[hash.Hash]FieldMeta, numFieldsForQuery)

// Insert timestamp field.
currIndex := 0
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: opts.TimestampFieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.ValueTypeSet{
field.TimeType: struct{}{},
},
},
})

// Insert raw doc source field.
currIndex++
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: opts.RawDocSourceFieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.ValueTypeSet{
field.StringType: struct{}{},
},
},
})

// Insert filter fields.
currIndex++
for _, fl := range q.Filters {
for _, f := range fl.Filters {
allowedFieldTypes, err := f.AllowedFieldTypes()
if err != nil {
return err
}
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: f.FieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: allowedFieldTypes,
},
})
currIndex++
}
}

// Insert order by fields.
for _, ob := range q.OrderBy {
addQueryFieldToMap(fieldMap, opts.FieldHashFn, FieldMeta{
FieldPath: ob.FieldPath,
AllowedTypesBySourceIdx: map[int]field.ValueTypeSet{
currIndex: field.OrderableTypes.Clone(),
},
})
currIndex++
}

q.AllowedFieldTypes = fieldMap
return nil
}

// addQueryFieldToMap adds a new query field meta to the existing
// field meta map.
func addQueryFieldToMap(
fm map[hash.Hash]FieldMeta,
fieldHashFn hash.StringArrayHashFn,
newFieldMeta FieldMeta,
) {
// Do not insert empty fields.
if len(newFieldMeta.FieldPath) == 0 {
return
}
fieldHash := fieldHashFn(newFieldMeta.FieldPath)
meta, exists := fm[fieldHash]
if !exists {
fm[fieldHash] = newFieldMeta
return
}
meta.MergeInPlace(newFieldMeta)
fm[fieldHash] = meta
}

func (q *ParsedQuery) computeRawResultLessThan() error {
compareFns := make([]field.ValueCompareFn, 0, len(q.OrderBy))
for _, ob := range q.OrderBy {
// NB(xichen): Reverse compare function is needed to keep the top N values. For example,
// if we need to keep the top 10 values sorted in ascending order, it means we need the
// 10 smallest values, and as such we keep a max heap and only add values to the heap
// if the current value is smaller than the max heap value.
compareFn, err := ob.SortOrder.ReverseCompareFn()
if err != nil {
return fmt.Errorf("error determining the value compare fn for sort order %v: %v", ob.SortOrder, err)
}
compareFns = append(compareFns, compareFn)
}
q.RawResultLessThanFn = NewLessThanFn(compareFns)
return nil
}

// TODO(xichen): Implement this if necessary.
func (q *ParsedQuery) computeGroupDerived(opts ParseOptions) error {
return errors.New("not implemented")
}

// FieldMeta contains field metadata.
type FieldMeta struct {
FieldPath []string
AllowedTypesBySourceIdx map[int]field.ValueTypeSet
}

// MergeInPlace merges the other field meta into the current field meta.
// Precondition: m.fieldPath == other.fieldPath.
// Precondition: The set of source indices in the two metas don't overlap.
func (m *FieldMeta) MergeInPlace(other FieldMeta) {
for idx, types := range other.AllowedTypesBySourceIdx {
m.AllowedTypesBySourceIdx[idx] = types
}
}

// ParsedRawQuery represents a validated, sanitized raw query.
type ParsedRawQuery struct {
Namespace string
StartNanosInclusive int64
EndNanosExclusive int64
Filters []FilterList
OrderBy []OrderBy
Limit *int

// Derived fields.
AllowedFieldTypes map[hash.Hash]FieldMeta
RawResultLessThanFn RawResultLessThanFn
}

// NumFieldsForQuery returns the total number of fields for query.
func (q *ParsedRawQuery) NumFieldsForQuery() int {
numFieldsForQuery := 2 // Timestamp field and raw doc source field
for _, f := range q.Filters {
numFieldsForQuery += len(f.Filters)
}
numFieldsForQuery += len(q.OrderBy)
return numFieldsForQuery
}

// NewRawResults creates a new raw results from the parsed raw query.
func (q *ParsedRawQuery) NewRawResults() RawResults {
return RawResults{
IsOrdered: len(q.OrderBy) > 0,
Limit: q.Limit,
}
}

// Calculation represents a calculation object.
type Calculation struct {
FieldPath []string
Expand Down
21 changes: 11 additions & 10 deletions server/http/handlers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,10 @@ func (s *service) Query(w http.ResponseWriter, r *http.Request) {
}

parseOpts := query.ParseOptions{
FieldPathSeparator: s.dbOpts.FieldPathSeparator(),
FieldPathSeparator: s.dbOpts.FieldPathSeparator(),
FieldHashFn: s.dbOpts.FieldHashFn(),
TimestampFieldPath: s.dbOpts.TimestampFieldPath(),
RawDocSourceFieldPath: s.dbOpts.RawDocSourceFieldPath(),
}
pq, err := q.Parse(parseOpts)
if err != nil {
Expand All @@ -146,13 +149,11 @@ func (s *service) Query(w http.ResponseWriter, r *http.Request) {

ctx := s.contextPool.Get()
defer ctx.Close()
nsBytes := unsafe.ToBytes(pq.Namespace)
res, err := s.db.QueryRaw(
ctx, nsBytes, pq.StartTimeNanos, pq.EndTimeNanos,
pq.Filters, pq.OrderBy, pq.Limit,
)

rq := pq.RawQuery()
res, err := s.db.QueryRaw(ctx, rq)
if err != nil {
err = fmt.Errorf("error performing query %v against database namespace %s: %v", pq, nsBytes, err)
err = fmt.Errorf("error performing query %v against database namespace %s: %v", rq, rq.Namespace, err)
writeErrorResponse(w, err)
return
}
Expand Down Expand Up @@ -228,10 +229,10 @@ func (s *service) newDocumentFromBytes(p jsonparser.Parser, data []byte) ([]byte
}

// Extract document timestamp from JSON.
timestampFieldName := s.dbOpts.TimestampFieldName()
tsVal, ok := v.Get(timestampFieldName)
timestampFieldPath := s.dbOpts.TimestampFieldPath()
tsVal, ok := v.Get(timestampFieldPath...)
if !ok {
err = fmt.Errorf("cannot find timestamp field %s for document %v", timestampFieldName, data)
err = fmt.Errorf("cannot find timestamp field %s for document %v", timestampFieldPath, data)
return nil, document.Document{}, err
}
timeNanos, err := s.timeNanosFn(tsVal)
Expand Down
11 changes: 8 additions & 3 deletions services/eventdb/config/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/xichen2020/eventdb/persist/fs"
"github.com/xichen2020/eventdb/sharding"
"github.com/xichen2020/eventdb/storage"
"github.com/xichen2020/eventdb/x/hash"
"github.com/xichen2020/eventdb/x/pool"

"github.com/m3db/m3cluster/shard"
Expand Down Expand Up @@ -78,16 +79,20 @@ func (c *DatabaseConfiguration) NewOptions(scope tally.Scope) (*storage.Options,
opts = opts.SetFilePathPrefix(*c.FilePathPrefix)
}
if c.FieldPathSeparator != nil {
opts = opts.SetFieldPathSeparator(byte(*c.FieldPathSeparator))
sepByte := byte(*c.FieldPathSeparator)
fieldHashFn := func(fieldPath []string) hash.Hash {
return hash.StringArrayHash(fieldPath, sepByte)
}
opts = opts.SetFieldPathSeparator(sepByte).SetFieldHashFn(fieldHashFn)
}
if c.NamespaceFieldName != nil {
opts = opts.SetNamespaceFieldName(*c.NamespaceFieldName)
}
if c.TimestampFieldName != nil {
opts = opts.SetTimestampFieldName(*c.TimestampFieldName)
opts = opts.SetTimestampFieldPath([]string{*c.TimestampFieldName})
}
if c.RawDocSourceFieldName != nil {
opts = opts.SetRawDocSourceFieldName(*c.RawDocSourceFieldName)
opts = opts.SetRawDocSourceFieldPath([]string{*c.RawDocSourceFieldName})
}
if c.TickMinInterval != nil {
opts = opts.SetTickMinInterval(*c.TickMinInterval)
Expand Down
20 changes: 5 additions & 15 deletions storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/xichen2020/eventdb/query"
"github.com/xichen2020/eventdb/sharding"
"github.com/xichen2020/eventdb/x/hash"
"github.com/xichen2020/eventdb/x/unsafe"

"github.com/m3db/m3x/context"
xerrors "github.com/m3db/m3x/errors"
Expand All @@ -32,11 +33,7 @@ type Database interface {
// certain criteria, with optional filtering, sorting, and limiting applied.
QueryRaw(
ctx context.Context,
namespace []byte,
startNanosInclusive, endNanosExclusive int64,
filters []query.FilterList,
orderBy []query.OrderBy,
limit *int,
q query.ParsedRawQuery,
) (query.RawResults, error)

// Close closes the database.
Expand Down Expand Up @@ -149,20 +146,13 @@ func (d *db) WriteBatch(

func (d *db) QueryRaw(
ctx context.Context,
namespace []byte,
startNanosInclusive, endNanosExclusive int64,
filters []query.FilterList,
orderBy []query.OrderBy,
limit *int,
q query.ParsedRawQuery,
) (query.RawResults, error) {
n, err := d.namespaceFor(namespace)
n, err := d.namespaceFor(unsafe.ToBytes(q.Namespace))
if err != nil {
return query.RawResults{}, err
}
return n.QueryRaw(
ctx, startNanosInclusive, endNanosExclusive,
filters, orderBy, limit,
)
return n.QueryRaw(ctx, q)
}

func (d *db) Close() error {
Expand Down
Loading

0 comments on commit adb1706

Please sign in to comment.