forked from go-chassis/go-chassis
-
Notifications
You must be signed in to change notification settings - Fork 1
/
highway_client.go
executable file
·149 lines (123 loc) · 3.07 KB
/
highway_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package tcp
import (
"errors"
"fmt"
"net/url"
"runtime"
"sync"
"github.com/ServiceComb/go-chassis/core/client"
"github.com/ServiceComb/go-chassis/core/codec"
"github.com/ServiceComb/go-chassis/core/config"
"github.com/ServiceComb/go-chassis/core/lager"
microClient "github.com/ServiceComb/go-chassis/third_party/forked/go-micro/client"
"golang.org/x/net/context"
)
const (
//Name is a variable of type string
Name = "highway"
)
var requestID int
//初始状态时未连接的
//Go SDK 没有老版本问题 默认本地和对端都是支持新的编码方式的
var localSupportLogin = true
type highwayClient struct {
once sync.Once
opts microClient.Options
reqMutex sync.Mutex // protects following
}
func (c *highwayClient) Init(opts ...microClient.Option) error {
for _, o := range opts {
o(&c.opts)
}
return nil
}
func (c *highwayClient) NewRequest(service, schemaID, operationID string, arg interface{}, reqOpts ...microClient.RequestOption) *microClient.Request {
var opts microClient.RequestOptions
for _, o := range reqOpts {
o(&opts)
}
i := µClient.Request{
MicroServiceName: service,
Struct: schemaID,
Method: operationID,
Arg: arg,
}
//计算请求Id
i.ID = requestID
requestID++
if requestID >= ((1 << 31) - 2) {
requestID = 0
}
return i
}
//NewHighwayClient is a function
func NewHighwayClient(options ...microClient.Option) microClient.Client {
opts := microClient.Options{
PoolTTL: microClient.DefaultPoolTTL,
}
for _, o := range options {
o(&opts)
}
if opts.Codecs == nil {
opts.Codecs = codec.GetCodecMap()
}
if len(opts.ContentType) == 0 {
//TODO take effect of that option
opts.ContentType = "application/protobuf"
}
rc := &highwayClient{
once: sync.Once{},
opts: opts,
}
c := microClient.Client(rc)
return c
}
func (c *highwayClient) String() string {
return "highway_client"
}
func (c *highwayClient) Options() microClient.Options {
return c.opts
}
func (c *highwayClient) Call(ctx context.Context, addr string, req *microClient.Request, rsp interface{}, opts ...microClient.CallOption) error {
address := "highway://" + addr
u, err := url.Parse(address)
if err != nil {
lager.Logger.Errorf(err, "highway get host failed")
return err
}
//check worker number in configuration
workerNum := config.GlobalDefinition.Cse.Protocols["highway"].WorkerNumber
if workerNum == 0 {
workerNum = runtime.NumCPU() * 4
}
//check for the existence of workers if not exist create workers
err = jobSchdlr.createWorkerSchedulers(addr, workerNum, c, u.Host)
if err != nil {
return err
}
errCh := make(chan error)
j := &job{
req: req,
resp: rsp,
err: errCh,
ctx: ctx,
}
//schedule the job to workers
scheduleErr := jobSchdlr.scheduleJob(addr, j)
if scheduleErr != nil {
return scheduleErr
}
select {
case err := <-errCh:
return err
case <-ctx.Done():
err = ctx.Err()
e := fmt.Sprintf("request timeout: %v", ctx.Err())
return errors.New(e)
}
//return nil
}
//TODO send(requestHeader)
func init() {
client.InstallPlugin(Name, NewHighwayClient)
}