New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Keeping many idle clients alive #92
Comments
Which side sends the keepalive message in your code, client, or server? Usually, the client should send the keepalive message and the server responds to it. func newUpgrader() *websocket.Upgrader {
u := websocket.NewUpgrader()
u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
// echo
c.WriteMessage(messageType, data)
c.SetReadDeadline(time.Now().Add(your_keepalive_time))
})
u.OnClose(func(c *websocket.Conn, err error) {
fmt.Println("OnClose:", c.RemoteAddr().String(), err)
})
return u
}
func onWebsocket(w http.ResponseWriter, r *http.Request) {
upgrader := newUpgrader()
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}
wsConn := conn.(*websocket.Conn)
wsConn.SetReadDeadline(time.Now().Add(your_keepalive_time))
} |
If you want to Ping by server, here is a full example of websocket using the latest version of nbio: package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"time"
"github.com/lesismal/nbio/nbhttp"
"github.com/lesismal/nbio/nbhttp/websocket"
)
var (
KeepaliveTime = time.Second * 5
KeepaliveTimeout = KeepaliveTime + time.Second*3
)
var clientMgr *ClientMgr
type ClientMgr struct {
mux sync.Mutex
chStop chan struct{}
clients map[*websocket.Conn]struct{}
keepaliveTime time.Duration
}
func NewClientMgr(keepaliveTime time.Duration) *ClientMgr {
return &ClientMgr{
chStop: make(chan struct{}),
clients: map[*websocket.Conn]struct{}{},
keepaliveTime: keepaliveTime,
}
}
func (cm *ClientMgr) Add(c *websocket.Conn) {
cm.mux.Lock()
defer cm.mux.Unlock()
cm.clients[c] = struct{}{}
}
func (cm *ClientMgr) Delete(c *websocket.Conn) {
cm.mux.Lock()
defer cm.mux.Unlock()
delete(cm.clients, c)
}
func (cm *ClientMgr) Run() {
ticker := time.NewTicker(cm.keepaliveTime)
defer ticker.Stop()
for {
select {
case <-ticker.C:
func() {
cm.mux.Lock()
defer cm.mux.Unlock()
for wsConn := range cm.clients {
wsConn.WriteMessage(websocket.PingMessage, nil)
}
fmt.Printf("keepalive: ping %v clients\n", len(cm.clients))
}()
case <-cm.chStop:
return
}
}
}
func (cm *ClientMgr) Stop() {
close(cm.chStop)
}
func onWebsocket(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.NewUpgrader()
upgrader.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
// echo
c.WriteMessage(messageType, data)
// update read deadline
c.SetReadDeadline(time.Now().Add(KeepaliveTimeout))
})
upgrader.SetPongHandler(func(c *websocket.Conn, s string) {
// update read deadline
c.SetReadDeadline(time.Now().Add(KeepaliveTimeout))
})
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}
wsConn := conn.(*websocket.Conn)
// init read deadline
wsConn.SetReadDeadline(time.Now().Add(KeepaliveTimeout))
clientMgr.Add(wsConn)
wsConn.OnClose(func(c *websocket.Conn, err error) {
clientMgr.Delete(c)
})
}
func main() {
clientMgr = NewClientMgr(KeepaliveTime)
go clientMgr.Run()
defer clientMgr.Stop()
mux := &http.ServeMux{}
mux.HandleFunc("/ws", onWebsocket)
svr := nbhttp.NewServer(nbhttp.Config{
Network: "tcp",
Addrs: []string{"localhost:8888"},
}, mux, nil)
err := svr.Start()
if err != nil {
fmt.Printf("nbio.Start failed: %v\n", err)
return
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
svr.Shutdown(ctx)
} |
To avoid any misanderstanding. I do not ask you to provide a solution. Only asking for your opinion. |
I don't understand what you want, any further information? |
Oh wow. Your two replies only became visible after I sent my 2nd msg. |
My current browser has Javascript turned off :-) |
All right, glad to help! |
Rather than using the mutex and map, would this be a go scenario to use
sync.Map?
…On Sat, Sep 18, 2021, 2:57 AM Timur Mehrvarz ***@***.***> wrote:
Closed #92 <#92>.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#92 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAJSWEUHRQUSP4CFZ2H7WYTUCQZ6XANCNFSM5EIVS6XQ>
.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>.
|
Of course, as you like. I rarely use sync.Map because of a little bad performance than |
My thoughts on pinging after some consideration. One advantage of my 1-goroutine-per-client solution is that it lets me handle each client individually. Whenever a client interacts with the server, I reset the deadline for it's next ping. If a client interacts with the server for a longer period, it will not see any pings during this time. Cutting down on pinging can save bandwidth and cpu load. Mimicing this behavior with a single goroutine would require more complex code. Not saying it can't be done. But for now I am shying away from trying to implement this. |
If you want to save payload as much as possible, try this: // server.go
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"time"
"github.com/lesismal/nbio"
"github.com/lesismal/nbio/nbhttp"
"github.com/lesismal/nbio/nbhttp/websocket"
)
var (
KeepaliveTime = time.Second * 5
KeepaliveTimeout = KeepaliveTime + time.Second*3
server *nbhttp.Server
)
func onWebsocket(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.NewUpgrader()
upgrader.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
log.Println("onMessage:", string(data))
// step 2: reset ping timer
keepaliveTimer := c.Session().(*nbio.Timer)
keepaliveTimer.Reset(KeepaliveTime)
// echo
c.WriteMessage(messageType, data)
// update read deadline
c.SetReadDeadline(time.Now().Add(KeepaliveTimeout))
})
upgrader.SetPongHandler(func(c *websocket.Conn, s string) {
log.Println("-- pone")
// step 3: reset ping timer
keepaliveTimer := c.Session().(*nbio.Timer)
keepaliveTimer.Reset(KeepaliveTime)
// update read deadline
c.SetReadDeadline(time.Now().Add(KeepaliveTimeout))
})
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}
wsConn := conn.(*websocket.Conn)
// step 1: reset ping timer and save it
closed := false
var ping func()
ping = func() {
if closed {
return
}
log.Println("++ ping")
wsConn.WriteMessage(websocket.PingMessage, nil)
keepaliveTimer := server.AfterFunc(KeepaliveTime, ping)
wsConn.SetSession(keepaliveTimer)
}
keepaliveTimer := server.AfterFunc(KeepaliveTime, ping)
wsConn.SetSession(keepaliveTimer)
wsConn.OnClose(func(c *websocket.Conn, err error) {
closed = true
keepaliveTimer := c.Session().(*nbio.Timer)
keepaliveTimer.Stop()
})
// init read deadline
wsConn.SetReadDeadline(time.Now().Add(KeepaliveTimeout))
}
func main() {
mux := &http.ServeMux{}
mux.HandleFunc("/ws", onWebsocket)
server = nbhttp.NewServer(nbhttp.Config{
Network: "tcp",
Addrs: []string{"localhost:8888"},
}, mux, nil)
err := server.Start()
if err != nil {
fmt.Printf("nbio.Start failed: %v\n", err)
return
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
server.Shutdown(ctx)
} // client.go
package main
import (
"flag"
"fmt"
"log"
"net"
"net/url"
"time"
"github.com/gorilla/websocket"
)
func main() {
flag.Parse()
u := url.URL{Scheme: "ws", Host: "localhost:8888", Path: "/ws"}
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
c.SetPingHandler(func(message string) error {
log.Println("++ ping")
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
log.Println("-- pong", err)
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
})
for i := 0; i < 6; i++ {
request := fmt.Sprintf("hello %v", i)
err := c.WriteMessage(websocket.BinaryMessage, []byte(request))
if err != nil {
log.Fatalf("write: %v", err)
return
}
receiveType, response, err := c.ReadMessage()
if err != nil {
log.Println("ReadMessage failed:", err)
return
}
if receiveType != websocket.BinaryMessage {
log.Println("received type != websocket.BinaryMessage")
return
}
if string(response) != request {
log.Printf("'%v' != '%v'", len(response), len(request))
return
}
log.Println("success echo websocket.BinaryMessage:", request)
time.Sleep(time.Second)
}
_, _, err = c.ReadMessage()
if err != nil {
log.Println("ReadMessage failed:", err)
return
}
} |
@mehrvarz @acgreek One thing I am sorry about is that my English is poor, I will try my best to practice English 😂. |
This looks like what I am doing, but without goroutines. Very nice! I've had a suspicion it can be done with help from a layer below (AfterFunc). I am not using slack yet. Not really sure I want to. Let me think about it. Btw I upgraded nbio 1.1.9 -> 1.2.2 and it is working flawlessly. |
Usually, a ticker loop in a single goroutine is enough, we don't need to do it like this unless you've got a huge num of online connections. |
You can also implement your own timer that running one or a few more goroutines, both heap or time-wheel would be a good choice. |
Or simply, in my first example, you can set each conn's // server.go
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"time"
"github.com/lesismal/nbio/nbhttp"
"github.com/lesismal/nbio/nbhttp/websocket"
)
var (
KeepaliveTime = time.Second * 5
KeepaliveTimeout = KeepaliveTime + time.Second*3
)
var clientMgr *ClientMgr
type ClientMgr struct {
mux sync.Mutex
chStop chan struct{}
clients map[*websocket.Conn]struct{}
keepaliveTime time.Duration
}
func NewClientMgr(keepaliveTime time.Duration) *ClientMgr {
return &ClientMgr{
chStop: make(chan struct{}),
clients: map[*websocket.Conn]struct{}{},
keepaliveTime: keepaliveTime,
}
}
func (cm *ClientMgr) Add(c *websocket.Conn) {
cm.mux.Lock()
defer cm.mux.Unlock()
cm.clients[c] = struct{}{}
}
func (cm *ClientMgr) Delete(c *websocket.Conn) {
cm.mux.Lock()
defer cm.mux.Unlock()
delete(cm.clients, c)
}
func (cm *ClientMgr) Run() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
func() {
cm.mux.Lock()
defer cm.mux.Unlock()
mustActive := time.Now().Add(-cm.keepaliveTime)
nPing := 0
for wsConn := range cm.clients {
lastActive := wsConn.Session().(time.Time)
if lastActive.Before(mustActive) {
wsConn.WriteMessage(websocket.PingMessage, nil)
nPing++
}
}
fmt.Printf("keepalive: ping [%v/%v] clients\n", nPing, len(cm.clients))
}()
case <-cm.chStop:
return
}
}
}
func (cm *ClientMgr) Stop() {
close(cm.chStop)
}
func onWebsocket(w http.ResponseWriter, r *http.Request) {
upgrader := websocket.NewUpgrader()
upgrader.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
c.SetSession(time.Now())
// echo
c.WriteMessage(messageType, data)
// update read deadline
c.SetReadDeadline(time.Now().Add(KeepaliveTimeout))
})
upgrader.SetPongHandler(func(c *websocket.Conn, s string) {
c.SetSession(time.Now())
// update read deadline
c.SetReadDeadline(time.Now().Add(KeepaliveTimeout))
})
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
}
wsConn := conn.(*websocket.Conn)
// init read deadline
wsConn.SetReadDeadline(time.Now().Add(KeepaliveTimeout))
clientMgr.Add(wsConn)
wsConn.OnClose(func(c *websocket.Conn, err error) {
clientMgr.Delete(c)
})
}
func main() {
clientMgr = NewClientMgr(KeepaliveTime)
go clientMgr.Run()
defer clientMgr.Stop()
mux := &http.ServeMux{}
mux.HandleFunc("/ws", onWebsocket)
svr := nbhttp.NewServer(nbhttp.Config{
Network: "tcp",
Addrs: []string{"localhost:8888"},
}, mux, nil)
err := svr.Start()
if err != nil {
fmt.Printf("nbio.Start failed: %v\n", err)
return
}
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
svr.Shutdown(ctx)
} // client.go
// client.go
package main
import (
"flag"
"fmt"
"log"
"net"
"net/url"
"time"
"github.com/gorilla/websocket"
)
func main() {
flag.Parse()
go func() {
u := url.URL{Scheme: "ws", Host: "localhost:8888", Path: "/ws"}
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
c.SetPingHandler(func(message string) error {
log.Println("222 ping")
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
log.Println("222 pong", err)
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
})
for i := 0; true; i++ {
request := fmt.Sprintf("hello %v", i)
err := c.WriteMessage(websocket.BinaryMessage, []byte(request))
if err != nil {
log.Fatalf("write: %v", err)
return
}
receiveType, response, err := c.ReadMessage()
if err != nil {
log.Println("ReadMessage failed:", err)
return
}
if receiveType != websocket.BinaryMessage {
log.Println("received type != websocket.BinaryMessage")
return
}
if string(response) != request {
log.Printf("'%v' != '%v'", len(response), len(request))
return
}
log.Println("222 success echo websocket.BinaryMessage:", request)
time.Sleep(time.Second)
}
_, _, err = c.ReadMessage()
if err != nil {
log.Println("ReadMessage failed:", err)
return
}
}()
u := url.URL{Scheme: "ws", Host: "localhost:8888", Path: "/ws"}
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
c.SetPingHandler(func(message string) error {
log.Println("111 ping")
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
log.Println("111 pong", err)
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
})
for i := 0; i < 6; i++ {
request := fmt.Sprintf("hello %v", i)
err := c.WriteMessage(websocket.BinaryMessage, []byte(request))
if err != nil {
log.Fatalf("write: %v", err)
return
}
receiveType, response, err := c.ReadMessage()
if err != nil {
log.Println("ReadMessage failed:", err)
return
}
if receiveType != websocket.BinaryMessage {
log.Println("received type != websocket.BinaryMessage")
return
}
if string(response) != request {
log.Printf("'%v' != '%v'", len(response), len(request))
return
}
log.Println("111 success echo websocket.BinaryMessage:", request)
time.Sleep(time.Second)
}
_, _, err = c.ReadMessage()
if err != nil {
log.Println("ReadMessage failed:", err)
return
}
} |
This last one is the winner. Best combination of effectiveness and simplicity. |
Looking at top I would say that (with 200K idle test-clients) 1s-Ticker is generating 30% to 50% less cpu load. Number of pings sent over time is basically the same. Before, everything was done by func setPingDeadline. Now the same func looks like this:
|
You can change the ticker interval to a bigger value and save more. AfterFunc may be better. |
Maybe true. But spreading the pings out evenly is probably not a bad thing.
Sure? I am happy with this now. Thank you very much again.
|
Because there is no need to traverse all clients every interval, but indeed it is more complex. It's the right choice to keep using this simple solution that has already meet your needs. You can compare the CPU cost when you need a better performance in the future. |
I finally committed KeepAliveMgr. Let me know how I should credit you :-) |
It maybe better: func (kaMgr *KeepAliveMgr) Run() {
ticker := time.NewTicker(2*time.Second)
defer ticker.Stop()
myClients := make([]*websocket.Conn, yourMaxOnlineNum)[0:0] // here we reuse the array
for {
<-ticker.C
if shutdownStarted.Get() {
break
}
kaMgr.mux.RLock()
timeNow := time.Now()
for wsConn := range kaMgr.clients {
pingTime := wsConn.Session()
if pingTime!=nil && timeNow.After(pingTime.(time.Time)) {
myClients = append(myClients, wsConn)
}
}
kaMgr.mux.RUnlock()
pingDeadline := timeNow.Add(time.Duration(pingPeriod)*time.Second)
readDeadline := timeNow.Add(30*time.Second)
for _,wsConn := range myClients {
// set the time for sending the next ping in pingPeriod secs
wsConn.SetSession(pingDeadline) // here we save more time.Now().Add
// we expect a pong to our ping within max 30 secs from now
wsConn.SetReadDeadline(readDeadline) // here we save more time.Now().Add
// send the ping
wsConn.WriteMessage(websocket.PingMessage, nil)
}
atomic.AddInt64(&pingSentCounter, int64(len(myClients)))
myClients = myClients[0:0]
}
} |
These are very good ideas. For now I will keep my current implementation. It's a huge improvement, but still easy to understand code. I ran it for hours on my slowest Linux device, a RPi-2B+. I saw 24% cpu load with 60000 remote clients. So basically running 60K clients on 1 (slow) ARM core. Mind boggling. - I'm inclined to play with AfterFunc at some point. |
That's all right. |
That's not using AfterFunc, but same more time.Now() 😄 |
In another thread you said:
My current keep-alive implementation is using 1 goroutine per client to send PingMessages in perfect frequency. It is using case <-time.After(): which is simple (almost elegant) and is working really well. But it creates a lot of goroutines.
The alternative "1 goroutine for all clients" implementation, that I can think of, would be way more complex. And I am not certain it would be really so much more efficient. Any thoughts you can share on this?
The text was updated successfully, but these errors were encountered: