forked from thinkgos/go-iecp5
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server_special.go
134 lines (113 loc) · 3 KB
/
server_special.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright 2020 thinkgos (thinkgo@aliyun.com). All rights reserved.
// Use of this source code is governed by a version 3 of the GNU General
// Public License, license that can be found in the LICENSE file.
package cs104
import (
"context"
"errors"
"math/rand"
"sync/atomic"
"time"
"github.com/jiguorui/go-iecp5/asdu"
"github.com/jiguorui/go-iecp5/clog"
)
// ServerSpecial server special interface
type ServerSpecial interface {
asdu.Connect
IsConnected() bool
IsClosed() bool
Start() error
Close() error
SetOnConnectHandler(f func(c asdu.Connect))
SetConnectionLostHandler(f func(c asdu.Connect))
LogMode(enable bool)
SetLogProvider(p clog.LogProvider)
}
type serverSpec struct {
SrvSession
option ClientOption
closeCancel context.CancelFunc
}
// NewServerSpecial new special server
func NewServerSpecial(handler ServerHandlerInterface, o *ClientOption) ServerSpecial {
return &serverSpec{
SrvSession: SrvSession{
config: &o.config,
params: &o.params,
handler: handler,
rcvASDU: make(chan []byte, 1024),
sendASDU: make(chan []byte, 1024),
rcvRaw: make(chan []byte, 1024),
sendRaw: make(chan []byte, 1024), // may not block!
Clog: clog.NewLogger("cs104 serverSpec => "),
},
option: *o,
}
}
// SetOnConnectHandler set on connect handler
func (sf *serverSpec) SetOnConnectHandler(f func(conn asdu.Connect)) {
sf.onConnection = f
}
// SetConnectionLostHandler set connection lost handler
func (sf *serverSpec) SetConnectionLostHandler(f func(c asdu.Connect)) {
sf.connectionLost = f
}
// Start start the server,and return quickly,if it nil,the server will disconnected background,other failed
func (sf *serverSpec) Start() error {
if sf.option.server == nil {
return errors.New("empty remote server")
}
go sf.running()
return nil
}
// 增加重连间隔
func (sf *serverSpec) running() {
var ctx context.Context
sf.rwMux.Lock()
if !atomic.CompareAndSwapUint32(&sf.status, initial, disconnected) {
sf.rwMux.Unlock()
return
}
ctx, sf.closeCancel = context.WithCancel(context.Background())
sf.rwMux.Unlock()
defer sf.setConnectStatus(initial)
for {
select {
case <-ctx.Done():
return
default:
}
sf.Debug("connecting server %+v", sf.option.server)
conn, err := openConnection(sf.option.server, sf.option.TLSConfig, sf.config.ConnectTimeout0)
if err != nil {
sf.Error("connect failed, %v", err)
if !sf.option.autoReconnect {
return
}
time.Sleep(sf.option.reconnectInterval)
continue
}
sf.Debug("connect success")
sf.conn = conn
sf.run(ctx)
sf.Debug("disconnected server %+v", sf.option.server)
select {
case <-ctx.Done():
return
default:
// 随机500ms-1s的重试,避免快速重试造成服务器许多无效连接
time.Sleep(time.Millisecond * time.Duration(500+rand.Intn(500)))
}
}
}
func (sf *serverSpec) IsClosed() bool {
return sf.connectStatus() == initial
}
func (sf *serverSpec) Close() error {
sf.rwMux.Lock()
if sf.closeCancel != nil {
sf.closeCancel()
}
sf.rwMux.Unlock()
return nil
}