diff --git a/server/memphis_handlers_monitoring.go b/server/memphis_handlers_monitoring.go index a7d8e191c..4c760ca7a 100644 --- a/server/memphis_handlers_monitoring.go +++ b/server/memphis_handlers_monitoring.go @@ -612,10 +612,10 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) { messages, err = stationsHandler.GetMessagesFromPartition(station, fmt.Sprintf("%v$%v", stationName.Intern(), body.PartitionNumber), messagesToFetch, body.PartitionNumber) if err != nil { if IsNatsErr(err, JSStreamNotFoundErr) { - serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) + serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessagesFromPartition: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName) c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"}) } else { - serv.Errorf("GetStationOverviewData at GetMessages: At station " + body.StationName + ": " + err.Error()) + serv.Errorf("GetStationOverviewData at GetMessagesFromPartition: At station " + body.StationName + ": " + err.Error()) c.AbortWithStatusJSON(500, gin.H{"message": "Server error"}) } return diff --git a/server/memphis_helper.go b/server/memphis_helper.go index 97cb3178e..daec0d042 100644 --- a/server/memphis_helper.go +++ b/server/memphis_helper.go @@ -1085,7 +1085,10 @@ func (s *Server) GetMessagesFromPartition(station models.Station, streamName str if !station.IsNative { filterSubj = "" } - + replicas := 1 + if streamInfo.Config.Retention == InterestPolicy { + replicas = streamInfo.Config.Replicas + } msgs, err := s.memphisGetMsgs(station.TenantName, filterSubj, streamName, startSequence, @@ -1093,7 +1096,7 @@ func (s *Server) GetMessagesFromPartition(station models.Station, streamName str 5*time.Second, true, station.RetentionType == "ack_based", - station.Replicas, + replicas, ) if err != nil { @@ -1180,7 +1183,6 @@ func getHdrLastIdxFromRaw(msg []byte) int { func (s *Server) memphisGetMsgs(tenantName, filterSubj, streamName string, startSeq uint64, amount int, timeout time.Duration, findHeader, isAckBasedStation bool, consumerReplicas int) ([]StoredMsg, error) { uid, _ := uuid.NewV4() durableName := "$memphis_fetch_messages_consumer_" + uid.String() - replicas := GetStationReplicas(1) cc := ConsumerConfig{ FilterSubject: filterSubj, @@ -1188,7 +1190,7 @@ func (s *Server) memphisGetMsgs(tenantName, filterSubj, streamName string, start DeliverPolicy: DeliverByStartSequence, Durable: durableName, AckPolicy: AckExplicit, - Replicas: replicas, + Replicas: consumerReplicas, } err := s.memphisAddConsumer(tenantName, streamName, &cc)