From c1fc90ddbfc068a2cfb1a7f81e37e065e1df3c0a Mon Sep 17 00:00:00 2001 From: Shotaro Yamada Date: Thu, 14 May 2020 09:07:25 +0900 Subject: [PATCH] datastore: query cache --- swift.go | 47 +++++++++++++++++++++++++++++++++++++++++------ swift_test.go | 21 +++++++++++++++++++++ 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/swift.go b/swift.go index 788b294..2b08852 100644 --- a/swift.go +++ b/swift.go @@ -13,11 +13,23 @@ import ( ) type SwiftContainer struct { - conn *swift.Connection + conn *swift.Connection + cache QueryCache Config } +type QueryCache struct { + prefix string + index int + name string +} + +func (c *QueryCache) Invalidate() { + c.prefix = "" + c.name = "" +} + type Config struct { swift.Connection Container string @@ -61,10 +73,12 @@ func (s *SwiftContainer) Get(k ds.Key) ([]byte, error) { } func (s *SwiftContainer) Delete(k ds.Key) error { + s.cache.Invalidate() return s.conn.ObjectDelete(s.Container, keyToName(k)) } func (s *SwiftContainer) Put(k ds.Key, val []byte) error { + s.cache.Invalidate() return s.conn.ObjectPutBytes(s.Container, keyToName(k), val, "application/octet-stream") } @@ -107,15 +121,21 @@ func (s *SwiftContainer) Query(q dsq.Query) (dsq.Results, error) { opts := swift.ObjectsOpts{ Prefix: strings.TrimPrefix(q.Prefix, "/"), // Number of entries to fetch at once - Limit: 1000, + Limit: 10000, } - end := q.Offset + q.Limit - if end != 0 && end+1 < opts.Limit { - opts.Limit = end + 1 + offset := q.Offset + + if s.cache.prefix == opts.Prefix && s.cache.name != "" && s.cache.index <= offset { + opts.Marker = s.cache.name + offset = s.cache.index - offset + } + + end := offset + q.Limit + if end != 0 && end < opts.Limit { + opts.Limit = end } - offset := q.Offset count := 0 names := []string{} doneFetching := false @@ -170,6 +190,19 @@ func (s *SwiftContainer) Query(q dsq.Query) (dsq.Results, error) { name := names[0] names = names[1:] + if len(names) == 0 && q.Limit > count && (q.Limit-count) < opts.Limit { + opts.Limit = q.Limit - count + } + + // Cache the last item + if q.Limit != 0 && count == q.Limit { + s.cache = QueryCache{ + prefix: opts.Prefix, + index: q.Offset + count, + name: name, + } + } + key := "/" + name if q.KeysOnly { @@ -244,6 +277,8 @@ func (b *swiftBatch) Delete(k ds.Key) error { } func (b *swiftBatch) Commit() error { + b.s.cache.Invalidate() + if b.tarWriter != nil { if err := b.tarWriter.Close(); err != nil { return err diff --git a/swift_test.go b/swift_test.go index c210bf2..425be1e 100644 --- a/swift_test.go +++ b/swift_test.go @@ -125,6 +125,27 @@ func TestQuery(t *testing.T) { "/a/b/d", "/a/c", }, rs) + + rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 4, Limit: 1}) + if err != nil { + t.Fatal(err) + } + + expectMatches(t, []string{ + "/a/d", + }, rs) + + rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 1}) + if err != nil { + t.Fatal(err) + } + + expectMatches(t, []string{ + "/a/b/c", + "/a/b/d", + "/a/c", + "/a/d", + }, rs) } func TestHas(t *testing.T) {