-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
128 lines (119 loc) · 2.85 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package crpc
import (
"sync"
"github.com/ndsky1003/crpc/codec"
"github.com/ndsky1003/crpc/header"
"github.com/ndsky1003/crpc/header/headertype"
"github.com/sirupsen/logrus"
)
//-----------------------------service----------------------------
type service struct {
name string
done chan struct{}
server *server
codec codec.Codec
mutex sync.Mutex //读是单线程,写加锁
}
func newService(server *server, codec codec.Codec) *service {
s := &service{
server: server,
codec: codec,
done: make(chan struct{}),
}
return s
}
func (this *service) serve() {
if this == nil {
return
}
h, err := this.codec.ReadHeader()
if err != nil {
h.Release()
this.codec.Close()
logrus.Errorf("first frame header is error:%+v", err)
return
}
if h.Type != headertype.Verify {
h.Release()
this.codec.Close()
logrus.Error("first frame header is error")
return
}
var req verify_req
if err = this.codec.ReadBody(&req); err != nil {
h.Release()
logrus.Errorf("first frame body is error:%v", err)
this.codec.Close()
return
}
if req.Secret != this.server.Secret {
h.Release()
logrus.Errorf("verify is error")
this.codec.Close()
return
}
this.name = req.Name
if err = this.server.addService(this.name, this); err != nil {
logrus.Errorf("add map is error:%v", err)
h.Release()
this.codec.Close()
return
}
this.mutex.Lock()
if err = this.codec.Write(h, verify_res{Success: true}); err != nil {
h.Release()
logrus.Errorf("write verify res is err :%v", err)
this.codec.Close()
return
}
h.Release()
this.mutex.Unlock()
for err == nil {
h, e := this.codec.ReadHeader()
if e != nil {
err = e
continue
}
//logrus.Infof("header:%+v", h)
var data []byte
if err = this.codec.ReadBodyRawData(&data); err != nil {
h.Release()
continue
}
//logrus.Infof("data:%+v", data)
switch h.Type {
case headertype.Ping:
h.Type = headertype.Pong
go this.WriteRawData(h, data)
case headertype.Req, headertype.Chunks, headertype.Msg: //forward
if e := this.server.WriteRawData(h.ToService, h, data); e != nil {
logrus.Error(e)
h.Type = headertype.Reply_Error
go this.Write(h, e.Error())
}
case headertype.Reply_Success, headertype.Reply_Error: //back forward
if e := this.server.WriteRawData(h.FromService, h, data); e != nil {
logrus.Error(e)
}
default: //pong
}
}
this.close()
logrus.Errorf("service:%s is die,err:%v\n", this.name, err)
}
func (this *service) close() error {
this.server.removeService(this.name)
return this.codec.Close()
}
func (this *service) WriteRawData(h *header.Header, data []byte) error {
defer h.Release()
this.mutex.Lock()
defer this.mutex.Unlock()
return this.codec.WriteRawData(h, data)
}
func (this *service) Write(h *header.Header, v any) error {
defer h.Release()
this.mutex.Lock()
defer this.mutex.Unlock()
return this.codec.Write(h, v)
}