Skip to content

Commit

Permalink
Merge pull request #227 from memphisdev/poisin-messages
Browse files Browse the repository at this point in the history
Poisin messages
  • Loading branch information
idanasulin2706 committed Jul 11, 2022
2 parents 1be5845 + 17aebcd commit a7b6425
Show file tree
Hide file tree
Showing 21 changed files with 476 additions and 80 deletions.
1 change: 0 additions & 1 deletion background_tasks/logs_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,4 @@ func ConsumeSysLogs(wg *sync.WaitGroup) {
}
}
}

}
25 changes: 25 additions & 0 deletions background_tasks/poison_messages_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2021-2022 The Memphis Authors
// Licensed under the GNU General Public License v3.0 (the “License”);
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an “AS IS” BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package background_tasks

import (
"memphis-broker/broker"
"memphis-broker/handlers"
)

var poisonMessagesHandler = handlers.PoisonMessagesHandler{}

func ListenForPoisonMessages() {
broker.QueueSubscribe("$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.>", "$memphis_poison_messages_listeners_group", poisonMessagesHandler.HandleNewMessage)
}
17 changes: 17 additions & 0 deletions background_tasks/zombie_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var connectionsCollection *mongo.Collection = db.GetCollection("connections")
var producersCollection *mongo.Collection = db.GetCollection("producers")
var consumersCollection *mongo.Collection = db.GetCollection("consumers")
var sysLogsCollection *mongo.Collection = db.GetCollection("system_logs")
var poisonMessagesCollection *mongo.Collection = db.GetCollection("poison_messages")

func killRelevantConnections() ([]models.Connection, error) {
lastAllowedTime := time.Now().Add(time.Duration(-configuration.PING_INTERVAL_MS-5000) * time.Millisecond)
Expand Down Expand Up @@ -108,6 +109,16 @@ func removeOldLogs() error {
return nil
}

func removeOldPoisonMsgs() error {
filter := bson.M{"creation_date": bson.M{"$lte": (time.Now().Add(-(time.Hour * time.Duration(configuration.POISON_MSGS_RETENTION_IN_HOURS))))}}
_, err := poisonMessagesCollection.DeleteMany(context.TODO(), filter)
if err != nil {
return err
}

return nil
}

func KillZombieResources(wg *sync.WaitGroup) {
for range time.Tick(time.Second * 30) {
connections, err := killRelevantConnections()
Expand All @@ -129,10 +140,16 @@ func KillZombieResources(wg *sync.WaitGroup) {
logger.Error("KillZombieResources error: " + err.Error())
}
}

err = removeOldLogs()
if err != nil {
logger.Error("KillZombieResources error: " + err.Error())
}

err = removeOldPoisonMsgs()
if err != nil {
logger.Error("KillZombieResources error: " + err.Error())
}
}

defer wg.Done()
Expand Down
34 changes: 32 additions & 2 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func CreateStream(station models.Station) error {
}

var dedupWindow time.Duration
if station.DedupEnabled {
if station.DedupEnabled && station.DedupWindowInMs >= 100 {
dedupWindow = time.Duration(station.DedupWindowInMs) * time.Millisecond
} else {
dedupWindow = time.Duration(100) * time.Millisecond // can not be 0
Expand Down Expand Up @@ -239,6 +239,15 @@ func CreateConsumer(consumer models.Consumer, station models.Station) error {
return nil
}

func GetCgInfo(stationName, cgName string) (*nats.ConsumerInfo, error) {
info, err := js.ConsumerInfo(stationName, cgName)
if err != nil {
return nil, nil
}

return info, nil
}

func RemoveStream(streamName string) error {
err := js.DeleteStream(streamName)
if err != nil {
Expand Down Expand Up @@ -281,6 +290,14 @@ func GetAvgMsgSizeInStation(station models.Station) (int64, error) {
return int64(streamInfo.State.Bytes / streamInfo.State.Msgs), nil
}

func GetHeaderSizeInBytes(headers nats.Header) int {
bytes := 0
for i, s := range headers {
bytes += len(s[0]) + len(i)
}
return bytes
}

func GetMessages(station models.Station, messagesToFetch int) ([]models.Message, error) {
streamInfo, err := js.StreamInfo(station.Name)
if err != nil {
Expand All @@ -304,7 +321,7 @@ func GetMessages(station models.Station, messagesToFetch int) ([]models.Message,
Message: string(msg.Data),
ProducedBy: msg.Header.Get("producedBy"),
CreationDate: metadata.Timestamp,
Size: len(msg.Subject) + len(msg.Reply) + len(msg.Data) + len(msg.Header),
Size: len(msg.Subject) + len(msg.Data) + GetHeaderSizeInBytes(msg.Header),
})
msg.Ack()
}
Expand All @@ -317,6 +334,15 @@ func GetMessages(station models.Station, messagesToFetch int) ([]models.Message,
return messages, nil
}

func GetMessage(stationName string, messageSeq uint64) (*nats.RawStreamMsg, error) {
msg, err := js.GetMsg(stationName, messageSeq)
if err != nil {
return nil, err
}

return msg, nil
}

func RemoveProducer() error {
// nothing to remove
return nil
Expand Down Expand Up @@ -386,6 +412,10 @@ func CreatePullSubscriber(stream string, durable string) (*nats.Subscription, er
return sub, nil
}

func QueueSubscribe(subject, queue_group_name string, cb func(msg *nats.Msg)) {
broker.QueueSubscribe(subject, queue_group_name, cb)
}

func IsConnectionAlive() bool {
return broker.IsConnected()
}
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Configuration struct {
GITHUB_CLIENT_ID string
GITHUB_CLIENT_SECRET string
SANDBOX_REDIRECT_URI string
POISON_MSGS_RETENTION_IN_HOURS int
}

func GetConfig() Configuration {
Expand Down
3 changes: 2 additions & 1 deletion config/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
"MAX_MESSAGE_SIZE_MB": 12,
"SHOWABLE_ERROR_STATUS_CODE": 666,
"PING_INTERVAL_MS": 60000,
"ANALYTICS_TOKEN": "+KoOqVQu+cSF+qW4owTUdoicAzzRoICI5LcM3o5CMLCyA3cqK6e1yef6r2eqrAiYwJv7Z2r3gByZlBPTAfINPN6vQpRXFaxY60FV8GYi"
"ANALYTICS_TOKEN": "+KoOqVQu+cSF+qW4owTUdoicAzzRoICI5LcM3o5CMLCyA3cqK6e1yef6r2eqrAiYwJv7Z2r3gByZlBPTAfINPN6vQpRXFaxY60FV8GYi",
"POISON_MSGS_RETENTION_IN_HOURS": 12
}
3 changes: 2 additions & 1 deletion config/docker-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@
"MAX_MESSAGE_SIZE_MB": 12,
"SHOWABLE_ERROR_STATUS_CODE": 666,
"PING_INTERVAL_MS": 60000,
"ANALYTICS_TOKEN": "+KoOqVQu+cSF+qW4owTUdoicAzzRoICI5LcM3o5CMLCyA3cqK6e1yef6r2eqrAiYwJv7Z2r3gByZlBPTAfINPN6vQpRXFaxY60FV8GYi"
"ANALYTICS_TOKEN": "+KoOqVQu+cSF+qW4owTUdoicAzzRoICI5LcM3o5CMLCyA3cqK6e1yef6r2eqrAiYwJv7Z2r3gByZlBPTAfINPN6vQpRXFaxY60FV8GYi",
"POISON_MSGS_RETENTION_IN_HOURS": 12
}
112 changes: 51 additions & 61 deletions handlers/consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,40 @@ func validateConsumerType(consumerType string) error {
return nil
}

func isConsumerGroupExist(consumerGroup string, stationId primitive.ObjectID) (bool, error) {
func isConsumerGroupExist(consumerGroup string, stationId primitive.ObjectID) (bool, models.Consumer, error) {
filter := bson.M{"consumers_group": consumerGroup, "station_id": stationId, "is_active": true}
var consumer models.Consumer
err := consumersCollection.FindOne(context.TODO(), filter).Decode(&consumer)
if err == mongo.ErrNoDocuments {
return false, nil
return false, models.Consumer{}, nil
} else if err != nil {
return false, err
return false, models.Consumer{}, err
}
return true, nil
return true, consumer, nil
}

func isConsumersWithoutGroupAndNameLikeGroupExist(consumerGroup string, stationId primitive.ObjectID) (bool, error) {
filter := bson.M{"name": consumerGroup, "consumers_group": "", "station_id": stationId, "is_active": true}
var consumer models.Consumer
err := consumersCollection.FindOne(context.TODO(), filter).Decode(&consumer)
if err == mongo.ErrNoDocuments {
return false, nil
} else if err != nil {
return false, err
func GetConsumerGroupMembers(cgName string, station models.Station) ([]models.CgMember, error) {
var consumers []models.CgMember

cursor, err := consumersCollection.Aggregate(context.TODO(), mongo.Pipeline{
bson.D{{"$match", bson.D{{"consumers_group", cgName}, {"station_id", station.ID}}}},
bson.D{{"$lookup", bson.D{{"from", "connections"}, {"localField", "connection_id"}, {"foreignField", "_id"}, {"as", "connection"}}}},
bson.D{{"$unwind", bson.D{{"path", "$connection"}, {"preserveNullAndEmptyArrays", true}}}},
bson.D{{"$project", bson.D{{"name", 1}, {"created_by_user", 1}, {"is_active", 1}, {"is_deleted", 1}, {"max_ack_time_ms", 1}, {"max_msg_deliveries", 1}, {"client_address", "$connection.client_address"}}}},
bson.D{{"$project", bson.D{{"station", 0}, {"factory", 0}, {"connection", 0}}}},
})

if err != nil {
logger.Error("GetConsumerGroupMembers error: " + err.Error())
return consumers, err
}

if err = cursor.All(context.TODO(), &consumers); err != nil {
logger.Error("GetConsumerGroupMembers error: " + err.Error())
return consumers, err
}
return true, nil

return consumers, nil
}

func (ch ConsumersHandler) CreateConsumer(c *gin.Context) {
Expand All @@ -104,6 +116,8 @@ func (ch ConsumersHandler) CreateConsumer(c *gin.Context) {
c.AbortWithStatusJSON(configuration.SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": err.Error()})
return
}
} else {
consumerGroup = name
}

consumerType := strings.ToLower(body.ConsumerType)
Expand Down Expand Up @@ -188,58 +202,34 @@ func (ch ConsumersHandler) CreateConsumer(c *gin.Context) {
return
}

var consumerGroupExist bool
if consumerGroup != "" {
exist, err = isConsumersWithoutGroupAndNameLikeGroupExist(consumerGroup, station.ID)
if err != nil {
logger.Error("CreateConsumer error: " + err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
if exist {
logger.Warn("You can not give your consumer group the same name like another active consumer on the same station")
c.AbortWithStatusJSON(configuration.SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "You can not give your consumer group the same name like another active consumer on the same station"})
return
}

consumerGroupExist, err = isConsumerGroupExist(consumerGroup, station.ID)
if err != nil {
logger.Error("CreateConsumer error: " + err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
} else {
exist, err = isConsumerGroupExist(name, station.ID)
if err != nil {
logger.Error("CreateConsumer error: " + err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}
if exist {
logger.Warn("You can not give your consumer the same name like another active consumer group name on the same station")
c.AbortWithStatusJSON(configuration.SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "You can not give your consumer the same name like another active consumer group name on the same station"})
return
}
consumerGroupExist, consumerFromGroup, err := isConsumerGroupExist(consumerGroup, station.ID)
if err != nil {
logger.Error("CreateConsumer error: " + err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}

consumerId := primitive.NewObjectID()
newConsumer := models.Consumer{
ID: consumerId,
Name: name,
StationId: station.ID,
FactoryId: station.FactoryId,
Type: consumerType,
ConnectionId: connectionId,
CreatedByUser: connection.CreatedByUser,
ConsumersGroup: consumerGroup,
MaxAckTimeMs: body.MaxAckTimeMs,
MaxMsgDeliveries: body.MaxMsgDeliveries,
IsActive: true,
CreationDate: time.Now(),
IsDeleted: false,
}

if consumerGroup == "" || !consumerGroupExist {
ID: consumerId,
Name: name,
StationId: station.ID,
FactoryId: station.FactoryId,
Type: consumerType,
ConnectionId: connectionId,
CreatedByUser: connection.CreatedByUser,
ConsumersGroup: consumerGroup,
IsActive: true,
CreationDate: time.Now(),
IsDeleted: false,
}

if consumerGroupExist {
newConsumer.MaxAckTimeMs = consumerFromGroup.MaxAckTimeMs
newConsumer.MaxMsgDeliveries = consumerFromGroup.MaxMsgDeliveries
} else {
newConsumer.MaxAckTimeMs = body.MaxAckTimeMs
newConsumer.MaxMsgDeliveries = body.MaxMsgDeliveries
broker.CreateConsumer(newConsumer, station)
if err != nil {
logger.Error("CreateConsumer error: " + err.Error())
Expand Down
6 changes: 6 additions & 0 deletions handlers/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ func removeStations(factoryId primitive.ObjectID) error {
return err
}

err = RemovePoisonMsgsByStation(station.Name)
if err != nil {
logger.Warn("removeStations error: " + err.Error())
}


err = RemoveAllAuditLogsByStation(station.Name)
if err != nil {
logger.Warn("removeStations error: " + err.Error())
Expand Down
1 change: 1 addition & 0 deletions handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var producersCollection *mongo.Collection = db.GetCollection("producers")
var consumersCollection *mongo.Collection = db.GetCollection("consumers")
var systemKeysCollection *mongo.Collection = db.GetCollection("system_keys")
var auditLogsCollection *mongo.Collection = db.GetCollection("audit_logs")
var poisonMessagesCollection *mongo.Collection = db.GetCollection("poison_messages")
var configuration = config.GetConfig()

func getUserDetailsFromMiddleware(c *gin.Context) models.User {
Expand Down
9 changes: 9 additions & 0 deletions handlers/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var stationsHandler = StationsHandler{}
var producersHandler = ProducersHandler{}
var consumersHandler = ConsumersHandler{}
var auditLogsHandler = AuditLogsHandler{}
var poisonMsgsHandler = PoisonMessagesHandler{}

func clientSetConfig() error {
var config *rest.Config
Expand Down Expand Up @@ -276,6 +277,13 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
return
}

poisonMessages, err := poisonMsgsHandler.GetPoisonMsgsByStation(station)
if err != nil {
logger.Error("GetStationOverviewData error: " + err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
return
}

response := models.StationOverviewData{
ActiveProducers: activeProducers,
KilledProducers: killedProducers,
Expand All @@ -287,6 +295,7 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
AvgMsgSize: avgMsgSize,
AuditLogs: auditLogs,
Messages: messages,
PoisonMessages: poisonMessages,
}

c.IndentedJSON(200, response)
Expand Down

0 comments on commit a7b6425

Please sign in to comment.