Skip to content

Commit

Permalink
Merge pull request #1642 from memphisdev/bugfix-RND-458-consumre-last…
Browse files Browse the repository at this point in the history
…-messages-start-from-seq-not-working-as-expected

bugfix-RND-458-consumre-last-messages-start-from-seq-not-working-as-expected
  • Loading branch information
shohamroditimemphis committed Jan 18, 2024
2 parents 53f0515 + 00d36d3 commit 8b3d30c
Showing 1 changed file with 67 additions and 30 deletions.
97 changes: 67 additions & 30 deletions server/memphis_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,29 +738,33 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta
}
var deliveryPolicy DeliverPolicy
var optStartSeq uint64
lastSeqPerPartition := map[string]uint64{}
// This check for case when the last message is 0 (in case StartConsumeFromSequence > 1 the LastMessages is 0 )
if consumer.LastMessages == 0 && consumer.StartConsumeFromSeq == 1 {
deliveryPolicy = DeliverNew
} else if consumer.LastMessages > 0 {
var streamInfo *StreamInfo
if len(partitionsList) == 1 {
streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+"$1.final")
streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+"$1")
if err != nil {
return err
}
lastSeq := streamInfo.State.LastSeq
lastMessages := (lastSeq - uint64(consumer.LastMessages)) + 1
if int(lastMessages) < 1 {
lastMessages = uint64(1)
}
deliveryPolicy = DeliverByStartSequence
optStartSeq = lastMessages
} else {
streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+".final")
if err != nil {
return err
for _, pl := range partitionsList {
streamInfo, err = serv.memphisStreamInfo(tenantName, stationName.Intern()+"$"+strconv.Itoa(pl))
if err != nil {
return err
}
lastSeqPerPartition[stationName.Intern()+"$"+strconv.Itoa(pl)] = streamInfo.State.LastSeq
}
}
lastSeq := streamInfo.State.LastSeq
lastMessages := (lastSeq - uint64(consumer.LastMessages)) + 1
if int(lastMessages) < 1 {
lastMessages = uint64(1)
}
deliveryPolicy = DeliverByStartSequence
optStartSeq = lastMessages
} else if consumer.StartConsumeFromSeq > 1 {
deliveryPolicy = DeliverByStartSequence
optStartSeq = consumer.StartConsumeFromSeq
Expand All @@ -787,26 +791,59 @@ func (s *Server) CreateConsumer(tenantName string, consumer models.Consumer, sta
err = s.memphisAddConsumer(tenantName, stationName.Intern(), consumerConfig)
return err
} else {
for _, pl := range partitionsList {
consumerConfig := &ConsumerConfig{
Durable: consumerName,
DeliverPolicy: deliveryPolicy,
AckPolicy: AckExplicit,
AckWait: time.Duration(maxAckTimeMs) * time.Millisecond,
MaxDeliver: MaxMsgDeliveries,
FilterSubject: stationName.Intern() + "$" + strconv.Itoa(pl) + ".final",
ReplayPolicy: ReplayInstant,
MaxAckPending: -1,
HeadersOnly: false,
// RateLimit: ,// Bits per sec
// Heartbeat: // time.Duration,
}
if deliveryPolicy == DeliverByStartSequence {
consumerConfig.OptStartSeq = optStartSeq
if consumer.LastMessages > 0 {
for k, v := range lastSeqPerPartition {
lastSeq := v
lastMessages := (lastSeq - uint64(consumer.LastMessages)) + 1
if int(lastMessages) < 1 {
lastMessages = uint64(1)
}
deliveryPolicy = DeliverByStartSequence
optStartSeq = lastMessages

consumerConfig := &ConsumerConfig{
Durable: consumerName,
DeliverPolicy: deliveryPolicy,
AckPolicy: AckExplicit,
AckWait: time.Duration(maxAckTimeMs) * time.Millisecond,
MaxDeliver: MaxMsgDeliveries,
FilterSubject: k + ".final",
ReplayPolicy: ReplayInstant,
MaxAckPending: -1,
HeadersOnly: false,
// RateLimit: ,// Bits per sec
// Heartbeat: // time.Duration,
}
if deliveryPolicy == DeliverByStartSequence {
consumerConfig.OptStartSeq = optStartSeq
}
err = s.memphisAddConsumer(tenantName, k, consumerConfig)
if err != nil {
return err
}
}
err = s.memphisAddConsumer(tenantName, stationName.Intern()+"$"+strconv.Itoa(pl), consumerConfig)
if err != nil {
return err
} else {
for _, pl := range partitionsList {
consumerConfig := &ConsumerConfig{
Durable: consumerName,
DeliverPolicy: deliveryPolicy,
AckPolicy: AckExplicit,
AckWait: time.Duration(maxAckTimeMs) * time.Millisecond,
MaxDeliver: MaxMsgDeliveries,
FilterSubject: stationName.Intern() + "$" + strconv.Itoa(pl) + ".final",
ReplayPolicy: ReplayInstant,
MaxAckPending: -1,
HeadersOnly: false,
// RateLimit: ,// Bits per sec
// Heartbeat: // time.Duration,
}
if deliveryPolicy == DeliverByStartSequence {
consumerConfig.OptStartSeq = optStartSeq
}
err = s.memphisAddConsumer(tenantName, stationName.Intern()+"$"+strconv.Itoa(pl), consumerConfig)
if err != nil {
return err
}
}
}
}
Expand Down

0 comments on commit 8b3d30c

Please sign in to comment.