Skip to content

Commit

Permalink
datastore: query cache
Browse files Browse the repository at this point in the history
  • Loading branch information
sinkuu committed May 14, 2020
1 parent 8940020 commit c1fc90d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 6 deletions.
47 changes: 41 additions & 6 deletions swift.go
Expand Up @@ -13,11 +13,23 @@ import (
)

type SwiftContainer struct {
conn *swift.Connection
conn *swift.Connection
cache QueryCache

Config
}

type QueryCache struct {
prefix string
index int
name string
}

func (c *QueryCache) Invalidate() {
c.prefix = ""
c.name = ""
}

type Config struct {
swift.Connection
Container string
Expand Down Expand Up @@ -61,10 +73,12 @@ func (s *SwiftContainer) Get(k ds.Key) ([]byte, error) {
}

func (s *SwiftContainer) Delete(k ds.Key) error {
s.cache.Invalidate()
return s.conn.ObjectDelete(s.Container, keyToName(k))
}

func (s *SwiftContainer) Put(k ds.Key, val []byte) error {
s.cache.Invalidate()
return s.conn.ObjectPutBytes(s.Container, keyToName(k), val, "application/octet-stream")
}

Expand Down Expand Up @@ -107,15 +121,21 @@ func (s *SwiftContainer) Query(q dsq.Query) (dsq.Results, error) {
opts := swift.ObjectsOpts{
Prefix: strings.TrimPrefix(q.Prefix, "/"),
// Number of entries to fetch at once
Limit: 1000,
Limit: 10000,
}

end := q.Offset + q.Limit
if end != 0 && end+1 < opts.Limit {
opts.Limit = end + 1
offset := q.Offset

if s.cache.prefix == opts.Prefix && s.cache.name != "" && s.cache.index <= offset {
opts.Marker = s.cache.name
offset = s.cache.index - offset
}

end := offset + q.Limit
if end != 0 && end < opts.Limit {
opts.Limit = end
}

offset := q.Offset
count := 0
names := []string{}
doneFetching := false
Expand Down Expand Up @@ -170,6 +190,19 @@ func (s *SwiftContainer) Query(q dsq.Query) (dsq.Results, error) {
name := names[0]
names = names[1:]

if len(names) == 0 && q.Limit > count && (q.Limit-count) < opts.Limit {
opts.Limit = q.Limit - count
}

// Cache the last item
if q.Limit != 0 && count == q.Limit {
s.cache = QueryCache{
prefix: opts.Prefix,
index: q.Offset + count,
name: name,
}
}

key := "/" + name

if q.KeysOnly {
Expand Down Expand Up @@ -244,6 +277,8 @@ func (b *swiftBatch) Delete(k ds.Key) error {
}

func (b *swiftBatch) Commit() error {
b.s.cache.Invalidate()

if b.tarWriter != nil {
if err := b.tarWriter.Close(); err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions swift_test.go
Expand Up @@ -125,6 +125,27 @@ func TestQuery(t *testing.T) {
"/a/b/d",
"/a/c",
}, rs)

rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 4, Limit: 1})
if err != nil {
t.Fatal(err)
}

expectMatches(t, []string{
"/a/d",
}, rs)

rs, err = d.Query(dsq.Query{Prefix: "/a/", Offset: 1})
if err != nil {
t.Fatal(err)
}

expectMatches(t, []string{
"/a/b/c",
"/a/b/d",
"/a/c",
"/a/d",
}, rs)
}

func TestHas(t *testing.T) {
Expand Down

0 comments on commit c1fc90d

Please sign in to comment.