forked from grpc-up-and-running/samples
/
main.go
160 lines (135 loc) · 4.56 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package main
import (
"context"
"github.com/golang/protobuf/ptypes/wrappers"
wrapper "github.com/golang/protobuf/ptypes/wrappers"
pb "github.com/grpc-up-and-running/samples/ch05/interceptors/order-service/go/order-service-gen"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"io"
"log"
"net"
"strings"
"time"
)
const (
port = ":50051"
orderBatchSize = 3
)
var orderMap = make(map[string]pb.Order)
type server struct {
orderMap map[string]*pb.Order
}
// Simple RPC
func (s *server) AddOrder(ctx context.Context, orderReq *pb.Order) (*wrappers.StringValue, error) {
orderMap[orderReq.Id] = *orderReq
sleepDuration := 5
log.Println("Sleeping for :", sleepDuration, "s")
time.Sleep(time.Duration(sleepDuration) * time.Second)
if ctx.Err() == context.DeadlineExceeded {
log.Printf("RPC has reached deadline exceeded state : %s", ctx.Err())
return nil, ctx.Err()
}
log.Println("Order : ", orderReq.Id, " -> Added")
return &wrapper.StringValue{Value: "Order Added: " + orderReq.Id}, nil
}
// Simple RPC
func (s *server) GetOrder(ctx context.Context, orderId *wrapper.StringValue) (*pb.Order, error) {
ord := orderMap[orderId.Value]
return &ord, nil
}
// Server-side Streaming RPC
func (s *server) SearchOrders(searchQuery *wrappers.StringValue, stream pb.OrderManagement_SearchOrdersServer) error {
for key, order := range orderMap {
for _, itemStr := range order.Items {
if strings.Contains(itemStr, searchQuery.Value) {
// Send the matching orders in a stream
log.Print("Matching Order Found : " + key, " -> Writing Order to the stream ... ")
stream.Send(&order)
break
}
}
}
return nil
}
// Client-side Streaming RPC
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
ordersStr := "Updated Order IDs : "
for {
order, err := stream.Recv()
if err == io.EOF {
// Finished reading the order stream.
return stream.SendAndClose(&wrapper.StringValue{Value: "Orders processed " + ordersStr})
}
// Update order
orderMap[order.Id] = *order
log.Printf("Order ID ", order.Id, ": Updated")
ordersStr += order.Id + ", "
}
}
// Bi-directional Streaming RPC
func (s *server) ProcessOrders(stream pb.OrderManagement_ProcessOrdersServer) error {
batchMarker := 1
var combinedShipmentMap = make(map[string]pb.CombinedShipment)
for {
orderId, err := stream.Recv()
log.Println("Reading Proc order ... ", orderId)
if err == io.EOF {
// Client has sent all the messages
// Send remaining shipments
log.Println("EOF ", orderId)
for _, comb := range combinedShipmentMap {
stream.Send(&comb)
}
return nil
}
if err != nil {
log.Println(err)
return err
}
destination := orderMap[orderId.GetValue()].Destination
shipment, found := combinedShipmentMap[destination]
if found {
ord := orderMap[orderId.GetValue()]
shipment.OrdersList = append(shipment.OrdersList, &ord)
combinedShipmentMap[destination] = shipment
} else {
comShip := pb.CombinedShipment{Id: "cmb - " + (orderMap[orderId.GetValue()].Destination), Status: "Processed!", }
ord := orderMap[orderId.GetValue()]
comShip.OrdersList = append(shipment.OrdersList, &ord)
combinedShipmentMap[destination] = comShip
log.Print(len(comShip.OrdersList), comShip.GetId())
}
if batchMarker == orderBatchSize {
for _, comb := range combinedShipmentMap {
log.Print("Shipping : " , comb.Id, " -> ", len(comb.OrdersList))
stream.Send(&comb)
}
batchMarker = 0
combinedShipmentMap = make(map[string]pb.CombinedShipment)
} else {
batchMarker++
}
}
}
func main() {
initSampleData()
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterOrderManagementServer(s, &server{})
// Register reflection service on gRPC server.
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func initSampleData() {
orderMap["102"] = pb.Order{Id: "102", Items: []string{"Google Pixel 3A", "Mac Book Pro"}, Destination: "Mountain View, CA", Price: 1800.00}
orderMap["103"] = pb.Order{Id: "103", Items: []string{"Apple Watch S4"}, Destination: "San Jose, CA", Price: 400.00}
orderMap["104"] = pb.Order{Id: "104", Items: []string{"Google Home Mini", "Google Nest Hub"}, Destination: "Mountain View, CA", Price: 400.00}
orderMap["105"] = pb.Order{Id: "105", Items: []string{"Amazon Echo"}, Destination: "San Jose, CA", Price: 30.00}
orderMap["106"] = pb.Order{Id: "106", Items: []string{"Amazon Echo", "Apple iPhone XS"}, Destination: "Mountain View, CA", Price: 30.00}
}