forked from TarsCloud/TarsGo
-
Notifications
You must be signed in to change notification settings - Fork 4
/
tarsprotocol.go
105 lines (95 loc) · 2.97 KB
/
tarsprotocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package tars
import (
"bytes"
"context"
"encoding/binary"
"time"
"tars/util/current"
"tars/protocol/codec"
"tars/protocol/res/basef"
"tars/protocol/res/requestf"
)
type dispatch interface {
Dispatch(context.Context, interface{}, *requestf.RequestPacket, *requestf.ResponsePacket, bool) error
}
//TarsProtocol is struct for dispatch with tars protocol.
type TarsProtocol struct {
dispatcher dispatch
serverImp interface{}
withContext bool
}
//NewTarsProtocol return a Tarsprotocol with dipatcher and implement interface.
//withContext explain using context or not.
func NewTarsProtocol(dispatcher dispatch, imp interface{}, withContext bool) *TarsProtocol {
s := &TarsProtocol{dispatcher: dispatcher, serverImp: imp, withContext: withContext}
return s
}
//Invoke puts the request as []byte and call the dispather, and then return the response as []byte.
func (s *TarsProtocol) Invoke(ctx context.Context, req []byte) (rsp []byte) {
defer checkPanic()
reqPackage := requestf.RequestPacket{}
rspPackage := requestf.ResponsePacket{}
if len(req) <= 4 {
TLOG.Error("len(req)=[%d]", len(req))
return nil
}
req = req[4:]
is := codec.NewReader(req)
reqPackage.ReadFrom(is)
TLOG.Debug("invoke:", reqPackage.IRequestId)
if reqPackage.CPacketType == basef.TARSONEWAY {
defer func() func() {
beginTime := time.Now().UnixNano() / 1000000
return func() {
endTime := time.Now().UnixNano() / 1000000
ReportStatFromServer(reqPackage.SFuncName, "one_way_client", rspPackage.IRet, endTime-beginTime)
}
}()()
}
var err error
if s.withContext {
ok := current.SetRequestStatus(ctx, reqPackage.Status)
if !ok {
TLOG.Error("Set reqeust status in context fail!")
}
ok = current.SetRequestContext(ctx, reqPackage.Context)
if !ok {
TLOG.Error("Set request context in context fail!")
}
}
if allFilters.sf != nil {
err = allFilters.sf(ctx, s.dispatcher.Dispatch, s.serverImp, &reqPackage, &rspPackage, s.withContext)
} else {
err = s.dispatcher.Dispatch(ctx, s.serverImp, &reqPackage, &rspPackage, s.withContext)
}
if err != nil {
rspPackage.IVersion = basef.TARSVERSION
rspPackage.CPacketType = basef.TARSNORMAL
rspPackage.IRequestId = reqPackage.IRequestId
rspPackage.IRet = 1
rspPackage.SResultDesc = err.Error()
}
return s.rsp2Byte(&rspPackage)
}
func (s *TarsProtocol) rsp2Byte(rsp *requestf.ResponsePacket) []byte {
os := codec.NewBuffer()
rsp.WriteTo(os)
bs := os.ToBytes()
sbuf := bytes.NewBuffer(nil)
sbuf.Write(make([]byte, 4))
sbuf.Write(bs)
len := sbuf.Len()
binary.BigEndian.PutUint32(sbuf.Bytes(), uint32(len))
return sbuf.Bytes()
}
//ParsePackage parse the []byte according to the tars protocol.
func (s *TarsProtocol) ParsePackage(buff []byte) (int, int) {
return TarsRequest(buff)
}
//InvokeTimeout indicates how to deal with timeout.
func (s *TarsProtocol) InvokeTimeout(pkg []byte) []byte {
rspPackage := requestf.ResponsePacket{}
rspPackage.IRet = 1
rspPackage.SResultDesc = "server invoke timeout"
return s.rsp2Byte(&rspPackage)
}