From fa8b6370624f4b39117c3bd927485d9fecb003f7 Mon Sep 17 00:00:00 2001 From: Corey Hulen Date: Wed, 1 Mar 2017 12:07:09 -0500 Subject: [PATCH] Optimizing weh_hub.Start() for 3.6 (#5579) * Optimizing weh_hub.Start() for 3.6 * Optimizing weh_hub.Start() for 3.6 * Optimizing weh_hub.Start() for 3.6 * Removing config * Removing config --- api/web_conn.go | 25 +++++++++++++++++-------- api/web_hub.go | 41 ++++++++++++++++++++++++----------------- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/api/web_conn.go b/api/web_conn.go index 8bb4e10bb1097..e54080ebfff67 100644 --- a/api/web_conn.go +++ b/api/web_conn.go @@ -29,6 +29,7 @@ type WebConn struct { Send chan model.WebSocketMessage SessionToken string SessionExpiresAt int64 + Session *model.Session UserId string T goi18n.TranslateFunc Locale string @@ -148,6 +149,7 @@ func (webCon *WebConn) InvalidateCache() { webCon.AllChannelMembers = nil webCon.LastAllChannelMembersTime = 0 webCon.SessionExpiresAt = 0 + webCon.Session = nil } func (webCon *WebConn) isAuthenticated() bool { @@ -161,11 +163,13 @@ func (webCon *WebConn) isAuthenticated() bool { if session == nil || session.IsExpired() { webCon.SessionToken = "" webCon.SessionExpiresAt = 0 + webCon.Session = nil return false } webCon.SessionToken = session.Token webCon.SessionExpiresAt = session.ExpiresAt + webCon.Session = session } return true @@ -230,16 +234,21 @@ func (webCon *WebConn) ShouldSendEvent(msg *model.WebSocketEvent) bool { } func (webCon *WebConn) IsMemberOfTeam(teamId string) bool { - session := GetSession(webCon.SessionToken) - if session == nil { - return false - } else { - member := session.GetTeamByTeamId(teamId) - if member != nil { - return true - } else { + if webCon.Session == nil { + session := GetSession(webCon.SessionToken) + if session == nil { return false + } else { + webCon.Session = session } } + + member := webCon.Session.GetTeamByTeamId(teamId) + + if member != nil { + return true + } else { + return false + } } diff --git a/api/web_hub.go b/api/web_hub.go index cf4f71ae4dea4..4538e4963a364 100644 --- a/api/web_hub.go +++ b/api/web_hub.go @@ -89,6 +89,15 @@ func HubUnregister(webConn *WebConn) { } func Publish(message *model.WebSocketEvent) { + + if SkipTypingMessage(message) { + if metrics := einterfaces.GetMetricsInterface(); metrics != nil { + metrics.IncrementWebsocketEvent(message.Event + "_skipped") + } + + return + } + if metrics := einterfaces.GetMetricsInterface(); metrics != nil { metrics.IncrementWebsocketEvent(message.Event) } @@ -266,20 +275,18 @@ func (h *Hub) Start() { } case msg := <-h.broadcast: - if OkToSendTypingMessage(msg) { - for _, webCon := range h.connections { - if webCon.ShouldSendEvent(msg) { - select { - case webCon.Send <- msg: - default: - l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId)) - close(webCon.Send) - for i, webConCandidate := range h.connections { - if webConCandidate == webCon { - h.connections[i] = h.connections[len(h.connections)-1] - h.connections = h.connections[:len(h.connections)-1] - break - } + for _, webCon := range h.connections { + if webCon.ShouldSendEvent(msg) { + select { + case webCon.Send <- msg: + default: + l4g.Error(fmt.Sprintf("webhub.broadcast: cannot send, closing websocket for userId=%v", webCon.UserId)) + close(webCon.Send) + for i, webConCandidate := range h.connections { + if webConCandidate == webCon { + h.connections[i] = h.connections[len(h.connections)-1] + h.connections = h.connections[:len(h.connections)-1] + break } } } @@ -319,13 +326,13 @@ func (h *Hub) Start() { go doRecoverableStart() } -func OkToSendTypingMessage(msg *model.WebSocketEvent) bool { +func SkipTypingMessage(msg *model.WebSocketEvent) bool { // Only broadcast typing messages if less than 1K people in channel if msg.Event == model.WEBSOCKET_EVENT_TYPING { if Srv.Store.Channel().GetMemberCountFromCache(msg.Broadcast.ChannelId) > *utils.Cfg.TeamSettings.MaxNotificationsPerChannel { - return false + return true } } - return true + return false }