-
Notifications
You must be signed in to change notification settings - Fork 681
/
config_update.go
156 lines (131 loc) · 4.02 KB
/
config_update.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: hello@weaviate.io
//
package hnsw
import (
"os"
"sync/atomic"
enterrors "github.com/weaviate/weaviate/entities/errors"
"github.com/weaviate/weaviate/usecases/configbase"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/schema/config"
ent "github.com/weaviate/weaviate/entities/vectorindex/hnsw"
)
func ValidateUserConfigUpdate(initial, updated config.VectorIndexConfig) error {
initialParsed, ok := initial.(ent.UserConfig)
if !ok {
return errors.Errorf("initial is not UserConfig, but %T", initial)
}
updatedParsed, ok := updated.(ent.UserConfig)
if !ok {
return errors.Errorf("updated is not UserConfig, but %T", updated)
}
immutableFields := []immutableParameter{
{
name: "efConstruction",
accessor: func(c ent.UserConfig) interface{} { return c.EFConstruction },
},
{
name: "maxConnections",
accessor: func(c ent.UserConfig) interface{} { return c.MaxConnections },
},
{
// NOTE: There isn't a technical reason for this to be immutable, it
// simply hasn't been implemented yet. It would require to stop the
// current timer and start a new one. Certainly possible, but let's see
// if anyone actually needs this before implementing it.
name: "cleanupIntervalSeconds",
accessor: func(c ent.UserConfig) interface{} { return c.CleanupIntervalSeconds },
},
{
name: "distance",
accessor: func(c ent.UserConfig) interface{} { return c.Distance },
},
}
for _, u := range immutableFields {
if err := validateImmutableField(u, initialParsed, updatedParsed); err != nil {
return err
}
}
return nil
}
type immutableParameter struct {
accessor func(c ent.UserConfig) interface{}
name string
}
func validateImmutableField(u immutableParameter,
previous, next ent.UserConfig,
) error {
oldField := u.accessor(previous)
newField := u.accessor(next)
if oldField != newField {
return errors.Errorf("%s is immutable: attempted change from \"%v\" to \"%v\"",
u.name, oldField, newField)
}
return nil
}
func (h *hnsw) UpdateUserConfig(updated config.VectorIndexConfig, callback func()) error {
parsed, ok := updated.(ent.UserConfig)
if !ok {
callback()
return errors.Errorf("config is not UserConfig, but %T", updated)
}
// Store automatically as a lock here would be very expensive, this value is
// read on every single user-facing search, which can be highly concurrent
atomic.StoreInt64(&h.ef, int64(parsed.EF))
atomic.StoreInt64(&h.efMin, int64(parsed.DynamicEFMin))
atomic.StoreInt64(&h.efMax, int64(parsed.DynamicEFMax))
atomic.StoreInt64(&h.efFactor, int64(parsed.DynamicEFFactor))
atomic.StoreInt64(&h.flatSearchCutoff, int64(parsed.FlatSearchCutoff))
if !parsed.PQ.Enabled && !parsed.BQ.Enabled {
callback()
return nil
}
h.pqConfig = parsed.PQ
if asyncEnabled() {
callback()
return nil
}
if !h.compressed.Load() {
// the compression will fire the callback once it's complete
return h.Upgrade(callback)
} else {
h.compressor.SetCacheMaxSize(int64(parsed.VectorCacheMaxObjects))
callback()
return nil
}
}
func asyncEnabled() bool {
return configbase.Enabled(os.Getenv("ASYNC_INDEXING"))
}
func (h *hnsw) Upgrade(callback func()) error {
h.logger.WithField("action", "compress").Info("switching to compressed vectors")
err := ent.ValidatePQConfig(h.pqConfig)
if err != nil {
callback()
return err
}
enterrors.GoWrapper(func() { h.compressThenCallback(callback) }, h.logger)
return nil
}
func (h *hnsw) compressThenCallback(callback func()) {
defer callback()
uc := ent.UserConfig{
PQ: h.pqConfig,
BQ: ent.BQConfig{
Enabled: !h.pqConfig.Enabled,
},
}
if err := h.compress(uc); err != nil {
h.logger.Error(err)
return
}
h.logger.WithField("action", "compress").Info("vector compression complete")
}