/
server.go
128 lines (116 loc) · 3.36 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package comet
import (
"context"
"math/rand"
"time"
logic "github.com/youminxue/euler/api/logic/grpc"
"github.com/youminxue/euler/internal/comet/conf"
log "github.com/golang/glog"
"github.com/zhenjl/cityhash"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/keepalive"
)
const (
minServerHeartbeat = time.Minute * 10
maxServerHeartbeat = time.Minute * 30
// grpc options
grpcInitialWindowSize = 1 << 24
grpcInitialConnWindowSize = 1 << 24
grpcMaxSendMsgSize = 1 << 24
grpcMaxCallMsgSize = 1 << 24
grpcKeepAliveTime = time.Second * 10
grpcKeepAliveTimeout = time.Second * 3
grpcBackoffMaxDelay = time.Second * 3
)
func newLogicClient(c *conf.RPCClient) logic.LogicClient {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(c.Dial))
defer cancel()
conn, err := grpc.DialContext(ctx, "discovery://default/goim.logic",
[]grpc.DialOption{
grpc.WithInsecure(),
grpc.WithInitialWindowSize(grpcInitialWindowSize),
grpc.WithInitialConnWindowSize(grpcInitialConnWindowSize),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(grpcMaxCallMsgSize)),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(grpcMaxSendMsgSize)),
grpc.WithBackoffMaxDelay(grpcBackoffMaxDelay),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: grpcKeepAliveTime,
Timeout: grpcKeepAliveTimeout,
PermitWithoutStream: true,
}),
grpc.WithBalancerName(roundrobin.Name),
}...)
if err != nil {
panic(err)
}
return logic.NewLogicClient(conn)
}
// Server is comet server.
type Server struct {
c *conf.Config
round *Round // accept round store
buckets []*Bucket // subkey bucket
bucketIdx uint32
serverID string
rpcClient logic.LogicClient
}
// NewServer returns a new Server.
func NewServer(c *conf.Config) *Server {
s := &Server{
c: c,
round: NewRound(c),
rpcClient: newLogicClient(c.RPCClient),
}
// init bucket
s.buckets = make([]*Bucket, c.Bucket.Size)
s.bucketIdx = uint32(c.Bucket.Size)
for i := 0; i < c.Bucket.Size; i++ {
s.buckets[i] = NewBucket(c.Bucket)
}
s.serverID = c.Env.Host
go s.onlineproc()
return s
}
// Buckets return all buckets.
func (s *Server) Buckets() []*Bucket {
return s.buckets
}
// Bucket get the bucket by subkey.
func (s *Server) Bucket(subKey string) *Bucket {
idx := cityhash.CityHash32([]byte(subKey), uint32(len(subKey))) % s.bucketIdx
if conf.Conf.Debug {
log.Infof("%s hit channel bucket index: %d use cityhash", subKey, idx)
}
return s.buckets[idx]
}
// RandServerHearbeat rand server heartbeat.
func (s *Server) RandServerHearbeat() time.Duration {
return (minServerHeartbeat + time.Duration(rand.Int63n(int64(maxServerHeartbeat-minServerHeartbeat))))
}
// Close close the server.
func (s *Server) Close() (err error) {
return
}
func (s *Server) onlineproc() {
for {
var (
allRoomsCount map[string]int32
err error
)
roomCount := make(map[string]int32)
for _, bucket := range s.buckets {
for roomID, count := range bucket.RoomsCount() {
roomCount[roomID] += count
}
}
if allRoomsCount, err = s.RenewOnline(context.Background(), s.serverID, roomCount); err != nil {
time.Sleep(time.Second)
continue
}
for _, bucket := range s.buckets {
bucket.UpRoomsCount(allRoomsCount)
}
time.Sleep(time.Second * 10)
}
}