Skip to content

Commit

Permalink
Create a subchannel with dedicated hostport per deputy request. (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
herainman committed Jul 20, 2018
1 parent ae4652d commit aaa7011
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 78 deletions.
8 changes: 5 additions & 3 deletions codegen/template_bundle/template_files.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions codegen/templates/tchannel_endpoint.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ func (h *{{$handlerName}}) redirectToDeputy(
"{{.ThriftService}}::{{.Name}}": "{{$methodName}}",
}

h.Deps.Default.Channel.GetSubChannel(serviceName, tchannel.Isolated)
sub := h.Deps.Default.Channel.GetSubChannel(serviceName, tchannel.Isolated)
sub.Peers().Add(hostPort)
client := zanzibar.NewTChannelClient(
h.Deps.Default.Channel,
h.Deps.Default.Logger,
Expand All @@ -233,7 +234,8 @@ func (h *{{$handlerName}}) redirectToDeputy(
},
)

success, respHeaders, err := client.CallToHostPort(ctx, "{{.ThriftService}}", "{{$methodName}}", hostPort, reqHeaders, req, res, false)
success, respHeaders, err := client.Call(ctx, "{{.ThriftService}}", "{{$methodName}}", reqHeaders, req, res)
sub.Peers().Remove(hostPort)
return success, res, respHeaders, err
}
{{end -}}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 6 additions & 70 deletions runtime/tchannel_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
netContext "golang.org/x/net/context"
"sync"
)

// TChannelClientOption is used when creating a new TChannelClient
Expand Down Expand Up @@ -61,8 +60,6 @@ type TChannelClient struct {
ch *tchannel.Channel
sc *tchannel.SubChannel
scAlt *tchannel.SubChannel
selectedPeers map[string]struct{}
mutex sync.RWMutex
serviceName string
ClientID string
methodNames map[string]string
Expand Down Expand Up @@ -118,7 +115,6 @@ func NewTChannelClient(
client := &TChannelClient{
ch: ch,
sc: ch.GetSubChannel(opt.ServiceName),
selectedPeers: map[string]struct{}{},
serviceName: opt.ServiceName,
ClientID: opt.ClientID,
methodNames: opt.MethodNames,
Expand Down Expand Up @@ -154,7 +150,7 @@ func (c *TChannelClient) Call(
metrics: c.metrics[serviceMethod],
}

return c.call(ctx, call, reqHeaders, req, resp, false, "")
return c.call(ctx, call, reqHeaders, req, resp, false)
}

// CallThruAltChannel makes a RPC call using a configured alternate channel
Expand All @@ -175,29 +171,7 @@ func (c *TChannelClient) CallThruAltChannel(
metrics: c.metrics[serviceMethod],
}

return c.call(ctx, call, reqHeaders, req, resp, true, "")
}

// CallToHostPort makes a RPC call using a dedicated hostport
func (c *TChannelClient) CallToHostPort(
ctx context.Context,
thriftService, methodName, hostPort string,
reqHeaders map[string]string,
req, resp RWTStruct,
useAltSubchannel bool,
) (success bool, resHeaders map[string]string, err error) {
serviceMethod := thriftService + "::" + methodName

call := &tchannelOutboundCall{
client: c,
methodName: c.methodNames[serviceMethod],
serviceMethod: serviceMethod,
reqHeaders: reqHeaders,
logger: c.Loggers[serviceMethod],
metrics: c.metrics[serviceMethod],
}

return c.call(ctx, call, reqHeaders, req, resp, useAltSubchannel, hostPort)
return c.call(ctx, call, reqHeaders, req, resp, true)
}

func (c *TChannelClient) call(
Expand All @@ -206,7 +180,6 @@ func (c *TChannelClient) call(
reqHeaders map[string]string,
req, resp RWTStruct,
useAltSubchannel bool,
hostport string,
) (success bool, resHeaders map[string]string, err error) {
defer func() { call.finish(err) }()
call.start()
Expand Down Expand Up @@ -238,47 +211,10 @@ func (c *TChannelClient) call(
sc = c.scAlt
}

if hostport != "" {
var exist bool
c.mutex.Lock()
// if hostport is NOT in original peerlist, add it to pre-selected peers to avoid being picking up by subchannel
peers := sc.Peers().Copy()
if _, exist = peers[hostport]; !exist {
c.selectedPeers[hostport] = struct{}{}
}

p := sc.Peers().Add(hostport)
call.call, cerr = p.BeginCall(ctx, sc.ServiceName(), call.serviceMethod, &tchannel.CallOptions{
Format: tchannel.Thrift,
RequestState: rs,
})

defer func() {
// if hostport is NOT in original peerlist, this hostport is called only by this request
if !exist {
sc.Peers().Remove(hostport)
delete(c.selectedPeers, hostport)
}

c.mutex.Unlock()
}()
} else {
if rs.SelectedPeers == nil {
rs.SelectedPeers = map[string]struct{}{}
}

c.mutex.RLock()
for k, v := range c.selectedPeers {
rs.SelectedPeers[k] = v
}

c.mutex.RUnlock()
call.call, cerr = sc.BeginCall(ctx, call.serviceMethod, &tchannel.CallOptions{
Format: tchannel.Thrift,
RequestState: rs,
})
}

call.call, cerr = sc.BeginCall(ctx, call.serviceMethod, &tchannel.CallOptions{
Format: tchannel.Thrift,
RequestState: rs,
})
if cerr != nil {
LogErrorWarnTimeout(call.logger, err, "Could not begin outbound request")
return errors.Wrapf(
Expand Down
2 changes: 1 addition & 1 deletion runtime/tchannel_client_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,5 @@ func (r *RawTChannelClient) Call(
call.metrics = r.tc.metrics[serviceMethod]
}

return r.tc.call(ctx, call, reqHeaders, req, resp, false, "")
return r.tc.call(ctx, call, reqHeaders, req, resp, false)
}

0 comments on commit aaa7011

Please sign in to comment.