Skip to content

Commit

Permalink
Merge 1cfd08a into b74e856
Browse files Browse the repository at this point in the history
  • Loading branch information
coreyhulen committed Apr 18, 2017
2 parents b74e856 + 1cfd08a commit af017a3
Showing 1 changed file with 79 additions and 13 deletions.
92 changes: 79 additions & 13 deletions app/web_hub.go
Expand Up @@ -8,7 +8,10 @@ import (
"hash/fnv"
"runtime"
"runtime/debug"
"strconv"
"strings"
"sync/atomic"
"time"

l4g "github.com/alecthomas/log4go"

Expand All @@ -17,37 +20,44 @@ import (
"github.com/mattermost/platform/utils"
)

const (
BROADCAST_QUEUE_SIZE = 4096
DEADLOCK_TICKER = 15 * time.Second // check every 15 seconds
DEADLOCK_WARN = (BROADCAST_QUEUE_SIZE * 99) / 100 // number of buffered messages before printing stack trace
)

type Hub struct {
connections []*WebConn
count int64
register chan *WebConn
unregister chan *WebConn
broadcast chan *model.WebSocketEvent
stop chan string
invalidateUser chan string
ExplicitStop bool
connections []*WebConn
connectionCount int64
connectionIndex int
register chan *WebConn
unregister chan *WebConn
broadcast chan *model.WebSocketEvent
stop chan string
invalidateUser chan string
ExplicitStop bool
goroutineId int
}

var hubs []*Hub = make([]*Hub, 0)
var stopCheckingForDeadlock chan bool

func NewWebHub() *Hub {
return &Hub{
register: make(chan *WebConn),
unregister: make(chan *WebConn),
connections: make([]*WebConn, 0, model.SESSION_CACHE_SIZE),
broadcast: make(chan *model.WebSocketEvent, 4096),
broadcast: make(chan *model.WebSocketEvent, BROADCAST_QUEUE_SIZE),
stop: make(chan string),
invalidateUser: make(chan string),
ExplicitStop: false,
}
}

func TotalWebsocketConnections() int {
// This is racy, but it's only used for reporting information
// so it's probably OK
count := int64(0)
for _, hub := range hubs {
count = count + atomic.LoadInt64(&hub.count)
count = count + atomic.LoadInt64(&hub.connectionCount)
}

return int(count)
Expand All @@ -61,13 +71,54 @@ func HubStart() {

for i := 0; i < len(hubs); i++ {
hubs[i] = NewWebHub()
hubs[i].connectionIndex = i
hubs[i].Start()
}

go func() {
ticker := time.NewTicker(DEADLOCK_TICKER)

defer func() {
ticker.Stop()
}()

stopCheckingForDeadlock = make(chan bool, 1)

for {
select {
case <-ticker.C:
for _, hub := range hubs {
if len(hub.broadcast) >= DEADLOCK_WARN {
l4g.Error("Hub processing might be deadlock on hub %v goroutine %v with %v events in the buffer", hub.connectionIndex, hub.goroutineId, len(hub.broadcast))
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
output := fmt.Sprintf("%s", buf)
splits := strings.Split(output, "goroutine ")

for _, part := range splits {
if strings.Index(part, fmt.Sprintf("%v", hub.goroutineId)) > -1 {
l4g.Error("Trace for possible deadlock goroutine %v", part)
}
}
}
}

case <-stopCheckingForDeadlock:
return
}
}
}()
}

func HubStop() {
l4g.Info(utils.T("api.web_hub.start.stopping.debug"))

select {
case stopCheckingForDeadlock <- true:
default:
l4g.Warn("We appear to have already sent the stop checking for deadlocks command")
}

for _, hub := range hubs {
hub.Stop()
}
Expand Down Expand Up @@ -236,6 +287,17 @@ func (h *Hub) InvalidateUser(userId string) {
h.invalidateUser <- userId
}

func getGoroutineId() int {
var buf [64]byte
n := runtime.Stack(buf[:], false)
idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
id, err := strconv.Atoi(idField)
if err != nil {
id = -1
}
return id
}

func (h *Hub) Stop() {
h.stop <- "all"
}
Expand All @@ -246,11 +308,15 @@ func (h *Hub) Start() {
var doRecover func()

doStart = func() {

h.goroutineId = getGoroutineId()
l4g.Debug("Hub for index %v is starting with goroutine %v", h.connectionIndex, h.goroutineId)

for {
select {
case webCon := <-h.register:
h.connections = append(h.connections, webCon)
atomic.StoreInt64(&h.count, int64(len(h.connections)))
atomic.StoreInt64(&h.connectionCount, int64(len(h.connections)))

case webCon := <-h.unregister:
userId := webCon.UserId
Expand Down

0 comments on commit af017a3

Please sign in to comment.