-
Notifications
You must be signed in to change notification settings - Fork 1
/
ping.go
130 lines (110 loc) · 3.06 KB
/
ping.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package service
import (
"context"
"fmt"
"io"
"net"
"net/http"
"sync"
"time"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
pb "github.com/mchmarny/grpc-lab/pkg/api/v1"
"github.com/mchmarny/grpc-lab/pkg/format"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
// NewPingService creates an instance of the PingService
func NewPingService(list net.Listener) *PingService {
return &PingService{
grpcListener: list,
}
}
// PingService represents the server that responds to pings
type PingService struct {
pb.UnimplementedServiceServer
messageCount int64
lock sync.Mutex
grpcListener net.Listener
}
// Start starts the ping service as a gRPC server
func (s *PingService) Start(ctx context.Context) error {
opts := []grpc.ServerOption{}
grpcServer := grpc.NewServer(opts...)
reflection.Register(grpcServer)
pb.RegisterServiceServer(grpcServer, s)
log.Infof("starting gRPC server at: %s", s.grpcListener.Addr().String())
return grpcServer.Serve(s.grpcListener)
}
// StartHTTP starts the ping service as a HTTP server
func (s *PingService) StartHTTP(ctx context.Context, addr string) error {
lis, err := net.Listen("tcp", addr)
if err != nil {
return errors.Wrapf(err, "error creating listener on %s: %v", addr, err)
}
defer lis.Close()
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()
endpoint := s.grpcListener.Addr().String()
if err := pb.RegisterServiceHandlerFromEndpoint(cancelCtx, mux, endpoint, opts); err != nil {
return errors.Wrap(err, "error registering HTTP handler")
}
log.Infof("starting REST server at %s", lis.Addr().String())
return http.Serve(lis, mux)
}
// Stream stream messages
func (s *PingService) Stream(stream pb.Service_StreamServer) error {
for {
err := contextError(stream.Context())
if err != nil {
return err
}
req, err := stream.Recv()
if err == io.EOF {
log.Debug("no more data")
break
}
if err != nil {
return errors.Wrap(err, "error receiving stream")
}
res := s.processReq(req)
err = stream.Send(res)
if err != nil {
return errors.Wrap(err, "error sending stream response")
}
}
return nil
}
// Ping performs ping
func (s *PingService) Ping(ctx context.Context, req *pb.PingRequest) (res *pb.PingResponse, err error) {
if req == nil {
return nil, errors.New("nil request")
}
res = s.processReq(req)
return
}
func (s *PingService) processReq(req *pb.PingRequest) *pb.PingResponse {
log.Infof("%+v", req)
s.lock.Lock()
s.messageCount++
s.lock.Unlock()
return &pb.PingResponse{
MessageID: req.Content.Id,
MessageCount: s.messageCount,
Processed: time.Now().UTC().UnixNano(),
Detail: fmt.Sprintf("Reversed: %s", format.ReverseString(string(req.Content.Data))),
}
}
func contextError(ctx context.Context) error {
switch ctx.Err() {
case context.Canceled:
return errors.New("request is canceled")
case context.DeadlineExceeded:
return errors.New("deadline is exceeded")
default:
return nil
}
}