Skip to content

Commit

Permalink
Merge pull request #237 from ipfs-search/redis_fixups
Browse files Browse the repository at this point in the history
Redis fixups
  • Loading branch information
dokterbob committed Oct 17, 2022
2 parents 0467d7d + f170b00 commit d211ca6
Show file tree
Hide file tree
Showing 24 changed files with 1,030 additions and 246 deletions.
2 changes: 1 addition & 1 deletion components/crawler/existingitem.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type existingItem struct {
func (c *Crawler) getExistingItem(ctx context.Context, r *t.AnnotatedResource) (*existingItem, error) {
indexes := []index.Index{c.indexes.Files, c.indexes.Directories, c.indexes.Invalids, c.indexes.Partials}

update := new(index_types.Update)
update := &index_types.Update{}

index, err := index.MultiGet(ctx, indexes, r.ID, update, "references", "last-seen")
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion components/crawler/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *Crawler) updateExisting(ctx context.Context, i *existingItem) error {

var isRecent bool
if i.LastSeen == nil {
// No LastSeen set, override isRecent
log.Printf("LastSeen is nil, overriding isRecent.")
isRecent = true
} else {
isRecent = now.Sub(*i.LastSeen) > c.config.MinUpdateAge
Expand Down
6 changes: 0 additions & 6 deletions components/index/cache/config.go

This file was deleted.

97 changes: 58 additions & 39 deletions components/index/cache/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,29 @@ const debug bool = false

// Index wraps a backing index and caches it using another index.
type Index struct {
cfg *Config
backingIndex index.Index
cachingIndex index.Index
cachingType reflect.Type

*instr.Instrumentation
}

// New returns a new index.
func New(backing index.Index, caching index.Index,
cfg *Config, instr *instr.Instrumentation) index.Index {
func New(backing index.Index, caching index.Index, cachingType interface{}, instr *instr.Instrumentation) index.Index {
t := reflect.TypeOf(cachingType)
if t.Kind() == reflect.Pointer {
// Dereference pointer
t = t.Elem()
}

if cfg == nil {
panic("Index.New Config cannot be nil.")
if t.Kind() != reflect.Struct {
panic("caching type should be a struct")
}

index := &Index{
backingIndex: backing,
cachingIndex: caching,
cfg: cfg,
cachingType: t,
Instrumentation: instr,
}

Expand All @@ -41,47 +45,62 @@ func New(backing index.Index, caching index.Index,

// String returns the name of the index, for convenient logging.
func (i *Index) String() string {
return fmt.Sprintf("cache for '%s' through '%s'", i.backingIndex, i.cachingIndex)
return fmt.Sprintf("'%s' through '%s'", i.backingIndex, i.cachingIndex)
}

func contains(s []string, str string) bool {
for _, v := range s {
if v == str {
return true
func matchDstKind(src, dst reflect.Value) reflect.Value {
dKind, sKind := dst.Kind(), src.Kind()

if dKind == reflect.Pointer && sKind != reflect.Pointer {
// dst is pointer, src is not.

if !src.CanAddr() {
panic(fmt.Sprintf("cannot address val %v for field %v", src, dst))
}

return src.Addr()
}

return false
}
if dKind != reflect.Pointer && sKind == reflect.Pointer {
// dst is value, src is pointer.
return src.Elem()
}

func (i *Index) makeCachingProperties(properties interface{}) map[string]interface{} {
// Take care to allocate map for caching properties on the stack.
return src
}

valueof := reflect.ValueOf(properties)
func setFieldVal(src, dst reflect.Value, dstField reflect.StructField) {
// Set dst field to corresponding src value.
// Note: this will panic when a dst field is not present in the src struct.
srcVal := src.FieldByName(dstField.Name)
dstVal := dst.FieldByIndex(dstField.Index)

if valueof.Kind() != reflect.Pointer {
panic(fmt.Sprintf("not called with pointer but %T", properties))
}
srcVal = matchDstKind(srcVal, dstVal)

// Dereference pointer
valueof = valueof.Elem()
dstVal.Set(srcVal)
}

if valueof.Kind() != reflect.Struct {
panic(fmt.Sprintf("not struct pointer but %T", properties))
}
func (i *Index) makeCachingProperties(props interface{}) interface{} {
src := GetStructElem(props)

dst := make(map[string]interface{}, len(i.cfg.CachingFields))
fields := reflect.VisibleFields(valueof.Type())
// Create pointer cache struct
dstPtr := reflect.New(i.cachingType)
dstFields := reflect.VisibleFields(i.cachingType)

for _, field := range fields {
if contains(i.cfg.CachingFields, field.Name) {
value := valueof.FieldByName(field.Name).Interface()
// Get the underlying struct
dst := dstPtr.Elem()

dst[field.Name] = value
}
// Iterate fields of destination
for _, dstField := range dstFields {
setFieldVal(src, dst, dstField)
}

return dst
// if debug {
// log.Printf("makeCachingProperties - src: %s: %v", src.Type(), src)
// log.Printf("makeCachingProperties - dst: %s: %v", dst.Type(), dst)
// }

return dstPtr.Interface()
}

func (i *Index) cacheGet(ctx context.Context, id string, dst interface{}, fields ...string) (bool, error) {
Expand All @@ -107,11 +126,11 @@ func (i *Index) cacheWrite(ctx context.Context, id string, properties interface{
cachingProperties := i.makeCachingProperties(properties)

if debug {
log.Printf("cache: write %s", id)
log.Printf("cache %s: write %s", i, id)
}

if err := f(ctx, id, cachingProperties); err != nil {
return ErrCache{err, fmt.Sprintf("cache error in '%v': %s", f, err.Error())}
return ErrCache{err, fmt.Sprintf("cache error in writing %+v to %s: %s", cachingProperties, id, err.Error())}
}

return nil
Expand Down Expand Up @@ -156,7 +175,7 @@ func (i *Index) Delete(ctx context.Context, id string) error {
defer span.End()

if debug {
log.Printf("cache: delete %s", id)
log.Printf("cache %s: delete %s", i, id)
}

// Delete cache first; maintain consistency as our backing index is the source of truth.
Expand Down Expand Up @@ -188,14 +207,14 @@ func (i *Index) Get(ctx context.Context, id string, dst interface{}, fields ...s

if found, err = i.cacheGet(ctx, id, dst, fields...); found {
// if debug {
log.Printf("cache: hit %s", id)
log.Printf("cache %s: hit %s", i.cachingIndex, id)
// }

return found, err
}

if debug {
log.Printf("cache: miss %s", id)
log.Printf("cache %s: miss %s", i.cachingIndex, id)
}

var backingErr error
Expand All @@ -206,7 +225,7 @@ func (i *Index) Get(ctx context.Context, id string, dst interface{}, fields ...s

if found {
if debug {
log.Printf("backing: hit %s", id)
log.Printf("backing %s: hit %s", i.backingIndex, id)
}

if indexErr := i.cacheWrite(ctx, id, dst, i.cachingIndex.Index); indexErr != nil {
Expand All @@ -215,7 +234,7 @@ func (i *Index) Get(ctx context.Context, id string, dst interface{}, fields ...s
}

if debug {
log.Printf("backing: miss %s", id)
log.Printf("backing %s: miss %s", i.backingIndex, id)
}

return found, err
Expand Down
79 changes: 36 additions & 43 deletions components/index/cache/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"reflect"
"testing"

"github.com/stretchr/testify/mock"
Expand All @@ -23,19 +24,21 @@ type testStruct struct {
valNope int
}

var cachingFields []string = []string{"ValOne", "ValThree", "ValFour", "neverFound"}
var props testStruct = testStruct{"test", 5, 5.5, struct{ v string }{"h"}, 3}
var cachedProps = map[string]interface{}{
"ValOne": "test",
"ValThree": 5.5,
"ValFour": struct{ v string }{"h"},
}
var emptyCachedProps = map[string]interface{}{
"ValOne": "",
"ValThree": 0.0,
"ValFour": struct{ v string }{""},
type cacheStruct struct {
ValOne *string
ValThree float64
ValFour struct {
v string
}
}

var cachingFields []string = []string{"ValOne", "ValThree", "ValFour", "neverFound"}
var props = testStruct{"test", 5, 5.5, struct{ v string }{"h"}, 3}
var testStr = "test"
var emptyStr = ""
var cachedProps = cacheStruct{&testStr, 5.5, struct{ v string }{"h"}}
var emptyCachedProps = cacheStruct{&emptyStr, 0.0, struct{ v string }{""}}

var testID = "testID"
var testErr = errors.New("errr")

Expand All @@ -57,54 +60,44 @@ func (s *CacheTestSuite) SetupTest() {
s.backingIndex = &index.Mock{}

s.i = &Index{
backingIndex: s.backingIndex,
cachingIndex: s.cachingIndex,
cfg: &Config{
CachingFields: cachingFields,
},
backingIndex: s.backingIndex,
cachingIndex: s.cachingIndex,
cachingType: reflect.TypeOf(cacheStruct{}),
Instrumentation: s.instr,
}
}

func (s *CacheTestSuite) TestNew() {
i := New(s.backingIndex, s.cachingIndex, &Config{
CachingFields: cachingFields,
}, s.instr)
// Allow value.
i := New(s.backingIndex, s.cachingIndex, cacheStruct{}, s.instr)
s.NotNil(i)

// Allow pointer as well.
i = New(s.backingIndex, s.cachingIndex, &cacheStruct{}, s.instr)
s.NotNil(i)
}

func (s *CacheTestSuite) TestString() {
exp := fmt.Sprintf("cache for '%s' through '%s'", s.backingIndex, s.cachingIndex)
exp := fmt.Sprintf("'%s' through '%s'", s.backingIndex, s.cachingIndex)
s.Equal(exp, s.i.String())
}

func (s *CacheTestSuite) TestContains() {
l := []string{"a", "bee", "cee"}

s.True(contains(l, "bee"))
s.False(contains(l, "monkey"))
}

func (s *CacheTestSuite) TestMakeCachingProperties() {
res := s.i.makeCachingProperties(&props)

s.NotNil(res)

s.Len(res, 3)

s.Contains(res, "ValOne")
s.Contains(res, "ValThree")
s.Contains(res, "ValFour")
s.IsType(res, &cacheStruct{})

s.Equal(props.ValOne, res["ValOne"])
s.Equal(props.ValThree, res["ValThree"])
s.Equal(props.ValFour, res["ValFour"])
cacheres := res.(*cacheStruct)
s.Equal(props.ValOne, *cacheres.ValOne)
s.Equal(props.ValThree, cacheres.ValThree)
s.Equal(props.ValFour, cacheres.ValFour)
}

func (s *CacheTestSuite) TestIndexSuccess() {
s.backingIndex.On("Index", mock.Anything, testID, &props).Return(nil).Once()
s.cachingIndex.On("Index", mock.Anything, testID, cachedProps).Return(nil).Once()
s.cachingIndex.On("Index", mock.Anything, testID, &cachedProps).Return(nil).Once()

err := s.i.Index(s.ctx, testID, &props)
s.NoError(err)
Expand All @@ -124,7 +117,7 @@ func (s *CacheTestSuite) TestIndexBackingFail() {

func (s *CacheTestSuite) TestIndexCachingFail() {
s.backingIndex.On("Index", mock.Anything, testID, &props).Return(nil).Once()
s.cachingIndex.On("Index", mock.Anything, testID, cachedProps).Return(testErr).Once()
s.cachingIndex.On("Index", mock.Anything, testID, &cachedProps).Return(testErr).Once()

err := s.i.Index(s.ctx, testID, &props)
s.Error(err)
Expand All @@ -134,15 +127,15 @@ func (s *CacheTestSuite) TestIndexCachingFail() {

func (s *CacheTestSuite) TestUpdateSuccess() {
s.backingIndex.On("Update", mock.Anything, testID, &props).Return(nil).Once()
s.cachingIndex.On("Update", mock.Anything, testID, cachedProps).Return(nil).Once()
s.cachingIndex.On("Update", mock.Anything, testID, &cachedProps).Return(nil).Once()

err := s.i.Update(s.ctx, testID, &props)
s.NoError(err)
}

func (s *CacheTestSuite) TestUpdateBackingFail() {
s.backingIndex.On("Update", mock.Anything, testID, &props).Return(testErr).Once()
s.cachingIndex.On("Update", mock.Anything, testID, cachedProps).Return(nil).Once()
s.cachingIndex.On("Update", mock.Anything, testID, &cachedProps).Return(nil).Once()

err := s.i.Update(s.ctx, testID, &props)
s.Error(err)
Expand All @@ -151,7 +144,7 @@ func (s *CacheTestSuite) TestUpdateBackingFail() {
}

func (s *CacheTestSuite) TestUpdateCachingFail() {
s.cachingIndex.On("Update", mock.Anything, testID, cachedProps).Return(testErr).Once()
s.cachingIndex.On("Update", mock.Anything, testID, &cachedProps).Return(testErr).Once()

err := s.i.Update(s.ctx, testID, &props)
s.Error(err)
Expand Down Expand Up @@ -210,7 +203,7 @@ func (s *CacheTestSuite) TestGetBackingSuccess() {
s.backingIndex.On("Get", mock.Anything, testID, &data, mock.Anything).Return(true, nil).Once()

// If an item is not in the cache but is found in the backing, it will be added to the cache.
s.cachingIndex.On("Index", mock.Anything, testID, emptyCachedProps).Return(nil).Once()
s.cachingIndex.On("Index", mock.Anything, testID, &emptyCachedProps).Return(nil).Once()

found, err := s.i.Get(s.ctx, testID, &data)
s.True(found)
Expand All @@ -224,7 +217,7 @@ func (s *CacheTestSuite) TestGetCacheIndexFail() {
s.backingIndex.On("Get", mock.Anything, testID, &data, mock.Anything).Return(true, nil).Once()

// When there is an error adding things to the cache, we should still get results
s.cachingIndex.On("Index", mock.Anything, testID, emptyCachedProps).Return(testErr).Once()
s.cachingIndex.On("Index", mock.Anything, testID, &emptyCachedProps).Return(testErr).Once()

found, err := s.i.Get(s.ctx, testID, &data)
s.True(found)
Expand All @@ -241,7 +234,7 @@ func (s *CacheTestSuite) TestGetCacheFail() {
s.backingIndex.On("Get", mock.Anything, testID, &data, mock.Anything).Return(true, nil).Once()

// We will still *try* to index it.
s.cachingIndex.On("Index", mock.Anything, testID, emptyCachedProps).Return(nil).Once()
s.cachingIndex.On("Index", mock.Anything, testID, &emptyCachedProps).Return(nil).Once()

found, err := s.i.Get(s.ctx, testID, &data)
s.True(found)
Expand Down

0 comments on commit d211ca6

Please sign in to comment.