Skip to content

Commit

Permalink
wip1
Browse files Browse the repository at this point in the history
  • Loading branch information
aliszka committed May 16, 2023
1 parent aff9c8a commit a5872bf
Show file tree
Hide file tree
Showing 26 changed files with 372 additions and 265 deletions.
71 changes: 60 additions & 11 deletions adapters/repos/db/shard.go
Expand Up @@ -80,7 +80,12 @@ type Shard struct {
// being enabled, only searchable bucket exists
fallbackToSearchable bool

geoCommitlogMaintenanceCycle cyclemanager.CycleManager
vectorCommitlogMaintenanceCycle cyclemanager.CycleManager
vectorTombstoneCleanupCycle cyclemanager.CycleManager

geoPropsCommitlogMaintenanceCycle cyclemanager.CycleManager
geoPropsTombstoneCleanupCycle cyclemanager.CycleManager
geoPropsCyclesLock sync.Mutex // makes sure getPropsCycles are inited just once
}

func NewShard(ctx context.Context, promMetrics *monitoring.PrometheusMetrics,
Expand All @@ -94,11 +99,10 @@ func NewShard(ctx context.Context, promMetrics *monitoring.PrometheusMetrics,
promMetrics: promMetrics,
metrics: NewMetrics(index.logger, promMetrics,
string(index.Config.ClassName), shardName),
deletedDocIDs: docid.NewInMemDeletedTracker(),
stopMetrics: make(chan struct{}),
replicationMap: pendingReplicaTasks{Tasks: make(map[string]replicaTask, 32)},
centralJobQueue: jobQueueCh,
geoCommitlogMaintenanceCycle: cyclemanager.NewMulti(cyclemanager.GeoCommitLoggerCycleTicker()),
deletedDocIDs: docid.NewInMemDeletedTracker(),
stopMetrics: make(chan struct{}),
replicationMap: pendingReplicaTasks{Tasks: make(map[string]replicaTask, 32)},
centralJobQueue: jobQueueCh,
}

s.docIdLock = make([]sync.Mutex, IdLockPoolSize)
Expand Down Expand Up @@ -149,6 +153,14 @@ func (s *Shard) initVectorIndex(
"choose one of [\"cosine\", \"dot\", \"l2-squared\", \"manhattan\",\"hamming\"]", hnswUserConfig.Distance)
}

s.vectorCommitlogMaintenanceCycle = cyclemanager.NewMulti(
cyclemanager.HnswCommitLoggerCycleTicker())
s.vectorTombstoneCleanupCycle = cyclemanager.NewMulti(
cyclemanager.NewFixedIntervalTicker(time.Duration(hnswUserConfig.CleanupIntervalSeconds) * time.Second))

s.vectorCommitlogMaintenanceCycle.Start()
s.vectorTombstoneCleanupCycle.Start()

vi, err := hnsw.New(hnsw.Config{
Logger: s.index.logger,
RootPath: s.index.Config.RootPath,
Expand All @@ -158,7 +170,10 @@ func (s *Shard) initVectorIndex(
PrometheusMetrics: s.promMetrics,
VectorForIDThunk: s.vectorByIndexID,
DistanceProvider: distProv,
}, hnswUserConfig)
MakeCommitLoggerThunk: func() (hnsw.CommitLogger, error) {
return hnsw.NewCommitLogger(s.index.Config.RootPath, s.ID(), s.index.logger, s.vectorCommitlogMaintenanceCycle)
},
}, hnswUserConfig, s.vectorTombstoneCleanupCycle)
if err != nil {
return errors.Wrapf(err, "init shard %q: hnsw index", s.ID())
}
Expand Down Expand Up @@ -265,8 +280,25 @@ func (s *Shard) drop() error {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()

if err := s.geoCommitlogMaintenanceCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop get commitlog maintenance cycle")
if s.vectorCommitlogMaintenanceCycle != nil {
if err := s.vectorCommitlogMaintenanceCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop vector commitlog maintenance cycle")
}
}
if s.vectorTombstoneCleanupCycle != nil {
if err := s.vectorTombstoneCleanupCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop vector tombstone cleanup cycle")
}
}
if s.geoPropsCommitlogMaintenanceCycle != nil {
if err := s.geoPropsCommitlogMaintenanceCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop geo props commitlog maintenance cycle")
}
}
if s.geoPropsTombstoneCleanupCycle != nil {
if err := s.geoPropsTombstoneCleanupCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop geo props tombstone cleanup cycle")
}
}

if err := s.store.Shutdown(ctx); err != nil {
Expand Down Expand Up @@ -534,8 +566,25 @@ func (s *Shard) shutdown(ctx context.Context) error {
return errors.Wrap(err, "shut down vector index")
}

if err := s.geoCommitlogMaintenanceCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop get commitlog maintenance cycle")
if s.vectorCommitlogMaintenanceCycle != nil {
if err := s.vectorCommitlogMaintenanceCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop vector commitlog maintenance cycle")
}
}
if s.vectorTombstoneCleanupCycle != nil {
if err := s.vectorTombstoneCleanupCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop vector tombstone cleanup cycle")
}
}
if s.geoPropsCommitlogMaintenanceCycle != nil {
if err := s.geoPropsCommitlogMaintenanceCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop geo props commitlog maintenance cycle")
}
}
if s.geoPropsTombstoneCleanupCycle != nil {
if err := s.geoPropsTombstoneCleanupCycle.StopAndWait(ctx); err != nil {
return errors.Wrap(err, "stop geo props tombstone cleanup cycle")
}
}

if err := s.store.Shutdown(ctx); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions adapters/repos/db/shard_backup.go
Expand Up @@ -38,9 +38,10 @@ func (s *Shard) beginBackup(ctx context.Context) (err error) {
if err = s.store.FlushMemtables(ctx); err != nil {
return errors.Wrap(err, "flush memtables")
}
if err = s.vectorIndex.PauseMaintenance(ctx); err != nil {
return errors.Wrap(err, "pause maintenance")
}
// TODO common_cycle_manager move to shard
// if err = s.vectorIndex.PauseMaintenance(ctx); err != nil {
// return errors.Wrap(err, "pause maintenance")
// }
if err = s.vectorIndex.SwitchCommitLogs(ctx); err != nil {
return errors.Wrap(err, "switch commit logs")
}
Expand Down Expand Up @@ -71,9 +72,10 @@ func (s *Shard) resumeMaintenanceCycles(ctx context.Context) error {
return s.store.ResumeCompaction(ctx)
})

g.Go(func() error {
return s.vectorIndex.ResumeMaintenance(ctx)
})
// TODO common_cycle_manager move to shard
// g.Go(func() error {
// return s.vectorIndex.ResumeMaintenance(ctx)
// })

if err := g.Wait(); err != nil {
return errors.Wrapf(err,
Expand Down
20 changes: 16 additions & 4 deletions adapters/repos/db/shard_geo_props.go
Expand Up @@ -14,28 +14,40 @@ package db
import (
"context"
"fmt"
"time"

"github.com/pkg/errors"
"github.com/weaviate/weaviate/adapters/repos/db/propertyspecific"
"github.com/weaviate/weaviate/adapters/repos/db/vector/geo"
"github.com/weaviate/weaviate/entities/cyclemanager"
"github.com/weaviate/weaviate/entities/models"
"github.com/weaviate/weaviate/entities/schema"
"github.com/weaviate/weaviate/entities/storagestate"
"github.com/weaviate/weaviate/entities/storobj"
"github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)

func (s *Shard) initGeoProp(prop *models.Property) error {
// safe to call Start() even if cycle is already running
// started only if at least one geo prop is indexed
s.geoCommitlogMaintenanceCycle.Start()
s.geoPropsCyclesLock.Lock()
if s.geoPropsCommitlogMaintenanceCycle == nil &&
s.geoPropsTombstoneCleanupCycle == nil {
s.geoPropsCommitlogMaintenanceCycle = cyclemanager.NewMulti(
cyclemanager.GeoCommitLoggerCycleTicker())
s.geoPropsTombstoneCleanupCycle = cyclemanager.NewMulti(
cyclemanager.NewFixedIntervalTicker(hnsw.DefaultCleanupIntervalSeconds * time.Second))

s.geoPropsCommitlogMaintenanceCycle.Start()
s.geoPropsTombstoneCleanupCycle.Start()
}
s.geoPropsCyclesLock.Unlock()

idx, err := geo.NewIndex(geo.Config{
ID: geoPropID(s.ID(), prop.Name),
RootPath: s.index.Config.RootPath,
CoordinatesForID: s.makeCoordinatesForID(prop.Name),
DisablePersistence: false,
Logger: s.index.logger,
}, s.geoCommitlogMaintenanceCycle)
}, s.geoPropsTombstoneCleanupCycle, s.geoPropsCommitlogMaintenanceCycle)
if err != nil {
return errors.Wrapf(err, "create geo index for prop %q", prop.Name)
}
Expand Down
5 changes: 3 additions & 2 deletions adapters/repos/db/vector/geo/geo.go
Expand Up @@ -57,7 +57,8 @@ type Config struct {
Logger logrus.FieldLogger
}

func NewIndex(config Config, commitlogMaintenanceCycle cyclemanager.CycleManager,
func NewIndex(config Config, tombstoneCleanupCycle cyclemanager.CycleManager,
commitlogMaintenanceCycle cyclemanager.CycleManager,
) (*Index, error) {
vi, err := hnsw.New(hnsw.Config{
VectorForIDThunk: config.CoordinatesForID.VectorForID,
Expand All @@ -69,7 +70,7 @@ func NewIndex(config Config, commitlogMaintenanceCycle cyclemanager.CycleManager
MaxConnections: 64,
EFConstruction: 128,
CleanupIntervalSeconds: hnswent.DefaultCleanupIntervalSeconds,
})
}, tombstoneCleanupCycle)
if err != nil {
return nil, errors.Wrap(err, "underlying hnsw index")
}
Expand Down
2 changes: 1 addition & 1 deletion adapters/repos/db/vector/geo/geo_test.go
Expand Up @@ -43,7 +43,7 @@ func TestGeoJourney(t *testing.T) {
CoordinatesForID: getCoordinates,
DisablePersistence: true,
RootPath: "doesnt-matter-persistence-is-off",
}, cyclemanager.NewNoop())
}, cyclemanager.NewNoop(), cyclemanager.NewNoop())
require.Nil(t, err)

t.Run("importing all", func(t *testing.T) {
Expand Down
98 changes: 50 additions & 48 deletions adapters/repos/db/vector/hnsw/backup.go
Expand Up @@ -27,49 +27,50 @@ import (
//
// If a Delete-Cleanup Cycle is running (TombstoneCleanupCycle), it is aborted,
// as it's not feasible to wait for such a cycle to complete, as it can take hours.
func (h *hnsw) PauseMaintenance(ctx context.Context) error {
maintenanceCycleStop := make(chan error)
cleanupCycleStop := make(chan error)

go func() {
if err := h.commitLogMaintenanceCycle.StopAndWait(ctx); err != nil {
maintenanceCycleStop <- errors.Wrap(ctx.Err(), "long-running commitlog maintenance in progress")
return
}
maintenanceCycleStop <- nil
}()

go func() {
if err := h.tombstoneCleanupCycle.StopAndWait(ctx); err != nil {
cleanupCycleStop <- errors.Wrap(err, "long-running tombstone cleanup in progress")
return
}
cleanupCycleStop <- nil
}()

maintenanceCycleStopErr := <-maintenanceCycleStop
cleanupCycleStopErr := <-cleanupCycleStop

if maintenanceCycleStopErr != nil && cleanupCycleStopErr != nil {
return errors.Errorf("%s, %s", maintenanceCycleStopErr, cleanupCycleStopErr)
}

if maintenanceCycleStopErr != nil {
// restart tombstone cleanup since it was successfully stopped.
// both of these cycles must be either stopped or running.
h.tombstoneCleanupCycle.Start()
return maintenanceCycleStopErr
}

if cleanupCycleStopErr != nil {
// restart commitlog cycle since it was successfully stopped.
// both of these cycles must be either stopped or running.
h.commitLogMaintenanceCycle.Start()
return cleanupCycleStopErr
}

return nil
}
// TODO common_cycle_manager move to shard
// func (h *hnsw) PauseMaintenance(ctx context.Context) error {
// maintenanceCycleStop := make(chan error)
// cleanupCycleStop := make(chan error)

// go func() {
// if err := h.commitLogMaintenanceCycle.StopAndWait(ctx); err != nil {
// maintenanceCycleStop <- errors.Wrap(ctx.Err(), "long-running commitlog maintenance in progress")
// return
// }
// maintenanceCycleStop <- nil
// }()

// go func() {
// if err := h.tombstoneCleanupCycle.StopAndWait(ctx); err != nil {
// cleanupCycleStop <- errors.Wrap(err, "long-running tombstone cleanup in progress")
// return
// }
// cleanupCycleStop <- nil
// }()

// maintenanceCycleStopErr := <-maintenanceCycleStop
// cleanupCycleStopErr := <-cleanupCycleStop

// if maintenanceCycleStopErr != nil && cleanupCycleStopErr != nil {
// return errors.Errorf("%s, %s", maintenanceCycleStopErr, cleanupCycleStopErr)
// }

// if maintenanceCycleStopErr != nil {
// // restart tombstone cleanup since it was successfully stopped.
// // both of these cycles must be either stopped or running.
// h.tombstoneCleanupCycle.Start()
// return maintenanceCycleStopErr
// }

// if cleanupCycleStopErr != nil {
// // restart commitlog cycle since it was successfully stopped.
// // both of these cycles must be either stopped or running.
// h.commitLogMaintenanceCycle.Start()
// return cleanupCycleStopErr
// }

// return nil
// }

// SwitchCommitLogs makes sure that the previously writeable commitlog is
// switched to a new one, thus making the existing file read-only.
Expand Down Expand Up @@ -143,8 +144,9 @@ func (h *hnsw) ListFiles(ctx context.Context) ([]string, error) {

// ResumeMaintenance starts all async cycles. It errors if the operations
// had not been paused prior.
func (h *hnsw) ResumeMaintenance(ctx context.Context) error {
h.tombstoneCleanupCycle.Start()
h.commitLogMaintenanceCycle.Start()
return nil
}
// TODO common_cycle_manager move to shard
// func (h *hnsw) ResumeMaintenance(ctx context.Context) error {
// h.tombstoneCleanupCycle.Start()
// h.commitLogMaintenanceCycle.Start()
// return nil
// }
31 changes: 21 additions & 10 deletions adapters/repos/db/vector/hnsw/backup_integration_test.go
Expand Up @@ -26,23 +26,29 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaviate/weaviate/adapters/repos/db/vector/hnsw/distancer"
"github.com/weaviate/weaviate/entities/cyclemanager"
enthnsw "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)

func TestBackup_Integration(t *testing.T) {
ctx := context.Background()

dirName := t.TempDir()
indexID := "backup-integration-test"

dirName := t.TempDir()
maintenanceCycle := cyclemanager.NewMulti(cyclemanager.HnswCommitLoggerCycleTicker())
maintenanceCycle.Start()

idx, err := New(Config{
RootPath: dirName,
ID: indexID,
Logger: logrus.New(),
DistanceProvider: distancer.NewCosineDistanceProvider(),
VectorForIDThunk: testVectorForID,
}, enthnsw.NewDefaultUserConfig())
MakeCommitLoggerThunk: func() (CommitLogger, error) {
return NewCommitLogger(dirName, indexID, logrus.New(), maintenanceCycle)
},
}, enthnsw.NewDefaultUserConfig(), cyclemanager.NewNoop())
require.Nil(t, err)
idx.PostStartup()

Expand All @@ -59,10 +65,11 @@ func TestBackup_Integration(t *testing.T) {
// based on current timestamp, can differ
time.Sleep(time.Second)

t.Run("pause maintenance", func(t *testing.T) {
err = idx.PauseMaintenance(ctx)
require.Nil(t, err)
})
// TODO common_cycle_manager move to shard
// t.Run("pause maintenance", func(t *testing.T) {
// err = idx.PauseMaintenance(ctx)
// require.Nil(t, err)
// })

t.Run("switch commit logs", func(t *testing.T) {
err = idx.SwitchCommitLogs(ctx)
Expand Down Expand Up @@ -105,11 +112,15 @@ func TestBackup_Integration(t *testing.T) {
})
})

t.Run("resume maintenance", func(t *testing.T) {
err = idx.ResumeMaintenance(ctx)
require.Nil(t, err)
})
// TODO common_cycle_manager move to shard
// t.Run("resume maintenance", func(t *testing.T) {
// err = idx.ResumeMaintenance(ctx)
// require.Nil(t, err)
// })

err = idx.Shutdown(ctx)
require.Nil(t, err)

err = maintenanceCycle.StopAndWait(ctx)
require.Nil(t, err)
}

0 comments on commit a5872bf

Please sign in to comment.