Skip to content

Commit

Permalink
enhance: support mark error as user error (#33498)
Browse files Browse the repository at this point in the history
relate: #33492

---------

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
  • Loading branch information
aoiasd committed Jul 1, 2024
1 parent d6afb31 commit 186757e
Show file tree
Hide file tree
Showing 14 changed files with 165 additions and 43 deletions.
36 changes: 29 additions & 7 deletions internal/proxy/accesslog/info/grpc_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,24 +181,46 @@ func (i *GrpcAccessInfo) ErrorCode() string {
return fmt.Sprint(merr.Code(i.err))
}

func (i *GrpcAccessInfo) ErrorMsg() string {
if i.err != nil {
return i.err.Error()
}

func (i *GrpcAccessInfo) respStatus() *commonpb.Status {
baseResp, ok := i.resp.(BaseResponse)
if ok {
status := baseResp.GetStatus()
return status.GetReason()
return baseResp.GetStatus()
}

status, ok := i.resp.(*commonpb.Status)
if ok {
return status
}
return nil
}

func (i *GrpcAccessInfo) ErrorMsg() string {
if i.err != nil {
return i.err.Error()
}

if status := i.respStatus(); status != nil {
return status.GetReason()
}

return Unknown
}

func (i *GrpcAccessInfo) ErrorType() string {
if i.err != nil {
return merr.GetErrorType(i.err).String()
}

if status := i.respStatus(); status.GetCode() > 0 {
if _, ok := status.ExtraInfo[merr.InputErrorFlagKey]; ok {
return merr.InputError.String()
}
return merr.SystemError.String()
}

return ""
}

func (i *GrpcAccessInfo) DbName() string {
name, ok := requestutil.GetDbNameFromRequest(i.req)
if !ok {
Expand Down
16 changes: 16 additions & 0 deletions internal/proxy/accesslog/info/grpc_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,22 @@ func (s *GrpcAccessInfoSuite) TestErrorMsg() {
s.Equal("rpc error: code = Unavailable desc = mock", result[0])
}

func (s *GrpcAccessInfoSuite) TestErrorType() {
s.info.resp = &milvuspb.QueryResults{
Status: merr.Status(nil),
}
result := Get(s.info, "$error_type")
s.Equal("", result[0])

s.info.resp = merr.Status(merr.WrapErrAsInputError(merr.ErrParameterInvalid))
result = Get(s.info, "$error_type")
s.Equal(merr.InputError.String(), result[0])

s.info.err = merr.ErrParameterInvalid
result = Get(s.info, "$error_type")
s.Equal(merr.SystemError.String(), result[0])
}

func (s *GrpcAccessInfoSuite) TestDbName() {
s.info.req = nil
result := Get(s.info, "$database_name")
Expand Down
6 changes: 6 additions & 0 deletions internal/proxy/accesslog/info/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var MetricFuncMap = map[string]getMetricFunc{
"$response_size": getResponseSize,
"$error_code": getErrorCode,
"$error_msg": getErrorMsg,
"$error_type": getErrorType,
"$database_name": getDbName,
"$collection_name": getCollectionName,
"$partition_name": getPartitionName,
Expand Down Expand Up @@ -61,6 +62,7 @@ type AccessInfo interface {
ResponseSize() string
ErrorCode() string
ErrorMsg() string
ErrorType() string
DbName() string
CollectionName() string
PartitionName() string
Expand Down Expand Up @@ -115,6 +117,10 @@ func getErrorMsg(i AccessInfo) string {
return i.ErrorMsg()
}

func getErrorType(i AccessInfo) string {
return i.ErrorType()
}

func getDbName(i AccessInfo) string {
return i.DbName()
}
Expand Down
4 changes: 4 additions & 0 deletions internal/proxy/accesslog/info/restful_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (i *RestfulInfo) ErrorMsg() string {
return fmt.Sprint(message)
}

func (i *RestfulInfo) ErrorType() string {
return Unknown
}

func (i *RestfulInfo) SdkVersion() string {
return "Restful"
}
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2552,7 +2552,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
log.Warn("Failed to enqueue insert task: " + err.Error())
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc()
return constructFailedResponse(err), nil
return constructFailedResponse(merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)), nil
}

log.Debug("Detail of insert request in Proxy")
Expand Down
3 changes: 2 additions & 1 deletion internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
t.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
// nolint
t.result.Status.Reason = fmt.Sprintf("can't find collection[database=%s][collection=%s]", t.GetDbName(), t.GetCollectionName())
t.result.Status.ExtraInfo = map[string]string{merr.InputErrorFlagKey: "true"}
}
return nil
}
Expand Down Expand Up @@ -1440,7 +1441,7 @@ func (t *flushTask) Execute(ctx context.Context) error {
for _, collName := range t.CollectionNames {
collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collName)
if err != nil {
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
flushReq := &datapb.FlushRequest{
Base: commonpbutil.UpdateMsgBase(
Expand Down
8 changes: 4 additions & 4 deletions internal/proxy/task_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,13 @@ func (dr *deleteRunner) Init(ctx context.Context) error {

db, err := globalMetaCache.GetDatabaseInfo(ctx, dr.req.GetDbName())
if err != nil {
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrDatabaseNotFound)
}
dr.dbID = db.dbID

dr.collectionID, err = globalMetaCache.GetCollectionID(ctx, dr.req.GetDbName(), collName)
if err != nil {
return ErrWithLog(log, "Failed to get collection id", err)
return ErrWithLog(log, "Failed to get collection id", merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound))
}

dr.schema, err = globalMetaCache.GetCollectionSchema(ctx, dr.req.GetDbName(), collName)
Expand Down Expand Up @@ -316,11 +316,11 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
func (dr *deleteRunner) Run(ctx context.Context) error {
plan, err := planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr())
if err != nil {
return merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err)
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err))
}

if planparserv2.IsAlwaysTruePlan(plan) {
return merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr())
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()))
}

isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan)
Expand Down
6 changes: 3 additions & 3 deletions internal/proxy/task_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
if maxInsertSize != -1 && it.insertMsg.Size() > maxInsertSize {
log.Warn("insert request size exceeds maxInsertSize",
zap.Int("request size", it.insertMsg.Size()), zap.Int("maxInsertSize", maxInsertSize))
return merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize")
return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize"))
}

schema, err := globalMetaCache.GetCollectionSchema(ctx, it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err))
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
it.schema = schema.CollectionSchema

Expand Down Expand Up @@ -208,7 +208,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {

if err := newValidateUtil(withNANCheck(), withOverflowCheck(), withMaxLenCheck(), withMaxCapCheck()).
Validate(it.insertMsg.GetFieldsData(), schema.CollectionSchema, it.insertMsg.NRows()); err != nil {
return err
return merr.WrapErrAsInputError(err)
}

log.Debug("Proxy Insert PreExecute done")
Expand Down
16 changes: 8 additions & 8 deletions internal/proxy/task_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func createCntPlan(expr string, schemaHelper *typeutil.SchemaHelper) (*planpb.Pl

plan, err := planparserv2.CreateRetrievePlan(schemaHelper, expr)
if err != nil {
return nil, merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)
return nil, merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err))
}

plan.Node.(*planpb.PlanNode_Query).Query.IsCount = true
Expand All @@ -223,7 +223,7 @@ func (t *queryTask) createPlan(ctx context.Context) error {
if t.plan == nil {
t.plan, err = planparserv2.CreateRetrievePlan(schema.schemaHelper, t.request.Expr)
if err != nil {
return merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err))
}
}

Expand Down Expand Up @@ -291,7 +291,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
collID, err := globalMetaCache.GetCollectionID(ctx, t.request.GetDbName(), collectionName)
if err != nil {
log.Warn("Failed to get collection id.", zap.String("collectionName", collectionName), zap.Error(err))
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}
t.CollectionID = collID
log.Debug("Get collection ID by name", zap.Int64("collectionID", t.CollectionID))
Expand All @@ -302,11 +302,11 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
return err
}
if t.partitionKeyMode && len(t.request.GetPartitionNames()) != 0 {
return errors.New("not support manually specifying the partition names if partition key mode is used")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("not support manually specifying the partition names if partition key mode is used"))
}
if t.mustUsePartitionKey && !t.partitionKeyMode {
return merr.WrapErrParameterInvalidMsg("must use partition key in the query request " +
"because the mustUsePartitionKey config is true")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("must use partition key in the query request " +
"because the mustUsePartitionKey config is true"))
}

for _, tag := range t.request.PartitionNames {
Expand Down Expand Up @@ -363,7 +363,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
t.plan.Node.(*planpb.PlanNode_Query).Query.Limit = t.RetrieveRequest.Limit

if planparserv2.IsAlwaysTruePlan(t.plan) && t.RetrieveRequest.Limit == typeutil.Unlimited {
return fmt.Errorf("empty expression should be used with limit")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("empty expression should be used with limit"))
}

// convert partition names only when requery is false
Expand All @@ -390,7 +390,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {

// count with pagination
if t.plan.GetQuery().GetIsCount() && t.queryParams.limit != typeutil.Unlimited {
return fmt.Errorf("count entities with pagination is not allowed")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("count entities with pagination is not allowed"))
}

t.RetrieveRequest.IsCount = t.plan.GetQuery().GetIsCount()
Expand Down
6 changes: 3 additions & 3 deletions internal/proxy/task_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
t.collectionName = collectionName
collID, err := globalMetaCache.GetCollectionID(ctx, t.request.GetDbName(), collectionName)
if err != nil { // err is not nil if collection not exists
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
}

t.SearchRequest.DbID = 0 // todo
Expand All @@ -135,8 +135,8 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
return errors.New("not support manually specifying the partition names if partition key mode is used")
}
if t.mustUsePartitionKey && !t.partitionKeyMode {
return merr.WrapErrParameterInvalidMsg("must use partition key in the search request " +
"because the mustUsePartitionKey config is true")
return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("must use partition key in the search request " +
"because the mustUsePartitionKey config is true"))
}

if !t.partitionKeyMode && len(t.request.GetPartitionNames()) > 0 {
Expand Down
4 changes: 2 additions & 2 deletions internal/proxy/task_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,14 +187,14 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
if err != nil {
log.Warn("check primary field data and hash primary key failed when upsert",
zap.Error(err))
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid)
}
// set field ID to insert field data
err = fillFieldIDBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema)
if err != nil {
log.Warn("insert set fieldID to fieldData failed when upsert",
zap.Error(err))
return err
return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid)
}

if it.partitionKeyMode {
Expand Down
4 changes: 2 additions & 2 deletions internal/proxy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,10 +646,10 @@ func parsePrimaryFieldData2IDs(fieldData *schemapb.FieldData) (*schemapb.IDs, er
StrId: scalarField.GetStringData(),
}
default:
return nil, errors.New("currently only support DataType Int64 or VarChar as PrimaryField")
return nil, merr.WrapErrParameterInvalidMsg("currently only support DataType Int64 or VarChar as PrimaryField")
}
default:
return nil, errors.New("currently not support vector field as PrimaryField")
return nil, merr.WrapErrParameterInvalidMsg("currently not support vector field as PrimaryField")
}

return primaryData, nil
Expand Down
45 changes: 36 additions & 9 deletions pkg/util/merr/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,22 @@ const (
TimeoutCode int32 = 10001
)

type ErrorType int32

const (
SystemError ErrorType = 0
InputError ErrorType = 1
)

var ErrorTypeName = map[ErrorType]string{
SystemError: "system_error",
InputError: "input_error",
}

func (err ErrorType) String() string {
return ErrorTypeName[err]
}

// Define leaf errors here,
// WARN: take care to add new error,
// check whether you can use the errors below before adding a new one.
Expand Down Expand Up @@ -196,29 +212,40 @@ var (
ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false)
)

type errorOption func(*milvusError)

func WithDetail(detail string) errorOption {
return func(err *milvusError) {
err.detail = detail
}
}

func WithErrorType(etype ErrorType) errorOption {
return func(err *milvusError) {
err.errType = etype
}
}

type milvusError struct {
msg string
detail string
retriable bool
errCode int32
errType ErrorType
}

func newMilvusError(msg string, code int32, retriable bool) milvusError {
return milvusError{
func newMilvusError(msg string, code int32, retriable bool, options ...errorOption) milvusError {
err := milvusError{
msg: msg,
detail: msg,
retriable: retriable,
errCode: code,
}
}

func newMilvusErrorWithDetail(msg string, detail string, code int32, retriable bool) milvusError {
return milvusError{
msg: msg,
detail: detail,
retriable: retriable,
errCode: code,
for _, option := range options {
option(&err)
}
return err
}

func (e milvusError) code() int32 {
Expand Down
Loading

0 comments on commit 186757e

Please sign in to comment.