Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-1868 properly propagate "clear links" across commit logs. #1871

Merged
merged 5 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
70 changes: 57 additions & 13 deletions adapters/repos/db/vector/hnsw/commit_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/sirupsen/logrus"
)

const maxUncondensedCommitLogSize = 500 * 1024 * 1024
const defaultCommitLogSize = 500 * 1024 * 1024

func commitLogFileName(rootPath, indexName, fileName string) string {
return fmt.Sprintf("%s/%s", commitLogDirectory(rootPath, indexName), fileName)
Expand All @@ -39,17 +39,25 @@ func commitLogDirectory(rootPath, name string) string {
}

func NewCommitLogger(rootPath, name string,
maintainenceInterval time.Duration,
logger logrus.FieldLogger) (*hnswCommitLogger, error) {
maintainenceInterval time.Duration, logger logrus.FieldLogger,
opts ...CommitlogOption) (*hnswCommitLogger, error) {
l := &hnswCommitLogger{
cancel: make(chan struct{}),
rootPath: rootPath,
id: name,
maintainenceInterval: maintainenceInterval,
condensor: NewMemoryCondensor2(logger),
logger: logger,
maxSizeIndividual: maxUncondensedCommitLogSize / 5, // TODO: make configurable
maxSizeCombining: maxUncondensedCommitLogSize, // TODO: make configurable

// both can be overwritten using functional options
maxSizeIndividual: defaultCommitLogSize / 5,
maxSizeCombining: defaultCommitLogSize,
}

for _, o := range opts {
if err := o(l); err != nil {
return nil, err
}
}

fd, err := getLatestCommitFileOrCreate(rootPath, name)
Expand Down Expand Up @@ -223,16 +231,28 @@ type condensor interface {
}

type hnswCommitLogger struct {
// protect against concurrent attempts to write in the underlying file or
// buffer
sync.Mutex
cancel chan struct{}
rootPath string
id string
condensor condensor

cancel chan struct{}
rootPath string
id string
condensor condensor
logger logrus.FieldLogger
maxSizeIndividual int64
maxSizeCombining int64
commitLogger *commitlog.Logger

// Generally maintenance is happening from a single goroutine on a read-only
// file, so no locking should be required. However, there is one situation
// where maintenance suddenly becomes concurrent: When a cancel signal is
// received, we need to be able to make sure that cancellation does not
// complete while a maintenance process is still running. This would mean, we
// would return to the caller too early and the files on disk might still
// change due to a maintenance process that was still running undetected
maintenanceLock sync.Mutex
maintainenceInterval time.Duration
logger logrus.FieldLogger
maxSizeIndividual int64
maxSizeCombining int64
commitLogger *commitlog.Logger
}

type HnswCommitType uint8 // 256 options, plenty of room for future extensions
Expand Down Expand Up @@ -359,12 +379,30 @@ func (l *hnswCommitLogger) StartLogging() {
// cancel maintenance jobs on request
go func(cancel ...chan struct{}) {
<-l.cancel

// Once we've received the cancel signal, we must obtain all possible
// locks. Both the one for maintenance as well as the regular one for
// writing. Once we hold all locks, we can be sure that no background
// process is running anymore (as they would themselves require those
// locks) and we can cancel all tasks before a new one could start.
l.maintenanceLock.Lock()
defer l.maintenanceLock.Unlock()
l.Lock()
defer l.Unlock()

for _, c := range cancel {
c <- struct{}{}
}
}(cancelCombineAndCondenseLogs, cancelSwitchLog)
}

// Shutdown waits for ongoing maintenance processes to stop, then cancels their
// scheduling. The caller can be sure that state on disk is immutable after
// calling Shutdown().
func (l *hnswCommitLogger) Shutdown() {
l.cancel <- struct{}{}
}

func (l *hnswCommitLogger) startSwitchLogs() chan struct{} {
cancelSwitchLog := make(chan struct{})

Expand Down Expand Up @@ -470,6 +508,9 @@ func (l *hnswCommitLogger) maintenance() error {
}

func (l *hnswCommitLogger) condenseOldLogs() error {
l.maintenanceLock.Lock()
defer l.maintenanceLock.Lock()

files, err := getCommitFileNames(l.rootPath, l.id)
if err != nil {
return err
Expand Down Expand Up @@ -498,6 +539,9 @@ func (l *hnswCommitLogger) condenseOldLogs() error {
}

func (l *hnswCommitLogger) combineLogs() error {
l.maintenanceLock.Lock()
defer l.maintenanceLock.Lock()

// maxSize is the desired final size, since we assume a lot of redunancy we
// can set the combining threshold higher than the final threshold under the
// assumption that the combined file will be considerably smaller than the
Expand Down
17 changes: 17 additions & 0 deletions adapters/repos/db/vector/hnsw/commit_logger_functional_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package hnsw

type CommitlogOption func(l *hnswCommitLogger) error

func WithCommitlogThreshold(size int64) CommitlogOption {
return func(l *hnswCommitLogger) error {
l.maxSizeIndividual = size
return nil
}
}

func WithCommitlogThresholdForCombining(size int64) CommitlogOption {
return func(l *hnswCommitLogger) error {
l.maxSizeCombining = size
return nil
}
}
4 changes: 2 additions & 2 deletions adapters/repos/db/vector/hnsw/condensor2.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ func (c *MemoryCondensor2) Do(fileName string) error {
if res.ReplaceLinks(node.id, uint16(level)) {
if err := c.SetLinksAtLevel(node.id, level, links); err != nil {
return errors.Wrapf(err,
"write links for node %d at level %dto commit log", node.id, level)
"write links for node %d at level %d to commit log", node.id, level)
}
} else {
if err := c.AddLinksAtLevel(node.id, uint16(level), links); err != nil {
return errors.Wrapf(err,
"write links for node %d at level %dto commit log", node.id, level)
"write links for node %d at level %d to commit log", node.id, level)
}
}
}
Expand Down
200 changes: 200 additions & 0 deletions adapters/repos/db/vector/hnsw/condensor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,206 @@ func TestCondensorAppendNodeLinks(t *testing.T) {
})
}

// This test was added as part of
// https://github.com/semi-technologies/weaviate/issues/1868 to rule out that
// replace links broken across two independent commit logs. It turned out that
// this was green and not the cause for the bug. The bug could be reproduced
// with the new test added in index_too_many_links_bug_integration_test.go.
// Nevertheless it makes sense to keep this test around as this might have been
// a potential cause as well and by having this test, we can prevent a
// regression.
func TestCondensorReplaceNodeLinks(t *testing.T) {
rand.Seed(time.Now().UnixNano())
rootPath := fmt.Sprintf("./testdata/%d", rand.Intn(10000000))
os.MkdirAll(rootPath, 0o777)
defer func() {
err := os.RemoveAll(rootPath)
fmt.Println(err)
}()

logger, _ := test.NewNullLogger()
uncondensed1, err := NewCommitLogger(rootPath, "uncondensed1", 0, logger)
require.Nil(t, err)

uncondensed2, err := NewCommitLogger(rootPath, "uncondensed2", 0, logger)
require.Nil(t, err)

control, err := NewCommitLogger(rootPath, "control", 0, logger)
require.Nil(t, err)

t.Run("add data to the first log", func(t *testing.T) {
uncondensed1.AddNode(&vertex{id: 0, level: 1})
uncondensed1.AddLinkAtLevel(0, 0, 1)
uncondensed1.AddLinkAtLevel(0, 0, 2)
uncondensed1.AddLinkAtLevel(0, 0, 3)
uncondensed1.AddLinkAtLevel(0, 1, 1)
uncondensed1.AddLinkAtLevel(0, 1, 2)

require.Nil(t, uncondensed1.Flush())
})

t.Run("replace all data from previous log", func(t *testing.T) {
uncondensed2.AddLinkAtLevel(0, 0, 10)
uncondensed2.ReplaceLinksAtLevel(0, 0, []uint64{4, 5, 6})
uncondensed2.AddLinkAtLevel(0, 0, 7)
uncondensed2.ReplaceLinksAtLevel(0, 1, []uint64{8})

require.Nil(t, uncondensed2.Flush())
})

t.Run("create a control log", func(t *testing.T) {
control.AddNode(&vertex{id: 0, level: 1})
control.ReplaceLinksAtLevel(0, 0, []uint64{4, 5, 6, 7})
control.ReplaceLinksAtLevel(0, 1, []uint64{8})

require.Nil(t, control.Flush())
})

t.Run("condense both logs and verify the contents against the control", func(t *testing.T) {
input, ok, err := getCurrentCommitLogFileName(commitLogDirectory(rootPath, "uncondensed1"))
require.Nil(t, err)
require.True(t, ok)

err = NewMemoryCondensor2(logger).Do(commitLogFileName(rootPath, "uncondensed1", input))
require.Nil(t, err)

input, ok, err = getCurrentCommitLogFileName(commitLogDirectory(rootPath, "uncondensed2"))
require.Nil(t, err)
require.True(t, ok)

err = NewMemoryCondensor2(logger).Do(commitLogFileName(rootPath, "uncondensed2", input))
require.Nil(t, err)

control, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "control"))
require.Nil(t, err)
require.True(t, ok)

condensed1, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "uncondensed1"))
require.Nil(t, err)
require.True(t, ok)

condensed2, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "uncondensed2"))
require.Nil(t, err)
require.True(t, ok)

assert.True(t, strings.HasSuffix(condensed1, ".condensed"),
"commit log is now saved as condensed")
assert.True(t, strings.HasSuffix(condensed2, ".condensed"),
"commit log is now saved as condensed")

assertIndicesFromCommitLogsMatch(t, commitLogFileName(rootPath, "control", control),
[]string{
commitLogFileName(rootPath, "uncondensed1", condensed1),
commitLogFileName(rootPath, "uncondensed2", condensed2),
})
})
}

// This test was added as part of the investigation and fixing of
// https://github.com/semi-technologies/weaviate/issues/1868. We used the new
// (higher level) test in index_too_many_links_bug_integration_test.go to
// reproduce the problem without knowing what causes it. Eventually we came to
// the conclusion that "ClearLinksAtLevel" was not propagated correctly across
// two independently condensed commit logs. While the higher-level test already
// makes sure that the bug is gone and prevents regressions, this test was
// still added to test the broken (now fixed) behavior in relative isolation.
func TestCondensorClearLinksAtLevel(t *testing.T) {
rand.Seed(time.Now().UnixNano())
rootPath := fmt.Sprintf("./testdata/%d", rand.Intn(10000000))
os.MkdirAll(rootPath, 0o777)
defer func() {
err := os.RemoveAll(rootPath)
fmt.Println(err)
}()

logger, _ := test.NewNullLogger()
uncondensed1, err := NewCommitLogger(rootPath, "uncondensed1", 0, logger)
require.Nil(t, err)

uncondensed2, err := NewCommitLogger(rootPath, "uncondensed2", 0, logger)
require.Nil(t, err)

control, err := NewCommitLogger(rootPath, "control", 0, logger)
require.Nil(t, err)

t.Run("add data to the first log", func(t *testing.T) {
uncondensed1.AddNode(&vertex{id: 0, level: 1})
uncondensed1.AddLinkAtLevel(0, 0, 1)
uncondensed1.AddLinkAtLevel(0, 0, 2)
uncondensed1.AddLinkAtLevel(0, 0, 3)
uncondensed1.AddLinkAtLevel(0, 1, 1)
uncondensed1.AddLinkAtLevel(0, 1, 2)

require.Nil(t, uncondensed1.Flush())
})

t.Run("replace all data from previous log", func(t *testing.T) {
uncondensed2.AddLinkAtLevel(0, 0, 10)
uncondensed2.ClearLinksAtLevel(0, 0)
uncondensed2.AddLinkAtLevel(0, 0, 4)
uncondensed2.AddLinkAtLevel(0, 0, 5)
uncondensed2.AddLinkAtLevel(0, 0, 6)
uncondensed2.AddLinkAtLevel(0, 0, 7)
uncondensed2.ClearLinksAtLevel(0, 1)
uncondensed2.AddLinkAtLevel(0, 1, 8)

require.Nil(t, uncondensed2.Flush())
})

t.Run("create a control log", func(t *testing.T) {
control.AddNode(&vertex{id: 0, level: 1})
control.ReplaceLinksAtLevel(0, 0, []uint64{4, 5, 6, 7})
control.ReplaceLinksAtLevel(0, 1, []uint64{8})

require.Nil(t, control.Flush())
})

t.Run("condense both logs and verify the contents against the control", func(t *testing.T) {
input, ok, err := getCurrentCommitLogFileName(commitLogDirectory(rootPath, "uncondensed1"))
require.Nil(t, err)
require.True(t, ok)

err = NewMemoryCondensor2(logger).Do(commitLogFileName(rootPath, "uncondensed1", input))
require.Nil(t, err)

input, ok, err = getCurrentCommitLogFileName(commitLogDirectory(rootPath, "uncondensed2"))
require.Nil(t, err)
require.True(t, ok)

err = NewMemoryCondensor2(logger).Do(commitLogFileName(rootPath, "uncondensed2", input))
require.Nil(t, err)

control, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "control"))
require.Nil(t, err)
require.True(t, ok)

condensed1, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "uncondensed1"))
require.Nil(t, err)
require.True(t, ok)

condensed2, ok, err := getCurrentCommitLogFileName(
commitLogDirectory(rootPath, "uncondensed2"))
require.Nil(t, err)
require.True(t, ok)

assert.True(t, strings.HasSuffix(condensed1, ".condensed"),
"commit log is now saved as condensed")
assert.True(t, strings.HasSuffix(condensed2, ".condensed"),
"commit log is now saved as condensed")

assertIndicesFromCommitLogsMatch(t, commitLogFileName(rootPath, "control", control),
[]string{
commitLogFileName(rootPath, "uncondensed1", condensed1),
commitLogFileName(rootPath, "uncondensed2", condensed2),
})
})
}

func TestCondensorWithoutEntrypoint(t *testing.T) {
rand.Seed(time.Now().UnixNano())
rootPath := fmt.Sprintf("./testdata/%d", rand.Intn(10000000))
Expand Down