-
Notifications
You must be signed in to change notification settings - Fork 0
/
ringpop-ping.go
122 lines (106 loc) · 4.41 KB
/
ringpop-ping.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// @generated Code generated by thrift-gen. Do not modify.
package ping
import (
"errors"
"fmt"
"github.com/uber/ringpop-go"
"github.com/uber/ringpop-go/forward"
"github.com/uber/ringpop-go/router"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
)
type RingpopPingPongServiceAdapter struct {
impl TChanPingPongService
ringpop ringpop.Interface
ch *tchannel.Channel
config PingPongServiceConfiguration
router router.Router
}
// PingPongServiceConfiguration contains the forwarding configuration for the PingPongService service. It has a field for every endpoint defined in the service. In this field the endpoint specific forward configuration can be stored. Populating these fields is optional, default behaviour is to call the service implementation locally to the process where the call came in.
type PingPongServiceConfiguration struct {
// Ping holds the forwarding configuration for the Ping endpoint defined in the service
Ping *PingPongServicePingConfiguration
}
func (c *PingPongServiceConfiguration) validate() error {
if c.Ping != nil {
if c.Ping.Key == nil {
return errors.New("configuration for endpoint Ping is missing a Key function")
}
}
return nil
}
// NewRingpopPingPongServiceAdapter creates an implementation of the TChanPingPongService interface. This specific implementation will use to configuration provided during construction to deterministically route calls to nodes from a ringpop cluster. The channel should be the channel on which the service exposes its endpoints. Forwarded calls, calls to unconfigured endpoints and calls that already were executed on the right machine will be passed on the the implementation passed in during construction.
//
// Example usage:
// import "github.com/uber/tchannel-go/thrift"
//
// var server thrift.Server
// server = ...
//
// var handler TChanPingPongService
// handler = &YourImplementation{}
//
// adapter, _ := NewRingpopPingPongServiceAdapter(handler, ringpop, channel,
// PingPongServiceConfiguration{
// Ping: &PingPongServicePingConfiguration: {
// Key: func(ctx thrift.Context, request *Ping) (shardKey string, err error) {
// return "calculated-shard-key", nil
// },
// },
// },
// )
// server.Register(NewTChanPingPongServiceServer(adapter))
func NewRingpopPingPongServiceAdapter(
impl TChanPingPongService,
rp ringpop.Interface,
ch *tchannel.Channel,
config PingPongServiceConfiguration,
) (TChanPingPongService, error) {
err := config.validate()
if err != nil {
return nil, err
}
adapter := &RingpopPingPongServiceAdapter{
impl: impl,
ringpop: rp,
ch: ch,
config: config,
}
// create ringpop router for routing based on ring membership
adapter.router = router.New(rp, adapter, ch)
return adapter, nil
}
// GetLocalClient satisfies the ClientFactory interface of ringpop-go/router
func (a *RingpopPingPongServiceAdapter) GetLocalClient() interface{} {
return a.impl
}
// MakeRemoteClient satisfies the ClientFactory interface of ringpop-go/router
func (a *RingpopPingPongServiceAdapter) MakeRemoteClient(client thrift.TChanClient) interface{} {
return NewTChanPingPongServiceClient(client)
}
// PingPongServicePingConfiguration contains the configuration on how to route calls to the thrift endpoint PingPongService::Ping.
type PingPongServicePingConfiguration struct {
// Key is a closure that generates a routable key based on the parameters of the incomming request.
Key func(ctx thrift.Context, request *Ping) (string, error)
}
// Ping satisfies the TChanPingPongService interface. This function uses the configuration for Ping to determine the host to execute the call on. When it decides the call needs to be executed in the current process it will forward the invocation to its local implementation.
func (a *RingpopPingPongServiceAdapter) Ping(ctx thrift.Context, request *Ping) (r *Pong, err error) {
// check if the function should be called locally
if a.config.Ping == nil || forward.DeleteForwardedHeader(ctx) {
return a.impl.Ping(ctx, request)
}
// find the key to shard on
ringpopKey, err := a.config.Ping.Key(ctx, request)
if err != nil {
return r, fmt.Errorf("could not get key: %q", err)
}
clientInterface, isRemote, err := a.router.GetClient(ringpopKey)
if err != nil {
return r, err
}
client := clientInterface.(TChanPingPongService)
if isRemote {
ctx = forward.SetForwardedHeader(ctx, []string{ringpopKey})
}
return client.Ping(ctx, request)
}