Skip to content

Commit

Permalink
Merge branch 'master' of github.com:vearch/vearch
Browse files Browse the repository at this point in the history
  • Loading branch information
syhao committed Mar 27, 2024
2 parents 7b26c3d + 7fcc76f commit 864794b
Show file tree
Hide file tree
Showing 106 changed files with 6,414 additions and 21,464 deletions.
4 changes: 2 additions & 2 deletions go.mod
Expand Up @@ -11,7 +11,7 @@ require (
github.com/gin-contrib/cors v1.5.0
github.com/gin-gonic/gin v1.9.1
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/golang/protobuf v1.5.4
github.com/google/flatbuffers v2.0.8+incompatible
github.com/google/uuid v1.3.1
github.com/opentracing/opentracing-go v1.2.0
Expand All @@ -33,6 +33,7 @@ require (
go.uber.org/atomic v1.9.0
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.33.0
gotest.tools v2.1.1-0.20181001141646-317cc193f525+incompatible
)

Expand Down Expand Up @@ -171,7 +172,6 @@ require (
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Expand Up @@ -254,8 +254,8 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
Expand Down Expand Up @@ -1181,8 +1181,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
21 changes: 21 additions & 0 deletions internal/client/master.go
Expand Up @@ -366,6 +366,27 @@ func (m *masterClient) QuerySpaceByName(ctx context.Context, dbID int64, spaceNa
return nil, vearchpb.NewError(vearchpb.ErrorEnum_SPACE_NOTEXISTS, nil)
}

// QueryAliasByName query alias by alias name
func (m *masterClient) QueryAliasByName(ctx context.Context, alias_name string) (*entity.Alias, error) {
alias := &entity.Alias{Name: alias_name}

bs, err := m.client.master.Get(ctx, entity.AliasKey(alias_name))

if err != nil {
return nil, err
}

if bs == nil {
return nil, vearchpb.NewError(vearchpb.ErrorEnum_ALIAS_NOT_EXIST, nil)
}

err = vjson.Unmarshal(bs, alias)
if err != nil {
return nil, fmt.Errorf("get alias:%s value:%s, err:%s", alias.Name, string(bs), err.Error())
}
return alias, nil
}

// KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
// to the channel are not consumed promptly the channel may become full. When full, the lease
// client will continue sending keep alive requests to the etcd server, but will drop responses
Expand Down
125 changes: 116 additions & 9 deletions internal/client/master_cache.go
Expand Up @@ -44,14 +44,15 @@ var (
spaceReloadWorkder sync.Map
partitionReloadWorkder sync.Map
serverReloadWorkder sync.Map
aliasReloadWorkder sync.Map
)

type clientCache struct {
sync.Map
mc *masterClient
cancel context.CancelFunc
lock sync.Mutex
userCache, spaceCache, spaceIDCache, partitionCache, serverCache *cache.Cache
mc *masterClient
cancel context.CancelFunc
lock sync.Mutex
userCache, spaceCache, spaceIDCache, partitionCache, serverCache, aliasCache *cache.Cache
}

func newClientCache(serverCtx context.Context, masterClient *masterClient) (*clientCache, error) {
Expand All @@ -65,6 +66,7 @@ func newClientCache(serverCtx context.Context, masterClient *masterClient) (*cli
spaceIDCache: cache.New(cache.NoExpiration, cache.NoExpiration),
partitionCache: cache.New(cache.NoExpiration, cache.NoExpiration),
serverCache: cache.New(cache.NoExpiration, cache.NoExpiration),
aliasCache: cache.New(cache.NoExpiration, cache.NoExpiration),
}

if err := cc.startCacheJob(ctx); err != nil {
Expand Down Expand Up @@ -186,7 +188,7 @@ func (cliCache *clientCache) reloadSpaceCache(ctx context.Context, sync bool, db

space, err := cliCache.mc.QuerySpaceByName(ctx, dbID, spaceName)
if err != nil {
return fmt.Errorf("can not found db by name:[%s] err:[%s]", db, err.Error())
return fmt.Errorf("can not found space by space name:[%s] and db name:[%s] err:[%s]", spaceName, db, err.Error())
}
if space.ResourceName != config.Conf().Global.ResourceName {
log.Info("space name [%s] resource name don't match [%s], [%s], reloadSpaceCache failed. ",
Expand Down Expand Up @@ -441,7 +443,6 @@ func (cliCache *clientCache) startCacheJob(ctx context.Context) error {
return nil
},
}

userJob.start()

//init space
Expand Down Expand Up @@ -569,6 +570,32 @@ func (cliCache *clientCache) startCacheJob(ctx context.Context) error {
}
serverJob.start()

//init alias
if err := cliCache.initAlias(ctx); err != nil {
return err
}
aliasJob := watcherJob{ctx: ctx, prefix: entity.PrefixAlias, masterClient: cliCache.mc, cache: cliCache.aliasCache,
put: func(value []byte) (err error) {
defer errutil.CatchError(&err)
alias := &entity.Alias{}
if err := vjson.Unmarshal(value, alias); err != nil {
return err
}
log.Debug("[%v] add to alias cache.", *alias)
cliCache.aliasCache.Set(alias.Name, alias, cache.NoExpiration)
return nil
},
delete: func(key string) (err error) {
defer errutil.CatchError(&err)
aliasSplit := strings.Split(key, "/")
alias_name := aliasSplit[len(aliasSplit)-1]
log.Debug("[%s] delete from alias cache.", alias_name)
cliCache.aliasCache.Delete(alias_name)
return nil
},
}
aliasJob.start()

log.Info("cache inited ok use time %v", time.Since(start))

return nil
Expand Down Expand Up @@ -787,6 +814,86 @@ func (w *watcherJob) serverDelete(cacheKey string) (err error) {
return err
}

// find alias from cache
func (cliCache *clientCache) AliasByCache(ctx context.Context, alias_name string) (*entity.Alias, error) {
get, found := cliCache.aliasCache.Get(alias_name)
if found {
return get.(*entity.Alias), nil
}

err := cliCache.reloadAliasCache(ctx, false, alias_name)
vearchlog.LogErrNotNil(err)

if err != nil {
return nil, fmt.Errorf("alias_name:[%s] err:[%s]", alias_name,
vearchpb.NewError(vearchpb.ErrorEnum_ALIAS_NOT_EXIST, nil))
}

for i := 0; i < retryNum; i++ {
time.Sleep(retrySleepTime)
log.Debug("to find alias by key:[%s] ", alias_name)
if get, found = cliCache.aliasCache.Get(alias_name); found {
return get.(*entity.Alias), nil
}
}

return nil, fmt.Errorf("alias_name:[%s] err:[%s]", alias_name, vearchpb.NewError(vearchpb.ErrorEnum_ALIAS_NOT_EXIST, nil))
}

func (cliCache *clientCache) reloadAliasCache(ctx context.Context, sync bool, alias_name string) error {
fun := func() error {
log.Info("to reload alias_name:[%s]", alias_name)

alias, err := cliCache.mc.QueryAliasByName(ctx, alias_name)

if err != nil {
return fmt.Errorf("can not found alias by name:[%s] err:[%s]", alias_name, err.Error())
}
cliCache.aliasCache.Set(alias_name, alias, cache.NoExpiration)
return nil
}

if sync {
return fun()
}
if _, ok := aliasReloadWorkder.LoadOrStore(alias_name, struct{}{}); ok {
return nil
}
go func() {
defer func() {
if r := recover(); r != nil {
vearchlog.LogErrNotNil(fmt.Errorf(cast.ToString(r)))
}
}()
if alias_name == "" {
return
}
defer aliasReloadWorkder.Delete(alias_name)
vearchlog.FunIfNotNil(fun)
}()
return nil
}

func (cliCache *clientCache) initAlias(ctx context.Context) error {
_, values, err := cliCache.mc.PrefixScan(ctx, entity.PrefixAlias)
if err != nil {
log.Error("init server cache err , err:[%s]", err.Error())
return err
}
for _, value := range values {
alias := &entity.Alias{}
err := vjson.Unmarshal(value, alias)
if err != nil {
log.Error("unmarshal alias cache err [%s]", err.Error())
continue
}
if err := cliCache.aliasCache.Add(alias.Name, alias, cache.NoExpiration); err != nil {
log.Error(err.Error())
}
}
return nil
}

func (wj *watcherJob) start() {
go func() {
defer func() {
Expand All @@ -798,10 +905,10 @@ func (wj *watcherJob) start() {
for {
select {
case <-wj.ctx.Done():
log.Debug("watchjob job to stop")
log.Debug("watchjob job to stop %s", wj.prefix)
return
default:
log.Debug("start watcher routine")
log.Debug("start watcher routine %s", wj.prefix)
}

wj.wg.Add(1)
Expand All @@ -816,7 +923,7 @@ func (wj *watcherJob) start() {

select {
case <-wj.ctx.Done():
log.Debug("watchjob job to stop")
log.Debug("watchjob job to stop %s", wj.prefix)
return
default:
}
Expand Down
5 changes: 1 addition & 4 deletions internal/config/config.go
Expand Up @@ -200,7 +200,6 @@ func (ms Masters) Self() *MasterCfg {
}
}
return nil

}

type MasterCfg struct {
Expand Down Expand Up @@ -394,7 +393,6 @@ func LoadConfig(conf *Config, path string) {
// CurrentByMasterNameDomainIp find this machine domain.The main purpose of this function is to find the master from from multiple masters and set it‘s Field:self to true.
// The only criterion for judging is: Is the IP address the same with one of the masters?
func (config *Config) CurrentByMasterNameDomainIp(masterName string) error {

//find local all ip
addrMap := config.addrMap()

Expand Down Expand Up @@ -460,7 +458,6 @@ func (config *Config) addrMap() map[string]bool {
}

func (config *Config) Validate(model Model) error {

switch model {
case Master:
masterNum := 0
Expand All @@ -477,7 +474,7 @@ func (config *Config) Validate(model Model) error {
if config.PS.EngineDWPTNum == 0 {
config.PS.EngineDWPTNum = 1
}
if config.PS.EngineDWPTNum < 0 || config.PS.EngineDWPTNum > 100 {
if config.PS.EngineDWPTNum > 100 {
return fmt.Errorf("EngineDWPTNum need gt 0 and le 100")
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/engine/CMakeLists.txt
Expand Up @@ -99,6 +99,7 @@ file(GLOB_RECURSE ALL_SOURCES
"${CMAKE_CURRENT_SOURCE_DIR}/*.c"
)
list(FILTER ALL_SOURCES EXCLUDE REGEX "${CMAKE_CURRENT_SOURCE_DIR}/third_party/scann-1.2.1/.*")
list(FILTER ALL_SOURCES EXCLUDE REGEX "${CMAKE_CURRENT_SOURCE_DIR}/third_party/flatbuffers-*")
list(FILTER ALL_SOURCES EXCLUDE REGEX "${CMAKE_CURRENT_SOURCE_DIR}/tests/.*")
list(FILTER ALL_SOURCES EXCLUDE REGEX "${CMAKE_CURRENT_SOURCE_DIR}/tools/.*")

Expand Down
22 changes: 12 additions & 10 deletions internal/engine/c_api/api_data/cpp_api.cc
Expand Up @@ -33,7 +33,9 @@

int CPPSearch(void *engine, vearch::Request *request,
vearch::Response *response) {
int ret = static_cast<vearch::Engine *>(engine)->Search(*request, *response);
vearch::Status status;
int ret = static_cast<vearch::Engine *>(engine)->Search(*request, *response,
status);
if (ret) return ret;
response->PackResults(request->Fields());
return 0;
Expand All @@ -44,7 +46,7 @@ int CPPSearch2(void *engine, vearch::VectorResult *result) {

vearch::GammaQuery gamma_query;
PerfTool perf_tool;
gamma_query.condition = new vearch::GammaSearchCondition(&perf_tool);
gamma_query.condition = new vearch::SearchCondition(&perf_tool);

auto vec_manager = static_cast<vearch::Engine *>(engine)->GetVectorManager();

Expand Down Expand Up @@ -86,10 +88,10 @@ int CPPAddOrUpdateDocs(void *engine, vearch::Docs *docs,

void CPPSetNprobe(void *engine, int nprobe, std::string index_type) {
auto index_model = static_cast<vearch::Engine *>(engine)
->GetVectorManager()
->IndexModels()
.begin()
->second;
->GetVectorManager()
->IndexModels()
.begin()
->second;
if (index_type == "IVFPQ") {
vearch::GammaIVFPQIndex *index =
dynamic_cast<vearch::GammaIVFPQIndex *>(index_model);
Expand Down Expand Up @@ -123,10 +125,10 @@ void CPPSetNprobe(void *engine, int nprobe, std::string index_type) {

void CPPSetRerank(void *engine, int rerank, std::string index_type) {
auto index_model = static_cast<vearch::Engine *>(engine)
->GetVectorManager()
->IndexModels()
.begin()
->second;
->GetVectorManager()
->IndexModels()
.begin()
->second;
if (index_type == "IVFPQ") {
vearch::GammaIVFPQIndex *index =
dynamic_cast<vearch::GammaIVFPQIndex *>(index_model);
Expand Down

0 comments on commit 864794b

Please sign in to comment.