Skip to content

Commit

Permalink
Merge pull request #1012 from memphisdev/logs_enhancement
Browse files Browse the repository at this point in the history
finished changing the logs format to include the tenant and user name
  • Loading branch information
daniel-davidd committed Jun 21, 2023
2 parents f82869f + 208fa57 commit 9ebbe34
Show file tree
Hide file tree
Showing 18 changed files with 672 additions and 662 deletions.
42 changes: 21 additions & 21 deletions server/background_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *Server) ListenForZombieConnCheckRequests() error {
if len(connectionIds) > 0 { // in case there are connections
bytes, err := json.Marshal(connectionIds)
if err != nil {
s.Errorf("ListenForZombieConnCheckRequests: " + err.Error())
s.Errorf("ListenForZombieConnCheckRequests: %v", err.Error())
} else {
s.sendInternalAccountMsgWithReply(s.GlobalAccount(), reply, _EMPTY_, nil, bytes, true)
}
Expand All @@ -73,7 +73,7 @@ func (s *Server) ListenForIntegrationsUpdateEvents() error {
var integrationUpdate models.CreateIntegrationSchema
err := json.Unmarshal(msg, &integrationUpdate)
if err != nil {
s.Errorf("ListenForIntegrationsUpdateEvents: " + err.Error())
s.Errorf("[tenant: %v]ListenForIntegrationsUpdateEvents: %v", integrationUpdate.TenantName, err.Error())
return
}
switch strings.ToLower(integrationUpdate.Name) {
Expand All @@ -85,7 +85,7 @@ func (s *Server) ListenForIntegrationsUpdateEvents() error {
case "s3":
CacheDetails("s3", integrationUpdate.Keys, integrationUpdate.Properties, integrationUpdate.TenantName)
default:
s.Warnf("ListenForIntegrationsUpdateEvents: %s %s", strings.ToLower(integrationUpdate.Name), "unknown integration")
s.Warnf("[tenant: %v] ListenForIntegrationsUpdateEvents: %s %s", integrationUpdate.TenantName, strings.ToLower(integrationUpdate.Name), "unknown integration")
return
}
}(copyBytes(msg))
Expand All @@ -102,7 +102,7 @@ func (s *Server) ListenForConfigReloadEvents() error {
// reload config
err := s.Reload()
if err != nil {
s.Errorf("Failed reloading: " + err.Error())
s.Errorf("Failed reloading: %v", err.Error())
}
}(copyBytes(msg))
})
Expand All @@ -117,13 +117,13 @@ func (s *Server) ListenForNotificationEvents() error {
go func(msg []byte) {
tenantName, message, err := s.getTenantNameAndMessage(msg)
if err != nil {
s.Errorf("ListenForNotificationEvents: " + err.Error())
s.Errorf("[tenant: %v]ListenForNotificationEvents: %v", tenantName, err.Error())
return
}
var notification models.Notification
err = json.Unmarshal([]byte(message), &notification)
if err != nil {
s.Errorf("ListenForNotificationEvents: " + err.Error())
s.Errorf("[tenant: %v]ListenForNotificationEvents: %v", tenantName, err.Error())
return
}
notificationMsg := notification.Msg
Expand All @@ -147,13 +147,13 @@ func (s *Server) ListenForPoisonMsgAcks() error {
go func(msg []byte) {
tenantName, message, err := s.getTenantNameAndMessage(msg)
if err != nil {
s.Errorf("ListenForPoisonMsgAcks: " + err.Error())
s.Errorf("[tenant: %v]ListenForPoisonMsgAcks: %v", tenantName, err.Error())
return
}
var msgToAck models.PmAckMsg
err = json.Unmarshal([]byte(message), &msgToAck)
if err != nil {
s.Errorf("ListenForPoisonMsgAcks: " + err.Error())
s.Errorf("[tenant: %v]ListenForPoisonMsgAcks: %v", tenantName, err.Error())
return
}
err = db.RemoveCgFromDlsMsg(msgToAck.ID, msgToAck.CgName, tenantName)
Expand Down Expand Up @@ -259,7 +259,7 @@ func (s *Server) StartBackgroundTasks() error {
go s.InitializeThroughputSampling()
go s.UploadTenantUsageToDB()
go s.RefreshFirebaseFunctionsKey()

return nil
}

Expand All @@ -282,15 +282,15 @@ func (s *Server) uploadMsgsToTier2Storage() {
}
err := serv.memphisAddConsumer(globalAccountName, tieredStorageStream, &cc)
if err != nil {
serv.Errorf("Failed add tiered storage consumer: " + err.Error())
serv.Errorf("Failed add tiered storage consumer: %v", err.Error())
return
}
TIERED_STORAGE_CONSUMER_CREATED = true
}
tieredStorageMapLock.Lock()
err := flushMapToTire2Storage()
if err != nil {
serv.Errorf("Failed upload messages to tiered 2 storage: " + err.Error())
serv.Errorf("Failed upload messages to tiered 2 storage: %v", err.Error())
tieredStorageMapLock.Unlock()
continue
}
Expand Down Expand Up @@ -336,7 +336,7 @@ func (s *Server) ConsumeUnackedMsgs() {
}(subject, reply, copyBytes(msg))
})
if err != nil {
s.Errorf("Failed to subscribe to unacked messages: " + err.Error())
s.Errorf("Failed to subscribe to unacked messages: %v", err.Error())
continue
}

Expand Down Expand Up @@ -403,7 +403,7 @@ func (s *Server) ConsumeTieredStorageMsgs() {
}(subject, reply, copyBytes(msg))
})
if err != nil {
s.Errorf("Failed to subscribe to tiered storage messages: " + err.Error())
s.Errorf("Failed to subscribe to tiered storage messages: %v", err.Error())
continue
}

Expand Down Expand Up @@ -445,41 +445,41 @@ func (s *Server) ListenForSchemaverseDlsEvents() error {
go func(msg []byte) {
tenantName, stringMessage, err := s.getTenantNameAndMessage(msg)
if err != nil {
s.Errorf("ListenForNotificationEvents: " + err.Error())
s.Errorf("[tenant: %v]ListenForNotificationEvents: %v", tenantName, err.Error())
return
}
var message models.SchemaVerseDlsMessageSdk
err = json.Unmarshal([]byte(stringMessage), &message)
if err != nil {
serv.Errorf("ListenForSchemaverseDlsEvents: " + err.Error())
serv.Errorf("[tenant: %v]ListenForSchemaverseDlsEvents: %v", tenantName, err.Error())
return
}

exist, station, err := db.GetStationByName(message.StationName, tenantName)
if err != nil {
serv.Errorf("ListenForSchemaverseDlsEvents: " + err.Error())
serv.Errorf("[tenant: %v]ListenForSchemaverseDlsEvents: %v", tenantName, err.Error())
return
}
if !exist {
serv.Warnf("ListenForSchemaverseDlsEvents: station " + message.StationName + " couldn't been found")
serv.Warnf("[tenant: %v]ListenForSchemaverseDlsEvents: station %v couldn't been found", tenantName, message.StationName)
return
}

exist, p, err := db.GetProducerByNameAndConnectionID(message.Producer.Name, message.Producer.ConnectionId)
if err != nil {
serv.Errorf("ListenForSchemaverseDlsEvents: " + err.Error())
serv.Errorf("[tenant: %v]ListenForSchemaverseDlsEvents: %v", tenantName, err.Error())
return
}

if !exist {
serv.Warnf("ListenForSchemaverseDlsEvents: producer " + p.Name + " couldn't been found")
serv.Warnf("[tenant: %v]ListenForSchemaverseDlsEvents: producer %v couldn't been found", tenantName, p.Name)
return
}

message.Message.TimeSent = time.Now()
_, err = db.InsertSchemaverseDlsMsg(station.ID, 0, p.ID, []string{}, models.MessagePayload(message.Message), message.ValidationError, tenantName)
if err != nil {
serv.Errorf("ListenForSchemaverseDlsEvents: " + err.Error())
serv.Errorf("[tenant: %v]ListenForSchemaverseDlsEvents: %v", tenantName, err.Error())
return
}
}(copyBytes(msg))
Expand All @@ -497,7 +497,7 @@ func (s *Server) RemoveOldDlsMsgs() {
configurationTime := time.Now().Add(time.Hour * time.Duration(-s.opts.DlsRetentionHours))
err := db.DeleteOldDlsMessageByRetention(configurationTime)
if err != nil {
serv.Errorf("RemoveOldDlsMsgs: " + err.Error())
serv.Errorf("RemoveOldDlsMsgs: %v", err.Error())
}
}
}
Loading

0 comments on commit 9ebbe34

Please sign in to comment.