/
UdpServerSession.go
146 lines (124 loc) · 3.15 KB
/
UdpServerSession.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package zNet
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/pzqf/zUtil/zAes"
)
//type SessionIdType int64
type UdpServerSession struct {
conn *net.UDPConn
sid SessionIdType // session ID
addr *net.UDPAddr
sendChan chan *NetPacket
wg sync.WaitGroup
lastHeartBeat time.Time
ctxCancel context.CancelFunc
aesKey []byte
}
func (s *UdpServerSession) Init(conn *net.UDPConn, sid SessionIdType, aesKey []byte) {
s.conn = conn
s.sid = sid
s.sendChan = make(chan *NetPacket, GConfig.ChanSize)
s.lastHeartBeat = time.Now()
}
func (s *UdpServerSession) Start() {
if s.conn == nil {
return
}
ctx, ctxCancel := context.WithCancel(context.Background())
s.ctxCancel = ctxCancel
go s.process(ctx)
return
}
func (s *UdpServerSession) Close() {
s.ctxCancel()
s.wg.Wait()
}
func (s *UdpServerSession) Receive(data []byte) {
netPacket := NetPacket{}
if err := netPacket.UnmarshalHead(data[:NetPacketHeadSize]); err != nil {
LogPrint("Receive NetPacket,Unmarshal head error", err, len(data))
return
}
if netPacket.DataSize > 0 {
netPacket.Data = data[NetPacketHeadSize : NetPacketHeadSize+int(netPacket.DataSize)]
}
if netPacket.ProtoId < 0 {
LogPrint("receive NetPacket ProtoId less than 0")
return
}
if netPacket.DataSize > maxPacketDataSize {
LogPrint(fmt.Sprintf("Receive NetPacket, Data size over max size, protoid:%d, data size:%d, max size: %d",
netPacket.ProtoId, netPacket.DataSize, maxPacketDataSize))
return
}
if netPacket.ProtoId == HeartbeatProtoId {
return
}
if netPacket.DataSize > 0 && s.aesKey != nil {
netPacket.Data = zAes.DecryptCBC(netPacket.Data, s.aesKey)
}
err := Dispatcher(s, &netPacket)
if err != nil {
LogPrint(fmt.Sprintf("Dispatcher NetPacket error,%v, ProtoId:%d", err, netPacket.ProtoId))
}
}
func (s *UdpServerSession) process(ctx context.Context) {
s.wg.Add(1)
defer s.wg.Done()
defer Recover()
running := true
for {
select {
case sendPacket := <-s.sendChan:
_, err := s.conn.WriteToUDP(sendPacket.Marshal(), s.addr)
if err != nil {
LogPrint(fmt.Sprintf("Send NetPacket error,%v, ProtoId:%d", err, sendPacket.ProtoId))
}
case <-ctx.Done():
for {
if len(s.sendChan) > 0 {
sendPacket := <-s.sendChan
_, err := s.conn.WriteToUDP(sendPacket.Marshal(), s.addr)
if err != nil {
LogPrint(err)
break
}
continue
}
break
}
running = false
}
if !running {
break
}
}
}
func (s *UdpServerSession) Send(protoId int32, data []byte) error {
netPacket := NetPacket{
ProtoId: protoId,
}
if s.aesKey != nil {
netPacket.Data = zAes.EncryptCBC(data, s.aesKey)
} else {
netPacket.Data = data
}
netPacket.DataSize = int32(len(netPacket.Data))
if netPacket.ProtoId <= 0 && netPacket.DataSize < 0 {
return errors.New("send packet illegal")
}
if netPacket.DataSize > maxPacketDataSize {
return errors.New(fmt.Sprintf("send NetPacket, Data size over max size, data size :%d, max size: %d, protoId:%d",
netPacket.DataSize, maxPacketDataSize, protoId))
}
s.sendChan <- &netPacket
return nil
}
func (s *UdpServerSession) GetSid() SessionIdType {
return s.sid
}