diff --git a/server/memphis_handlers_consumers.go b/server/memphis_handlers_consumers.go index 370678925..732fc5eca 100644 --- a/server/memphis_handlers_consumers.go +++ b/server/memphis_handlers_consumers.go @@ -201,22 +201,24 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam } } - var newConsumer models.Consumer + newConsumer := models.Consumer{ + Name: name, + StationId: station.ID, + ConnectionId: connectionId, + ConsumersGroup: consumerGroup, + MaxAckTimeMs: int64(maxAckTime), + MaxMsgDeliveries: maxMsgDeliveries, + StartConsumeFromSeq: startConsumeFromSequence, + LastMessages: lastMessages, + TenantName: tenantName, + PartitionsList: station.PartitionsList, + } if strings.HasPrefix(user.Username, "$") { - newConsumer, err = db.InsertNewConsumer(name, station.ID, "connector", connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId) - if err != nil { - serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error()) - return []int{}, err - } + newConsumer.Type = "connector" } else { - newConsumer, err = db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId) - if err != nil { - serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error()) - return []int{}, err - } + newConsumer.Type = consumerType } - message := "Consumer " + name + " connected" if consumerGroupExist { if requestVersion == 1 { if newConsumer.StartConsumeFromSeq != consumerFromGroup.StartConsumeFromSeq || newConsumer.LastMessages != consumerFromGroup.LastMessages { @@ -260,7 +262,23 @@ func (s *Server) createConsumerDirectCommon(c *client, consumerName, cStationNam return []int{}, err } } + + if strings.HasPrefix(user.Username, "$") { + newConsumer, err = db.InsertNewConsumer(name, station.ID, "connector", connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId) + if err != nil { + serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error()) + return []int{}, err + } + } else { + newConsumer, err = db.InsertNewConsumer(name, station.ID, consumerType, connectionId, consumerGroup, maxAckTime, maxMsgDeliveries, startConsumeFromSequence, lastMessages, tenantName, station.PartitionsList, requestVersion, sdkName, appId) + if err != nil { + serv.Errorf("[tenant: %v]createConsumerDirectCommon at InsertNewConsumer: Consumer %v at station %v :%v", user.TenantName, consumerName, cStationName, err.Error()) + return []int{}, err + } + } + var auditLogs []interface{} + message := "Consumer " + name + " connected" newAuditLog := models.AuditLog{ StationName: stationName.Ext(), Message: message,