Skip to content

Commit

Permalink
Integrating caching and Redis index.
Browse files Browse the repository at this point in the history
  • Loading branch information
dokterbob committed Sep 29, 2022
1 parent c171676 commit e74b324
Show file tree
Hide file tree
Showing 24 changed files with 307 additions and 108 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ For discussing and suggesting features, look at the [issues](https://github.com/
* RabbitMQ / AMQP server
* NodeJS 9.x
* IPFS 0.7
* Redis

## Internal dependencies

Expand Down
4 changes: 2 additions & 2 deletions commands/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ func Crawl(ctx context.Context, cfg *config.Config) error {
ctx, span := i.Tracer.Start(ctx, "commands.Crawl")
defer span.End()

c, err := pool.New(ctx, cfg, i)
pool, err := pool.New(ctx, cfg, i)
if err != nil {
return err
}

c.Start(ctx)
pool.Start(ctx)

// Context closure or panic is the only way to stop crawling
<-ctx.Done()
Expand Down
67 changes: 42 additions & 25 deletions components/index/cache/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ package cache
import (
"context"
"fmt"
"log"
"reflect"

"github.com/ipfs-search/ipfs-search/components/index"
"github.com/ipfs-search/ipfs-search/instr"
)

const debug bool = false

// Index wraps a backing index and caches it using another index.
type Index struct {
cfg *Config
Expand Down Expand Up @@ -70,28 +73,15 @@ func (i *Index) makeCachingProperties(properties interface{}) map[string]interfa
dst := make(map[string]interface{}, len(i.cfg.CachingFields))
fields := reflect.VisibleFields(valueof.Type())

for k, field := range fields {
for _, field := range fields {
if contains(i.cfg.CachingFields, field.Name) {
dst[field.Name] = valueof.Field(k).Interface()
}
}
value := valueof.FieldByName(field.Name).Interface()

if len(dst) == 0 {
panic("no cachable properties found")
}

return dst
}

func (i *Index) allFieldsCachable(fields []string) bool {
for _, field := range fields {
exists := contains(i.cfg.CachingFields, field)
if !exists {
return false
dst[field.Name] = value
}
}

return true
return dst
}

func (i *Index) cacheGet(ctx context.Context, id string, dst interface{}, fields ...string) (bool, error) {
Expand All @@ -100,11 +90,12 @@ func (i *Index) cacheGet(ctx context.Context, id string, dst interface{}, fields
err error
)

if i.allFieldsCachable(fields) {
if found, err = i.cachingIndex.Get(ctx, id, dst, fields...); err != nil {
err = ErrCache{err, fmt.Sprintf("cache error deleting: %e", err)}
}
} else {
// Ignore fields for now; the OpenSearch API uses the json field names
// Ref: https://github.com/ipfs-search/ipfs-search/issues/234

if found, err = i.cachingIndex.Get(ctx, id, dst, fields...); err != nil {
// Ignore context closed
err = ErrCache{err, fmt.Sprintf("cache error in get: %s", err.Error())}
}

return found, err
Expand All @@ -115,8 +106,12 @@ type indexWrite func(context.Context, string, interface{}) error
func (i *Index) cacheWrite(ctx context.Context, id string, properties interface{}, f indexWrite) error {
cachingProperties := i.makeCachingProperties(properties)

if debug {
log.Printf("cache: write %s", id)
}

if err := f(ctx, id, cachingProperties); err != nil {
return ErrCache{err, fmt.Sprintf("cache error in '%v': %e", f, err)}
return ErrCache{err, fmt.Sprintf("cache error in '%v': %s", f, err.Error())}
}

return nil
Expand Down Expand Up @@ -160,9 +155,16 @@ func (i *Index) Delete(ctx context.Context, id string) error {
ctx, span := i.Tracer.Start(ctx, "index.cache.Delete")
defer span.End()

if debug {
log.Printf("cache: delete %s", id)
}

// Delete cache first; maintain consistency as our backing index is the source of truth.
if err := i.cachingIndex.Delete(ctx, id); err != nil {
return ErrCache{err, "error deleting cache"}
return ErrCache{
err,
fmt.Sprintf("error deleting cache: %s", err.Error()),
}
}

if err := i.backingIndex.Delete(ctx, id); err != nil {
Expand All @@ -173,7 +175,6 @@ func (i *Index) Delete(ctx context.Context, id string) error {
}

// Get retreives *all* fields from document with `id` from the cache, falling back to the backing index.
// `fields` parameter is used to determine whether cache can be used based on configured CachingFields.
// Returns: (exists, err) where err is of type ErrCache if there was (only) an error from the
// caching index.
func (i *Index) Get(ctx context.Context, id string, dst interface{}, fields ...string) (bool, error) {
Expand All @@ -186,21 +187,37 @@ func (i *Index) Get(ctx context.Context, id string, dst interface{}, fields ...s
)

if found, err = i.cacheGet(ctx, id, dst, fields...); found {
// if debug {
log.Printf("cache: hit %s", id)
// }

return found, err
}

if debug {
log.Printf("cache: miss %s", id)
}

var backingErr error
if found, backingErr = i.backingIndex.Get(ctx, id, dst, fields...); backingErr != nil {
// Backing errors overwrite cache errors.
err = backingErr
}

if found {
if debug {
log.Printf("backing: hit %s", id)
}

if indexErr := i.cacheWrite(ctx, id, dst, i.cachingIndex.Index); indexErr != nil {
err = indexErr
}
}

if debug {
log.Printf("backing: miss %s", id)
}

return found, err
}

Expand Down
2 changes: 1 addition & 1 deletion components/index/cache/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *CacheTestSuite) TestString() {
s.Equal(exp, s.i.String())
}

func (s CacheTestSuite) TestContains() {
func (s *CacheTestSuite) TestContains() {
l := []string{"a", "bee", "cee"}

s.True(contains(l, "bee"))
Expand Down
23 changes: 19 additions & 4 deletions components/index/multiget.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@
package index

import (
// "log"
"context"
"errors"
"log"

"golang.org/x/sync/errgroup"
)

const debug bool = false

func contextDone(ctx context.Context, err error) bool {
ctxErr := ctx.Err()
if ctxErr != nil {
return errors.Is(err, ctxErr)
}

return false
}

// MultiGet returns `fields` for the first document with `id` from given `indexes`.
// When the document is not found (nil, nil) is returned.
func MultiGet(ctx context.Context, indexes []Index, id string, dst interface{}, fields ...string) (Index, error) {
Expand All @@ -20,11 +32,14 @@ func MultiGet(ctx context.Context, indexes []Index, id string, dst interface{},
i := i // https://go.dev/doc/faq#closures_and_goroutines

g.Go(func() error {
// log.Printf("MultiGet %s index %s", id, i)
if debug {
log.Printf("MultiGet %s index %s", id, i)
}

found, err := i.Get(groupCtx, id, dst, fields...)

if err != nil {
if err != nil && !contextDone(ctx, err) {
// Ignore context done errors if MultiGet context is canceled.
return err
}

Expand All @@ -33,7 +48,7 @@ func MultiGet(ctx context.Context, indexes []Index, id string, dst interface{},
case <-groupCtx.Done():
return nil
case foundIdx <- i:
cancel() // We're done
cancel() // Found, we're done.
}

}
Expand Down
5 changes: 5 additions & 0 deletions components/index/opensearch/bulkgetter/asyncgetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bulkgetter

import (
"context"
"fmt"
)

// GetRequest represents an item to GET.
Expand All @@ -11,6 +12,10 @@ type GetRequest struct {
Fields []string
}

func (r *GetRequest) String() string {
return fmt.Sprintf("index: %s, id: %s", r.Index, r.DocumentID)
}

// GetResponse represents the response from a GetRequest.
type GetResponse struct {
Found bool
Expand Down
20 changes: 17 additions & 3 deletions components/index/opensearch/bulkgetter/bulkgetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/opensearch-project/opensearch-go/v2"
)

const debug bool = false

// BulkGetter allows batching/bulk gets.
type BulkGetter struct {
cfg Config
Expand Down Expand Up @@ -80,22 +82,34 @@ func (bg *BulkGetter) processBatch(ctx context.Context) error {
}

func (bg *BulkGetter) populateBatch(ctx context.Context, queue <-chan reqresp) (*bulkRequest, error) {
// log.Println("Populating BulkGetter batch.")
if debug {
log.Println("Populating BulkGetter batch.")
}

b := newBulkRequest(ctx, bg.cfg.Client, bg.cfg.BatchSize)

for i := 0; i < bg.cfg.BatchSize; i++ {
select {
case <-ctx.Done():
if debug {
log.Printf("bulkgetter: context closed in populateBatch")
}
return b, ctx.Err()
case <-time.After(bg.cfg.BatchTimeout):
// log.Printf("Batch timeout, %d elements", len(b.rrs))
if debug {
log.Printf("bulkgetter: Batch timeout, %d elements", len(b.rrs))
}

return b, nil
case rr := <-queue:
// log.Printf("Batch add, %d elements", len(b.rrs))
if debug {
log.Printf("bulkgetter: Batch add, %d elements", len(b.rrs))
}

if err := b.add(rr); err != nil {
if debug {
log.Printf("bulkgetter: error %s", err.Error())
}
return b, err
}
}
Expand Down
Loading

0 comments on commit e74b324

Please sign in to comment.