Skip to content
This repository has been archived by the owner on Jun 9, 2019. It is now read-only.

Commit

Permalink
Move to time deltas for time based subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
derekcollison committed Feb 21, 2016
1 parent be55c32 commit 26e69c3
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 55 deletions.
2 changes: 1 addition & 1 deletion TODO.md
@@ -1,7 +1,7 @@

- [ ] Retry limits?
- [ ] Server Store Limits (time, msgs, byte)
- [ ] Change time to deltas
- [X] Change time to deltas
- [X] Server heartbeat, release dead clients.
- [X] Require clientID for published messages, error if not registered.
- [X] Check for need of ackMap (out of order re-delivery to queue subscribers).
Expand Down
56 changes: 28 additions & 28 deletions protocol.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 15 additions & 15 deletions protocol.proto
Expand Up @@ -68,25 +68,25 @@ message ConnectResponse {

// Enum for start position type.
enum StartPosition {
NewOnly = 0;
LastReceived = 1;
TimeStart = 2;
SequenceStart = 3;
First = 4;
NewOnly = 0;
LastReceived = 1;
TimeDeltaStart = 2;
SequenceStart = 3;
First = 4;
}

// Protocol for a client to subscribe
message SubscriptionRequest {
string clientID = 1; // ClientID
string subject = 2; // Formal subject to subscribe to, e.g. foo.bar
string qGroup = 3; // Optional queue group
string inbox = 4; // Inbox subject to deliver messages on
int32 maxInFlight = 5; // Maximum inflight messages without an ack allowed
int32 ackWaitInSecs = 6; // Timeout for receiving an ack from the client
string durableName = 7; // Optional durable name which survives client restarts
StartPosition startPosition = 10; // Start position
uint64 startSequence = 11; // Optional start sequence number
int64 startTime = 12; // Optional start time
string clientID = 1; // ClientID
string subject = 2; // Formal subject to subscribe to, e.g. foo.bar
string qGroup = 3; // Optional queue group
string inbox = 4; // Inbox subject to deliver messages on
int32 maxInFlight = 5; // Maximum inflight messages without an ack allowed
int32 ackWaitInSecs = 6; // Timeout for receiving an ack from the client
string durableName = 7; // Optional durable name which survives client restarts
StartPosition startPosition = 10; // Start position
uint64 startSequence = 11; // Optional start sequence number
int64 startTimeDelta = 12; // Optional start time
}

// Response for SubscriptionRequest and UnsubscribeRequests
Expand Down
9 changes: 5 additions & 4 deletions server.go
Expand Up @@ -1010,8 +1010,9 @@ func (s *stanServer) processSubscriptionRequest(m *nats.Msg) {
}
}
// Check for SequenceTime out of range
if sr.StartPosition == StartPosition_TimeStart {
if !s.startTimeValid(sr.Subject, sr.StartTime) {
if sr.StartPosition == StartPosition_TimeDeltaStart {
startTime := time.Now().UnixNano() - sr.StartTimeDelta
if !s.startTimeValid(sr.Subject, startTime) {
s.sendSubscriptionResponseErr(m.Reply, ErrInvalidTime)
return
}
Expand Down Expand Up @@ -1063,8 +1064,8 @@ func (s *stanServer) processSubscriptionRequest(m *nats.Msg) {
s.sendNewOnly(cs, sub)
case StartPosition_LastReceived:
s.sendLastMessage(cs, sub)
case StartPosition_TimeStart:
s.sendMessagesToSubFromTime(cs, sub, sr.StartTime)
case StartPosition_TimeDeltaStart:
s.sendMessagesToSubFromTime(cs, sub, time.Now().UnixNano()-sr.StartTimeDelta)
case StartPosition_SequenceStart:
s.sendMessagesFromSequence(cs, sub, sr.StartSequence)
case StartPosition_First:
Expand Down
21 changes: 18 additions & 3 deletions stan_test.go
Expand Up @@ -527,15 +527,17 @@ func TestSubscriptionStartAtTime(t *testing.T) {
if err != nil {
t.Fatalf("Expected to connect correctly, got err %v\n", err)
}
// Publish ten messages all together

// Publish first 5
for i := 1; i <= 5; i++ {
data := []byte(fmt.Sprintf("%d", i))
sc.Publish("foo", data)
}
time.Sleep(200 * time.Millisecond)

// Buffer each side so slow tests still work.
time.Sleep(250 * time.Millisecond)
startTime := time.Now()
time.Sleep(250 * time.Millisecond)

// Publish last 5
for i := 6; i <= 10; i++ {
Expand Down Expand Up @@ -572,7 +574,7 @@ func TestSubscriptionStartAtTime(t *testing.T) {

// Check for sub setup
rsub := sub.(*subscription)
if rsub.opts.StartAt != StartPosition_TimeStart {
if rsub.opts.StartAt != StartPosition_TimeDeltaStart {
t.Fatalf("Incorrect StartAt state: %s\n", rsub.opts.StartAt)
}

Expand All @@ -598,6 +600,19 @@ func TestSubscriptionStartAtTime(t *testing.T) {
}
seq++
}

// Now test Ago helper
delta := time.Now().Sub(startTime)

sub, err = sc.Subscribe("foo", mcb, StartAtTimeDelta(delta))
if err != nil {
t.Fatalf("Expected no error on Subscribe, got %v\n", err)
}
defer sub.Unsubscribe()

if err := Wait(ch); err != nil {
t.Fatal("Did not receive our messages")
}
}

func TestSubscriptionStartAtFirst(t *testing.T) {
Expand Down
17 changes: 13 additions & 4 deletions sub.go
Expand Up @@ -108,15 +108,24 @@ func StartAtSequence(seq uint64) SubscriptionOption {
}
}

// StartTime sets the desired start time position and state.
// StartAtTime sets the desired start time position and state.
func StartAtTime(start time.Time) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartAt = StartPosition_TimeStart
o.StartAt = StartPosition_TimeDeltaStart
o.StartTime = start
return nil
}
}

// StartAtTimeDelta sets the desired start time position and state using the delta.
func StartAtTimeDelta(ago time.Duration) SubscriptionOption {
return func(o *SubscriptionOptions) error {
o.StartAt = StartPosition_TimeDeltaStart
o.StartTime = time.Now().Add(-ago)
return nil
}
}

// StartWithLastReceived is a helper function to set start position to last received.
func StartWithLastReceived() SubscriptionOption {
return func(o *SubscriptionOptions) error {
Expand Down Expand Up @@ -204,8 +213,8 @@ func (sc *conn) subscribe(subject, qgroup string, cb MsgHandler, options ...Subs

// Conditionals
switch sr.StartPosition {
case StartPosition_TimeStart:
sr.StartTime = sub.opts.StartTime.UnixNano()
case StartPosition_TimeDeltaStart:
sr.StartTimeDelta = time.Now().UnixNano() - sub.opts.StartTime.UnixNano()
case StartPosition_SequenceStart:
sr.StartSequence = sub.opts.StartSequence
}
Expand Down

0 comments on commit 26e69c3

Please sign in to comment.