Skip to content

Commit

Permalink
Fix handling grpX and chnX multiplexing sessions.
Browse files Browse the repository at this point in the history
1. When both grpX and chnX are attached to topic, send
   messages to grpX session only.
2. Set Original field in proxy -> master leave requests
   for correct handling of leave events.
  • Loading branch information
aforge committed Aug 23, 2022
1 parent 29279f3 commit 6e803e5
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
3 changes: 2 additions & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,10 @@ func (c *Cluster) routeToTopicMaster(reqType ProxyReqType, msg *ClientComMessage
// Cluster may be nil due to shutdown.
return nil
}
if sess != nil {
if sess != nil && reqType != ProxyReqLeave {
if atomic.LoadInt32(&sess.terminating) > 0 {
// The session is terminating.
// Do not forward any requests except "leave" to the topic master.
return nil
}
}
Expand Down
20 changes: 17 additions & 3 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,18 +1279,32 @@ func (t *Topic) broadcastToSessions(msg *ServerComMessage) {
continue
}
}
} else {
// If it's a chnX multiplexing session, check if there's a corresponding
// grpX multiplexing session as we don't want to send the message to both.
if pssd.isChanSub && types.IsChannel(sess.sid) {
grpSid := types.ChnToGrp(sess.sid)
if grpSess := globals.sessionStore.Get(grpSid); grpSess != nil && grpSess.isMultiplex() {
// If grpX multiplexing session's attached to topic, skip this chnX session
// (message will be routed to the topic proxy via the grpX session).
if _, attached := t.sessions[grpSess]; attached {
continue
}
}
}
}

// Topic name may be different depending on the user to which the `sess` belongs.
t.maybeFixTopicName(msg, pssd.uid, pssd.isChanSub)

msgCopy := msg.copy()
// Send channel messages anonymously.
if pssd.isChanSub && msg.Data != nil {
msg.Data.From = ""
if pssd.isChanSub && msgCopy.Data != nil {
msgCopy.Data.From = ""
}
// Send message to session.
// Make a copy of msg since messages sent to sessions differ.
if !sess.queueOut(msg.copy()) {
if !sess.queueOut(msgCopy) {
logs.Warn.Printf("topic[%s]: connection stuck, detaching - %s", t.name, sess.sid)
dropSessions = append(dropSessions, sess)
}
Expand Down
17 changes: 16 additions & 1 deletion server/topic_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,29 @@ func (t *Topic) handleProxyLeaveRequest(msg *ClientComMessage, killTimer *time.T
// Remove the session from the topic without waiting for a response from the master node
// because by the time the response arrives this session may be already gone from the session store
// and we won't be able to find and remove it by its sid.
_, result := t.remSession(msg.sess, asUid)
pssd, result := t.remSession(msg.sess, asUid)
if !msg.init {
// Explicitly specify the uid because the master multiplex session needs to know which
// of its multiple hosted sessions to delete.
msg.AsUser = asUid.UserId()
msg.Leave = &MsgClientLeave{}
msg.init = true
}
// Make sure we set the Original field if it's empty (e.g. when session is terminating altogether).
if msg.Original == "" {
if t.cat == types.TopicCatGrp && t.isChan {
// It's a channel topic. Original topic name depends the subscription type.
var toriginal string
if result && pssd.isChanSub {
toriginal = types.GrpToChn(t.xoriginal)
} else {
toriginal = t.xoriginal
}
msg.Original = toriginal
} else {
msg.Original = t.original(asUid)
}
}

if err := globals.cluster.routeToTopicMaster(ProxyReqLeave, msg, t.name, msg.sess); err != nil {
logs.Warn.Println("proxy topic: route leave request from proxy to master failed:", err)
Expand Down

0 comments on commit 6e803e5

Please sign in to comment.