-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
69 lines (62 loc) · 1.6 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package harness
import (
"context"
"fmt"
"github.com/panyam/slicer/clients"
"github.com/panyam/slicer/cmd/echosvc"
"github.com/panyam/slicer/protos"
"google.golang.org/grpc"
"io"
"log"
"net"
"time"
)
type Producer struct {
Addr string
ControlAddr string
Prefix string
grpcServer *grpc.Server
Logger *log.Logger
}
func NewProducer(prefix string, addr string, control_addr string, logfile io.Writer) *Producer {
out := Producer{
Prefix: prefix,
Addr: addr,
ControlAddr: control_addr,
grpcServer: grpc.NewServer(
// grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
// grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
),
Logger: log.New(logfile, fmt.Sprintf("PROD[%s] ", addr), log.Ldate|log.Ltime|log.Lshortfile),
}
return &out
}
func (p *Producer) Start() {
clientMgr := clients.NewStaticClientMgr(p.ControlAddr, protos.NewControlServiceClient)
ctrlSvcClient, err := clientMgr.GetClient("")
p.Logger.Println("CC, E: ", ctrlSvcClient, err)
if err != nil {
panic(err)
}
// do the ping
go func() {
t := time.NewTicker(time.Second * 5)
for {
<-t.C
ctrlSvcClient.Client.PingTarget(context.Background(), &protos.PingTargetRequest{
Address: p.Addr,
})
}
}()
echosvc.RegisterEchoServiceServer(p.grpcServer, echosvc.NewEchoService(p.Prefix))
// grpc_prometheus.Register(grpcServer)
p.Logger.Printf("Initializing Echo Server on %s", p.Addr)
lis, err := net.Listen("tcp", p.Addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
p.grpcServer.Serve(lis)
}
func (p *Producer) Stop() {
p.grpcServer.Stop()
}