Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memphis nats logs combined + db connection #251

Merged
merged 5 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 33 additions & 11 deletions analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
)

var configuration = conf.GetConfig()
var systemKeysCollection = db.GetCollection("system_keys")
var systemKeysCollection *mongo.Collection
var ls launcher.Launcher
var loginsCounter metric.Int64Counter
var installationsCounter metric.Int64Counter
Expand All @@ -43,17 +43,8 @@ var disableAnalyticsCounter metric.Int64Counter
var deploymentId string
var analyticsFlag string

func getSystemKey(key string) (models.SystemKey, error) {
filter := bson.M{"key": key}
var systemKey models.SystemKey
err := systemKeysCollection.FindOne(context.TODO(), filter).Decode(&systemKey)
if err != nil {
return systemKey, err
}
return systemKey, nil
}

func InitializeAnalytics() error {
systemKeysCollection = db.GetCollection("system_keys")
deployment, err := getSystemKey("deployment_id")
if err == mongo.ErrNoDocuments {
deploymentId := primitive.NewObjectID().Hex()
Expand Down Expand Up @@ -112,46 +103,77 @@ func InitializeAnalytics() error {
metric.WithUnit("0"),
metric.WithDescription("Counting the number of installations of Memphis"),
)
if err != nil {
return err
}

nextStepsCounter, err = Meter.NewInt64Counter(
"NextSteps",
metric.WithUnit("0"),
metric.WithDescription("Counting the number of users complete the next steps wizard in the UI"),
)
if err != nil {
return err
}

loginsCounter, err = Meter.NewInt64Counter(
"Logins",
metric.WithUnit("0"),
metric.WithDescription("Counting the number of logins to Memphis"),
)
if err != nil {
return err
}

stationsCounter, err = Meter.NewInt64Counter(
"Stations",
metric.WithUnit("0"),
metric.WithDescription("Counting the number of stations"),
)
if err != nil {
return err
}

producersCounter, err = Meter.NewInt64Counter(
"Producers",
metric.WithUnit("0"),
metric.WithDescription("Counting the number of producers"),
)
if err != nil {
return err
}

consumersCounter, err = Meter.NewInt64Counter(
"Consumers",
metric.WithUnit("0"),
metric.WithDescription("Counting the number of consumers"),
)
if err != nil {
return err
}

disableAnalyticsCounter, err = Meter.NewInt64Counter(
"DisableAnalytics",
metric.WithUnit("0"),
metric.WithDescription("Counting the number of disable analytics events"),
)
if err != nil {
return err
}

return nil
}

func getSystemKey(key string) (models.SystemKey, error) {
filter := bson.M{"key": key}
var systemKey models.SystemKey
err := systemKeysCollection.FindOne(context.TODO(), filter).Decode(&systemKey)
if err != nil {
return systemKey, err
}
return systemKey, nil
}

func IncrementInstallationsCounter() {
installationsCounter.Add(context.TODO(), 1, attribute.String("deployment_id", deploymentId))
}
Expand Down
72 changes: 0 additions & 72 deletions background_tasks/logs_consumer.go

This file was deleted.

56 changes: 24 additions & 32 deletions background_tasks/zombie_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"memphis-broker/conf"
"memphis-broker/db"
"memphis-broker/models"
"strconv"
"memphis-broker/server"
"sync"
"time"

Expand All @@ -29,24 +29,32 @@ import (

var configuration = conf.GetConfig()

var connectionsCollection *mongo.Collection = db.GetCollection("connections")
var producersCollection *mongo.Collection = db.GetCollection("producers")
var consumersCollection *mongo.Collection = db.GetCollection("consumers")
var sysLogsCollection *mongo.Collection = db.GetCollection("system_logs")
var poisonMessagesCollection *mongo.Collection = db.GetCollection("poison_messages")
var connectionsCollection *mongo.Collection
var producersCollection *mongo.Collection
var consumersCollection *mongo.Collection
var poisonMessagesCollection *mongo.Collection
var serv *server.Server

func InitializeZombieResources(s *server.Server) {
connectionsCollection = db.GetCollection("connections")
producersCollection = db.GetCollection("producers")
consumersCollection = db.GetCollection("consumers")
poisonMessagesCollection = db.GetCollection("poison_messages")
serv = s
}

func killRelevantConnections() ([]models.Connection, error) {
lastAllowedTime := time.Now().Add(time.Duration(-configuration.PING_INTERVAL_MS-5000) * time.Millisecond)

var connections []models.Connection
cursor, err := connectionsCollection.Find(context.TODO(), bson.M{"is_active": true, "last_ping": bson.M{"$lt": lastAllowedTime}})
if err != nil {
// logger.Error("killRelevantConnections error: " + err.Error())
serv.Errorf("killRelevantConnections error: " + err.Error())
return connections, err
}

if err = cursor.All(context.TODO(), &connections); err != nil {
// logger.Error("killRelevantConnections error: " + err.Error())
serv.Errorf("killRelevantConnections error: " + err.Error())
return connections, err
}

Expand All @@ -55,7 +63,7 @@ func killRelevantConnections() ([]models.Connection, error) {
bson.M{"$set": bson.M{"is_active": false}},
)
if err != nil {
// logger.Error("KillConnections error: " + err.Error())
serv.Errorf("KillConnections error: " + err.Error())
return connections, err
}

Expand All @@ -73,7 +81,7 @@ func killProducersByConnections(connectionIds []primitive.ObjectID) error {
bson.M{"$set": bson.M{"is_active": false}},
)
if err != nil {
// logger.Error("killProducersByConnections error: " + err.Error())
serv.Errorf("killProducersByConnections error: " + err.Error())
return err
}

Expand All @@ -86,22 +94,7 @@ func killConsumersByConnections(connectionIds []primitive.ObjectID) error {
bson.M{"$set": bson.M{"is_active": false}},
)
if err != nil {
// logger.Error("killConsumersByConnections error: " + err.Error())
return err
}

return nil
}

func removeOldLogs() error {
retentionToInt, err := strconv.Atoi(configuration.LOGS_RETENTION_IN_DAYS)
if err != nil {
return err
}
retentionDaysToHours := 24 * retentionToInt
filter := bson.M{"creation_date": bson.M{"$lte": (time.Now().Add(-(time.Hour * time.Duration(retentionDaysToHours))))}}
_, err = sysLogsCollection.DeleteMany(context.TODO(), filter)
if err != nil {
serv.Errorf("killConsumersByConnections error: " + err.Error())
return err
}

Expand All @@ -122,7 +115,7 @@ func KillZombieResources(wg *sync.WaitGroup) {
for range time.Tick(time.Second * 30) {
connections, err := killRelevantConnections()
if err != nil {
// logger.Error("KillZombieResources error: " + err.Error())
serv.Errorf("KillZombieResources error: " + err.Error())
} else if len(connections) > 0 {
var connectionIds []primitive.ObjectID
for _, con := range connections {
Expand All @@ -131,23 +124,22 @@ func KillZombieResources(wg *sync.WaitGroup) {

err = killProducersByConnections(connectionIds)
if err != nil {
// logger.Error("KillZombieResources error: " + err.Error())
serv.Errorf("KillZombieResources error: " + err.Error())
}

err = killConsumersByConnections(connectionIds)
if err != nil {
// logger.Error("KillZombieResources error: " + err.Error())
serv.Errorf("KillZombieResources error: " + err.Error())
}
}

err = removeOldLogs()
if err != nil {
// logger.Error("KillZombieResources error: " + err.Error())
serv.Errorf("KillZombieResources error: " + err.Error())
}

err = removeOldPoisonMsgs()
if err != nil {
// logger.Error("KillZombieResources error: " + err.Error())
serv.Errorf("KillZombieResources error: " + err.Error())
}
}

Expand Down
31 changes: 15 additions & 16 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ package db

import (
"memphis-broker/conf"
"memphis-broker/server"

"context"
"log"
"time"

"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

var configuration = conf.GetConfig()
var logger = log.Default()
var serv *server.Server

const (
dbOperationTimeout = 20
)

func initializeDbConnection() (*mongo.Client, context.Context, context.CancelFunc) {
func InitializeDbConnection(s *server.Server) error {
ctx, cancel := context.WithTimeout(context.TODO(), dbOperationTimeout*time.Second)

var clientOptions *options.ClientOptions
Expand All @@ -48,33 +48,32 @@ func initializeDbConnection() (*mongo.Client, context.Context, context.CancelFun

client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
logger.Print("[Error] Failed to create Mongodb client: " + err.Error())
panic("Failed to create Mongodb client: " + err.Error())
return err
}

err = client.Ping(ctx, nil)
if err != nil {
logger.Print("[Error] Failed to create Mongo DB client: " + err.Error())
panic("Failed to create Mongo DB client: " + err.Error())
return err
}

// logger.Print("[INFO] Established connection with the DB")
return client, ctx, cancel
s.DbClient = client
s.DbCtx = ctx
s.DbCancel = cancel
serv = s
s.Noticef("Established connection with the DB")
return nil
}

func GetCollection(collectionName string) *mongo.Collection {
var collection *mongo.Collection = Client.Database(configuration.DB_NAME).Collection(collectionName)
var collection *mongo.Collection = serv.DbClient.Database(configuration.DB_NAME).Collection(collectionName)
return collection
}

func Close() {
defer Cancel()
defer serv.DbCancel()
defer func() {
if err := Client.Disconnect(Ctx); err != nil {
logger.Print("[Error] Failed to close Mongodb client: " + err.Error())
panic("Failed to close Mongodb client: " + err.Error())
if err := serv.DbClient.Disconnect(serv.DbCtx); err != nil {
serv.Errorf("Failed to close Mongodb client: " + err.Error())
}
}()
}

var Client, Ctx, Cancel = initializeDbConnection()