Skip to content

Commit

Permalink
perf: Instrument auth{n,z} calls and simplify existing tracing code
Browse files Browse the repository at this point in the history
  • Loading branch information
Sonia Melgaço committed Oct 10, 2022
1 parent b028dac commit 383fea0
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 66 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ run-containers: ## run all test containers
@cd test_containers && docker-compose up -d && cd ..

kill-containers: ## kill all test containers
@cd test_containers && docker-compose stop && cd ..
@cd test_containers && docker-compose down && cd ..

setup/mongo:
go run scripts/setup_mongo_messages-index.go
Expand Down
44 changes: 39 additions & 5 deletions app/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (

"github.com/labstack/echo"
newrelic "github.com/newrelic/go-agent"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/log"
"github.com/spf13/viper"
"github.com/topfreegames/mqtt-history/mongoclient"
"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -61,6 +64,16 @@ func WithSegment(name string, c echo.Context, f func() error) error {
}

func findAuthorizedTopics(ctx context.Context, username string, topics []string) ([]ACL, error) {
collection := "mqtt_acl"
span, ctx := opentracing.StartSpanFromContext(
ctx,
"find_authorized_topics",
opentracing.Tags{
string(ext.DBType): "mongo",
"collection": collection,
},
)
defer span.Finish()
searchResults := make([]ACL, 0)
query := func(c *mongo.Collection) error {
opts := options.Find()
Expand All @@ -69,31 +82,41 @@ func findAuthorizedTopics(ctx context.Context, username string, topics []string)
{"username", 1},
{"pubsub", 1},
}

// add sort to match index
opts.SetSort(defaultACLSort)

query := bson.M{"username": username, "pubsub": bson.M{"$in": topics}}

statement := mongoclient.ExtractStatementForTrace(query, defaultACLSort, -1)
span.SetTag(string(ext.DBStatement), statement)
span.SetTag(string(ext.DBInstance), c.Database().Name())

cursor, err := c.Find(ctx, query)
if err != nil {
ext.LogError(span, err, log.Message("Error finding messages in MongoDB"))
return err
}

return cursor.All(ctx, &searchResults)
}
search := func() error {
mongoCollection, err := mongoclient.GetCollection(ctx, "mqtt_acl")
mongoCollection, err := mongoclient.GetCollection(ctx, collection)
if err != nil {
ext.LogError(span, err, log.Message("Error getting collection from MongoDB"))
return err
}
return query(mongoCollection)
}
err := search()
if err != nil {
ext.LogError(span, err, log.Message("Error decoding messages of a cursor from MongoDB"))
}
return searchResults, err
}

// GetTopics get topics
func GetTopics(ctx context.Context, username string, _topics []string) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "get_topics")
defer span.Finish()
if viper.GetBool("mongo.allow_anonymous") {
return _topics, nil
}
Expand All @@ -114,13 +137,16 @@ func IsAuthorized(ctx context.Context, app *App, userID string, topics ...string
httpAuthEnabled := app.Config.GetBool("httpAuth.enabled")

if httpAuthEnabled {
return httpAuthorize(app, userID, topics)
return httpAuthorize(ctx, app, userID, topics)
}

return mongoAuthorize(ctx, userID, topics)
}

func httpAuthorize(app *App, userID string, topics []string) (bool, []string, error) {
func httpAuthorize(ctx context.Context, app *App, userID string, topics []string) (bool, []string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "http_authorize")
defer span.Finish()

timeout := app.Config.GetDuration("httpAuth.timeout") * time.Second
address := app.Config.GetString("httpAuth.requestURL")

Expand All @@ -147,6 +173,11 @@ func httpAuthorize(app *App, userID string, topics []string) (bool, []string, er
request.SetBasicAuth(username, password)
}

opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(request.Header))

response, err := client.Do(request)
// discard response body
if response != nil && response.Body != nil {
Expand All @@ -155,6 +186,7 @@ func httpAuthorize(app *App, userID string, topics []string) (bool, []string, er
}

if err != nil {
ext.LogError(span, err, log.Message("Error authorizing user"))
return false, nil, err
}

Expand All @@ -168,6 +200,8 @@ func httpAuthorize(app *App, userID string, topics []string) (bool, []string, er
}

func mongoAuthorize(ctx context.Context, userID string, topics []string) (bool, []string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "mongo_authorize")
defer span.Finish()
for _, topic := range topics {
pieces := strings.Split(topic, "/")
pieces[len(pieces)-1] = "+"
Expand Down
4 changes: 4 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ services:
image: mongo:3.6.23
ports:
- "27017:27017"
volumes:
- mongo:/data/db
jaeger:
image: jaegertracing/all-in-one:1.6
ports:
- 6831:6831/udp
- 16686:16686
volumes:
mongo:
50 changes: 16 additions & 34 deletions mongoclient/get_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,24 +136,15 @@ func GetMessagesPlayerSupportV2WithParameter(ctx context.Context, queryParameter

mongoCollection, err := GetCollection(ctx, queryParameters.Collection)
if err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error getting collection from MongoDB"),
log.Error(err),
)
span.SetTag("collection", queryParameters.Collection)
ext.LogError(span, err, log.Message("Error getting collection from MongoDB"))
logger.Logger.Warningf("Error getting collection from MongoDB: %s", err.Error())
return []*models.MessageV2{}
}

rawResults, err := getMessagesPlayerSupportFromCollection(ctx, queryParameters, mongoCollection)
if err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error getting messages from MongoDB"),
log.Error(err),
)
ext.LogError(span, err, log.Message("Error getting messages from MongoDB"))
logger.Logger.Warningf("Error getting messages from MongoDB: %s", err.Error())
return []*models.MessageV2{}
}
Expand All @@ -164,12 +155,7 @@ func GetMessagesPlayerSupportV2WithParameter(ctx context.Context, queryParameter
searchResults[i], err = convertRawMessageToModelMessage(rawResults[i])

if err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error converting messages from MongoDB"),
log.Error(err),
)
ext.LogError(span, err, log.Message("Error converting messages from MongoDB"))
logger.Logger.Warningf("Error converting messages from MongoDB: %s", err.Error())
return []*models.MessageV2{}
}
Expand All @@ -189,15 +175,16 @@ func getMessagesPlayerSupportFromCollection(
{"timestamp", -1},
}

statement := extractStatementForTrace(query, sort, queryParameters.Limit)
statement := ExtractStatementForTrace(query, sort, queryParameters.Limit)
span, ctx := opentracing.StartSpanFromContext(
ctx,
"get_messages_player_support_from_collection",
opentracing.Tags{
string(ext.DBStatement): statement,
string(ext.DBType): "mongo",
string(ext.DBInstance): database,
string(ext.DBInstance): mongoCollection.Database().Name(),
string(ext.DBUser): user,
"collection": mongoCollection.Name(),
},
)
defer span.Finish()
Expand All @@ -208,13 +195,13 @@ func getMessagesPlayerSupportFromCollection(

cursor, err := mongoCollection.Find(ctx, query, opts)
if err != nil {
span.SetTag("error", true)
ext.LogError(span, err, log.Message("Error finding messages in MongoDB"))
return nil, err
}

rawResults := make([]MongoMessage, 0)
if err = cursor.All(ctx, &rawResults); err != nil {
span.SetTag("error", true)
ext.LogError(span, err, log.Message("Error decoding messages of a cursor from MongoDB"))
return nil, err
}

Expand All @@ -241,18 +228,13 @@ func resolveQuery(queryParameters QueryParameters) bson.M {
return query
}

func extractStatementForTrace(query bson.M, sort bson.D, limit int64) string {
statementByteArray, err := bson.MarshalExtJSON(query, true, true)
if err == nil {
statementByteArray, _ = bson.MarshalExtJSONAppend(
statementByteArray,
bson.D{
{"sort", sort},
{"limit", limit},
},
true,
true,
)
func ExtractStatementForTrace(query bson.M, sort bson.D, limit int64) string {
queryCopy := make(map[string]interface{}, len(query))
for k, v := range query {
queryCopy[k] = v
}
queryCopy["sort"] = sort
queryCopy["limit"] = limit
statementByteArray, _ := bson.MarshalExtJSON(queryCopy, true, true)
return string(statementByteArray)
}
34 changes: 8 additions & 26 deletions mongoclient/get_messages_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@ func GetMessagesV2WithParameter(ctx context.Context, queryParameters QueryParame

mongoCollection, err := GetCollection(ctx, queryParameters.Collection)
if err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error getting collection from MongoDB"),
log.Error(err),
)
span.SetTag("collection", queryParameters.Collection)
ext.LogError(span, err, log.Message("Error getting collection from MongoDB"))
logger.Logger.Warning("Error getting collection from MongoDB", err)
return []*models.MessageV2{}
}
Expand All @@ -47,12 +43,7 @@ func GetMessagesV2WithParameter(ctx context.Context, queryParameters QueryParame
searchResults[i], err = convertRawMessageToModelMessage(rawResults[i])

if err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error converting messages from MongoDB to the program domain format"),
log.Error(err),
)
ext.LogError(span, err, log.Message("Error converting messages from MongoDB to the program domain format"))
logger.Logger.Warningf("Error converting messages from MongoDB: %s", err.Error())
return []*models.MessageV2{}
}
Expand All @@ -78,15 +69,16 @@ func getMessagesFromCollection(
{"timestamp", -1},
}

statement := extractStatementForTrace(query, sort, queryParameters.Limit)
statement := ExtractStatementForTrace(query, sort, queryParameters.Limit)
span, ctx := opentracing.StartSpanFromContext(
ctx,
"get_messages_from_collection",
opentracing.Tags{
string(ext.DBStatement): statement,
string(ext.DBType): "mongo",
string(ext.DBInstance): database,
string(ext.DBInstance): mongoCollection.Database().Name(),
string(ext.DBUser): user,
"collection": mongoCollection.Name(),
},
)
defer span.Finish()
Expand All @@ -97,23 +89,13 @@ func getMessagesFromCollection(

cursor, err := mongoCollection.Find(ctx, query, opts)
if err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error finding messages in MongoDB"),
log.Error(err),
)
ext.LogError(span, err, log.Message("Error finding messages in MongoDB"))
return nil, err
}

rawResults := make([]MongoMessage, 0)
if err = cursor.All(ctx, &rawResults); err != nil {
span.SetTag("error", true)
span.LogFields(
log.Event("error"),
log.Message("Error decoding messages of a cursor from MongoDB"),
log.Error(err),
)
ext.LogError(span, err, log.Message("Error decoding messages of a cursor from MongoDB"))
return nil, err
}

Expand Down

0 comments on commit 383fea0

Please sign in to comment.