Skip to content

Commit

Permalink
Consolidate the 2 functions into 1 and add specific test
Browse files Browse the repository at this point in the history
There was actually a bug in the close tracing whereby a Close()
on the last member of a non durable queue subscription would be
traced as a "Suspended" instead of "Removed".
  • Loading branch information
kozlovic committed Jul 31, 2017
1 parent 1eecb3d commit 6da4c2d
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 62 deletions.
105 changes: 43 additions & 62 deletions server/server.go
Expand Up @@ -543,9 +543,7 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {

sub.Lock()
subject := sub.subject
inbox := sub.Inbox
clientID := sub.ClientID
durName := sub.DurableName
sub.removed = true
sub.clearAckTimer()
durableKey := ""
Expand Down Expand Up @@ -732,46 +730,11 @@ func (ss *subStore) Remove(c *channel, sub *subState, unsubscribe bool) {
}

if log != nil {
traceCloseOrUnsubscribeRequest(log, clientID, subject, inbox, qgroup, durName, subid,
queueGroupIsEmpty, isDurable, unsubscribe)
strace := subTrace{clientID: clientID, isRemove: true, isUnsubscribe: unsubscribe, isGroupEmpty: queueGroupIsEmpty}
traceSubStartCloseOrUnsubscribe(log, sub, &strace)
}
}

func traceCloseOrUnsubscribeRequest(log logger.Logger, clientID, subject, inbox, qgroup, durName string,
subID uint64, queueGroupIsEmpty, isDurable, unsubscribe bool) {

var (
action string
durable string
specific string
)
if isDurable {
durable = "durable "
}
if qgroup != "" {
if queueGroupIsEmpty {
if unsubscribe {
action = fmt.Sprintf("Removed %squeue ", durable)
} else {
action = fmt.Sprintf("Suspended %squeue ", durable)
}
} else {
action = fmt.Sprintf("Removed member from %squeue ", durable)
}
specific = fmt.Sprintf(" queue=%s,", qgroup)
} else {
if unsubscribe || !isDurable {
action = fmt.Sprintf("Removed %s", durable)
} else {
action = "Suspended durable "
}
if isDurable {
specific = fmt.Sprintf(" durable=%s,", durName)
}
}
log.Debugf("[Client:%s] %ssubscription, subject=%s, inbox=%s,%s subid=%d", clientID, action, subject, inbox, specific, subID)
}

// Lookup by durable name.
func (ss *subStore) LookupByDurable(durableName string) *subState {
ss.RLock()
Expand Down Expand Up @@ -3251,7 +3214,8 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
return
}
if s.debug {
traceSubscriptionRequest(s.log, sr, subIsNew, isDurable, subStartTrace, sub.ID)
strace := subTrace{clientID: sr.ClientID, isNew: subIsNew, startTrace: subStartTrace}
traceSubStartCloseOrUnsubscribe(s.log, sub, &strace)
}

s.monMu.Lock()
Expand Down Expand Up @@ -3293,43 +3257,60 @@ func (s *StanServer) processSubscriptionRequest(m *nats.Msg) {
s.subStartCh <- &subStartInfo{c: c, sub: sub, qs: qs, isDurable: isDurable}
}

func traceSubscriptionRequest(log logger.Logger, sr *pb.SubscriptionRequest,
subIsNew, isDurable bool, subStartTrace string, subID uint64) {
type subTrace struct {
clientID string
isRemove bool
isNew bool
isUnsubscribe bool
isGroupEmpty bool
startTrace string
}

func traceSubStartCloseOrUnsubscribe(log logger.Logger, sub *subState, trace *subTrace) {

This comment has been minimized.

Copy link
@petemiron

petemiron Jul 31, 2017

Contributor

LGTM. Only comment is maybe traceSubStateChange or just traceSubState.

sub.RLock()
defer sub.RUnlock()
var (
action string
specific string
durable string
sending string
queue string
prefix string
)
if isDurable {
if sub.IsDurable {
durable = "durable "
}
if sr.QGroup != "" {
// subStartTrace not empty means that subIsNew is true and that this is
// the very first member creating the queue subscription.
if subStartTrace != "" {
action = fmt.Sprintf("Started new %squeue ", durable)
} else if subIsNew {
action = fmt.Sprintf("Added member to %squeue ", durable)
if sub.QGroup != "" {
queue = "queue "
}
if trace.isRemove {
if (trace.isUnsubscribe || !sub.IsDurable) && (sub.QGroup == "" || trace.isGroupEmpty) {
prefix = "Removed"
} else if sub.QGroup != "" && !trace.isGroupEmpty {
prefix = "Removed member from"
} else {
action = fmt.Sprintf("Resumed %squeue ", durable)
prefix = "Suspended"
}
specific = fmt.Sprintf(" queue=%s,", sr.QGroup)
} else {
if subIsNew {
action = "Started new " + durable
} else if isDurable {
action = "Resumed durable "
}
if sr.DurableName != "" {
specific = fmt.Sprintf(" durable=%s,", sr.DurableName)
if trace.startTrace != "" {
prefix = "Started new"
} else if sub.QGroup != "" && trace.isNew {
prefix = "Added member to"
} else if sub.IsDurable {
prefix = "Resumed"
}
}
if subStartTrace != "" {
subStartTrace = ", sending " + subStartTrace
action = fmt.Sprintf("%s %s%s", prefix, durable, queue)
if sub.QGroup != "" {
specific = fmt.Sprintf(" queue=%s,", sub.QGroup)
} else if sub.IsDurable {
specific = fmt.Sprintf(" durable=%s,", sub.DurableName)
}
if !trace.isRemove && trace.startTrace != "" {
sending = ", sending " + trace.startTrace
}
log.Debugf("[Client:%s] %ssubscription, subject=%s, inbox=%s,%s subid=%d%s",
sr.ClientID, action, sr.Subject, sr.Inbox, specific, subID, subStartTrace)
trace.clientID, action, sub.subject, sub.Inbox, specific, sub.ID, sending)
}

// createAckInboxAndSubject returns an AckInbox.
Expand Down
194 changes: 194 additions & 0 deletions server/server_test.go
Expand Up @@ -7105,3 +7105,197 @@ func TestAckForUnknownChannel(t *testing.T) {
t.Fatalf("Server did not log error about not finding channel")
}
}

func TestTraceSubCreateCloseUnsubscribeRequests(t *testing.T) {
// This logger captures any log.XXX() call into a single logger.msg string.
// Will use that to compare the expected debug trace.
logger := &dummyLogger{}
opts := GetDefaultOptions()
opts.CustomLogger = logger
s, err := RunServerWithOpts(opts, nil)
if err != nil {
t.Fatalf("Error running server: %v", err)
}
defer s.Shutdown()

sc := NewDefaultConnection(t)
defer sc.Close()

type optAndText struct {
opt stan.SubscriptionOption
txt string
suffix bool
}
subOpts := []optAndText{
optAndText{stan.StartAt(pb.StartPosition_NewOnly), "new-only, seq=1", true},
optAndText{stan.StartWithLastReceived(), "last message, seq=1", true},
optAndText{stan.StartAtSequence(10), "from sequence, asked_seq=10 actual_seq=1", true},
optAndText{stan.StartAt(pb.StartPosition_First), "from beginning, seq=1", true},
optAndText{stan.StartAtTimeDelta(time.Hour), "from time time=", false},
}
for _, o := range subOpts {
sub, err := sc.Subscribe("foo", func(_ *stan.Msg) {}, o.opt)
if err != nil {
t.Fatalf("Error on subscribe: %v", err)
}
waitForNumSubs(t, s, clientName, 1)
logger.Lock()
msg := logger.msg
logger.Unlock()
if o.suffix {
if !strings.HasSuffix(msg, o.txt) {
t.Fatalf("Execpected suffix %q, got %q", o.txt, msg)
}
} else {
if !strings.Contains(msg, o.txt) {
t.Fatalf("Execpected to contain %q, got %q", o.txt, msg)
}
if !strings.HasSuffix(msg, "seq=1") {
t.Fatalf("Execpected suffix \"seq=1\", got %q", msg)
}
}
if err := sub.Unsubscribe(); err != nil {
t.Fatalf("Error on unsubscribe: %v", err)
}
}
checkTrace := func(trace string) {
logger.Lock()
msg := logger.msg
logger.Unlock()
trace = "[Client:me] " + trace
if !strings.Contains(msg, trace) {
stackFatalf(t, "Expected trace %q, got %q", trace, msg)
}
}
type startSub struct {
start func() (stan.Subscription, error)
startTrace string
end func(sub stan.Subscription) error
endTrace string
}
ssubs := []startSub{
// New plain subscription followed by Unsubscribe should remove the subscription
startSub{
start: func() (stan.Subscription, error) { return sc.Subscribe("foo", func(_ *stan.Msg) {}) },
startTrace: "Started new subscription",
end: func(sub stan.Subscription) error { return sub.Unsubscribe() },
endTrace: "Removed subscription",
},
// New plain subscription followed by Close should remove the subscription
startSub{
start: func() (stan.Subscription, error) { return sc.Subscribe("foo", func(_ *stan.Msg) {}) },
startTrace: "Started new subscription",
end: func(sub stan.Subscription) error { return sub.Close() },
endTrace: "Removed subscription",
},
// New durable subscription followed by Close should suspend the subscription
startSub{
start: func() (stan.Subscription, error) {
return sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
startTrace: "Started new durable subscription",
end: func(sub stan.Subscription) error { return sub.Close() },
endTrace: "Suspended durable subscription",
},
// Resuming the durable subscription, followed by Unsubscribe should removed the subscription
startSub{
start: func() (stan.Subscription, error) {
return sc.Subscribe("foo", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
startTrace: "Resumed durable subscription",
end: func(sub stan.Subscription) error { return sub.Unsubscribe() },
endTrace: "Removed durable subscription",
},
// Non durable queue subscribption
startSub{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}) },
startTrace: "Started new queue subscription",
end: func(sub stan.Subscription) error { return nil }, endTrace: "",
},
// Adding a member followed by Unsubscribe should simply remove this member.
startSub{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}) },
startTrace: "Added member to queue subscription",
end: func(sub stan.Subscription) error { return sub.Unsubscribe() },
endTrace: "Removed member from queue subscription",
},
// Adding a member followed by Close should simply remove this member.
startSub{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}) },
startTrace: "Added member to queue subscription",
end: func(sub stan.Subscription) error { return sub.Close() },
endTrace: "Removed member from queue subscription",
},
// New queue subscription followed by Unsubscribe should remove the queue subscription
startSub{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue2", func(_ *stan.Msg) {}) },
startTrace: "Started new queue subscription",
end: func(sub stan.Subscription) error { return sub.Unsubscribe() },
endTrace: "Removed queue subscription",
},
// New queue subscription followed by Close should remove the queue subscription
startSub{
start: func() (stan.Subscription, error) { return sc.QueueSubscribe("foo", "queue2", func(_ *stan.Msg) {}) },
startTrace: "Started new queue subscription",
end: func(sub stan.Subscription) error { return sub.Close() },
endTrace: "Removed queue subscription",
},
// New durable queue subscription followed by Close should suspend the subscription
startSub{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
startTrace: "Started new durable queue subscription",
end: func(sub stan.Subscription) error { return sub.Close() },
endTrace: "Suspended durable queue subscription",
},
// Resuming durable queue subscription
startSub{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
startTrace: "Resumed durable queue subscription",
end: func(sub stan.Subscription) error { return nil }, endTrace: "",
},
// Adding a member followed by Close should remove this member only
startSub{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
startTrace: "Added member to durable queue subscription",
end: func(sub stan.Subscription) error { return sub.Close() },
endTrace: "Removed member from durable queue subscription",
},
// Adding a member followed by Unsubscribe should remove this member only
startSub{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
startTrace: "Added member to durable queue subscription",
end: func(sub stan.Subscription) error { return sub.Unsubscribe() },
endTrace: "Removed member from durable queue subscription",
},
// New durable subscription followed by Unsubscribe should remove the subscription
startSub{
start: func() (stan.Subscription, error) {
return sc.QueueSubscribe("foo", "queue2", func(_ *stan.Msg) {}, stan.DurableName("dur"))
},
startTrace: "Started new durable queue subscription",
end: func(sub stan.Subscription) error { return sub.Unsubscribe() },
endTrace: "Removed durable queue subscription",
},
}
for _, s := range ssubs {
sub, err := s.start()
if err != nil {
t.Fatalf("Error starting subscription: %v", err)
}
checkTrace(s.startTrace)
if err := s.end(sub); err != nil {
t.Fatalf("Error ending subscription: %v", err)
}
if s.endTrace != "" {
checkTrace(s.endTrace)
}
}
}

0 comments on commit 6da4c2d

Please sign in to comment.