Skip to content

Commit

Permalink
bugfix - consumer creation
Browse files Browse the repository at this point in the history
  • Loading branch information
idanasulin2706 committed Jan 23, 2024
1 parent 9fca732 commit f7f089a
Showing 1 changed file with 30 additions and 12 deletions.
42 changes: 30 additions & 12 deletions server/memphis_handlers_consumers.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,22 +201,24 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam
}
}

var newConsumer models.Consumer
newConsumer := models.Consumer{
Name: name,
StationId: station.ID,
ConnectionId: connectionId,
ConsumersGroup: consumerGroup,
MaxAckTimeMs: int64(maxAckTime),
MaxMsgDeliveries: maxMsgDeliveries,
StartConsumeFromSeq: startConsumeFromSequence,
LastMessages: lastMessages,
TenantName: tenantName,
PartitionsList: station.PartitionsList,
}
if strings.HasPrefix(user.Username, "$") {
newConsumer, err = db.InsertNewConsumer(name, station.ID, "connector", connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId)
if err != nil {
serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
newConsumer.Type = "connector"
} else {
newConsumer, err = db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId)
if err != nil {
serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
newConsumer.Type = consumerType
}

message := "Consumer " + name + " connected"
if consumerGroupExist {
if requestVersion == 1 {
if newConsumer.StartConsumeFromSeq != consumerFromGroup.StartConsumeFromSeq || newConsumer.LastMessages != consumerFromGroup.LastMessages {
Expand Down Expand Up @@ -260,7 +262,23 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam
return []int{}, err
}
}

if strings.HasPrefix(user.Username, "$") {
newConsumer, err = db.InsertNewConsumer(name, station.ID, "connector", connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId)
if err != nil {
serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
} else {
newConsumer, err = db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId)
if err != nil {
serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error())
return []int{}, err
}
}

var auditLogs []interface{}
message := "Consumer " + name + " connected"
newAuditLog := models.AuditLog{
StationName: stationName.Ext(),
Message: message,
Expand Down

0 comments on commit f7f089a

Please sign in to comment.