diff --git a/adapters/repos/db/docid/deleted_tracker.go b/adapters/repos/db/docid/deleted_tracker.go index e3be31c7cf4..a6738ba7f0c 100644 --- a/adapters/repos/db/docid/deleted_tracker.go +++ b/adapters/repos/db/docid/deleted_tracker.go @@ -62,8 +62,6 @@ func (t *InMemDeletedTracker) Remove(id uint32) { delete(t.ids, id) } -// TODO: @Marcin use this method to get all the IDs which should be cleaned up -// // GetAll is a thread-safe way to retrieve all entries, it uses a ReadLock for // concurrent reading func (t *InMemDeletedTracker) GetAll() []uint32 { @@ -80,9 +78,6 @@ func (t *InMemDeletedTracker) GetAll() []uint32 { return out } -// TODO: @Marcin use this method to ultimately remove the docs ids you clenaed -// up, so the in-mem model doesn't keep on growing forever -// // BulkRemove is a thread-safe way to remove multiple ids, it locks only once, // for the entire duration of the deletion func (t *InMemDeletedTracker) BulkRemove(ids []uint32) { diff --git a/adapters/repos/db/index.go b/adapters/repos/db/index.go index 11e400de7c1..7d98ac934ee 100644 --- a/adapters/repos/db/index.go +++ b/adapters/repos/db/index.go @@ -66,6 +66,12 @@ func NewIndex(config IndexConfig, sg schemaUC.SchemaGetter, } index.Shards["single"] = singleShard + + err = index.startPeriodicCleanup() + if err != nil { + return nil, errors.Wrapf(err, "start periodic index %s cleanup", index.ID()) + } + return index, nil } @@ -248,3 +254,12 @@ func (i *Index) drop() error { } return nil } + +func (i *Index) startPeriodicCleanup() error { + shard := i.Shards["single"] + err := shard.startPeriodicCleanup() + if err != nil { + return errors.Wrapf(err, "shard periodic cleanup %s", shard.ID()) + } + return nil +} diff --git a/adapters/repos/db/inverted/cleaner.go b/adapters/repos/db/inverted/cleaner.go new file mode 100644 index 00000000000..055c6a7babb --- /dev/null +++ b/adapters/repos/db/inverted/cleaner.go @@ -0,0 +1,97 @@ +package inverted + +import ( + "bytes" + "encoding/binary" + "fmt" + + "github.com/boltdb/bolt" + "github.com/pkg/errors" + "github.com/semi-technologies/weaviate/adapters/repos/db/helpers" + "github.com/semi-technologies/weaviate/entities/models" + "github.com/semi-technologies/weaviate/entities/schema" +) + +type deleteFn func(b *bolt.Bucket, item Countable, docIDs []uint32, hasFrequency bool) error + +type Cleaner struct { + db *bolt.DB + class *models.Class + deletedDocIDs []uint32 + deleteFn deleteFn +} + +func NewCleaner(db *bolt.DB, class *models.Class, deletedDocIDs []uint32, deleteFn deleteFn) *Cleaner { + return &Cleaner{db, class, deletedDocIDs, deleteFn} +} + +func (c *Cleaner) getDocumentKey(documentID uint32) []byte { + keyBuf := bytes.NewBuffer(make([]byte, 4)) + binary.Write(keyBuf, binary.LittleEndian, &documentID) + key := keyBuf.Bytes() + return key +} + +func (c *Cleaner) propHasFrequency(p *models.Property) bool { + for i := range p.DataType { + if schema.DataType(p.DataType[i]) == schema.DataTypeString || schema.DataType(p.DataType[i]) == schema.DataTypeText { + return true + } + } + return false +} + +func (c *Cleaner) cleanupProperty(tx *bolt.Tx, p *models.Property) error { + hasFrequency := c.propHasFrequency(p) + id := helpers.BucketFromPropName(p.Name) + propsBucket := tx.Bucket(id) + if propsBucket == nil { + return nil + } + err := propsBucket.ForEach(func(item, data []byte) error { + return c.deleteFn(propsBucket, Countable{Data: item}, c.deletedDocIDs, hasFrequency) + }) + if err != nil { + return errors.Wrapf(err, "cleanup property %s row", p.Name) + } + return nil +} + +func (c *Cleaner) deleteDocument(tx *bolt.Tx, documentID uint32) bool { + key := c.getDocumentKey(documentID) + docsBucket := tx.Bucket(helpers.DocIDBucket) + if docsBucket == nil { + return false + } + err := docsBucket.Delete(key) + if err != nil { + return false + } + fmt.Printf("deleted document: %v\n", documentID) + return true +} + +// Cleanup cleans up properties for given documents +func (c *Cleaner) Cleanup() ([]uint32, error) { + performedDeletion := make([]uint32, 0) + err := c.db.Update(func(tx *bolt.Tx) error { + // cleanp properties + for _, p := range c.class.Properties { + err := c.cleanupProperty(tx, p) + if err != nil { + return err + } + } + for _, documentID := range c.deletedDocIDs { + // delete document + if c.deleteDocument(tx, documentID) { + performedDeletion = append(performedDeletion, documentID) + } + } + return nil + }) + if err != nil { + return performedDeletion, err + } + return performedDeletion, nil +} diff --git a/adapters/repos/db/shard.go b/adapters/repos/db/shard.go index c2a33733808..db2c5d1599a 100644 --- a/adapters/repos/db/shard.go +++ b/adapters/repos/db/shard.go @@ -13,6 +13,7 @@ package db import ( "context" + "encoding/binary" "fmt" "os" "time" @@ -43,6 +44,7 @@ type Shard struct { metrics *Metrics propertyIndices propertyspecific.Indices deletedDocIDs *docid.InMemDeletedTracker + cleanupInterval time.Duration } func NewShard(shardName string, index *Index) (*Shard, error) { @@ -52,6 +54,7 @@ func NewShard(shardName string, index *Index) (*Shard, error) { invertedRowCache: inverted.NewRowCacher(50 * 1024 * 1024), metrics: NewMetrics(index.logger), deletedDocIDs: docid.NewInMemDeletedTracker(), + cleanupInterval: 60 * time.Second, } vi, err := hnsw.New(hnsw.Config{ @@ -89,7 +92,9 @@ func NewShard(shardName string, index *Index) (*Shard, error) { return nil, errors.Wrapf(err, "init shard %q: init per property indices", s.ID()) } - // TODO: @Marcin init doc id deleted tracker with deleted ids from db + if err := s.findDeletedDocs(); err != nil { + return nil, errors.Wrapf(err, "init shard %q: find deleted documents", s.ID()) + } return s, nil } @@ -191,3 +196,50 @@ func (s *Shard) addProperty(ctx context.Context, prop *models.Property) error { return nil } + +func (s *Shard) findDeletedDocs() error { + err := s.db.View(func(tx *bolt.Tx) error { + docIDs := tx.Bucket(helpers.DocIDBucket) + if docIDs == nil { + return nil + } + + err := docIDs.ForEach(func(documentID, v []byte) error { + lookup, err := docid.LookupFromBinary(v) + if err != nil { + return errors.Wrap(err, "lookup from binary") + } + if lookup.Deleted { + s.deletedDocIDs.Add(binary.LittleEndian.Uint32(documentID[4:])) + } + return nil + }) + if err != nil { + return errors.Wrap(err, "search for deleted documents") + } + + return nil + }) + if err != nil { + return errors.Wrap(err, "find deleted ids") + } + + return nil +} + +func (s *Shard) startPeriodicCleanup() error { + t := time.Tick(s.cleanupInterval) + batchCleanupInterval := 5 * time.Second + batchSize := 10 + go func(batchSize int, batchCleanupInterval time.Duration) { + for { + <-t + err := s.periodicCleanup(batchSize, batchCleanupInterval) + if err != nil { + fmt.Printf("periodic cleanup error: %v", err) + } + } + }(batchSize, batchCleanupInterval) + + return nil +} diff --git a/adapters/repos/db/shard_write_delete.go b/adapters/repos/db/shard_write_delete.go index 99e64e0e29c..ccd7438c4f7 100644 --- a/adapters/repos/db/shard_write_delete.go +++ b/adapters/repos/db/shard_write_delete.go @@ -13,6 +13,7 @@ package db import ( "context" + "time" "github.com/boltdb/bolt" "github.com/go-openapi/strfmt" @@ -20,7 +21,9 @@ import ( "github.com/pkg/errors" "github.com/semi-technologies/weaviate/adapters/repos/db/docid" "github.com/semi-technologies/weaviate/adapters/repos/db/helpers" + "github.com/semi-technologies/weaviate/adapters/repos/db/inverted" "github.com/semi-technologies/weaviate/adapters/repos/db/storobj" + "github.com/semi-technologies/weaviate/entities/schema" ) func (s *Shard) deleteObject(ctx context.Context, id strfmt.UUID) error { @@ -45,21 +48,6 @@ func (s *Shard) deleteObject(ctx context.Context, id strfmt.UUID) error { return errors.Wrap(err, "get existing doc id from object binary") } - oldObj, err := storobj.FromBinary(existing) - if err != nil { - return errors.Wrap(err, "unmarshal existing doc") - } - - invertedPointersToDelete, err := s.analyzeObject(oldObj) - if err != nil { - return errors.Wrap(err, "analyze object") - } - - err = s.deleteFromInvertedIndices(tx, invertedPointersToDelete, docID) - if err != nil { - return errors.Wrap(err, "delete pointers from inverted index") - } - err = bucket.Delete(idBytes) if err != nil { return errors.Wrap(err, "delete object from bucket") @@ -86,6 +74,55 @@ func (s *Shard) deleteObject(ctx context.Context, id strfmt.UUID) error { return nil } +func (s *Shard) periodicCleanup(batchSize int, batchCleanupInterval time.Duration) error { + batchCleanupTicker := time.Tick(batchCleanupInterval) + docIDs := s.deletedDocIDs.GetAll() + if len(docIDs) > 0 { + batches := (len(docIDs) / batchSize) + if len(docIDs)%batchSize > 0 { + batches = batches + 1 + } + for indx := 0; indx < batches; indx++ { + start := indx * batchSize + end := start + batchSize + if end > len(docIDs) { + end = len(docIDs) + } + err := s.performCleanup(docIDs[start:end]) + if err != nil { + return err + } + <-batchCleanupTicker + } + } + return nil +} + +func (s *Shard) performCleanup(deletedDocIDs []uint32) error { + className := s.index.Config.ClassName + schemaModel := s.index.getSchema.GetSchemaSkipAuth().Things + class, err := schema.GetClassByName(schemaModel, className.String()) + if err != nil { + return errors.Wrapf(err, "get class %s", className) + } + + cleaner := inverted.NewCleaner( + s.db, + class, + deletedDocIDs, + func(b *bolt.Bucket, item inverted.Countable, docIDs []uint32, hasFrequency bool) error { + return s.tryDeleteFromInvertedIndicesProp(b, item, docIDs, hasFrequency) + }, + ) + deletedIDs, err := cleaner.Cleanup() + if err != nil { + return errors.Wrapf(err, "perform cleanup %v", deletedDocIDs) + } + + s.deletedDocIDs.BulkRemove(deletedIDs) + return nil +} + // func (s *Shard) deleteIndexIDLookup(tx *bolt.Tx, docID uint32) error { // keyBuf := bytes.NewBuffer(make([]byte, 4)) // binary.Write(keyBuf, binary.LittleEndian, &docID) diff --git a/adapters/repos/db/shard_write_delete_integration_test.go b/adapters/repos/db/shard_write_delete_integration_test.go new file mode 100644 index 00000000000..f71993d6229 --- /dev/null +++ b/adapters/repos/db/shard_write_delete_integration_test.go @@ -0,0 +1,174 @@ +package db + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "math/rand" + "os" + "testing" + "time" + + "github.com/boltdb/bolt" + "github.com/go-openapi/strfmt" + "github.com/google/uuid" + "github.com/semi-technologies/weaviate/adapters/repos/db/helpers" + "github.com/semi-technologies/weaviate/adapters/repos/db/storobj" + "github.com/semi-technologies/weaviate/entities/models" + "github.com/semi-technologies/weaviate/entities/schema" + "github.com/semi-technologies/weaviate/entities/schema/kind" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPerformCleanupIndexWithFrequencyProp(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + dirName := fmt.Sprintf("./testdata/%d", rand.Intn(10000000)) + os.MkdirAll(dirName, 0o777) + defer func() { + err := os.RemoveAll(dirName) + fmt.Println(err) + }() + logger := logrus.New() + testClassName := "deletetest" + testPropName := "name" + testClass := &models.Class{ + Class: testClassName, + Properties: []*models.Property{ + &models.Property{ + Name: testPropName, + DataType: []string{"string"}, + }, + }, + } + fakeSchema := schema.Schema{ + Things: &models.Schema{ + Classes: []*models.Class{ + testClass, + }, + }, + } + // create index with data + index, err := NewIndex(IndexConfig{ + RootPath: dirName, + Kind: kind.Thing, + ClassName: schema.ClassName(testClassName), + }, &fakeSchemaGetter{schema: fakeSchema}, nil, logger) + require.Nil(t, err) + shard, err := NewShard("extend_invert_benchmark", index) + require.Nil(t, err) + + var productsIds = []strfmt.UUID{ + "1295c052-263d-4aae-99dd-920c5a370d06", + "1295c052-263d-4aae-99dd-920c5a370d07", + } + + products := []map[string]interface{}{ + {"name": "one"}, + {"name": "two one"}, + } + + err = shard.addProperty(context.TODO(), &models.Property{ + Name: "uuid", + DataType: []string{"string"}, + }) + require.Nil(t, err) + + err = shard.addProperty(context.TODO(), &models.Property{ + Name: testPropName, + DataType: []string{"string"}, + }) + require.Nil(t, err) + + for i, p := range products { + thing := models.Thing{ + Class: testClass.Class, + ID: productsIds[i], + Schema: p, + } + + err := shard.putObject(context.TODO(), storobj.FromThing(&thing, []float32{0.1, 0.2, 0.01, 0.2})) + require.Nil(t, err) + } + + productToDeleteID := productsIds[1] + existsBeforeDelete, err := shard.exists(context.TODO(), strfmt.UUID(productToDeleteID)) + require.Nil(t, err) + + idBytes1, err := uuid.MustParse(strfmt.UUID(productsIds[0]).String()).MarshalBinary() + require.Nil(t, err) + + idBytes2, err := uuid.MustParse(strfmt.UUID(productsIds[1]).String()).MarshalBinary() + require.Nil(t, err) + + var beforeOne []byte + var beforeTwo []byte + var beforeObjID1 []byte + var beforeObjID2 []byte + err = shard.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(helpers.BucketFromPropName(testPropName)) + beforeOne = bucket.Get([]byte("one")) + beforeTwo = bucket.Get([]byte("two")) + bucket = tx.Bucket(helpers.ObjectsBucket) + beforeObjID1 = bucket.Get([]byte(idBytes1)) + bucket = tx.Bucket(helpers.ObjectsBucket) + beforeObjID2 = bucket.Get([]byte(idBytes2)) + return nil + }) + require.Nil(t, err) + + err = shard.deleteObject(context.TODO(), strfmt.UUID(productToDeleteID)) + require.Nil(t, err) + + existsAfterDelete, err := shard.exists(context.TODO(), strfmt.UUID(productToDeleteID)) + require.Nil(t, err) + + beforeDeletedIDs := shard.deletedDocIDs.GetAll() + + err = shard.periodicCleanup(10, 1*time.Millisecond) + require.Nil(t, err) + + afterDeletedIDs := shard.deletedDocIDs.GetAll() + + var afterOne []byte + var afterTwo []byte + var afterObjID1 []byte + var afterObjID2 []byte + err = shard.db.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket(helpers.BucketFromPropName(testPropName)) + afterOne = bucket.Get([]byte("one")) + afterTwo = bucket.Get([]byte("two")) + bucket = tx.Bucket(helpers.ObjectsBucket) + afterObjID1 = bucket.Get([]byte(idBytes1)) + bucket = tx.Bucket(helpers.ObjectsBucket) + afterObjID2 = bucket.Get([]byte(idBytes2)) + return nil + }) + require.Nil(t, err) + + var updatedDocOneCount uint32 + r := bytes.NewReader(afterOne[4:]) + err = binary.Read(r, binary.LittleEndian, &updatedDocOneCount) + require.Nil(t, err) + + var updatedDocTwoCount uint32 + r = bytes.NewReader(afterTwo[4:]) + err = binary.Read(r, binary.LittleEndian, &updatedDocTwoCount) + require.Nil(t, err) + + assert.Equal(t, true, existsBeforeDelete) + assert.Equal(t, false, existsAfterDelete) + assert.Equal(t, 1, len(beforeDeletedIDs)) + assert.Equal(t, 0, len(afterDeletedIDs)) + assert.Equal(t, len(beforeObjID1), len(afterObjID1)) + assert.NotEqual(t, len(beforeObjID2), len(afterObjID2)) + assert.Equal(t, 0, len(afterObjID2)) + assert.Equal(t, 2, len(beforeOne[8:])/8) + assert.Equal(t, 1, len(afterOne[8:])/8) + assert.Equal(t, 1, len(beforeTwo[8:])/8) + assert.Equal(t, 0, len(afterTwo[8:])) + assert.Equal(t, uint32(1), updatedDocOneCount) + assert.Equal(t, uint32(0), updatedDocTwoCount) +} diff --git a/adapters/repos/db/shard_write_inverted.go b/adapters/repos/db/shard_write_inverted.go index 05c9049b4ad..d2cf6bb9b3b 100644 --- a/adapters/repos/db/shard_write_inverted.go +++ b/adapters/repos/db/shard_write_inverted.go @@ -82,27 +82,47 @@ func (s *Shard) extendInvertedIndices(tx *bolt.Tx, props []inverted.Property, return nil } -func (s *Shard) deleteFromInvertedIndices(tx *bolt.Tx, props []inverted.Property, - docID uint32) error { - for _, prop := range props { - b := tx.Bucket(helpers.BucketFromPropName(prop.Name)) - if b == nil { - return fmt.Errorf("no bucket for prop '%s' found", prop.Name) - } +func (s *Shard) tryDeleteFromInvertedIndicesProp(b *bolt.Bucket, + item inverted.Countable, docIDs []uint32, hasFrequency bool) error { + data := b.Get(item.Data) + if len(data) == 0 { + // we want to delete from an empty row. Nothing to do + return nil + } + + deletedDocIDs := map[uint32]struct{}{} + for _, id := range docIDs { + deletedDocIDs[id] = struct{}{} + } - for _, item := range prop.Items { - err := s.deleteFromInvertedIndicesProp(b, item, docID, prop.HasFrequency) - if err != nil { - return errors.Wrapf(err, "clean up prop %q", prop.Name) + performDelete := false + if len(data) > 11 { + propDocIDs := data[8:] + divider := 4 + if hasFrequency { + divider = 8 + } + numberOfPropDocIDs := len(propDocIDs) / divider + for i := 0; i < numberOfPropDocIDs; i++ { + indx := i * divider + propDocID := binary.LittleEndian.Uint32(propDocIDs[indx : indx+4]) + _, foundDeleted := deletedDocIDs[propDocID] + if foundDeleted { + performDelete = true + break } } } + if performDelete { + return s.deleteFromInvertedIndicesProp(b, item, deletedDocIDs, hasFrequency) + } + return nil } func (s *Shard) deleteFromInvertedIndicesProp(b *bolt.Bucket, - item inverted.Countable, docID uint32, hasFrequency bool) error { + item inverted.Countable, docIDs map[uint32]struct{}, hasFrequency bool) error { data := b.Get(item.Data) if len(data) == 0 { // we want to delete from an empty row. Nothing to do @@ -142,13 +162,14 @@ func (s *Shard) deleteFromInvertedIndicesProp(b *bolt.Bucket, } } - newDocCount++ - if nextDocID == docID { + _, isDeleted := docIDs[nextDocID] + if isDeleted { // we have found the one we want to delete, i.e. not copy into the // updated list continue } + newDocCount++ if _, err := newRow.Write(nextDocIDBytes); err != nil { return errors.Wrap(err, "write doc") } diff --git a/adapters/repos/db/shard_write_inverted_integration_test.go b/adapters/repos/db/shard_write_inverted_integration_test.go index 3becbcb460e..ff6749fc0dd 100644 --- a/adapters/repos/db/shard_write_inverted_integration_test.go +++ b/adapters/repos/db/shard_write_inverted_integration_test.go @@ -29,6 +29,16 @@ import ( "github.com/stretchr/testify/require" ) +func getDocumentFrequencyValue(id uint32, frequency []byte) []byte { + documentID := uint32(id) + keyBuf := bytes.NewBuffer(nil) //make([]byte, 4) + binary.Write(keyBuf, binary.LittleEndian, &documentID) + if frequency != nil { + binary.Write(keyBuf, binary.LittleEndian, &frequency) + } + return keyBuf.Bytes() +} + func TestExtendInvertedIndexWithFrequency(t *testing.T) { rand.Seed(time.Now().UnixNano()) dirName := fmt.Sprintf("./testdata/%d", rand.Intn(10000000)) @@ -213,3 +223,366 @@ func TestExtendInvertedIndexWithOutFrequency(t *testing.T) { assert.Equal(t, uint32(32), newDocID) } + +func TestCleanupInvertedIndexWithPropWithoutFrequency(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + dirName := fmt.Sprintf("./testdata/%d", rand.Intn(10000000)) + os.MkdirAll(dirName, 0o777) + defer func() { + err := os.RemoveAll(dirName) + fmt.Println(err) + }() + index, err := NewIndex(IndexConfig{ + RootPath: dirName, Kind: kind.Thing, ClassName: "Test", + }, &fakeSchemaGetter{}, nil, nil) + require.Nil(t, err) + shard, err := NewShard("extend_invert_benchmark", index) + require.Nil(t, err) + + documentID := uint32(15) + + prop := []byte("testprop") + var before []byte + + err = shard.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists([]byte("testbucket")) + if err != nil { + return err + } + + b := bytes.NewBuffer(nil) + + // checksum + _, err = b.Write([]uint8{0, 0, 0, 0}) + if err != nil { + return err + } + + fakeEntries := 2 + // doc count + count := uint32(fakeEntries) + err = binary.Write(b, binary.LittleEndian, &count) + if err != nil { + return err + } + + // doc id + _, err = b.Write(getDocumentFrequencyValue(10, nil)) + if err != nil { + return err + } + + // doc id + _, err = b.Write(getDocumentFrequencyValue(documentID, nil)) + if err != nil { + return err + } + + before = b.Bytes() + bucket.Put(prop, before) + return nil + }) + require.Nil(t, err) + + var after []byte + err = shard.db.Update(func(tx *bolt.Tx) error { + // before := time.Now() + bucket := tx.Bucket([]byte("testbucket")) + err := shard.tryDeleteFromInvertedIndicesProp(bucket, inverted.Countable{Data: prop}, []uint32{documentID}, false) + if err != nil { + return err + } + + after = bucket.Get(prop) + + return nil + }) + require.Nil(t, err) + + var updatedDocCount uint32 + r := bytes.NewReader(after[4:]) + err = binary.Read(r, binary.LittleEndian, &updatedDocCount) + require.Nil(t, err) + + afterDocIDs := after[8:] + expectedDocIDs := getDocumentFrequencyValue(10, nil) + + assert.Equal(t, uint32(1), updatedDocCount) + assert.Equal(t, expectedDocIDs, afterDocIDs) +} + +func TestCleanupInvertedIndexWithFrequencyProp(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + dirName := fmt.Sprintf("./testdata/%d", rand.Intn(10000000)) + os.MkdirAll(dirName, 0o777) + defer func() { + err := os.RemoveAll(dirName) + fmt.Println(err) + }() + index, err := NewIndex(IndexConfig{ + RootPath: dirName, Kind: kind.Thing, ClassName: "Test", + }, &fakeSchemaGetter{}, nil, nil) + require.Nil(t, err) + shard, err := NewShard("extend_invert_benchmark", index) + require.Nil(t, err) + + documentID := uint32(15) + frequency := []uint8{1, 2, 3, 4} + + prop := []byte("testprop") + var before []byte + + err = shard.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists([]byte("testbucket")) + if err != nil { + return err + } + + b := bytes.NewBuffer(nil) + + // checksum + _, err = b.Write([]uint8{0, 0, 0, 0}) + if err != nil { + return err + } + + fakeEntries := 3 + // doc count + count := uint32(fakeEntries) + err = binary.Write(b, binary.LittleEndian, &count) + if err != nil { + return err + } + + // doc id with frequency + _, err = b.Write(getDocumentFrequencyValue(10, frequency)) + if err != nil { + return err + } + + // doc id with frequency + _, err = b.Write(getDocumentFrequencyValue(documentID, frequency)) + if err != nil { + return err + } + + // doc id with frequency + _, err = b.Write(getDocumentFrequencyValue(11, frequency)) + if err != nil { + return err + } + + before = b.Bytes() + bucket.Put(prop, before) + return nil + }) + require.Nil(t, err) + + var after []byte + err = shard.db.Update(func(tx *bolt.Tx) error { + // before := time.Now() + bucket := tx.Bucket([]byte("testbucket")) + err := shard.tryDeleteFromInvertedIndicesProp(bucket, inverted.Countable{Data: prop}, []uint32{documentID}, true) + if err != nil { + return err + } + + after = bucket.Get(prop) + + return nil + }) + require.Nil(t, err) + + var updatedDocCount uint32 + r := bytes.NewReader(after[4:]) + err = binary.Read(r, binary.LittleEndian, &updatedDocCount) + require.Nil(t, err) + + afterDocIDs := after[8:] + expectedDocIDsBuffer := bytes.NewBuffer(nil) + binary.Write(expectedDocIDsBuffer, binary.LittleEndian, getDocumentFrequencyValue(10, frequency)) + binary.Write(expectedDocIDsBuffer, binary.LittleEndian, getDocumentFrequencyValue(11, frequency)) + expectedDocIDs := expectedDocIDsBuffer.Bytes() + + assert.Equal(t, uint32(2), updatedDocCount) + assert.Equal(t, expectedDocIDs, afterDocIDs) +} + +func TestCleanupInvertedIndexDeleteAllDocumentIDs(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + dirName := fmt.Sprintf("./testdata/%d", rand.Intn(10000000)) + os.MkdirAll(dirName, 0o777) + defer func() { + err := os.RemoveAll(dirName) + fmt.Println(err) + }() + index, err := NewIndex(IndexConfig{ + RootPath: dirName, Kind: kind.Thing, ClassName: "Test", + }, &fakeSchemaGetter{}, nil, nil) + require.Nil(t, err) + shard, err := NewShard("extend_invert_benchmark", index) + require.Nil(t, err) + + documentID1 := uint32(11) + documentID2 := uint32(15) + + prop := []byte("testprop") + var before []byte + + err = shard.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists([]byte("testbucket")) + if err != nil { + return err + } + + b := bytes.NewBuffer(nil) + + // checksum + _, err = b.Write([]uint8{0, 0, 0, 0}) + if err != nil { + return err + } + + fakeEntries := 2 + // doc count + count := uint32(fakeEntries) + err = binary.Write(b, binary.LittleEndian, &count) + if err != nil { + return err + } + + // doc id + _, err = b.Write(getDocumentFrequencyValue(documentID1, nil)) + if err != nil { + return err + } + + // doc id + _, err = b.Write(getDocumentFrequencyValue(documentID2, nil)) + if err != nil { + return err + } + + before = b.Bytes() + bucket.Put(prop, before) + return nil + }) + require.Nil(t, err) + + var after []byte + err = shard.db.Update(func(tx *bolt.Tx) error { + // before := time.Now() + bucket := tx.Bucket([]byte("testbucket")) + err := shard.tryDeleteFromInvertedIndicesProp(bucket, inverted.Countable{Data: prop}, []uint32{documentID1, documentID2}, false) + if err != nil { + return err + } + + after = bucket.Get(prop) + + return nil + }) + require.Nil(t, err) + + var updatedDocCount uint32 + r := bytes.NewReader(after[4:]) + err = binary.Read(r, binary.LittleEndian, &updatedDocCount) + require.Nil(t, err) + + afterDocIDs := after[8:] + + assert.Equal(t, uint32(0), updatedDocCount) + assert.Equal(t, []byte{}, afterDocIDs) +} + +func TestCleanupInvertedIndexWithNoPropsToClean(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + dirName := fmt.Sprintf("./testdata/%d", rand.Intn(10000000)) + os.MkdirAll(dirName, 0o777) + defer func() { + err := os.RemoveAll(dirName) + fmt.Println(err) + }() + index, err := NewIndex(IndexConfig{ + RootPath: dirName, Kind: kind.Thing, ClassName: "Test", + }, &fakeSchemaGetter{}, nil, nil) + require.Nil(t, err) + shard, err := NewShard("extend_invert_benchmark", index) + require.Nil(t, err) + + documentID1 := uint32(11) + documentID2 := uint32(15) + documentIDNotInRow := uint32(20) + + prop := []byte("testprop") + var before []byte + + err = shard.db.Update(func(tx *bolt.Tx) error { + bucket, err := tx.CreateBucketIfNotExists([]byte("testbucket")) + if err != nil { + return err + } + + b := bytes.NewBuffer(nil) + + // checksum + _, err = b.Write([]uint8{0, 0, 0, 0}) + if err != nil { + return err + } + + fakeEntries := 2 + // doc count + count := uint32(fakeEntries) + err = binary.Write(b, binary.LittleEndian, &count) + if err != nil { + return err + } + + // doc id + _, err = b.Write(getDocumentFrequencyValue(documentID1, nil)) + if err != nil { + return err + } + + // doc id + _, err = b.Write(getDocumentFrequencyValue(documentID2, nil)) + if err != nil { + return err + } + + before = b.Bytes() + bucket.Put(prop, before) + return nil + }) + require.Nil(t, err) + + var after []byte + err = shard.db.Update(func(tx *bolt.Tx) error { + // before := time.Now() + bucket := tx.Bucket([]byte("testbucket")) + err := shard.tryDeleteFromInvertedIndicesProp(bucket, inverted.Countable{Data: prop}, []uint32{documentIDNotInRow}, false) + if err != nil { + return err + } + + after = bucket.Get(prop) + + return nil + }) + require.Nil(t, err) + + var updatedDocCount uint32 + r := bytes.NewReader(after[4:]) + err = binary.Read(r, binary.LittleEndian, &updatedDocCount) + require.Nil(t, err) + + afterDocIDs := after[8:] + expectedDocIDsBuffer := bytes.NewBuffer(nil) + binary.Write(expectedDocIDsBuffer, binary.LittleEndian, getDocumentFrequencyValue(documentID1, nil)) + binary.Write(expectedDocIDsBuffer, binary.LittleEndian, getDocumentFrequencyValue(documentID2, nil)) + expectedDocIDs := expectedDocIDsBuffer.Bytes() + + assert.Equal(t, uint32(2), updatedDocCount) + assert.Equal(t, expectedDocIDs, afterDocIDs) +}