Skip to content

Commit

Permalink
gh-1285 Clean up deleted doc ids from inverted indices periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
antas-marcin committed Dec 1, 2020
1 parent 32017f2 commit cb1d693
Show file tree
Hide file tree
Showing 8 changed files with 799 additions and 35 deletions.
5 changes: 0 additions & 5 deletions adapters/repos/db/docid/deleted_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ func (t *InMemDeletedTracker) Remove(id uint32) {
delete(t.ids, id)
}

// TODO: @Marcin use this method to get all the IDs which should be cleaned up
//
// GetAll is a thread-safe way to retrieve all entries, it uses a ReadLock for
// concurrent reading
func (t *InMemDeletedTracker) GetAll() []uint32 {
Expand All @@ -80,9 +78,6 @@ func (t *InMemDeletedTracker) GetAll() []uint32 {
return out
}

// TODO: @Marcin use this method to ultimately remove the docs ids you clenaed
// up, so the in-mem model doesn't keep on growing forever
//
// BulkRemove is a thread-safe way to remove multiple ids, it locks only once,
// for the entire duration of the deletion
func (t *InMemDeletedTracker) BulkRemove(ids []uint32) {
Expand Down
15 changes: 15 additions & 0 deletions adapters/repos/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ func NewIndex(config IndexConfig, sg schemaUC.SchemaGetter,
}

index.Shards["single"] = singleShard

err = index.startPeriodicCleanup()
if err != nil {
return nil, errors.Wrapf(err, "start periodic index %s cleanup", index.ID())
}

return index, nil
}

Expand Down Expand Up @@ -248,3 +254,12 @@ func (i *Index) drop() error {
}
return nil
}

func (i *Index) startPeriodicCleanup() error {
shard := i.Shards["single"]
err := shard.startPeriodicCleanup()
if err != nil {
return errors.Wrapf(err, "shard periodic cleanup %s", shard.ID())
}
return nil
}
97 changes: 97 additions & 0 deletions adapters/repos/db/inverted/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package inverted

import (
"bytes"
"encoding/binary"
"fmt"

"github.com/boltdb/bolt"
"github.com/pkg/errors"
"github.com/semi-technologies/weaviate/adapters/repos/db/helpers"
"github.com/semi-technologies/weaviate/entities/models"
"github.com/semi-technologies/weaviate/entities/schema"
)

type deleteFn func(b *bolt.Bucket, item Countable, docIDs []uint32, hasFrequency bool) error

type Cleaner struct {
db *bolt.DB
class *models.Class
deletedDocIDs []uint32
deleteFn deleteFn
}

func NewCleaner(db *bolt.DB, class *models.Class, deletedDocIDs []uint32, deleteFn deleteFn) *Cleaner {
return &Cleaner{db, class, deletedDocIDs, deleteFn}
}

func (c *Cleaner) getDocumentKey(documentID uint32) []byte {
keyBuf := bytes.NewBuffer(make([]byte, 4))
binary.Write(keyBuf, binary.LittleEndian, &documentID)
key := keyBuf.Bytes()
return key
}

func (c *Cleaner) propHasFrequency(p *models.Property) bool {
for i := range p.DataType {
if schema.DataType(p.DataType[i]) == schema.DataTypeString || schema.DataType(p.DataType[i]) == schema.DataTypeText {
return true
}
}
return false
}

func (c *Cleaner) cleanupProperty(tx *bolt.Tx, p *models.Property) error {
hasFrequency := c.propHasFrequency(p)
id := helpers.BucketFromPropName(p.Name)
propsBucket := tx.Bucket(id)
if propsBucket == nil {
return nil
}
err := propsBucket.ForEach(func(item, data []byte) error {
return c.deleteFn(propsBucket, Countable{Data: item}, c.deletedDocIDs, hasFrequency)
})
if err != nil {
return errors.Wrapf(err, "cleanup property %s row", p.Name)
}
return nil
}

func (c *Cleaner) deleteDocument(tx *bolt.Tx, documentID uint32) bool {
key := c.getDocumentKey(documentID)
docsBucket := tx.Bucket(helpers.DocIDBucket)
if docsBucket == nil {
return false
}
err := docsBucket.Delete(key)
if err != nil {
return false
}
fmt.Printf("deleted document: %v\n", documentID)
return true
}

// Cleanup cleans up properties for given documents
func (c *Cleaner) Cleanup() ([]uint32, error) {
performedDeletion := make([]uint32, 0)
err := c.db.Update(func(tx *bolt.Tx) error {
// cleanp properties
for _, p := range c.class.Properties {
err := c.cleanupProperty(tx, p)
if err != nil {
return err
}
}
for _, documentID := range c.deletedDocIDs {
// delete document
if c.deleteDocument(tx, documentID) {
performedDeletion = append(performedDeletion, documentID)
}
}
return nil
})
if err != nil {
return performedDeletion, err
}
return performedDeletion, nil
}
54 changes: 53 additions & 1 deletion adapters/repos/db/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package db

import (
"context"
"encoding/binary"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -43,6 +44,7 @@ type Shard struct {
metrics *Metrics
propertyIndices propertyspecific.Indices
deletedDocIDs *docid.InMemDeletedTracker
cleanupInterval time.Duration
}

func NewShard(shardName string, index *Index) (*Shard, error) {
Expand All @@ -52,6 +54,7 @@ func NewShard(shardName string, index *Index) (*Shard, error) {
invertedRowCache: inverted.NewRowCacher(50 * 1024 * 1024),
metrics: NewMetrics(index.logger),
deletedDocIDs: docid.NewInMemDeletedTracker(),
cleanupInterval: 60 * time.Second,
}

vi, err := hnsw.New(hnsw.Config{
Expand Down Expand Up @@ -89,7 +92,9 @@ func NewShard(shardName string, index *Index) (*Shard, error) {
return nil, errors.Wrapf(err, "init shard %q: init per property indices", s.ID())
}

// TODO: @Marcin init doc id deleted tracker with deleted ids from db
if err := s.findDeletedDocs(); err != nil {
return nil, errors.Wrapf(err, "init shard %q: find deleted documents", s.ID())
}

return s, nil
}
Expand Down Expand Up @@ -191,3 +196,50 @@ func (s *Shard) addProperty(ctx context.Context, prop *models.Property) error {

return nil
}

func (s *Shard) findDeletedDocs() error {
err := s.db.View(func(tx *bolt.Tx) error {
docIDs := tx.Bucket(helpers.DocIDBucket)
if docIDs == nil {
return nil
}

err := docIDs.ForEach(func(documentID, v []byte) error {
lookup, err := docid.LookupFromBinary(v)
if err != nil {
return errors.Wrap(err, "lookup from binary")
}
if lookup.Deleted {
s.deletedDocIDs.Add(binary.LittleEndian.Uint32(documentID[4:]))
}
return nil
})
if err != nil {
return errors.Wrap(err, "search for deleted documents")
}

return nil
})
if err != nil {
return errors.Wrap(err, "find deleted ids")
}

return nil
}

func (s *Shard) startPeriodicCleanup() error {
t := time.Tick(s.cleanupInterval)
batchCleanupInterval := 5 * time.Second
batchSize := 10
go func(batchSize int, batchCleanupInterval time.Duration) {
for {
<-t
err := s.periodicCleanup(batchSize, batchCleanupInterval)
if err != nil {
fmt.Printf("periodic cleanup error: %v", err)
}
}
}(batchSize, batchCleanupInterval)

return nil
}
67 changes: 52 additions & 15 deletions adapters/repos/db/shard_write_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ package db

import (
"context"
"time"

"github.com/boltdb/bolt"
"github.com/go-openapi/strfmt"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/semi-technologies/weaviate/adapters/repos/db/docid"
"github.com/semi-technologies/weaviate/adapters/repos/db/helpers"
"github.com/semi-technologies/weaviate/adapters/repos/db/inverted"
"github.com/semi-technologies/weaviate/adapters/repos/db/storobj"
"github.com/semi-technologies/weaviate/entities/schema"
)

func (s *Shard) deleteObject(ctx context.Context, id strfmt.UUID) error {
Expand All @@ -45,21 +48,6 @@ func (s *Shard) deleteObject(ctx context.Context, id strfmt.UUID) error {
return errors.Wrap(err, "get existing doc id from object binary")
}

oldObj, err := storobj.FromBinary(existing)
if err != nil {
return errors.Wrap(err, "unmarshal existing doc")
}

invertedPointersToDelete, err := s.analyzeObject(oldObj)
if err != nil {
return errors.Wrap(err, "analyze object")
}

err = s.deleteFromInvertedIndices(tx, invertedPointersToDelete, docID)
if err != nil {
return errors.Wrap(err, "delete pointers from inverted index")
}

err = bucket.Delete(idBytes)
if err != nil {
return errors.Wrap(err, "delete object from bucket")
Expand All @@ -86,6 +74,55 @@ func (s *Shard) deleteObject(ctx context.Context, id strfmt.UUID) error {
return nil
}

func (s *Shard) periodicCleanup(batchSize int, batchCleanupInterval time.Duration) error {
batchCleanupTicker := time.Tick(batchCleanupInterval)
docIDs := s.deletedDocIDs.GetAll()
if len(docIDs) > 0 {
batches := (len(docIDs) / batchSize)
if len(docIDs)%batchSize > 0 {
batches = batches + 1
}
for indx := 0; indx < batches; indx++ {
start := indx * batchSize
end := start + batchSize
if end > len(docIDs) {
end = len(docIDs)
}
err := s.performCleanup(docIDs[start:end])
if err != nil {
return err
}
<-batchCleanupTicker
}
}
return nil
}

func (s *Shard) performCleanup(deletedDocIDs []uint32) error {
className := s.index.Config.ClassName
schemaModel := s.index.getSchema.GetSchemaSkipAuth().Things
class, err := schema.GetClassByName(schemaModel, className.String())
if err != nil {
return errors.Wrapf(err, "get class %s", className)
}

cleaner := inverted.NewCleaner(
s.db,
class,
deletedDocIDs,
func(b *bolt.Bucket, item inverted.Countable, docIDs []uint32, hasFrequency bool) error {
return s.tryDeleteFromInvertedIndicesProp(b, item, docIDs, hasFrequency)
},
)
deletedIDs, err := cleaner.Cleanup()
if err != nil {
return errors.Wrapf(err, "perform cleanup %v", deletedDocIDs)
}

s.deletedDocIDs.BulkRemove(deletedIDs)
return nil
}

// func (s *Shard) deleteIndexIDLookup(tx *bolt.Tx, docID uint32) error {
// keyBuf := bytes.NewBuffer(make([]byte, 4))
// binary.Write(keyBuf, binary.LittleEndian, &docID)
Expand Down

0 comments on commit cb1d693

Please sign in to comment.