Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Commit

Permalink
Add support for rd transport header.
Browse files Browse the repository at this point in the history
Closes #324.
  • Loading branch information
abhinav committed Dec 7, 2015
1 parent eb2258c commit 7e9f9c5
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 0 deletions.
3 changes: 3 additions & 0 deletions tchannel/request.py
Expand Up @@ -83,6 +83,7 @@ class TransportHeaders(object):
'claim_at_finish',
'failure_domain',
'retry_flags',
'routing_delegate',
'scheme',
'speculative_exe',
'shard_key',
Expand All @@ -97,6 +98,7 @@ def __init__(self,
scheme=None,
speculative_exe=None,
shard_key=None,
routing_delegate=None,
**kwargs):

if scheme is None:
Expand All @@ -107,6 +109,7 @@ def __init__(self,
self.claim_at_finish = claim_at_finish
self.failure_domain = failure_domain
self.retry_flags = retry_flags
self.routing_delegate = routing_delegate
self.scheme = scheme
self.speculative_exe = speculative_exe
self.shard_key = shard_key
5 changes: 5 additions & 0 deletions tchannel/schemes/json.py
Expand Up @@ -50,6 +50,7 @@ def __call__(
hostport=None,
shard_key=None,
trace=None,
routing_delegate=None,
):
"""Make JSON TChannel Request.
Expand Down Expand Up @@ -86,6 +87,9 @@ def __call__(
:param string hostport:
A 'host:port' value to use when making a request directly to a
TChannel service, bypassing Hyperbahn.
:param routing_delegate:
Name of a service to which the request router should forward the
request instead of the service specified in the call req.
:rtype: Response
"""
Expand All @@ -107,6 +111,7 @@ def __call__(
hostport=hostport,
shard_key=shard_key,
trace=trace,
routing_delegate=routing_delegate,
)

# deserialize
Expand Down
5 changes: 5 additions & 0 deletions tchannel/schemes/raw.py
Expand Up @@ -45,6 +45,7 @@ def __call__(
hostport=None,
shard_key=None,
trace=None,
routing_delegate=None,
):
"""Make a raw TChannel request.
Expand Down Expand Up @@ -84,6 +85,9 @@ def __call__(
:param string hostport:
A 'host:port' value to use when making a request directly to a
TChannel service, bypassing Hyperbahn.
:param routing_delegate:
Name of a service to which the request router should forward the
request instead of the service specified in the call req.
:rtype: Response
"""
Expand All @@ -100,6 +104,7 @@ def __call__(
hostport=hostport,
shard_key=shard_key,
trace=trace,
routing_delegate=routing_delegate,
)

def register(self, endpoint, **kwargs):
Expand Down
6 changes: 6 additions & 0 deletions tchannel/schemes/thrift.py
Expand Up @@ -78,6 +78,7 @@ def __call__(
shard_key=None,
trace=None,
hostport=None,
routing_delegate=None,
):
"""Make a Thrift TChannel request.
Expand All @@ -103,6 +104,10 @@ def __call__(
``tchannel.retry``.
:param string retry_limit:
How many times to retry before
:param routing_delegate:
Name of a service to which the request router should forward the
request instead of the service specified in the call req.
:rtype: Response
"""
if not headers:
Expand Down Expand Up @@ -134,6 +139,7 @@ def __call__(
hostport=hostport or request.hostport,
shard_key=shard_key,
trace=trace,
routing_delegate=routing_delegate,
)

response.headers = serializer.deserialize_header(
Expand Down
3 changes: 3 additions & 0 deletions tchannel/tchannel.py
Expand Up @@ -137,6 +137,7 @@ def call(
timeout=None,
retry_on=None,
retry_limit=None,
routing_delegate=None,
hostport=None,
shard_key=None,
trace=None,
Expand Down Expand Up @@ -184,6 +185,8 @@ def call(
}
if shard_key:
transport_headers[transport.SHARD_KEY] = shard_key
if routing_delegate:
transport_headers[transport.ROUTING_DELEGATE] = routing_delegate

# If we got some parent tracing info we always want to propagate it
# along. Otherwise use the ``trace`` parameter that was passed in. If
Expand Down
2 changes: 2 additions & 0 deletions tchannel/transport.py
Expand Up @@ -27,6 +27,7 @@
CLAIM_AT_FINISH = "caf"
FAILURE_DOMAIN = "fd"
RETRY_FLAGS = "re"
ROUTING_DELEGATE = "rd"
SCHEME = "as"
SHARD_KEY = "sk"
SPECULATIVE_EXE = "se"
Expand All @@ -40,6 +41,7 @@ def to_kwargs(data):
args['claim_at_finish'] = data.get(CLAIM_AT_FINISH)
args['failure_domain'] = data.get(FAILURE_DOMAIN)
args['retry_flags'] = data.get(RETRY_FLAGS)
args['routing_delegate'] = data.get(ROUTING_DELEGATE)
args['scheme'] = data.get(SCHEME)
args['shard_key'] = data.get(SHARD_KEY)
args['speculative_exe'] = data.get(SPECULATIVE_EXE)
Expand Down
48 changes: 48 additions & 0 deletions tests/test_dispatch.py
Expand Up @@ -22,6 +22,7 @@

import pytest

from tchannel import TChannel
from tchannel.errors import FatalProtocolError
from tchannel.messages import CallResponseMessage
from tchannel.serializer.raw import RawSerializer
Expand Down Expand Up @@ -50,3 +51,50 @@ def test_dispatch_unexpected_message():
dispatcher = RequestDispatcher()
with pytest.raises(FatalProtocolError):
dispatcher.handle(CallResponseMessage(), None)


@pytest.mark.gen_test
def test_routing_delegate_is_propagated_raw():
server = TChannel('server')
server.listen()

@server.raw.register('foo')
def handler(request):
assert request.transport.routing_delegate == 'delegate'
return b'success'

client = TChannel('client', known_peers=[server.hostport])
res = yield client.raw('service', 'foo', b'', routing_delegate='delegate')
assert res.body == b'success'


@pytest.mark.gen_test
def test_routing_delegate_is_propagated_json():
server = TChannel('server')
server.listen()

@server.json.register('foo')
def handler(request):
assert request.transport.routing_delegate == 'delegate'
return {'success': True}

client = TChannel('client', known_peers=[server.hostport])
res = yield client.json('service', 'foo', {}, routing_delegate='delegate')
assert res.body == {'success': True}


@pytest.mark.gen_test
def test_routing_delegate_is_propagated_thrift(thrift_module):
server = TChannel('server')
server.listen()

@server.thrift.register(thrift_module.Service)
def healthy(request):
assert request.transport.routing_delegate == 'delegate'
return True

client = TChannel('client', known_peers=[server.hostport])
res = yield client.thrift(
thrift_module.Service.healthy(), routing_delegate='delegate'
)
assert res.body is True

0 comments on commit 7e9f9c5

Please sign in to comment.