This repository has been archived by the owner on May 11, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
operation.go
executable file
·94 lines (84 loc) · 2.57 KB
/
operation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package comet
import (
"context"
"time"
log "github.com/tsingson/zaplogger"
model "github.com/tsingson/ex-goim/api/comet/grpc"
logic "github.com/tsingson/ex-goim/api/logic/grpc"
"github.com/tsingson/ex-goim/pkg/strings"
"google.golang.org/grpc"
"google.golang.org/grpc/encoding/gzip"
)
// Connect connected a connection.
func (s *Server) Connect(c context.Context, p *model.Proto, cookie string) (mid int64, key, rid string, accepts []int32, heartbeat time.Duration, err error) {
reply, err := s.rpcClient.Connect(c, &logic.ConnectReq{
Server: s.serverID,
Cookie: cookie,
Token: p.Body,
})
if err != nil {
return
}
return reply.Mid, reply.Key, reply.RoomID, reply.Accepts, time.Duration(reply.Heartbeat), nil
}
// Disconnect disconnected a connection.
func (s *Server) Disconnect(c context.Context, mid int64, key string) (err error) {
_, err = s.rpcClient.Disconnect(context.Background(), &logic.DisconnectReq{
Server: s.serverID,
Mid: mid,
Key: key,
})
return
}
// Heartbeat heartbeat a connection session.
func (s *Server) Heartbeat(ctx context.Context, mid int64, key string) (err error) {
_, err = s.rpcClient.Heartbeat(ctx, &logic.HeartbeatReq{
Server: s.serverID,
Mid: mid,
Key: key,
})
return
}
// RenewOnline renew room online.
func (s *Server) RenewOnline(ctx context.Context, serverID string, rommCount map[string]int32) (allRoom map[string]int32, err error) {
reply, err := s.rpcClient.RenewOnline(ctx, &logic.OnlineReq{
Server: s.serverID,
RoomCount: rommCount,
}, grpc.UseCompressor(gzip.Name))
if err != nil {
return
}
return reply.AllRoomCount, nil
}
// Receive receive a message.
func (s *Server) Receive(ctx context.Context, mid int64, p *model.Proto) (err error) {
_, err = s.rpcClient.Receive(ctx, &logic.ReceiveReq{Mid: mid, Proto: p})
return
}
// Operate operate.
func (s *Server) Operate(ctx context.Context, p *model.Proto, ch *Channel, b *Bucket) error {
switch p.Op {
case model.OpChangeRoom:
if err := b.ChangeRoom(string(p.Body), ch); err != nil {
log.Errorf("b.ChangeRoom(%s) error(%v)", p.Body, err)
}
p.Op = model.OpChangeRoomReply
case model.OpSub:
if ops, err := strings.SplitInt32s(string(p.Body), ","); err == nil {
ch.Watch(ops...)
}
p.Op = model.OpSubReply
case model.OpUnsub:
if ops, err := strings.SplitInt32s(string(p.Body), ","); err == nil {
ch.UnWatch(ops...)
}
p.Op = model.OpUnsubReply
default:
// TODO ack ok&failed
if err := s.Receive(ctx, ch.Mid, p); err != nil {
log.Errorf("s.Report(%d) op:%d error(%v)", ch.Mid, p.Op, err)
}
p.Body = nil
}
return nil
}