Permalink
Browse files

[query cache] cache queries via a per block query cache

This commit adds a per block query cache based on the contents of the
querySpec. The query cache is enabled for table, time series and
distribution queries.

To cache a querySpec, its QueryDetails are hashed into an md5sum and
checked for on disk. If they exist, we can re-use those results for the
whole block, saving us some computation time.

To enable the query cache, use the -cache-queries flag. The first query
will be slightly slower because we write to disk, but the subsequent
queries will be faster.

Test Plan: Run a query with and without -cache-queries, plug in to
running snorkel instance and watch dashboards load more quickly.

Tests Added: see TESTPLAN.md
  • Loading branch information...
okay
okay committed Sep 27, 2017
1 parent d916ca2 commit 1835768fba8bc97f1e7a269ebcaac5448155fba6
@@ -19,6 +19,7 @@ advantages
* Lower disk usage through per column compression schemes
* Serverless design with controlled memory usage
* Per table retention policies (specify max age and/or size of tables)
* Per block query cache (optional) that avoids recomputation

disadvantages
-------------
@@ -29,6 +29,12 @@ Test Plan:
* [ ] Open Partial Blocks and re-fill them
* [ ] Auto Digest during ingestion
* [ ] Digestion can fail gracefully
* [ ] Per Block Query Cache
* [x] Gets built
* [x] Is used when supposed to
* [x] Is ignored properly
* [x] Gives consistent results
* [x] Works with Basic Hist

Failure Plans
-------------
@@ -93,7 +93,7 @@ func main() {
first_arg := os.Args[1]
os.Args = os.Args[1:]

sybil.SetDefaults()
sybil.Startup()

handler, ok := CMD_FUNCS[first_arg]
if !ok {
@@ -63,6 +63,8 @@ func addQueryFlags() {
TIME_FORMAT = flag.String("time-format", "", "time format to use")
NO_RECYCLE_MEM = flag.Bool("no-recycle-mem", false, "don't recycle memory slabs (use Go GC instead)")

sybil.FLAGS.CACHED_QUERIES = flag.Bool("cache-queries", false, "Cache query results per block")

}

func RunQueryCmdLine() {
@@ -166,7 +168,8 @@ func RunQueryCmdLine() {
filterSpec := sybil.FilterSpec{Int: *sybil.FLAGS.INT_FILTERS, Str: *sybil.FLAGS.STR_FILTERS, Set: *sybil.FLAGS.SET_FILTERS}
filters := sybil.BuildFilters(t, &loadSpec, filterSpec)

querySpec := sybil.QuerySpec{Groups: groupings, Filters: filters, Aggregations: aggs}
query_params := sybil.QueryParams{Groups: groupings, Filters: filters, Aggregations: aggs}
querySpec := sybil.QuerySpec{QueryParams: query_params}

for _, v := range groups {
switch t.GetColumnType(v) {
@@ -84,7 +84,8 @@ func RunSessionizeCmdLine() {
filters := []sybil.Filter{}
groupings := []sybil.Grouping{}
aggs := []sybil.Aggregation{}
querySpec := sybil.QuerySpec{Groups: groupings, Filters: filters, Aggregations: aggs}
query_params := sybil.QueryParams{Groups: groupings, Filters: filters, Aggregations: aggs}
querySpec := sybil.QuerySpec{QueryParams: query_params}

querySpec.Limit = int16(*sybil.FLAGS.LIMIT)

@@ -186,7 +186,7 @@ func FilterAndAggRecords(querySpec *QuerySpec, recordsPtr *RecordList) int {
case INT_VAL:
val := int64(r.Ints[a.name_id])

hist, ok := added_record.Hists[a.name]
hist, ok := added_record.Hists[a.Name]

if !ok {
if *FLAGS.HDR_HIST && ENABLE_HDR {
@@ -196,7 +196,7 @@ func FilterAndAggRecords(querySpec *QuerySpec, recordsPtr *RecordList) int {
} else {
hist = r.block.table.NewHist(r.block.table.get_int_info(a.name_id))
}
added_record.Hists[a.name] = hist
added_record.Hists[a.Name] = hist
}

hist.RecordValues(val, weight)
@@ -43,7 +43,8 @@ type FlagDefs struct {
PROFILE *bool
PROFILE_MEM *bool

RECYCLE_MEM *bool
RECYCLE_MEM *bool
CACHED_QUERIES *bool

WEIGHT_COL *string

@@ -106,7 +107,7 @@ var FLAGS = FlagDefs{}
var OPTS = OptionDefs{}
var EMPTY = ""

func SetDefaults() {
func setDefaults() {
OPTS.SORT_COUNT = "$COUNT"
OPTS.SAMPLES = false
OPTS.WEIGHT_COL = false
@@ -144,6 +145,7 @@ func SetDefaults() {
FLAGS.LUAFILE = &EMPTY

FLAGS.RECYCLE_MEM = &TRUE
FLAGS.CACHED_QUERIES = &FALSE

FLAGS.HDR_HIST = &FALSE
FLAGS.LOG_HIST = &FALSE
@@ -159,4 +161,5 @@ func SetDefaults() {
}

initLua()

}
@@ -118,7 +118,7 @@ type StrFilter struct {
FieldId int16
Op string
Value string
Regex *regexp.Regexp
regex *regexp.Regexp

table *Table
}
@@ -185,11 +185,11 @@ func (filter StrFilter) Filter(r *Record) bool {
ret, ok = col.RCache[int(val)]
if !ok {
str_val := col.get_string_for_val(int32(val))
ret = filter.Regex.MatchString(str_val)
ret = filter.regex.MatchString(str_val)
}
} else {
str_val := col.get_string_for_val(int32(val))
ret = filter.Regex.MatchString(str_val)
ret = filter.regex.MatchString(str_val)
}

if cardinality < REGEX_CACHE_SIZE && !ok {
@@ -262,7 +262,7 @@ func (t *Table) StrFilter(name string, op string, value string) StrFilter {

var err error
if op == "re" || op == "nre" {
strFilter.Regex, err = regexp.Compile(value)
strFilter.regex, err = regexp.Compile(value)
if err != nil {
Debug("REGEX ERROR", err, "WITH", value)
}
@@ -49,7 +49,7 @@ func testIntLt(test *testing.T) {
aggs := []sybil.Aggregation{}
aggs = append(aggs, nt.Aggregation("age", "avg"))

querySpec := sybil.QuerySpec{Filters: filters, Aggregations: aggs}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs}}

nt.MatchAndAggregate(&querySpec)

@@ -75,7 +75,7 @@ func testIntGt(test *testing.T) {
aggs := []sybil.Aggregation{}
aggs = append(aggs, nt.Aggregation("age", "avg"))

querySpec := sybil.QuerySpec{Filters: filters, Aggregations: aggs}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs}}

nt.MatchAndAggregate(&querySpec)

@@ -104,7 +104,7 @@ func testIntNeq(test *testing.T) {
groupings := []sybil.Grouping{}
groupings = append(groupings, nt.Grouping("age"))

querySpec := sybil.QuerySpec{Filters: filters, Aggregations: aggs, Groups: groupings}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs, Groups: groupings}}

nt.MatchAndAggregate(&querySpec)

@@ -135,7 +135,7 @@ func testIntEq(test *testing.T) {
aggs := []sybil.Aggregation{}
aggs = append(aggs, nt.Aggregation("age", "avg"))

querySpec := sybil.QuerySpec{Filters: filters, Aggregations: aggs}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs}}

nt.MatchAndAggregate(&querySpec)

@@ -164,7 +164,7 @@ func testStrEq(test *testing.T) {
groupings := []sybil.Grouping{}
groupings = append(groupings, nt.Grouping("age"))

querySpec := sybil.QuerySpec{Filters: filters, Aggregations: aggs, Groups: groupings}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs, Groups: groupings}}

Debug("QUERY SPEC", querySpec.Results)

@@ -195,7 +195,7 @@ func testStrNeq(test *testing.T) {
groupings := []sybil.Grouping{}
groupings = append(groupings, nt.Grouping("age"))

querySpec := sybil.QuerySpec{Filters: filters, Aggregations: aggs}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs}}

nt.MatchAndAggregate(&querySpec)

@@ -224,7 +224,7 @@ func testStrRe(test *testing.T) {
groupings := []sybil.Grouping{}
groupings = append(groupings, nt.Grouping("age"))

querySpec := sybil.QuerySpec{Filters: filters, Aggregations: aggs, Groups: groupings}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs, Groups: groupings}}

nt.MatchAndAggregate(&querySpec)

@@ -255,7 +255,7 @@ func testSetIn(test *testing.T) {
groupings := []sybil.Grouping{}
groupings = append(groupings, nt.Grouping("age"))

querySpec := sybil.QuerySpec{Filters: filters, Aggregations: aggs, Groups: groupings}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs, Groups: groupings}}

nt.MatchAndAggregate(&querySpec)

@@ -278,7 +278,7 @@ func testSetIn(test *testing.T) {
// TODO: MULTIPLE SET VALUE FILTER
// filters = []sybil.Filter{}
// filters = append(filters, nt.SetFilter("age_set", "in", "20,21,22"))
// querySpec = sybil.QuerySpec{Filters: filters, Aggregations: aggs, Groups: groupings}
// querySpec = sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs, Groups: groupings}}
//
// if len(querySpec.Results) != 3 {
// test.Error("Set Filter for nin returned more (or less) than three results", len(querySpec.Results), querySpec.Results)
@@ -297,7 +297,7 @@ func testSetNin(test *testing.T) {
groupings := []sybil.Grouping{}
groupings = append(groupings, nt.Grouping("age"))

querySpec := sybil.QuerySpec{Filters: filters, Aggregations: aggs, Groups: groupings}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Filters: filters, Aggregations: aggs, Groups: groupings}}

nt.MatchAndAggregate(&querySpec)

@@ -23,7 +23,7 @@ func run_tests(m *testing.M) {
}

func setup_test_vars(chunk_size int) {
sybil.SetDefaults()
sybil.Startup()
sybil.FLAGS.TABLE = &TEST_TABLE_NAME

sybil.TEST_MODE = true
@@ -82,7 +82,7 @@ func new_query_spec() *sybil.QuerySpec {
aggs := []sybil.Aggregation{}
groupings := []sybil.Grouping{}

querySpec := sybil.QuerySpec{Groups: groupings, Filters: filters, Aggregations: aggs}
querySpec := sybil.QuerySpec{QueryParams: sybil.QueryParams{Groups: groupings, Filters: filters, Aggregations: aggs}}

return &querySpec
}
Oops, something went wrong.

0 comments on commit 1835768

Please sign in to comment.