/
start.go
120 lines (101 loc) · 2.69 KB
/
start.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package socketmode
import (
"context"
"fmt"
"time"
"github.com/gorilla/websocket"
"golang.org/x/exp/slog"
)
const (
defaultWaitTimeBetweenRetries = 5 * time.Second
defaultPingDeadline = 30 * time.Second
defaultHandlePingInterval = 5 * time.Second
)
// Start starts the client. It contains logic for retries.
func (c *Client) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
default:
if err := c.start(ctx); err != nil {
// Handle websocket connection error
if c.retries < c.maxRetries {
c.retries++
c.logger.Info("failed to initialize connection", slog.String("error", err.Error()))
c.logger.Info(fmt.Sprintf("connection retry attempt %d", c.retries))
time.Sleep(defaultWaitTimeBetweenRetries)
continue
}
return err
}
}
}
}
func (c *Client) start(ctx context.Context) error {
if err := c.connect(ctx); err != nil {
return err
}
ctx, cancel := context.WithCancelCause(ctx)
// Listeners
go c.handleRead(ctx)
go c.handlePings(ctx)
go c.handleSend(ctx)
go c.handleErrors(ctx, cancel)
c.isConnected = true
c.logger.Info("client started successfully")
<-ctx.Done()
// Close connection
if err := c.conn.Close(); err != nil {
return err
}
err := context.Cause(ctx)
c.logger.Info("connection failed with error", slog.Any("error", err))
return err
}
// connect grabs the `ws` url using the webapi to make the call and opens up a websocket
// connection.
func (c *Client) connect(ctx context.Context) error {
c.logger.Info("initializing websocket connection")
_, url, err := c.Api.StartSocketModeContext(ctx)
if err != nil {
return err
}
if c.debugReconnects {
url += "&debug_reconnects=true"
}
c.conn, _, err = c.dialer.Dial(url, nil)
if err != nil {
return err
}
c.pingTimer = time.NewTimer(c.pingTimeout)
c.conn.SetPingHandler(pingHandlerFunc(c))
c.logger.Info("successfully performed websocket handshake")
return nil
}
// pingHandlerFunc defines the logic of how to handle a ping
func pingHandlerFunc(c *Client) func(string) error {
return func(h string) error {
c.retries = 0
if !c.pingTimer.Stop() {
<-c.pingTimer.C
}
c.pingTimer.Reset(c.pingTimeout)
return c.conn.WriteControl(websocket.PongMessage, []byte(h), time.Now().Add(defaultPingDeadline))
}
}
// handlePings is a listener that checks to make sure that our connection is healthy.
func (c *Client) handlePings(ctx context.Context) {
defer c.logger.Info("shutting down handlePings listener")
c.logger.Info("starting handlePings listener")
for {
select {
case <-ctx.Done():
return
case <-c.pingTimer.C:
c.errCh <- ErrPingTimeout
default:
time.Sleep(defaultHandlePingInterval)
}
}
}