Skip to content

Commit

Permalink
Improved error messages in topic_proxy.
Browse files Browse the repository at this point in the history
  • Loading branch information
aforge committed Aug 25, 2022
1 parent eee19f2 commit 96b0ab3
Showing 1 changed file with 16 additions and 16 deletions.
32 changes: 16 additions & 16 deletions server/topic_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ func (t *Topic) runProxy(hub *Hub) {
join.sess.queueOut(ErrLockedReply(join, types.TimeNow()))
} else if err := globals.cluster.routeToTopicMaster(ProxyReqJoin, join, t.name, join.sess); err != nil {
// Response (ctrl message) will be handled when it's received via the proxy channel.
logs.Warn.Println("proxy topic: route join request from proxy to master failed:", err)
logs.Warn.Printf("proxy topic[%s]: route join request from proxy to master failed - %s", t.name, err)
}
if join.sess.inflightReqs != nil {
join.sess.inflightReqs.Done()
}

case msg := <-t.unreg:
if !t.handleProxyLeaveRequest(msg, killTimer) {
logs.Warn.Println("Failed to update proxy topic state for leave request", msg.sess.sid)
logs.Warn.Printf("proxy topic[%s]: failed to update proxy topic state for leave request - sid %s", t.name, msg.sess.sid)
}
if msg.init && msg.sess.inflightReqs != nil {
// If it's a client initiated request.
Expand All @@ -44,21 +44,21 @@ func (t *Topic) runProxy(hub *Hub) {
case msg := <-t.clientMsg:
// Content message intended for broadcasting to recipients
if err := globals.cluster.routeToTopicMaster(ProxyReqBroadcast, msg, t.name, msg.sess); err != nil {
logs.Warn.Println("proxy topic: route broadcast request from proxy to master failed:", err)
logs.Warn.Printf("topic proxy[%s]: route broadcast request from proxy to master failed - %s", t.name, err)
}

case msg := <-t.serverMsg:
if msg.Info != nil || msg.Pres != nil {
globals.cluster.routeToTopicIntraCluster(t.name, msg, msg.sess)
} else {
// FIXME: should something be done here?
logs.Err.Printf("ERROR!!! Unexpected server-side message in proxy topic %s", msg.describe())
logs.Err.Printf("ERROR!!! topic proxy[%s]: unexpected server-side message in proxy topic %s", t.name, msg.describe())
}

case msg := <-t.meta:
// Request to get/set topic metadata
if err := globals.cluster.routeToTopicMaster(ProxyReqMeta, msg, t.name, msg.sess); err != nil {
logs.Warn.Println("proxy topic: route meta request from proxy to master failed:", err)
logs.Warn.Printf("proxy topic[%s]: route meta request from proxy to master failed - %s", t.name, err)
}

case upd := <-t.supd:
Expand All @@ -70,7 +70,7 @@ func (t *Topic) runProxy(hub *Hub) {
// Subscribed user may not match session user. Find out who is subscribed
pssd, ok := t.sessions[upd.sess]
if !ok {
logs.Warn.Println("proxy topic: sess update request from detached session")
logs.Warn.Printf("proxy topic[%s]: sess update request from detached session - sid %s", t.name, upd.sess.sid)
continue
}
req = ProxyReqBgSession
Expand All @@ -79,7 +79,7 @@ func (t *Topic) runProxy(hub *Hub) {
tmpSess.userAgent = upd.sess.userAgent
}
if err := globals.cluster.routeToTopicMaster(req, nil, t.name, tmpSess); err != nil {
logs.Warn.Println("proxy topic: route sess update request from proxy to master failed:", err)
logs.Warn.Printf("proxy topic[%s]: route sess update request from proxy to master failed - %s", t.name, err)
}

case msg := <-t.proxy:
Expand All @@ -92,7 +92,7 @@ func (t *Topic) runProxy(hub *Hub) {
}

if err := globals.cluster.topicProxyGone(t.name); err != nil {
logs.Warn.Printf("topic proxy shutdown [%s]: failed to notify master - %s", t.name, err)
logs.Warn.Printf("proxy topic[%s] shutdown: failed to notify master - %s", t.name, err)
}

// Report completion back to sender, if 'done' is not nil.
Expand Down Expand Up @@ -122,7 +122,7 @@ func (t *Topic) handleProxyLeaveRequest(msg *ClientComMessage, killTimer *time.T
if pssd, ok := t.sessions[msg.sess]; ok {
asUid = pssd.uid
} else {
logs.Warn.Println("proxy topic: leave request sent for unknown session")
logs.Warn.Printf("proxy topic[%s]: leave request sent for unknown session", t.name)
return false
}
}
Expand Down Expand Up @@ -152,7 +152,7 @@ func (t *Topic) handleProxyLeaveRequest(msg *ClientComMessage, killTimer *time.T
}

if err := globals.cluster.routeToTopicMaster(ProxyReqLeave, msg, t.name, msg.sess); err != nil {
logs.Warn.Println("proxy topic: route leave request from proxy to master failed:", err)
logs.Warn.Printf("proxy topic[%s]: route leave request from proxy to master failed - %s", t.name, err)
}
if len(t.sessions) == 0 {
// No more sessions attached. Start the countdown.
Expand Down Expand Up @@ -185,7 +185,7 @@ func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) {
} else {
sess := globals.sessionStore.Get(msg.OrigSid)
if sess == nil {
logs.Warn.Println("topic_proxy: session not found; already terminated?", msg.OrigSid)
logs.Warn.Printf("proxy topic[%s]: session %s not found; already terminated?", t.name, msg.OrigSid)
}
switch msg.OrigReqType {
case ProxyReqJoin:
Expand Down Expand Up @@ -230,12 +230,12 @@ func (t *Topic) proxyMasterResponse(msg *ClusterResp, killTimer *time.Timer) {
}

default:
logs.Err.Printf("proxy topic [%s] received response referencing unexpected request type %d",
logs.Err.Printf("proxy topic[%s] received response referencing unexpected request type %d",
t.name, msg.OrigReqType)
}

if sess != nil && !sess.queueOut(msg.SrvMsg) {
logs.Err.Println("topic proxy: timeout")
logs.Err.Printf("proxy topic[%s]: timeout in sending response - sid %s", sess.sid)
}
}
}
Expand All @@ -259,7 +259,7 @@ func (t *Topic) proxyCtrlBroadcast(msg *ServerComMessage) {
if msg.Ctrl.Code == http.StatusResetContent && msg.Ctrl.Text == "evicted" {
// We received a ctrl command for evicting a user.
if msg.uid.IsZero() {
logs.Err.Panicf("topic[%s]: proxy received evict message with empty uid", t.name)
logs.Err.Panicf("proxy topic[%s]: proxy received evict message with empty uid", t.name)
}
for sess := range t.sessions {
// Proxy topic may only have ordinary sessions. No multiplexing or proxy sessions here.
Expand Down Expand Up @@ -287,11 +287,11 @@ func (t *Topic) updateAcsFromPresMsg(pres *MsgServerPres) {
pud := t.perUser[uid]
dacs := pres.Acs
if err := pud.modeWant.ApplyMutation(dacs.Want); err != nil {
logs.Warn.Printf("proxy topic[%s]: could not process acs change - want: %+v", t.name, err)
logs.Warn.Printf("proxy topic[%s]: could not process acs change - want: %s", t.name, err)
return
}
if err := pud.modeGiven.ApplyMutation(dacs.Given); err != nil {
logs.Warn.Printf("proxy topic[%s]: could not process acs change - given: %+v", t.name, err)
logs.Warn.Printf("proxy topic[%s]: could not process acs change - given: %s", t.name, err)
return
}
// Update existing or add new.
Expand Down

0 comments on commit 96b0ab3

Please sign in to comment.