This repository has been archived by the owner on Jul 7, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2
/
peer.go
115 lines (106 loc) · 3.5 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package stategate_client_go
import (
"context"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
stategate "github.com/stategate/stategate/gen/grpc/go"
"github.com/stategate/stategate/internal/logger"
"go.uber.org/zap"
"google.golang.org/grpc"
"io"
)
// PeerClient is a stategate PeerService gRPC client
type PeerClient struct {
client stategate.PeerServiceClient
conn *grpc.ClientConn
}
// NewPeerClient creates a new stategate PeerService client
func NewPeerClient(ctx context.Context, target string, opts ...Opt) (*PeerClient, error) {
if target == "" {
return nil, errors.New("empty target")
}
dialopts := []grpc.DialOption{}
var uinterceptors []grpc.UnaryClientInterceptor
var sinterceptors []grpc.StreamClientInterceptor
options := &Options{}
for _, o := range opts {
o(options)
}
if options.creds == nil {
dialopts = append(dialopts, grpc.WithInsecure())
} else {
dialopts = append(dialopts, grpc.WithTransportCredentials(options.creds))
}
uinterceptors = append(uinterceptors, grpc_validator.UnaryClientInterceptor())
if options.metrics {
uinterceptors = append(uinterceptors, grpc_prometheus.UnaryClientInterceptor)
sinterceptors = append(sinterceptors, grpc_prometheus.StreamClientInterceptor)
}
if options.tokenSource != nil {
uinterceptors = append(uinterceptors, unaryAuth(options.tokenSource, options.idtoken))
sinterceptors = append(sinterceptors, streamAuth(options.tokenSource, options.idtoken))
}
if options.logging {
lgger := logger.New(true, zap.Bool("client", true))
uinterceptors = append(uinterceptors, grpc_zap.UnaryClientInterceptor(lgger.Zap()))
sinterceptors = append(sinterceptors, grpc_zap.StreamClientInterceptor(lgger.Zap()))
if options.logPayload {
uinterceptors = append(uinterceptors, grpc_zap.PayloadUnaryClientInterceptor(lgger.Zap(), func(ctx context.Context, fullMethodName string) bool {
return true
}))
sinterceptors = append(sinterceptors, grpc_zap.PayloadStreamClientInterceptor(lgger.Zap(), func(ctx context.Context, fullMethodName string) bool {
return true
}))
}
}
dialopts = append(dialopts,
grpc.WithChainUnaryInterceptor(uinterceptors...),
grpc.WithChainStreamInterceptor(sinterceptors...),
grpc.WithBlock(),
)
conn, err := grpc.DialContext(ctx, target, dialopts...)
if err != nil {
return nil, errors.Wrap(err, "failed to create stategate client")
}
return &PeerClient{
client: stategate.NewPeerServiceClient(conn),
conn: conn,
}, nil
}
// Broadcast broadcasts a message to N subscribers(PeerService clients calling Stream)
func (p PeerClient) Broadcast(ctx context.Context, in *stategate.Message) error {
_, err := p.client.Broadcast(ctx, in)
return err
}
// Stream consumes/streams messages from message producers(PeerService clients calling broadcast)
func (p PeerClient) Stream(ctx context.Context, in *stategate.StreamMessageOpts, fn func(msg *stategate.PeerMessage) bool) error {
stream, err := p.client.Stream(ctx, in)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for {
select {
case <-ctx.Done():
return nil
default:
msg, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
if !fn(msg) {
return nil
}
}
}
}
// Close closes the gRPC client connection
func (c *PeerClient) Close() error {
return c.conn.Close()
}