forked from plgd-dev/go-coap
/
multicastClient.go
216 lines (186 loc) · 7.05 KB
/
multicastClient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
package coap
// A client implementation.
import (
"context"
"net"
"time"
)
// A ClientConn represents a connection to a COAP server.
type MulticastClientConn struct {
conn *ClientConn
client *MulticastClient
}
// A MulticastClient defines parameters for a COAP client.
type MulticastClient struct {
Net string // "udp" / "udp4" / "udp6"
MaxMessageSize uint32 // Max message size that could be received from peer. If not set it defaults to 1152 B.
DialTimeout time.Duration // set Timeout for dialer
ReadTimeout time.Duration // net.ClientConn.SetReadTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
WriteTimeout time.Duration // net.ClientConn.SetWriteTimeout value for connections, defaults to 1 hour - overridden by Timeout when that value is non-zero
HeartBeat time.Duration // The maximum of time for synchronization go-routines, defaults to 30 seconds - overridden by Timeout when that value is non-zero if it occurs, then it call log.Fatal
Handler HandlerFunc // default handler for handling messages from server
NotifySessionEndFunc func(err error) // if NotifySessionEndFunc is set it is called when TCP/UDP session was ended.
BlockWiseTransfer *bool // Use blockWise transfer for transfer payload (default for UDP it's enabled, for TCP it's disable)
BlockWiseTransferSzx *BlockWiseSzx // Set maximal block size of payload that will be send in fragment
MulticastHopLimit int //sets the hop limit field value for future outgoing multicast packets. default is 2.
multicastHandler *TokenHandler
}
// Dial connects to the address on the named network.
func (c *MulticastClient) dialNet(ctx context.Context, net, address string) (*ClientConn, error) {
if c.multicastHandler == nil {
c.multicastHandler = &TokenHandler{tokenHandlers: make(map[[MaxTokenSize]byte]HandlerFunc)}
}
client := &Client{
Net: net,
MaxMessageSize: c.MaxMessageSize,
DialTimeout: c.DialTimeout,
ReadTimeout: c.ReadTimeout,
WriteTimeout: c.WriteTimeout,
HeartBeat: c.HeartBeat,
Handler: func(w ResponseWriter, r *Request) {
handler := c.Handler
if handler == nil {
handler = HandleFailed
}
c.multicastHandler.Handle(w, r, handler)
},
NotifySessionEndFunc: c.NotifySessionEndFunc,
BlockWiseTransfer: c.BlockWiseTransfer,
BlockWiseTransferSzx: c.BlockWiseTransferSzx,
MulticastHopLimit: c.MulticastHopLimit,
}
return client.DialWithContext(ctx, address)
}
func (c *MulticastClient) Dial(address string) (*MulticastClientConn, error) {
return c.DialWithContext(context.Background(), address)
}
// DialContext connects with context to the address on the named network.
func (c *MulticastClient) DialWithContext(ctx context.Context, address string) (*MulticastClientConn, error) {
var net string
switch c.Net {
case "udp", "udp4", "udp6":
net = c.Net + "-mcast"
case "":
net = "udp-mcast"
default:
return nil, ErrInvalidNetParameter
}
conn, err := c.dialNet(ctx, net, address)
if err != nil {
return nil, err
}
return &MulticastClientConn{
conn: conn,
client: c,
}, nil
}
// LocalAddr implements the networkSession.LocalAddr method.
func (mconn *MulticastClientConn) LocalAddr() net.Addr {
return mconn.conn.LocalAddr()
}
// RemoteAddr implements the networkSession.RemoteAddr method.
func (mconn *MulticastClientConn) RemoteAddr() net.Addr {
return mconn.conn.RemoteAddr()
}
// NewMessage Create message for request
func (mconn *MulticastClientConn) NewMessage(p MessageParams) Message {
return mconn.conn.NewMessage(p)
}
// NewGetRequest creates get request
func (mconn *MulticastClientConn) NewGetRequest(path string) (Message, error) {
return mconn.conn.NewGetRequest(path)
}
// WriteMsg sends a message through the connection co.
func (mconn *MulticastClientConn) WriteMsg(m Message) error {
return mconn.WriteMsgWithContext(context.Background(), m)
}
// WriteContextMsg sends a message with context through the connection co.
func (mconn *MulticastClientConn) WriteMsgWithContext(ctx context.Context, m Message) error {
return mconn.conn.WriteMsgWithContext(ctx, m)
}
// Close close connection
func (mconn *MulticastClientConn) Close() {
mconn.conn.Close()
}
//ResponseWaiter represents subscription to resource on the server
type ResponseWaiter struct {
token []byte
path string
conn *MulticastClientConn
}
// Cancel remove observation from server. For recreate observation use Observe.
func (r *ResponseWaiter) Cancel() error {
return r.conn.client.multicastHandler.Remove(r.token)
}
// Publish subscribes to sever on path. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) Publish(path string, responseHandler func(req *Request)) (*ResponseWaiter, error) {
return mconn.PublishWithContext(context.Background(), path, responseHandler)
}
// PublishContext subscribes with context to sever on path. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) PublishWithContext(ctx context.Context, path string, responseHandler func(req *Request)) (*ResponseWaiter, error) {
req, err := mconn.NewGetRequest(path)
if err != nil {
return nil, err
}
return mconn.PublishMsgWithContext(ctx, req, responseHandler)
}
// PublishMsg subscribes to sever with GET message. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) PublishMsg(req Message, responseHandler func(req *Request)) (*ResponseWaiter, error) {
return mconn.PublishMsgWithContext(context.Background(), req, responseHandler)
}
// PublishMsgWithContext subscribes with context to sever with GET message. After subscription and every change on path,
// server sends immediately response
func (mconn *MulticastClientConn) PublishMsgWithContext(ctx context.Context, req Message, responseHandler func(req *Request)) (*ResponseWaiter, error) {
if req.Code() != GET || req.PathString() == "" {
return nil, ErrInvalidRequest
}
path := req.PathString()
r := &ResponseWaiter{
token: req.Token(),
path: path,
conn: mconn,
}
err := mconn.client.multicastHandler.Add(req.Token(), func(w ResponseWriter, r *Request) {
var err error
switch r.Msg.Code() {
case GET, POST, PUT, DELETE:
//dont serve commands by multicast handler (filter own request)
return
}
needGet := false
resp := r.Msg
if r.Msg.Option(Size2) != nil {
if len(r.Msg.Payload()) != int(r.Msg.Option(Size2).(uint32)) {
needGet = true
}
}
if !needGet {
if block, ok := r.Msg.Option(Block2).(uint32); ok {
_, _, more, err := UnmarshalBlockOption(block)
if err != nil {
return
}
needGet = more
}
}
if needGet {
resp, err = r.Client.GetWithContext(ctx, path)
if err != nil {
return
}
}
responseHandler(&Request{Msg: resp, Client: r.Client, Ctx: ctx, Sequence: r.Client.Sequence()})
})
if err != nil {
return nil, err
}
err = mconn.WriteMsgWithContext(ctx, req)
if err != nil {
mconn.client.multicastHandler.Remove(r.token)
return nil, err
}
return r, nil
}