forked from xiaonanln/goworld
-
Notifications
You must be signed in to change notification settings - Fork 0
/
DispatcherConnMgr.go
147 lines (125 loc) · 4.75 KB
/
DispatcherConnMgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package dispatcherclient
import (
"time"
"net"
"fmt"
"sync/atomic"
"unsafe"
"github.com/pkg/errors"
"github.com/xiaonanln/goworld/engine/common"
"github.com/xiaonanln/goworld/engine/config"
"github.com/xiaonanln/goworld/engine/consts"
"github.com/xiaonanln/goworld/engine/gwioutil"
"github.com/xiaonanln/goworld/engine/gwlog"
"github.com/xiaonanln/goworld/engine/gwutils"
"github.com/xiaonanln/goworld/engine/netutil"
"github.com/xiaonanln/goworld/engine/proto"
)
const (
_LOOP_DELAY_ON_DISPATCHER_CLIENT_ERROR = time.Second
)
type DispatcherConnMgr struct {
gid uint16 // gateid or gameid
dctype DispatcherClientType
dispid uint16
_dispatcherClient *DispatcherClient
isReconnect, isRestoreGame, isBanBootEntity bool // more properties for Game
delegate IDispatcherClientDelegate
}
var (
errDispatcherNotConnected = errors.New("dispatcher not connected")
)
func NewDispatcherConnMgr(gid uint16, dctype DispatcherClientType, dispid uint16, isRestoreGame, isBanBootEntity bool, delegate IDispatcherClientDelegate) *DispatcherConnMgr {
return &DispatcherConnMgr{
gid: gid,
dctype: dctype,
dispid: dispid,
isRestoreGame: isRestoreGame,
isBanBootEntity: isBanBootEntity,
delegate: delegate,
}
}
func (dcm *DispatcherConnMgr) getDispatcherClient() *DispatcherClient { // atomic
addr := (*uintptr)(unsafe.Pointer(&dcm._dispatcherClient))
return (*DispatcherClient)(unsafe.Pointer(atomic.LoadUintptr(addr)))
}
func (dcm *DispatcherConnMgr) setDispatcherClient(dispatcherClient *DispatcherClient) { // atomic
addr := (*uintptr)(unsafe.Pointer(&dcm._dispatcherClient))
atomic.StoreUintptr(addr, uintptr(unsafe.Pointer(dispatcherClient)))
}
func (dcm *DispatcherConnMgr) String() string {
return fmt.Sprintf("DispatcherConnMgr<%d>", dcm.dispid)
}
func (dcm *DispatcherConnMgr) assureConnected() *DispatcherClient {
//gwlog.Debugf("assureConnected: _dispatcherClient", _dispatcherClient)
var err error
dc := dcm.getDispatcherClient()
for dc == nil || dc.IsClosed() {
dc, err = dcm.connectDispatchClient()
if err != nil {
gwlog.Errorf("Connect to dispatcher failed: %s", err.Error())
time.Sleep(_LOOP_DELAY_ON_DISPATCHER_CLIENT_ERROR)
continue
}
dcm.setDispatcherClient(dc)
if dcm.dctype == GameDispatcherClientType {
dc.SendSetGameID(dcm.gid, dcm.isReconnect, dcm.isRestoreGame, dcm.isBanBootEntity, dcm.delegate.GetEntityIDsForDispatcher(dcm.dispid))
} else {
dc.SendSetGateID(dcm.gid)
}
dcm.isReconnect = true
gwlog.Infof("dispatcher_client: connected to dispatcher: %s", dc)
}
return dc
}
func (dcm *DispatcherConnMgr) connectDispatchClient() (*DispatcherClient, error) {
dispatcherConfig := config.GetDispatcher(dcm.dispid)
conn, err := netutil.ConnectTCP(dispatcherConfig.Ip, dispatcherConfig.Port)
if err != nil {
return nil, err
}
tcpConn := conn.(*net.TCPConn)
tcpConn.SetReadBuffer(consts.DISPATCHER_CLIENT_READ_BUFFER_SIZE)
tcpConn.SetWriteBuffer(consts.DISPATCHER_CLIENT_WRITE_BUFFER_SIZE)
dc := newDispatcherClient(dcm.dctype, conn, dcm.isReconnect, dcm.isRestoreGame)
return dc, nil
}
// IDispatcherClientDelegate defines functions that should be implemented by dispatcher clients
type IDispatcherClientDelegate interface {
HandleDispatcherClientPacket(msgtype proto.MsgType, packet *netutil.Packet)
HandleDispatcherClientDisconnect()
GetEntityIDsForDispatcher(dispid uint16) []common.EntityID
}
// Initialize the dispatcher client, only called by engine
func (dcm *DispatcherConnMgr) Connect() {
dcm.assureConnected()
go gwutils.RepeatUntilPanicless(dcm.serveDispatcherClient) // start the recv routine
}
// GetDispatcherClientForSend returns the current dispatcher client for sending messages
func (dcm *DispatcherConnMgr) GetDispatcherClientForSend() *DispatcherClient {
dispatcherClient := dcm.getDispatcherClient()
return dispatcherClient
}
// serve the dispatcher client, receive RESPs from dispatcher and process
func (dcm *DispatcherConnMgr) serveDispatcherClient() {
gwlog.Debugf("%s.serveDispatcherClient: start serving dispatcher client ...", dcm)
for {
dc := dcm.assureConnected()
var msgtype proto.MsgType
pkt, err := dc.Recv(&msgtype)
if err != nil {
if gwioutil.IsTimeoutError(err) {
continue
}
gwlog.TraceError("serveDispatcherClient: RecvMsgPacket error: %s", err.Error())
dc.Close()
dcm.delegate.HandleDispatcherClientDisconnect()
time.Sleep(_LOOP_DELAY_ON_DISPATCHER_CLIENT_ERROR)
continue
}
if consts.DEBUG_PACKETS {
gwlog.Debugf("%s.RecvPacket: msgtype=%v, payload=%v", dc, msgtype, pkt.Payload())
}
dcm.delegate.HandleDispatcherClientPacket(msgtype, pkt)
}
}