Skip to content

Commit

Permalink
Merge pull request #1871 from semi-technologies/bugfix-gh-1868
Browse files Browse the repository at this point in the history
gh-1868 properly propagate "clear links" across commit logs.
  • Loading branch information
antas-marcin committed Mar 24, 2022
2 parents 91f6209 + 34562e0 commit ea967ef
Show file tree
Hide file tree
Showing 8 changed files with 543 additions and 22 deletions.
70 changes: 57 additions & 13 deletions adapters/repos/db/vector/hnsw/commit_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/sirupsen/logrus"
)

const maxUncondensedCommitLogSize = 500 * 1024 * 1024
const defaultCommitLogSize = 500 * 1024 * 1024

func commitLogFileName(rootPath, indexName, fileName string) string {
return fmt.Sprintf("%s/%s", commitLogDirectory(rootPath, indexName), fileName)
Expand All @@ -39,17 +39,25 @@ func commitLogDirectory(rootPath, name string) string {
}

func NewCommitLogger(rootPath, name string,
maintainenceInterval time.Duration,
logger logrus.FieldLogger) (*hnswCommitLogger, error) {
maintainenceInterval time.Duration, logger logrus.FieldLogger,
opts ...CommitlogOption) (*hnswCommitLogger, error) {
l := &hnswCommitLogger{
cancel: make(chan struct{}),
rootPath: rootPath,
id: name,
maintainenceInterval: maintainenceInterval,
condensor: NewMemoryCondensor2(logger),
logger: logger,
maxSizeIndividual: maxUncondensedCommitLogSize / 5, // TODO: make configurable
maxSizeCombining: maxUncondensedCommitLogSize, // TODO: make configurable

// both can be overwritten using functional options
maxSizeIndividual: defaultCommitLogSize / 5,
maxSizeCombining: defaultCommitLogSize,
}

for _, o := range opts {
if err := o(l); err != nil {
return nil, err
}
}

fd, err := getLatestCommitFileOrCreate(rootPath, name)
Expand Down Expand Up @@ -223,16 +231,28 @@ type condensor interface {
}

type hnswCommitLogger struct {
// protect against concurrent attempts to write in the underlying file or
// buffer
sync.Mutex
cancel chan struct{}
rootPath string
id string
condensor condensor

cancel chan struct{}
rootPath string
id string
condensor condensor
logger logrus.FieldLogger
maxSizeIndividual int64
maxSizeCombining int64
commitLogger *commitlog.Logger

// Generally maintenance is happening from a single goroutine on a read-only
// file, so no locking should be required. However, there is one situation
// where maintenance suddenly becomes concurrent: When a cancel signal is
// received, we need to be able to make sure that cancellation does not
// complete while a maintenance process is still running. This would mean, we
// would return to the caller too early and the files on disk might still
// change due to a maintenance process that was still running undetected
maintenanceLock sync.Mutex
maintainenceInterval time.Duration
logger logrus.FieldLogger
maxSizeIndividual int64
maxSizeCombining int64
commitLogger *commitlog.Logger
}

type HnswCommitType uint8 // 256 options, plenty of room for future extensions
Expand Down Expand Up @@ -359,12 +379,30 @@ func (l *hnswCommitLogger) StartLogging() {
// cancel maintenance jobs on request
go func(cancel ...chan struct{}) {
<-l.cancel

// Once we've received the cancel signal, we must obtain all possible
// locks. Both the one for maintenance as well as the regular one for
// writing. Once we hold all locks, we can be sure that no background
// process is running anymore (as they would themselves require those
// locks) and we can cancel all tasks before a new one could start.
l.maintenanceLock.Lock()
defer l.maintenanceLock.Unlock()
l.Lock()
defer l.Unlock()

for _, c := range cancel {
c <- struct{}{}
}
}(cancelCombineAndCondenseLogs, cancelSwitchLog)
}

// Shutdown waits for ongoing maintenance processes to stop, then cancels their
// scheduling. The caller can be sure that state on disk is immutable after
// calling Shutdown().
func (l *hnswCommitLogger) Shutdown() {
l.cancel <- struct{}{}
}

func (l *hnswCommitLogger) startSwitchLogs() chan struct{} {
cancelSwitchLog := make(chan struct{})

Expand Down Expand Up @@ -470,6 +508,9 @@ func (l *hnswCommitLogger) maintenance() error {
}

func (l *hnswCommitLogger) condenseOldLogs() error {
l.maintenanceLock.Lock()
defer l.maintenanceLock.Lock()

files, err := getCommitFileNames(l.rootPath, l.id)
if err != nil {
return err
Expand Down Expand Up @@ -498,6 +539,9 @@ func (l *hnswCommitLogger) condenseOldLogs() error {
}

func (l *hnswCommitLogger) combineLogs() error {
l.maintenanceLock.Lock()
defer l.maintenanceLock.Lock()

// maxSize is the desired final size, since we assume a lot of redunancy we
// can set the combining threshold higher than the final threshold under the
// assumption that the combined file will be considerably smaller than the
Expand Down
17 changes: 17 additions & 0 deletions adapters/repos/db/vector/hnsw/commit_logger_functional_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package hnsw

type CommitlogOption func(l *hnswCommitLogger) error

func WithCommitlogThreshold(size int64) CommitlogOption {
return func(l *hnswCommitLogger) error {
l.maxSizeIndividual = size
return nil
}
}

func WithCommitlogThresholdForCombining(size int64) CommitlogOption {
return func(l *hnswCommitLogger) error {
l.maxSizeCombining = size
return nil
}
}
4 changes: 2 additions & 2 deletions adapters/repos/db/vector/hnsw/condensor2.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ func (c *MemoryCondensor2) Do(fileName string) error {
if res.ReplaceLinks(node.id, uint16(level)) {
if err := c.SetLinksAtLevel(node.id, level, links); err != nil {
return errors.Wrapf(err,
"write links for node %d at level %dto commit log", node.id, level)
"write links for node %d at level %d to commit log", node.id, level)
}
} else {
if err := c.AddLinksAtLevel(node.id, uint16(level), links); err != nil {
return errors.Wrapf(err,
"write links for node %d at level %dto commit log", node.id, level)
"write links for node %d at level %d to commit log", node.id, level)
}
}
}
Expand Down
200 changes: 200 additions & 0 deletions adapters/repos/db/vector/hnsw/condensor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,206 @@ func TestCondensorAppendNodeLinks(t *testing.T) {
})
}

// This test was added as part of
// https://github.com/semi-technologies/weaviate/issues/1868 to rule out that
// replace links broken across two independent commit logs. It turned out that
// this was green and not the cause for the bug. The bug could be reproduced
// with the new test added in index_too_many_links_bug_integration_test.go.
// Nevertheless it makes sense to keep this test around as this might have been
// a potential cause as well and by having this test, we can prevent a
// regression.
func TestCondensorReplaceNodeLinks(t *testing.T) {
rand.Seed(time.Now().UnixNano())
rootPath := fmt.Sprintf("./testdata/%d", rand.Intn(10000000))
os.MkdirAll(rootPath, 0o777)
defer func() {
err := os.RemoveAll(rootPath)
fmt.Println(err)
}()

logger, _ := test.NewNullLogger()
uncondensed1, err := NewCommitLogger(rootPath, "uncondensed1", 0, logger)
require.Nil(t, err)

uncondensed2, err := NewCommitLogger(rootPath, "uncondensed2", 0, logger)
require.Nil(t, err)

control, err := NewCommitLogger(rootPath, "control", 0, logger)
require.Nil(t, err)

t.Run("add data to the first log", func(t *testing.T) {
uncondensed1.AddNode(&vertex{id: 0, level: 1})
uncondensed1.AddLinkAtLevel(0, 0, 1)
uncondensed1.AddLinkAtLevel(0, 0, 2)
uncondensed1.AddLinkAtLevel(0, 0, 3)
uncondensed1.AddLinkAtLevel(0, 1, 1)
uncondensed1.AddLinkAtLevel(0, 1, 2)

require.Nil(t, uncondensed1.Flush())
})

t.Run("replace all data from previous log", func(t *testing.T) {
uncondensed2.AddLinkAtLevel(0, 0, 10)
uncondensed2.ReplaceLinksAtLevel(0, 0, []uint64{4, 5, 6})
uncondensed2.AddLinkAtLevel(0, 0, 7)
uncondensed2.ReplaceLinksAtLevel(0, 1, []uint64{8})

require.Nil(t, uncondensed2.Flush())
})

t.Run("create a control log", func(t *testing.T) {
control.AddNode(&vertex{id: 0, level: 1})
control.ReplaceLinksAtLevel(0, 0, []uint64{4, 5, 6, 7})
control.ReplaceLinksAtLevel(0, 1, []uint64{8})

require.Nil(t, control.Flush())
})

t.Run("condense both logs and verify the contents against the control", func(t *testing.T) {
input, ok, err := getCurrentCommitLogFileName(commitLogDirectory(rootPath, "uncondensed1"))
require.Nil(t, err)
require.True(t, ok)

err = NewMemoryCondensor2(logger).Do(commitLogFileName(rootPath, "uncondensed1", input))
require.Nil(t, err)

input, ok, err = getCurrentCommitLogFileName(commitLogDirectory(rootPath, "uncondensed2"))
require.Nil(t, err)
require.True(t, ok)

err = NewMemoryCondensor2(logger).Do(commitLogFileName(rootPath, "uncondensed2", input))
require.Nil(t, err)

control, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "control"))
require.Nil(t, err)
require.True(t, ok)

condensed1, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "uncondensed1"))
require.Nil(t, err)
require.True(t, ok)

condensed2, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "uncondensed2"))
require.Nil(t, err)
require.True(t, ok)

assert.True(t, strings.HasSuffix(condensed1, ".condensed"),
"commit log is now saved as condensed")
assert.True(t, strings.HasSuffix(condensed2, ".condensed"),
"commit log is now saved as condensed")

assertIndicesFromCommitLogsMatch(t, commitLogFileName(rootPath, "control", control),
[]string{
commitLogFileName(rootPath, "uncondensed1", condensed1),
commitLogFileName(rootPath, "uncondensed2", condensed2),
})
})
}

// This test was added as part of the investigation and fixing of
// https://github.com/semi-technologies/weaviate/issues/1868. We used the new
// (higher level) test in index_too_many_links_bug_integration_test.go to
// reproduce the problem without knowing what causes it. Eventually we came to
// the conclusion that "ClearLinksAtLevel" was not propagated correctly across
// two independently condensed commit logs. While the higher-level test already
// makes sure that the bug is gone and prevents regressions, this test was
// still added to test the broken (now fixed) behavior in relative isolation.
func TestCondensorClearLinksAtLevel(t *testing.T) {
rand.Seed(time.Now().UnixNano())
rootPath := fmt.Sprintf("./testdata/%d", rand.Intn(10000000))
os.MkdirAll(rootPath, 0o777)
defer func() {
err := os.RemoveAll(rootPath)
fmt.Println(err)
}()

logger, _ := test.NewNullLogger()
uncondensed1, err := NewCommitLogger(rootPath, "uncondensed1", 0, logger)
require.Nil(t, err)

uncondensed2, err := NewCommitLogger(rootPath, "uncondensed2", 0, logger)
require.Nil(t, err)

control, err := NewCommitLogger(rootPath, "control", 0, logger)
require.Nil(t, err)

t.Run("add data to the first log", func(t *testing.T) {
uncondensed1.AddNode(&vertex{id: 0, level: 1})
uncondensed1.AddLinkAtLevel(0, 0, 1)
uncondensed1.AddLinkAtLevel(0, 0, 2)
uncondensed1.AddLinkAtLevel(0, 0, 3)
uncondensed1.AddLinkAtLevel(0, 1, 1)
uncondensed1.AddLinkAtLevel(0, 1, 2)

require.Nil(t, uncondensed1.Flush())
})

t.Run("replace all data from previous log", func(t *testing.T) {
uncondensed2.AddLinkAtLevel(0, 0, 10)
uncondensed2.ClearLinksAtLevel(0, 0)
uncondensed2.AddLinkAtLevel(0, 0, 4)
uncondensed2.AddLinkAtLevel(0, 0, 5)
uncondensed2.AddLinkAtLevel(0, 0, 6)
uncondensed2.AddLinkAtLevel(0, 0, 7)
uncondensed2.ClearLinksAtLevel(0, 1)
uncondensed2.AddLinkAtLevel(0, 1, 8)

require.Nil(t, uncondensed2.Flush())
})

t.Run("create a control log", func(t *testing.T) {
control.AddNode(&vertex{id: 0, level: 1})
control.ReplaceLinksAtLevel(0, 0, []uint64{4, 5, 6, 7})
control.ReplaceLinksAtLevel(0, 1, []uint64{8})

require.Nil(t, control.Flush())
})

t.Run("condense both logs and verify the contents against the control", func(t *testing.T) {
input, ok, err := getCurrentCommitLogFileName(commitLogDirectory(rootPath, "uncondensed1"))
require.Nil(t, err)
require.True(t, ok)

err = NewMemoryCondensor2(logger).Do(commitLogFileName(rootPath, "uncondensed1", input))
require.Nil(t, err)

input, ok, err = getCurrentCommitLogFileName(commitLogDirectory(rootPath, "uncondensed2"))
require.Nil(t, err)
require.True(t, ok)

err = NewMemoryCondensor2(logger).Do(commitLogFileName(rootPath, "uncondensed2", input))
require.Nil(t, err)

control, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "control"))
require.Nil(t, err)
require.True(t, ok)

condensed1, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "uncondensed1"))
require.Nil(t, err)
require.True(t, ok)

condensed2, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "uncondensed2"))
require.Nil(t, err)
require.True(t, ok)

assert.True(t, strings.HasSuffix(condensed1, ".condensed"),
"commit log is now saved as condensed")
assert.True(t, strings.HasSuffix(condensed2, ".condensed"),
"commit log is now saved as condensed")

assertIndicesFromCommitLogsMatch(t, commitLogFileName(rootPath, "control", control),
[]string{
commitLogFileName(rootPath, "uncondensed1", condensed1),
commitLogFileName(rootPath, "uncondensed2", condensed2),
})
})
}

func TestCondensorWithoutEntrypoint(t *testing.T) {
rand.Seed(time.Now().UnixNano())
rootPath := fmt.Sprintf("./testdata/%d", rand.Intn(10000000))
Expand Down

0 comments on commit ea967ef

Please sign in to comment.