-
-
Notifications
You must be signed in to change notification settings - Fork 113
/
client.go
111 lines (100 loc) · 2.65 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package tcp
import (
"crypto/tls"
"fmt"
"net"
"time"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/message/pool"
coapNet "github.com/plgd-dev/go-coap/v3/net"
"github.com/plgd-dev/go-coap/v3/net/blockwise"
"github.com/plgd-dev/go-coap/v3/net/monitor/inactivity"
"github.com/plgd-dev/go-coap/v3/options"
client "github.com/plgd-dev/go-coap/v3/tcp/client"
)
// A Option sets options such as credentials, keepalive parameters, etc.
type Option interface {
TCPClientApply(cfg *client.Config)
}
// Dial creates a client connection to the given target.
func Dial(target string, opts ...Option) (*client.Conn, error) {
cfg := client.DefaultConfig
for _, o := range opts {
o.TCPClientApply(&cfg)
}
var conn net.Conn
var err error
if cfg.TLSCfg != nil {
conn, err = tls.DialWithDialer(cfg.Dialer, cfg.Net, target, cfg.TLSCfg)
} else {
conn, err = cfg.Dialer.DialContext(cfg.Ctx, cfg.Net, target)
}
if err != nil {
return nil, err
}
opts = append(opts, options.WithCloseSocket())
return Client(conn, opts...), nil
}
// Client creates client over tcp/tcp-tls connection.
func Client(conn net.Conn, opts ...Option) *client.Conn {
cfg := client.DefaultConfig
for _, o := range opts {
o.TCPClientApply(&cfg)
}
if cfg.Errors == nil {
cfg.Errors = func(error) {
// default no-op
}
}
if cfg.CreateInactivityMonitor == nil {
cfg.CreateInactivityMonitor = func() client.InactivityMonitor {
return inactivity.NewNilMonitor[*client.Conn]()
}
}
if cfg.MessagePool == nil {
cfg.MessagePool = pool.New(0, 0)
}
errorsFunc := cfg.Errors
cfg.Errors = func(err error) {
if coapNet.IsCancelOrCloseError(err) {
// this error was produced by cancellation context or closing connection.
return
}
errorsFunc(fmt.Errorf("tcp: %w", err))
}
createBlockWise := func(*client.Conn) *blockwise.BlockWise[*client.Conn] {
return nil
}
if cfg.BlockwiseEnable {
createBlockWise = func(cc *client.Conn) *blockwise.BlockWise[*client.Conn] {
v := cc
return blockwise.New(
v,
cfg.BlockwiseTransferTimeout,
cfg.Errors,
func(token message.Token) (*pool.Message, bool) {
return v.GetObservationRequest(token)
},
)
}
}
l := coapNet.NewConn(conn)
monitor := cfg.CreateInactivityMonitor()
cc := client.NewConnWithOpts(l,
&cfg,
client.WithBlockWise(createBlockWise),
client.WithInactivityMonitor(monitor),
client.WithRequestMonitor(cfg.RequestMonitor),
)
cfg.PeriodicRunner(func(now time.Time) bool {
cc.CheckExpirations(now)
return cc.Context().Err() == nil
})
go func() {
err := cc.Run()
if err != nil {
cfg.Errors(fmt.Errorf("%v: %w", cc.RemoteAddr(), err))
}
}()
return cc
}