Skip to content

Commit

Permalink
Merge pull request #215 from uber/weiqing.addCtx
Browse files Browse the repository at this point in the history
Update tchannel version and add a context option
  • Loading branch information
liweiqing authored Nov 21, 2017
2 parents 08d3997 + 2df96dd commit e73b34d
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 9 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ lint.log
/_venv/
README.md.err
tick-cluster.log
.idea/
12 changes: 12 additions & 0 deletions examples/keyvalue/gen-go/keyvalue/tchan-keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func (c *tchanKeyValueServiceClient) Get(ctx thrift.Context, key string) (string
}
success, err := c.client.Call(ctx, c.thriftService, "Get", &args, &resp)
if err == nil && !success {
switch {
default:
err = fmt.Errorf("received no result or unknown exception for Get")
}
}

return resp.GetSuccess(), err
Expand All @@ -57,6 +61,10 @@ func (c *tchanKeyValueServiceClient) GetAll(ctx thrift.Context, keys []string) (
}
success, err := c.client.Call(ctx, c.thriftService, "GetAll", &args, &resp)
if err == nil && !success {
switch {
default:
err = fmt.Errorf("received no result or unknown exception for GetAll")
}
}

return resp.GetSuccess(), err
Expand All @@ -70,6 +78,10 @@ func (c *tchanKeyValueServiceClient) Set(ctx thrift.Context, key string, value s
}
success, err := c.client.Call(ctx, c.thriftService, "Set", &args, &resp)
if err == nil && !success {
switch {
default:
err = fmt.Errorf("received no result or unknown exception for Set")
}
}

return err
Expand Down
4 changes: 4 additions & 0 deletions examples/ping-thrift-gen/gen-go/ping/tchan-ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (c *tchanPingPongServiceClient) Ping(ctx thrift.Context, request *Ping) (*P
}
success, err := c.client.Call(ctx, c.thriftService, "Ping", &args, &resp)
if err == nil && !success {
switch {
default:
err = fmt.Errorf("received no result or unknown exception for Ping")
}
}

return resp.GetSuccess(), err
Expand Down
8 changes: 8 additions & 0 deletions examples/role-labels/gen-go/role/tchan-role.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (c *tchanRoleServiceClient) GetMembers(ctx thrift.Context, role string) ([]
}
success, err := c.client.Call(ctx, c.thriftService, "GetMembers", &args, &resp)
if err == nil && !success {
switch {
default:
err = fmt.Errorf("received no result or unknown exception for GetMembers")
}
}

return resp.GetSuccess(), err
Expand All @@ -56,6 +60,10 @@ func (c *tchanRoleServiceClient) SetRole(ctx thrift.Context, role string) error
}
success, err := c.client.Call(ctx, c.thriftService, "SetRole", &args, &resp)
if err == nil && !success {
switch {
default:
err = fmt.Errorf("received no result or unknown exception for SetRole")
}
}

return err
Expand Down
2 changes: 2 additions & 0 deletions forward/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Sender interface {

// Options for the creation of a forwarder
type Options struct {
Ctx thrift.Context
MaxRetries int
RerouteRetries bool
RetrySchedule []time.Duration
Expand Down Expand Up @@ -80,6 +81,7 @@ func (f *Forwarder) mergeDefaultOptions(opts *Options) *Options {
merged.RetrySchedule = def.RetrySchedule
}
merged.Headers = opts.Headers
merged.Ctx = opts.Ctx

return &merged
}
Expand Down
43 changes: 43 additions & 0 deletions forward/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"golang.org/x/net/context"
)

type ContextKey string

type ForwarderTestSuite struct {
suite.Suite
sender *MockSender
Expand Down Expand Up @@ -77,6 +79,16 @@ func (s *ForwarderTestSuite) registerPong(address string, channel *tchannel.Chan

thriftHandler := &pingpong.MockTChanPingPong{}

// successful request with context
thriftHandler.On("Ping", mock.MatchedBy(
func(c thrift.Context) bool {
return true
}), &pingpong.Ping{
Key: "ctxTest",
}).Return(&pingpong.Pong{
Source: address,
}, nil)

// successful request
thriftHandler.On("Ping", mock.Anything, &pingpong.Ping{
Key: "success",
Expand Down Expand Up @@ -194,6 +206,37 @@ func (s *ForwarderTestSuite) TestForwardThrift() {
s.Equal("correct pinging host", response.Success.Source)
}

func (s *ForwarderTestSuite) TestForwardThriftWithCtxOption() {
dest, err := s.sender.Lookup("reachable")
s.NoError(err)

request := &pingpong.PingPongPingArgs{
Request: &pingpong.Ping{
Key: "ctxTest",
},
}

bytes1, err := SerializeThrift(request)
s.NoError(err, "expected ping to be serialized")


k := ContextKey("key")
ctx := thrift.Wrap(context.WithValue(context.Background(), k, "val"))

res, err := s.forwarder.ForwardRequest(bytes1, dest, "test", "PingPong::Ping", []string{"reachable"},
tchannel.Thrift, &Options{
Ctx: ctx,
})
s.NoError(err, "expected request to be forwarded")

var response pingpong.PingPongPingResult

err = DeserializeThrift(res, &response)
s.NoError(err)

s.Equal("correct pinging host", response.Success.Source)
}

func (s *ForwarderTestSuite) TestForwardThriftErrorResponse() {
dest, err := s.sender.Lookup("reachable")
s.NoError(err)
Expand Down
13 changes: 12 additions & 1 deletion forward/request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/uber/ringpop-go/shared"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/raw"
"github.com/uber/tchannel-go/thrift"
)

// errDestinationsDiverged is an error that is returned from AttemptRetry
Expand Down Expand Up @@ -60,6 +61,7 @@ type requestSender struct {
rerouteRetries bool

headers []byte
ctx thrift.Context

startTime, retryStartTime time.Time

Expand Down Expand Up @@ -90,12 +92,21 @@ func newRequestSender(sender Sender, emitter events.EventEmitter, channel shared
retrySchedule: opts.RetrySchedule,
rerouteRetries: opts.RerouteRetries,
headers: opts.Headers,
ctx: opts.Ctx,
logger: logger,
}
}

func (s *requestSender) Send() (res []byte, err error) {
ctx, cancel := shared.NewTChannelContext(s.timeout)
var cancel context.CancelFunc
var ctx thrift.Context
if s.ctx == nil {
ctx, cancel = shared.NewTChannelContext(s.timeout)
} else {
var c context.Context
c, cancel = context.WithTimeout(s.ctx, s.timeout)
ctx = tchannel.Wrap(c)
}
defer cancel()

var forwardError, applicationError error
Expand Down
26 changes: 18 additions & 8 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import:
- require
- package: github.com/uber-common/bark
- package: github.com/uber/tchannel-go
version: 1.8.0
subpackages:
- json
- raw
Expand Down

0 comments on commit e73b34d

Please sign in to comment.