Skip to content

Commit

Permalink
enhance: add log interceptor
Browse files Browse the repository at this point in the history
Signed-off-by: ThreadDao <yufen.zong@zilliz.com>
  • Loading branch information
ThreadDao committed May 27, 2024
1 parent 815742b commit 4966947
Showing 1 changed file with 47 additions and 4 deletions.
51 changes: 47 additions & 4 deletions tests/go_client/base/milvus_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package base

import (
"context"

"github.com/milvus-io/milvus/pkg/log"

"encoding/json"
"github.com/milvus-io/milvus/client/v2/entity"
"github.com/milvus-io/milvus/pkg/log"
"strings"

"go.uber.org/zap"

Expand All @@ -28,12 +28,54 @@ func postResponse(funcName string, err error, res ...interface{}) {
}
}


func LoggingUnaryInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
maxLogLength := 200
_method := strings.Split(method, "/")
_methodShotName := _method[len(_method) - 1]
// Marshal req to json str
reqJSON, err := json.Marshal(req)
if err != nil {
log.Error("Failed to marshal request", zap.Error(err))
reqJSON = []byte("could not marshal request")
}
reqStr := string(reqJSON)
if len(reqStr) > maxLogLength {
reqStr = reqStr[:maxLogLength] + "..."
}

// log before
log.Info("Request", zap.String("method", _methodShotName), zap.Any("reqs", reqStr))

// invoker
errResp := invoker(ctx, method, req, reply, cc, opts...)

// Marshal reply to json str
respJSON, err := json.Marshal(reply)
if err != nil {
log.Error("Failed to marshal response", zap.Error(err))
respJSON = []byte("could not marshal response")
}
respStr := string(respJSON)
if len(respStr) > maxLogLength {
respStr = respStr[:maxLogLength] + "..."
}

// log after
log.Info("Response", zap.String("method", _methodShotName), zap.Any("resp", respStr))
return errResp
}
}


type MilvusClient struct {
mClient *clientv2.Client
}

func NewMilvusClient(ctx context.Context, cfg *clientv2.ClientConfig) (*MilvusClient, error) {
preRequest("NewClient", ctx, cfg)
cfg.DialOptions = append(cfg.DialOptions, grpc.WithUnaryInterceptor(LoggingUnaryInterceptor()))
mClient, err := clientv2.New(ctx, cfg)
postResponse("NewClient", err, mClient)
return &MilvusClient{
Expand All @@ -50,7 +92,7 @@ func (mc *MilvusClient) Close(ctx context.Context) error {

// -- database --

// ListDatabases list all database in milvus cluster.
// UsingDatabase list all database in milvus cluster.
func (mc *MilvusClient) UsingDatabase(ctx context.Context, option clientv2.UsingDatabaseOption) error {
preRequest("UsingDatabase", ctx, option)
err := mc.mClient.UsingDatabase(ctx, option)
Expand All @@ -66,6 +108,7 @@ func (mc *MilvusClient) ListDatabases(ctx context.Context, option clientv2.ListD
return databaseNames, err
}


// CreateDatabase create database with the given name.
func (mc *MilvusClient) CreateDatabase(ctx context.Context, option clientv2.CreateDatabaseOption, callOptions ...grpc.CallOption) error {
preRequest("CreateDatabase", ctx, option, callOptions)
Expand Down

0 comments on commit 4966947

Please sign in to comment.