Skip to content

Commit

Permalink
MM-21012: Revamp websocket implementation (#16620)
Browse files Browse the repository at this point in the history
* MM-21012: Revamp websocket implementation

We replace the old gorilla/websocket implementation with the
gobwas/ws library. The gorilla library was in maintenance mode
and had a high level API due to which we cannot use that for
situations where a large number of concurrent connections needs
to be supported.

The ws library is a very low-level library that allows us
to work with raw net.Conns. We make several improvements:

- We completely remove the reader goroutines, and instead
replace them with a manual epoll implementation which sends off
messages to be read when it receives any data on the connection.
This lets us scale to a much larger number of connections.
- The reader buffer is eliminated, because we directly read
from the connection now.

https://mattermost.atlassian.net/browse/MM-21012

```release-notes
Improved the websocket implementation by using epoll manually
to read from a websocket. As a result, the number of goroutines
is expected to go down by half.
```

* fix tests

* fix shadowing errors

* final changes

* windows support!

* Remove pointer to waitgroup

* Fix edge case

* Trigger CI

* Trigger CI

Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
  • Loading branch information
agnivade and mattermod committed Feb 13, 2021
1 parent 9e561aa commit a246104
Show file tree
Hide file tree
Showing 76 changed files with 9,346 additions and 58 deletions.
27 changes: 18 additions & 9 deletions api4/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package api4

import (
"net/http"
"runtime"
"time"

"github.com/gorilla/websocket"
"github.com/gobwas/ws"

"github.com/mattermost/mattermost-server/v5/model"
)
Expand All @@ -17,23 +19,30 @@ func (api *API) InitWebSocket() {
}

func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
ReadBufferSize: model.SOCKET_MAX_MESSAGE_SIZE_KB,
WriteBufferSize: model.SOCKET_MAX_MESSAGE_SIZE_KB,
CheckOrigin: c.App.OriginChecker(),
fn := c.App.OriginChecker()
if fn != nil && !fn(r) {
c.Err = model.NewAppError("origin_check", "api.web_socket.connect.check_origin.app_error", nil, "", http.StatusBadRequest)
return
}

upgrader := ws.HTTPUpgrader{
Timeout: 5 * time.Second,
}

ws, err := upgrader.Upgrade(w, r, nil)
conn, _, _, err := upgrader.Upgrade(r, w)
if err != nil {
c.Err = model.NewAppError("connect", "api.web_socket.connect.upgrade.app_error", nil, "", http.StatusInternalServerError)
return
}

wc := c.App.NewWebConn(ws, *c.App.Session(), c.App.T, "")

wc := c.App.NewWebConn(conn, *c.App.Session(), c.App.T, "")
if c.App.Session().UserId != "" {
c.App.HubRegister(wc)
}

wc.Pump()
if runtime.GOOS == "windows" {
wc.BlockingPump()
} else {
go wc.Pump()
}
}
4 changes: 2 additions & 2 deletions app/app_iface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions app/opentracing/opentracing_layer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 24 additions & 6 deletions app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/getsentry/sentry-go"
sentryhttp "github.com/getsentry/sentry-go/http"
"github.com/gorilla/mux"
"github.com/mailru/easygo/netpoll"
"github.com/pkg/errors"
"github.com/rs/cors"
"golang.org/x/crypto/acme/autocert"
Expand Down Expand Up @@ -102,8 +103,11 @@ type Server struct {

EmailService *EmailService

hubs []*Hub
hashSeed maphash.Seed
hubs []*Hub
hashSeed maphash.Seed
poller netpoll.Poller
webConnSema chan struct{}
webConnSemaWg sync.WaitGroup

PushNotificationsHub PushNotificationsHub
pushNotificationClient *http.Client // TODO: move this to it's own package
Expand Down Expand Up @@ -226,6 +230,16 @@ func NewServer(options ...Option) (*Server, error) {
mlog.Error("Could not initiate logging", mlog.Err(err))
}

// epoll/kqueue is not available on Windows.
if runtime.GOOS != "windows" {
poller, err := netpoll.New(nil)
if err != nil {
return nil, errors.Wrap(err, "failed to create a netpoll instance")
}
s.poller = poller
s.webConnSema = make(chan struct{}, runtime.NumCPU()*8) // numCPU * 8 is a good amount of concurrency.
}

// This is called after initLogging() to avoid a race condition.
mlog.Info("Server is initializing...", mlog.String("go_version", runtime.Version()))

Expand Down Expand Up @@ -259,9 +273,9 @@ func NewServer(options ...Option) (*Server, error) {
}

if *s.Config().ServiceSettings.EnableOpenTracing {
tracer, err := tracing.New()
if err != nil {
return nil, err
tracer, err2 := tracing.New()
if err2 != nil {
return nil, err2
}
s.tracer = tracer
}
Expand Down Expand Up @@ -1201,7 +1215,7 @@ func (a *App) OriginChecker() func(*http.Request) bool {

return utils.OriginChecker(allowed)
}
return nil
return utils.SameOriginChecker()
}

func (s *Server) checkPushNotificationServerUrl() {
Expand Down Expand Up @@ -1621,6 +1635,10 @@ func (s *Server) SetLog(l *mlog.Logger) {
s.Log = l
}

func (s *Server) Poller() netpoll.Poller {
return s.poller
}

func (a *App) GenerateSupportPacket() []model.FileData {
// If any errors we come across within this function, we will log it in a warning.txt file so that we know why certain files did not get produced if any
var warnings []string
Expand Down
Loading

0 comments on commit a246104

Please sign in to comment.