diff --git a/Makefile b/Makefile index 189b519..4509c4c 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ run-containers: ## run all test containers @cd test_containers && docker-compose up -d && cd .. kill-containers: ## kill all test containers - @cd test_containers && docker-compose stop && cd .. + @cd test_containers && docker-compose down && cd .. setup/mongo: go run scripts/setup_mongo_messages-index.go diff --git a/app/helpers.go b/app/helpers.go index 0bc7489..bf1d019 100644 --- a/app/helpers.go +++ b/app/helpers.go @@ -19,6 +19,9 @@ import ( "github.com/labstack/echo" newrelic "github.com/newrelic/go-agent" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/opentracing/opentracing-go/log" "github.com/spf13/viper" "github.com/topfreegames/mqtt-history/mongoclient" "go.mongodb.org/mongo-driver/bson" @@ -61,6 +64,16 @@ func WithSegment(name string, c echo.Context, f func() error) error { } func findAuthorizedTopics(ctx context.Context, username string, topics []string) ([]ACL, error) { + collection := "mqtt_acl" + span, ctx := opentracing.StartSpanFromContext( + ctx, + "find_authorized_topics", + opentracing.Tags{ + string(ext.DBType): "mongo", + "collection": collection, + }, + ) + defer span.Finish() searchResults := make([]ACL, 0) query := func(c *mongo.Collection) error { opts := options.Find() @@ -69,31 +82,41 @@ func findAuthorizedTopics(ctx context.Context, username string, topics []string) {"username", 1}, {"pubsub", 1}, } - // add sort to match index opts.SetSort(defaultACLSort) - query := bson.M{"username": username, "pubsub": bson.M{"$in": topics}} + + statement := mongoclient.ExtractStatementForTrace(query, defaultACLSort, -1) + span.SetTag(string(ext.DBStatement), statement) + span.SetTag(string(ext.DBInstance), c.Database().Name()) + cursor, err := c.Find(ctx, query) if err != nil { + ext.LogError(span, err, log.Message("Error finding messages in MongoDB")) return err } return cursor.All(ctx, &searchResults) } search := func() error { - mongoCollection, err := mongoclient.GetCollection(ctx, "mqtt_acl") + mongoCollection, err := mongoclient.GetCollection(ctx, collection) if err != nil { + ext.LogError(span, err, log.Message("Error getting collection from MongoDB")) return err } return query(mongoCollection) } err := search() + if err != nil { + ext.LogError(span, err, log.Message("Error decoding messages of a cursor from MongoDB")) + } return searchResults, err } // GetTopics get topics func GetTopics(ctx context.Context, username string, _topics []string) ([]string, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "get_topics") + defer span.Finish() if viper.GetBool("mongo.allow_anonymous") { return _topics, nil } @@ -114,13 +137,16 @@ func IsAuthorized(ctx context.Context, app *App, userID string, topics ...string httpAuthEnabled := app.Config.GetBool("httpAuth.enabled") if httpAuthEnabled { - return httpAuthorize(app, userID, topics) + return httpAuthorize(ctx, app, userID, topics) } return mongoAuthorize(ctx, userID, topics) } -func httpAuthorize(app *App, userID string, topics []string) (bool, []string, error) { +func httpAuthorize(ctx context.Context, app *App, userID string, topics []string) (bool, []string, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "http_authorize") + defer span.Finish() + timeout := app.Config.GetDuration("httpAuth.timeout") * time.Second address := app.Config.GetString("httpAuth.requestURL") @@ -147,6 +173,11 @@ func httpAuthorize(app *App, userID string, topics []string) (bool, []string, er request.SetBasicAuth(username, password) } + opentracing.GlobalTracer().Inject( + span.Context(), + opentracing.HTTPHeaders, + opentracing.HTTPHeadersCarrier(request.Header)) + response, err := client.Do(request) // discard response body if response != nil && response.Body != nil { @@ -155,6 +186,7 @@ func httpAuthorize(app *App, userID string, topics []string) (bool, []string, er } if err != nil { + ext.LogError(span, err, log.Message("Error authorizing user")) return false, nil, err } @@ -168,6 +200,8 @@ func httpAuthorize(app *App, userID string, topics []string) (bool, []string, er } func mongoAuthorize(ctx context.Context, userID string, topics []string) (bool, []string, error) { + span, ctx := opentracing.StartSpanFromContext(ctx, "mongo_authorize") + defer span.Finish() for _, topic := range topics { pieces := strings.Split(topic, "/") pieces[len(pieces)-1] = "+" diff --git a/docker-compose.yml b/docker-compose.yml index 6f664a9..166e223 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -23,8 +23,12 @@ services: image: mongo:3.6.23 ports: - "27017:27017" + volumes: + - mongo:/data/db jaeger: image: jaegertracing/all-in-one:1.6 ports: - 6831:6831/udp - 16686:16686 +volumes: + mongo: \ No newline at end of file diff --git a/mongoclient/get_messages.go b/mongoclient/get_messages.go index 762f317..1298cf0 100644 --- a/mongoclient/get_messages.go +++ b/mongoclient/get_messages.go @@ -136,24 +136,15 @@ func GetMessagesPlayerSupportV2WithParameter(ctx context.Context, queryParameter mongoCollection, err := GetCollection(ctx, queryParameters.Collection) if err != nil { - span.SetTag("error", true) - span.LogFields( - log.Event("error"), - log.Message("Error getting collection from MongoDB"), - log.Error(err), - ) + span.SetTag("collection", queryParameters.Collection) + ext.LogError(span, err, log.Message("Error getting collection from MongoDB")) logger.Logger.Warningf("Error getting collection from MongoDB: %s", err.Error()) return []*models.MessageV2{} } rawResults, err := getMessagesPlayerSupportFromCollection(ctx, queryParameters, mongoCollection) if err != nil { - span.SetTag("error", true) - span.LogFields( - log.Event("error"), - log.Message("Error getting messages from MongoDB"), - log.Error(err), - ) + ext.LogError(span, err, log.Message("Error getting messages from MongoDB")) logger.Logger.Warningf("Error getting messages from MongoDB: %s", err.Error()) return []*models.MessageV2{} } @@ -164,12 +155,7 @@ func GetMessagesPlayerSupportV2WithParameter(ctx context.Context, queryParameter searchResults[i], err = convertRawMessageToModelMessage(rawResults[i]) if err != nil { - span.SetTag("error", true) - span.LogFields( - log.Event("error"), - log.Message("Error converting messages from MongoDB"), - log.Error(err), - ) + ext.LogError(span, err, log.Message("Error converting messages from MongoDB")) logger.Logger.Warningf("Error converting messages from MongoDB: %s", err.Error()) return []*models.MessageV2{} } @@ -189,15 +175,16 @@ func getMessagesPlayerSupportFromCollection( {"timestamp", -1}, } - statement := extractStatementForTrace(query, sort, queryParameters.Limit) + statement := ExtractStatementForTrace(query, sort, queryParameters.Limit) span, ctx := opentracing.StartSpanFromContext( ctx, "get_messages_player_support_from_collection", opentracing.Tags{ string(ext.DBStatement): statement, string(ext.DBType): "mongo", - string(ext.DBInstance): database, + string(ext.DBInstance): mongoCollection.Database().Name(), string(ext.DBUser): user, + "collection": mongoCollection.Name(), }, ) defer span.Finish() @@ -208,13 +195,13 @@ func getMessagesPlayerSupportFromCollection( cursor, err := mongoCollection.Find(ctx, query, opts) if err != nil { - span.SetTag("error", true) + ext.LogError(span, err, log.Message("Error finding messages in MongoDB")) return nil, err } rawResults := make([]MongoMessage, 0) if err = cursor.All(ctx, &rawResults); err != nil { - span.SetTag("error", true) + ext.LogError(span, err, log.Message("Error decoding messages of a cursor from MongoDB")) return nil, err } @@ -241,18 +228,13 @@ func resolveQuery(queryParameters QueryParameters) bson.M { return query } -func extractStatementForTrace(query bson.M, sort bson.D, limit int64) string { - statementByteArray, err := bson.MarshalExtJSON(query, true, true) - if err == nil { - statementByteArray, _ = bson.MarshalExtJSONAppend( - statementByteArray, - bson.D{ - {"sort", sort}, - {"limit", limit}, - }, - true, - true, - ) +func ExtractStatementForTrace(query bson.M, sort bson.D, limit int64) string { + queryCopy := make(map[string]interface{}, len(query)) + for k, v := range query { + queryCopy[k] = v } + queryCopy["sort"] = sort + queryCopy["limit"] = limit + statementByteArray, _ := bson.MarshalExtJSON(queryCopy, true, true) return string(statementByteArray) } diff --git a/mongoclient/get_messages_v2.go b/mongoclient/get_messages_v2.go index b963884..76dd816 100644 --- a/mongoclient/get_messages_v2.go +++ b/mongoclient/get_messages_v2.go @@ -25,12 +25,8 @@ func GetMessagesV2WithParameter(ctx context.Context, queryParameters QueryParame mongoCollection, err := GetCollection(ctx, queryParameters.Collection) if err != nil { - span.SetTag("error", true) - span.LogFields( - log.Event("error"), - log.Message("Error getting collection from MongoDB"), - log.Error(err), - ) + span.SetTag("collection", queryParameters.Collection) + ext.LogError(span, err, log.Message("Error getting collection from MongoDB")) logger.Logger.Warning("Error getting collection from MongoDB", err) return []*models.MessageV2{} } @@ -47,12 +43,7 @@ func GetMessagesV2WithParameter(ctx context.Context, queryParameters QueryParame searchResults[i], err = convertRawMessageToModelMessage(rawResults[i]) if err != nil { - span.SetTag("error", true) - span.LogFields( - log.Event("error"), - log.Message("Error converting messages from MongoDB to the program domain format"), - log.Error(err), - ) + ext.LogError(span, err, log.Message("Error converting messages from MongoDB to the program domain format")) logger.Logger.Warningf("Error converting messages from MongoDB: %s", err.Error()) return []*models.MessageV2{} } @@ -78,15 +69,16 @@ func getMessagesFromCollection( {"timestamp", -1}, } - statement := extractStatementForTrace(query, sort, queryParameters.Limit) + statement := ExtractStatementForTrace(query, sort, queryParameters.Limit) span, ctx := opentracing.StartSpanFromContext( ctx, "get_messages_from_collection", opentracing.Tags{ string(ext.DBStatement): statement, string(ext.DBType): "mongo", - string(ext.DBInstance): database, + string(ext.DBInstance): mongoCollection.Database().Name(), string(ext.DBUser): user, + "collection": mongoCollection.Name(), }, ) defer span.Finish() @@ -97,23 +89,13 @@ func getMessagesFromCollection( cursor, err := mongoCollection.Find(ctx, query, opts) if err != nil { - span.SetTag("error", true) - span.LogFields( - log.Event("error"), - log.Message("Error finding messages in MongoDB"), - log.Error(err), - ) + ext.LogError(span, err, log.Message("Error finding messages in MongoDB")) return nil, err } rawResults := make([]MongoMessage, 0) if err = cursor.All(ctx, &rawResults); err != nil { - span.SetTag("error", true) - span.LogFields( - log.Event("error"), - log.Message("Error decoding messages of a cursor from MongoDB"), - log.Error(err), - ) + ext.LogError(span, err, log.Message("Error decoding messages of a cursor from MongoDB")) return nil, err }