/
client.go
116 lines (107 loc) · 2.84 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package udp
import (
"context"
"fmt"
"net"
"time"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/pool"
coapNet "github.com/plgd-dev/go-coap/v3/net"
"github.com/plgd-dev/go-coap/v3/net/blockwise"
"github.com/plgd-dev/go-coap/v3/net/monitor/inactivity"
"github.com/plgd-dev/go-coap/v3/options"
"github.com/plgd-dev/go-coap/v3/udp/client"
"github.com/plgd-dev/go-coap/v3/udp/server"
)
// A Option sets options such as credentials, keepalive parameters, etc.
type Option interface {
UDPClientApply(cfg *client.Config)
}
// Dial creates a client connection to the given target.
func Dial(target string, opts ...Option) (*client.Conn, error) {
cfg := client.DefaultConfig
for _, o := range opts {
o.UDPClientApply(&cfg)
}
c, err := cfg.Dialer.DialContext(cfg.Ctx, cfg.Net, target)
if err != nil {
return nil, err
}
conn, ok := c.(*net.UDPConn)
if !ok {
return nil, fmt.Errorf("unsupported connection type: %T", c)
}
opts = append(opts, options.WithCloseSocket())
return Client(conn, opts...), nil
}
// Client creates client over udp connection.
func Client(conn *net.UDPConn, opts ...Option) *client.Conn {
cfg := client.DefaultConfig
for _, o := range opts {
o.UDPClientApply(&cfg)
}
if cfg.Errors == nil {
cfg.Errors = func(error) {
// default no-op
}
}
if cfg.CreateInactivityMonitor == nil {
cfg.CreateInactivityMonitor = func() client.InactivityMonitor {
return inactivity.NewNilMonitor[*client.Conn]()
}
}
if cfg.MessagePool == nil {
cfg.MessagePool = pool.New(0, 0)
}
errorsFunc := cfg.Errors
cfg.Errors = func(err error) {
if coapNet.IsCancelOrCloseError(err) {
// this error was produced by cancellation context or closing connection.
return
}
errorsFunc(fmt.Errorf("udp: %v: %w", conn.RemoteAddr(), err))
}
addr, _ := conn.RemoteAddr().(*net.UDPAddr)
createBlockWise := func(*client.Conn) *blockwise.BlockWise[*client.Conn] {
return nil
}
if cfg.BlockwiseEnable {
createBlockWise = func(cc *client.Conn) *blockwise.BlockWise[*client.Conn] {
v := cc
return blockwise.New(
v,
cfg.BlockwiseTransferTimeout,
cfg.Errors,
func(token message.Token) (*pool.Message, bool) {
return v.GetObservationRequest(token)
},
)
}
}
monitor := cfg.CreateInactivityMonitor()
l := coapNet.NewUDPConn(cfg.Net, conn, coapNet.WithErrors(cfg.Errors))
session := server.NewSession(cfg.Ctx,
context.Background(),
l,
addr,
cfg.MaxMessageSize,
cfg.MTU,
cfg.CloseSocket,
)
cc := client.NewConnWithOpts(session, &cfg,
client.WithBlockWise(createBlockWise),
client.WithInactivityMonitor(monitor),
client.WithRequestMonitor(cfg.RequestMonitor),
)
cfg.PeriodicRunner(func(now time.Time) bool {
cc.CheckExpirations(now)
return cc.Context().Err() == nil
})
go func() {
err := cc.Run()
if err != nil {
cfg.Errors(err)
}
}()
return cc
}