Skip to content

Commit

Permalink
Merge pull request #1453 from memphisdev/bugfix/RND-261-consumer-conf…
Browse files Browse the repository at this point in the history
…ig-replica-count-exceeds-parent-stream

bugfix/RND-261-consumer-config-replica-count-exceeds-parent-stream
  • Loading branch information
shay23b committed Nov 28, 2023
2 parents d5869d0 + 73717c8 commit f00a73d
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
4 changes: 2 additions & 2 deletions server/memphis_handlers_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,10 +612,10 @@ func (mh MonitoringHandler) GetStationOverviewData(c *gin.Context) {
messages, err = stationsHandler.GetMessagesFromPartition(station, fmt.Sprintf("%v$%v", stationName.Intern(), body.PartitionNumber), messagesToFetch, body.PartitionNumber)
if err != nil {
if IsNatsErr(err, JSStreamNotFoundErr) {
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessages: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName)
serv.Warnf("[tenant: %v][user: %v]GetStationOverviewData at GetMessagesFromPartition: nats error At station %v: does not exist", user.TenantName, user.Username, body.StationName)
c.AbortWithStatusJSON(SHOWABLE_ERROR_STATUS_CODE, gin.H{"message": "Station " + body.StationName + " does not exist"})
} else {
serv.Errorf("GetStationOverviewData at GetMessages: At station " + body.StationName + ": " + err.Error())
serv.Errorf("GetStationOverviewData at GetMessagesFromPartition: At station " + body.StationName + ": " + err.Error())
c.AbortWithStatusJSON(500, gin.H{"message": "Server error"})
}
return
Expand Down
10 changes: 6 additions & 4 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,15 +1085,18 @@ func (s *Server) GetMessagesFromPartition(station models.Station, streamName str
if !station.IsNative {
filterSubj = ""
}

replicas := 1
if streamInfo.Config.Retention == InterestPolicy {
replicas = streamInfo.Config.Replicas
}
msgs, err := s.memphisGetMsgs(station.TenantName, filterSubj,
streamName,
startSequence,
messagesToFetch,
5*time.Second,
true,
station.RetentionType == "ack_based",
station.Replicas,
replicas,
)

if err != nil {
Expand Down Expand Up @@ -1180,15 +1183,14 @@ func getHdrLastIdxFromRaw(msg []byte) int {
func (s *Server) memphisGetMsgs(tenantName, filterSubj, streamName string, startSeq uint64, amount int, timeout time.Duration, findHeader, isAckBasedStation bool, consumerReplicas int) ([]StoredMsg, error) {
uid, _ := uuid.NewV4()
durableName := "$memphis_fetch_messages_consumer_" + uid.String()
replicas := GetStationReplicas(1)

cc := ConsumerConfig{
FilterSubject: filterSubj,
OptStartSeq: startSeq,
DeliverPolicy: DeliverByStartSequence,
Durable: durableName,
AckPolicy: AckExplicit,
Replicas: replicas,
Replicas: consumerReplicas,
}

err := s.memphisAddConsumer(tenantName, streamName, &cc)
Expand Down

0 comments on commit f00a73d

Please sign in to comment.