Skip to content

Commit

Permalink
support timestamp update on multiUpdate API
Browse files Browse the repository at this point in the history
Signed-off-by: kevindiu <kevin_diu@yahoo.com.hk>
  • Loading branch information
kevindiu committed Oct 6, 2023
1 parent 2ae0879 commit 76c4e58
Showing 1 changed file with 101 additions and 87 deletions.
188 changes: 101 additions & 87 deletions pkg/agent/core/ngt/handler/grpc/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequ
}()

uuids := make([]string, 0, len(reqs.GetRequests()))
vmap := make(map[string][]float32, len(reqs.GetRequests()))
reqMap := make(map[string]*payload.Update_Request, len(reqs.GetRequests()))

// check dimension and remove duplicated ID
for _, req := range reqs.GetRequests() {
vec := req.GetVector()
if len(vec.GetVector()) != s.ngt.GetDimensionSize() {
Expand Down Expand Up @@ -249,100 +251,112 @@ func (s *server) MultiUpdate(ctx context.Context, reqs *payload.Update_MultiRequ
}
return nil, err
}
vmap[vec.GetId()] = vec.GetVector()
reqMap[vec.GetId()] = req
uuids = append(uuids, vec.GetId())
}

err = s.ngt.UpdateMultiple(vmap)
if err != nil {
var attrs []attribute.KeyValue
if notFoundIDs := func() []string {
aids := make([]string, 0, len(uuids))
for _, id := range uuids {
if errors.Is(err, errors.ErrObjectIDNotFound(id)) {
aids = append(aids, id)
}
if len(uuids) == 0 {
return s.newLocations(), nil
}

Check warning on line 260 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L259-L260

Added lines #L259 - L260 were not covered by tests

notFoundIDs := make([]string, 0)
invalidArgumentIDs := make([]string, 0)
alreadyExistsIDs := make([]string, 0)
otherIDs := make([]string, 0)

for _, req := range reqMap {
vec := req.GetVector()
id := vec.GetId()
v := vec.GetVector()

err := s.ngt.UpdateWithTime(id, v, vec.GetTimestamp(), req.GetConfig().GetUpdateTimestampIfExists())
if err != nil {
if errors.Is(err, errors.ErrObjectIDNotFound(id)) {
notFoundIDs = append(notFoundIDs, id)
} else if errors.Is(err, errors.ErrInvalidDimensionSize(len(v), s.ngt.GetDimensionSize())) || errors.Is(err, errors.ErrUUIDNotFound(0)) {
invalidArgumentIDs = append(invalidArgumentIDs, id)
} else if errors.Is(err, errors.ErrUUIDAlreadyExists(id)) {
alreadyExistsIDs = append(alreadyExistsIDs, id)
} else {
otherIDs = append(otherIDs, id)

Check warning on line 281 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L274-L281

Added lines #L274 - L281 were not covered by tests
}
return aids
}(); len(notFoundIDs) != 0 {
err = status.WrapWithNotFound(fmt.Sprintf("MultiUpdate API uuids %v not found", notFoundIDs), err,
&errdetails.RequestInfo{
RequestId: strings.Join(uuids, ", "),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.MultiUpdate",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
}
}

var errs error
handleErrIDs := func(ids []string, errFunc func() error, attrsFunc func(string) []attribute.KeyValue) {
if len(ids) != 0 {
err := errFunc()

Check warning on line 289 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L289

Added line #L289 was not covered by tests
log.Warn(err)
attrs = trace.StatusCodeNotFound(err.Error())
} else if invalidDimensionIDs := func() []string {
idis := make([]string, 0, len(uuids))
for id, vec := range vmap {
if errors.Is(err, errors.ErrInvalidDimensionSize(len(vec), s.ngt.GetDimensionSize())) {
idis = append(idis, id)
}
errs = errors.Join(errs, err)

if span != nil {
span.RecordError(err)
span.SetAttributes(attrsFunc(err.Error())...)
span.SetStatus(trace.StatusError, err.Error())

Check warning on line 296 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L291-L296

Added lines #L291 - L296 were not covered by tests
}
return idis
}(); len(invalidDimensionIDs) != 0 || errors.Is(err, errors.ErrUUIDNotFound(0)) {
err = status.WrapWithInvalidArgument(fmt.Sprintf("MultiUpdate API invalid argument for uuids \"%v\" detected", invalidDimensionIDs), err,
&errdetails.RequestInfo{
RequestId: strings.Join(uuids, ", "),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
Field: "uuid or vector",
Description: err.Error(),
},
}
}

handleErrIDs(notFoundIDs, func() error {
return status.WrapWithNotFound(fmt.Sprintf("MultiUpdate API uuids %v not found", notFoundIDs), err,
&errdetails.RequestInfo{
RequestId: strings.Join(notFoundIDs, ", "),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.MultiUpdate",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
}, trace.StatusCodeNotFound)

Check warning on line 311 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L302-L311

Added lines #L302 - L311 were not covered by tests

handleErrIDs(invalidArgumentIDs, func() error {
return status.WrapWithInvalidArgument(fmt.Sprintf("MultiUpdate API invalid argument for uuids \"%v\" detected", invalidArgumentIDs), err,
&errdetails.RequestInfo{
RequestId: strings.Join(invalidArgumentIDs, ", "),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequestFieldViolation{
{
Field: "uuid or vector",
Description: err.Error(),

Check warning on line 323 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L314-L323

Added lines #L314 - L323 were not covered by tests
},
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.MultiUpdate",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
log.Warn(err)
attrs = trace.StatusCodeInvalidArgument(err.Error())
} else if alreadyExistsIDs := func() []string {
aids := make([]string, 0, len(uuids))
for _, id := range uuids {
if errors.Is(err, errors.ErrUUIDAlreadyExists(id)) {
aids = append(aids, id)
}
}
return aids
}(); len(alreadyExistsIDs) != 0 {
err = status.WrapWithAlreadyExists(fmt.Sprintf("MultiUpdate API uuids %v already exists", alreadyExistsIDs), err,
&errdetails.RequestInfo{
RequestId: strings.Join(uuids, ", "),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.MultiUpdate",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
log.Warn(err)
attrs = trace.StatusCodeAlreadyExists(err.Error())
} else {
err = status.WrapWithInternal("Update API failed", err,
&errdetails.RequestInfo{
RequestId: strings.Join(uuids, ", "),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.Update",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
}, info.Get())
log.Error(err)
attrs = trace.StatusCodeInternal(err.Error())
}
if span != nil {
span.RecordError(err)
span.SetAttributes(attrs...)
span.SetStatus(trace.StatusError, err.Error())
}
return nil, err
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.MultiUpdate",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
}, trace.StatusCodeInvalidArgument)

Check warning on line 331 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L326-L331

Added lines #L326 - L331 were not covered by tests

handleErrIDs(alreadyExistsIDs, func() error {
return status.WrapWithAlreadyExists(fmt.Sprintf("MultiUpdate API uuids %v already exists", alreadyExistsIDs), err,
&errdetails.RequestInfo{
RequestId: strings.Join(alreadyExistsIDs, ", "),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.MultiUpdate",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
})
}, trace.StatusCodeAlreadyExists)

Check warning on line 343 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L334-L343

Added lines #L334 - L343 were not covered by tests

handleErrIDs(otherIDs, func() error {
return status.WrapWithInternal("Update API failed", err,
&errdetails.RequestInfo{
RequestId: strings.Join(otherIDs, ", "),
ServingData: errdetails.Serialize(reqs),
},
&errdetails.ResourceInfo{
ResourceType: ngtResourceType + "/ngt.Update",
ResourceName: fmt.Sprintf("%s: %s(%s)", apiName, s.name, s.ip),
}, info.Get())
}, trace.StatusCodeInternal)

Check warning on line 355 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L346-L355

Added lines #L346 - L355 were not covered by tests

if errs != nil {
return nil, errs

Check warning on line 358 in pkg/agent/core/ngt/handler/grpc/update.go

View check run for this annotation

Codecov / codecov/patch

pkg/agent/core/ngt/handler/grpc/update.go#L358

Added line #L358 was not covered by tests
}

return s.newLocations(uuids...), nil
}

0 comments on commit 76c4e58

Please sign in to comment.