/
rpcxClient.go
67 lines (59 loc) · 1.67 KB
/
rpcxClient.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package rpc
import (
"context"
"fmt"
"strings"
"github.com/cockroachdb/errors"
cmap "github.com/orcaman/concurrent-map"
"github.com/rpcxio/libkv/store"
"github.com/smallnest/rpcx/client"
)
type (
XClientPool struct {
pPrefix string
clients cmap.ConcurrentMap
servers []string
}
)
func NewXClientPool(pPrefix string, servers []string) (*XClientPool, error) {
return &XClientPool{
pPrefix: pPrefix,
clients: cmap.New(),
servers: servers,
}, nil
}
func (s *XClientPool) G(pSuffix string, service string) (client.XClient, error) {
if c, ok := s.clients.Get(s.key(pSuffix, service)); ok {
return c.(client.XClient), nil
}
d, err := client.NewZookeeperDiscovery(s.pPrefix+"_"+pSuffix, service, s.servers, &store.Config{
PersistConnection: true,
})
if err != nil {
return nil, err
}
xclient := client.NewXClient(service, client.Failover, client.RandomSelect, d, client.DefaultOption)
s.clients.Set(s.key(pSuffix, service), xclient)
return xclient, nil
}
// Call serviceMethod: {包名后缀}_{服务名}_{成员函数名} 比如: activity_TaskService_Draw
func (s *XClientPool) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error {
results := strings.Split(serviceMethod, "_")
if len(results) != 3 {
return errors.New("serviceMethod split must be 3")
}
client, err := s.G(results[0], results[1])
if err != nil {
return err
}
return client.Call(ctx, results[2], args, reply)
}
func (s *XClientPool) Must(c client.XClient, err error) client.XClient {
if err != nil {
panic(err)
}
return c
}
func (s *XClientPool) key(pSuffix string, service string) string {
return fmt.Sprintf("%s_%s_%s", s.pPrefix, pSuffix, service)
}