forked from VolantMQ/volantmq
/
base.go
100 lines (84 loc) · 2.4 KB
/
base.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package transport
import (
"errors"
"sync"
"github.com/VolantMQ/volantmq/auth"
"github.com/VolantMQ/volantmq/systree"
"go.uber.org/zap"
)
// Config is base configuration object used by all transports
type Config struct {
// AuthManager
AuthManager *auth.Manager
Host string
// Port tcp port to listen on
Port string
}
// InternalConfig used by server implementation to configure internal specific needs
type InternalConfig struct {
Handler
Metric systree.Metric
}
type baseConfig struct {
InternalConfig
config Config
onConnection sync.WaitGroup // nolint: structcheck
onceStop sync.Once // nolint: structcheck
quit chan struct{} // nolint: structcheck
log *zap.SugaredLogger
protocol string
}
// Provider is interface that all of transports must implement
type Provider interface {
Protocol() string
Serve() error
Close() error
Port() string
Ready() error
Alive() error
}
var (
ErrListenerIsOff = errors.New("listener is off")
)
// Port return tcp port used by transport
func (c *baseConfig) Port() string {
return c.config.Port
}
// Protocol return protocol name used by transport
func (c *baseConfig) Protocol() string {
return c.protocol
}
func (c *baseConfig) baseReady() error {
select {
case <-c.quit:
return ErrListenerIsOff
default:
}
return nil
}
// handleConnection is for the broker to handle an incoming connection from a client
func (c *baseConfig) handleConnection(conn Conn) {
if c == nil {
c.log.Error("Invalid connection type")
return
}
var err error
defer func() {
if err != nil {
conn.Close()
}
}()
// To establish a connection, we must
// 1. Read and decode the message.ConnectMessage from the wire
// 2. If no decoding errors, then authenticate using username and password.
// Otherwise, write out to the wire message.ConnackMessage with
// appropriate error.
// 3. If authentication is successful, then either create a new session or
// retrieve existing session
// 4. Write out to the wire a successful message.ConnackMessage message
// Read the CONNECT message from the wire, if error, then check to see if it's
// a CONNACK error. If it's CONNACK error, send the proper CONNACK error back
// to client. Exit regardless of error type.
//conn.Conn.SetReadDeadline(time.Now().Add(time.Second * time.Duration(c.ConnectTimeout))) // nolint: errcheck, gas
err = c.OnConnection(conn, c.config.AuthManager)
}