/
acceptor.go
76 lines (62 loc) · 2.53 KB
/
acceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package gate
import (
"context"
"errors"
"git.golaxy.org/core/util/generic"
"git.golaxy.org/core/util/uid"
"git.golaxy.org/framework/net/gtp/codec"
"git.golaxy.org/framework/util/concurrent"
"net"
)
// _Acceptor 网络连接接受器
type _Acceptor struct {
gate *_Gate
encoderCreator codec.EncoderCreator
decoderCreator codec.DecoderCreator
}
// accept 接受网络连接
func (acc *_Acceptor) accept(conn net.Conn) (*_Session, error) {
select {
case <-acc.gate.ctx.Done():
return nil, errors.New("service shutdown")
default:
}
ctx, _ := context.WithTimeout(acc.gate.ctx, acc.gate.options.AcceptTimeout)
return acc.handshake(ctx, conn)
}
// newSession 创建会话
func (acc *_Acceptor) newSession(conn net.Conn) (*_Session, error) {
if conn == nil {
return nil, errors.New("conn is nil")
}
session := &_Session{
terminatedChan: make(chan struct{}),
gate: acc.gate,
id: uid.New(),
state: SessionState_Birth,
}
session.Context, session.terminate = context.WithCancelCause(acc.gate.ctx)
session.transceiver.Conn = conn
// 初始化会话默认选项
sessionWith.Default()(&session.options)
sessionWith.SendDataChanSize(acc.gate.options.SessionSendDataChanSize)(&session.options)
sessionWith.RecvDataChanSize(acc.gate.options.SessionRecvDataChanSize, acc.gate.options.SessionRecvDataChanRecyclable)(&session.options)
sessionWith.SendEventChanSize(acc.gate.options.SessionSendEventChanSize)(&session.options)
sessionWith.RecvEventChanSize(acc.gate.options.SessionRecvEventChanSize)(&session.options)
// 初始化消息事件分发器
session.eventDispatcher.Transceiver = &session.transceiver
session.eventDispatcher.RetryTimes = acc.gate.options.IORetryTimes
session.eventDispatcher.EventHandler = generic.MakeDelegateFunc1(session.trans.HandleEvent, session.ctrl.HandleEvent, session.handleRecvEventChan, session.handleRecvEvent)
// 初始化传输协议
session.trans.Transceiver = &session.transceiver
session.trans.RetryTimes = acc.gate.options.IORetryTimes
session.trans.PayloadHandler = generic.MakeDelegateFunc1(session.handleRecvDataChan, session.handleRecvPayload)
// 初始化控制协议
session.ctrl.Transceiver = &session.transceiver
session.ctrl.RetryTimes = acc.gate.options.IORetryTimes
session.ctrl.HeartbeatHandler = generic.MakeDelegateFunc1(session.handleRecvHeartbeat)
// 初始化监听器
session.dataWatchers = concurrent.MakeLockedSlice[*_DataWatcher](0, 0)
session.eventWatchers = concurrent.MakeLockedSlice[*_EventWatcher](0, 0)
return session, nil
}