Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WEAVIATE-62 Remove obsolete hnsw files #1875

Merged
merged 9 commits into from
Mar 24, 2022
72 changes: 58 additions & 14 deletions adapters/repos/db/vector/hnsw/commit_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/sirupsen/logrus"
)

const maxUncondensedCommitLogSize = 500 * 1024 * 1024
const defaultCommitLogSize = 500 * 1024 * 1024

func commitLogFileName(rootPath, indexName, fileName string) string {
return fmt.Sprintf("%s/%s", commitLogDirectory(rootPath, indexName), fileName)
Expand All @@ -39,17 +39,25 @@ func commitLogDirectory(rootPath, name string) string {
}

func NewCommitLogger(rootPath, name string,
maintainenceInterval time.Duration,
logger logrus.FieldLogger) (*hnswCommitLogger, error) {
maintainenceInterval time.Duration, logger logrus.FieldLogger,
opts ...CommitlogOption) (*hnswCommitLogger, error) {
l := &hnswCommitLogger{
cancel: make(chan struct{}),
rootPath: rootPath,
id: name,
maintainenceInterval: maintainenceInterval,
condensor: NewMemoryCondensor2(logger),
condensor: NewMemoryCondensor(logger),
logger: logger,
maxSizeIndividual: maxUncondensedCommitLogSize / 5, // TODO: make configurable
maxSizeCombining: maxUncondensedCommitLogSize, // TODO: make configurable

// both can be overwritten using functional options
maxSizeIndividual: defaultCommitLogSize / 5,
maxSizeCombining: defaultCommitLogSize,
}

for _, o := range opts {
if err := o(l); err != nil {
return nil, err
}
}

fd, err := getLatestCommitFileOrCreate(rootPath, name)
Expand Down Expand Up @@ -223,16 +231,28 @@ type condensor interface {
}

type hnswCommitLogger struct {
// protect against concurrent attempts to write in the underlying file or
// buffer
sync.Mutex
cancel chan struct{}
rootPath string
id string
condensor condensor

cancel chan struct{}
rootPath string
id string
condensor condensor
logger logrus.FieldLogger
maxSizeIndividual int64
maxSizeCombining int64
commitLogger *commitlog.Logger

// Generally maintenance is happening from a single goroutine on a read-only
// file, so no locking should be required. However, there is one situation
// where maintenance suddenly becomes concurrent: When a cancel signal is
// received, we need to be able to make sure that cancellation does not
// complete while a maintenance process is still running. This would mean, we
// would return to the caller too early and the files on disk might still
// change due to a maintenance process that was still running undetected
maintenanceLock sync.Mutex
maintainenceInterval time.Duration
logger logrus.FieldLogger
maxSizeIndividual int64
maxSizeCombining int64
commitLogger *commitlog.Logger
}

type HnswCommitType uint8 // 256 options, plenty of room for future extensions
Expand Down Expand Up @@ -359,12 +379,30 @@ func (l *hnswCommitLogger) StartLogging() {
// cancel maintenance jobs on request
go func(cancel ...chan struct{}) {
<-l.cancel

// Once we've received the cancel signal, we must obtain all possible
// locks. Both the one for maintenance as well as the regular one for
// writing. Once we hold all locks, we can be sure that no background
// process is running anymore (as they would themselves require those
// locks) and we can cancel all tasks before a new one could start.
l.maintenanceLock.Lock()
defer l.maintenanceLock.Unlock()
l.Lock()
defer l.Unlock()

for _, c := range cancel {
c <- struct{}{}
}
}(cancelCombineAndCondenseLogs, cancelSwitchLog)
}

// Shutdown waits for ongoing maintenance processes to stop, then cancels their
// scheduling. The caller can be sure that state on disk is immutable after
// calling Shutdown().
func (l *hnswCommitLogger) Shutdown() {
l.cancel <- struct{}{}
}

func (l *hnswCommitLogger) startSwitchLogs() chan struct{} {
cancelSwitchLog := make(chan struct{})

Expand Down Expand Up @@ -470,6 +508,9 @@ func (l *hnswCommitLogger) maintenance() error {
}

func (l *hnswCommitLogger) condenseOldLogs() error {
l.maintenanceLock.Lock()
defer l.maintenanceLock.Lock()

files, err := getCommitFileNames(l.rootPath, l.id)
if err != nil {
return err
Expand Down Expand Up @@ -498,6 +539,9 @@ func (l *hnswCommitLogger) condenseOldLogs() error {
}

func (l *hnswCommitLogger) combineLogs() error {
l.maintenanceLock.Lock()
defer l.maintenanceLock.Lock()

// maxSize is the desired final size, since we assume a lot of redunancy we
// can set the combining threshold higher than the final threshold under the
// assumption that the combined file will be considerably smaller than the
Expand Down
17 changes: 17 additions & 0 deletions adapters/repos/db/vector/hnsw/commit_logger_functional_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package hnsw

type CommitlogOption func(l *hnswCommitLogger) error

func WithCommitlogThreshold(size int64) CommitlogOption {
return func(l *hnswCommitLogger) error {
l.maxSizeIndividual = size
return nil
}
}

func WithCommitlogThresholdForCombining(size int64) CommitlogOption {
return func(l *hnswCommitLogger) error {
l.maxSizeCombining = size
return nil
}
}
90 changes: 67 additions & 23 deletions adapters/repos/db/vector/hnsw/condensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"bufio"
"encoding/binary"
"fmt"
"io"
"math"
"os"

Expand All @@ -25,7 +24,7 @@ import (

type MemoryCondensor struct {
newLogFile *os.File
newLog *bufio.Writer
newLog *bufWriter
logger logrus.FieldLogger
}

Expand All @@ -34,10 +33,9 @@ func (c *MemoryCondensor) Do(fileName string) error {
if err != nil {
return errors.Wrap(err, "open commit log to be condensed")
}

fdBuf := bufio.NewReaderSize(fd, 256*1024)

res, _, err := NewDeserializer2(c.logger).Do(fdBuf, nil, false)
res, _, err := NewDeserializer(c.logger).Do(fdBuf, nil, true)
if err != nil {
return errors.Wrap(err, "read commit log to be condensed")
}
Expand All @@ -49,22 +47,35 @@ func (c *MemoryCondensor) Do(fileName string) error {
}

c.newLogFile = newLogFile
c.newLog = bufio.NewWriterSize(c.newLogFile, 1*1024*1024)

c.newLog = NewWriterSize(c.newLogFile, 1*1024*1024)

for _, node := range res.Nodes {
if node == nil {
// nil nodes occur when we've grown, but not inserted anything yet
continue
}

if err := c.AddNode(node); err != nil {
return errors.Wrapf(err, "write node %d to commit log", node.id)
if node.level > 0 {
// nodes are implicitly added when they are first linked, if the level is
// not zero we know this node was new. If the level is zero it doesn't
// matter if it gets added explicitly or implicitly
if err := c.AddNode(node); err != nil {
return errors.Wrapf(err, "write node %d to commit log", node.id)
}
}

for level, links := range node.connections {
if err := c.SetLinksAtLevel(node.id, level, links); err != nil {
return errors.Wrapf(err,
"write links for node %d at level %dto commit log", node.id, level)
if res.ReplaceLinks(node.id, uint16(level)) {
if err := c.SetLinksAtLevel(node.id, level, links); err != nil {
return errors.Wrapf(err,
"write links for node %d at level %d to commit log", node.id, level)
}
} else {
if err := c.AddLinksAtLevel(node.id, uint16(level), links); err != nil {
return errors.Wrapf(err,
"write links for node %d at level %d to commit log", node.id, level)
}
}
}
}
Expand Down Expand Up @@ -98,37 +109,45 @@ func (c *MemoryCondensor) Do(fileName string) error {
return nil
}

func (c *MemoryCondensor) writeUint64(w io.Writer, in uint64) error {
err := binary.Write(w, binary.LittleEndian, &in)
func (c *MemoryCondensor) writeUint64(w *bufWriter, in uint64) error {
toWrite := make([]byte, 8)
binary.LittleEndian.PutUint64(toWrite[0:8], in)
_, err := w.Write(toWrite)
if err != nil {
return errors.Wrap(err, "writing uint64")
return err
}

return nil
}

func (c *MemoryCondensor) writeUint16(w io.Writer, in uint16) error {
err := binary.Write(w, binary.LittleEndian, &in)
func (c *MemoryCondensor) writeUint16(w *bufWriter, in uint16) error {
toWrite := make([]byte, 2)
binary.LittleEndian.PutUint16(toWrite[0:2], in)
_, err := w.Write(toWrite)
if err != nil {
return errors.Wrap(err, "writing uint16")
return err
}

return nil
}

func (c *MemoryCondensor) writeCommitType(w io.Writer, in HnswCommitType) error {
err := binary.Write(w, binary.LittleEndian, &in)
func (c *MemoryCondensor) writeCommitType(w *bufWriter, in HnswCommitType) error {
toWrite := make([]byte, 1)
toWrite[0] = byte(in)
_, err := w.Write(toWrite)
if err != nil {
return errors.Wrap(err, "writing commit type")
return err
}

return nil
}

func (c *MemoryCondensor) writeUint64Slice(w io.Writer, in []uint64) error {
err := binary.Write(w, binary.LittleEndian, &in)
if err != nil {
return errors.Wrap(err, "writing []uint64")
func (c *MemoryCondensor) writeUint64Slice(w *bufWriter, in []uint64) error {
for _, v := range in {
err := c.writeUint64(w, v)
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -165,6 +184,31 @@ func (c *MemoryCondensor) SetLinksAtLevel(nodeid uint64, level int, targets []ui
return ec.toError()
}

func (c *MemoryCondensor) AddLinksAtLevel(nodeid uint64, level uint16, targets []uint64) error {
toWrite := make([]byte, 13+len(targets)*8)
toWrite[0] = byte(AddLinksAtLevel)
binary.LittleEndian.PutUint64(toWrite[1:9], nodeid)
binary.LittleEndian.PutUint16(toWrite[9:11], uint16(level))
binary.LittleEndian.PutUint16(toWrite[11:13], uint16(len(targets)))
for i, target := range targets {
offsetStart := 13 + i*8
offsetEnd := offsetStart + 8
binary.LittleEndian.PutUint64(toWrite[offsetStart:offsetEnd], target)
}
_, err := c.newLog.Write(toWrite)
return err
}

func (c *MemoryCondensor) AddLinkAtLevel(nodeid uint64, level uint16, target uint64) error {
ec := &errorCompounder{}
ec.add(c.writeCommitType(c.newLog, AddLinkAtLevel))
ec.add(c.writeUint64(c.newLog, nodeid))
ec.add(c.writeUint16(c.newLog, uint16(level)))
ec.add(c.writeUint64(c.newLog, target))

return ec.toError()
}

func (c *MemoryCondensor) SetEntryPointWithMaxLayer(id uint64, level int) error {
ec := &errorCompounder{}
ec.add(c.writeCommitType(c.newLog, SetEntryPointMaxLevel))
Expand Down