Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ v0.4.3
- Added on_ready callback to RSocketServer. Called when sender/receiver tasks are ready
- Implement ReactiveX (3.0, 4.0) server side handler. Allows to define RequestHandler directly using ReactiveX
- Added sending_done_event argument to request_channel to allow client to wait until sending to server is complete/canceled
- Breaking Change: Removed RSocketBase class dependency from RequestHandler. It is not longer required as an argument to __init__

v0.4.2
======
Expand Down
4 changes: 2 additions & 2 deletions examples/server_with_fragmenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ async def authenticator(route: str, authentication: Authentication):
raise Exception('Unsupported authentication')


def handler_factory(socket):
return RoutingRequestHandler(socket, router, authenticator)
def handler_factory():
return RoutingRequestHandler(router, authenticator)


def handle_client(reader, writer):
Expand Down
4 changes: 2 additions & 2 deletions examples/server_with_lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ async def single_request_response(payload, composite_metadata):
return create_future(Payload(b'single_response'))


def handler_factory(socket):
return RoutingRequestHandler(socket, router)
def handler_factory():
return RoutingRequestHandler(router)


def handle_client(reader, writer):
Expand Down
4 changes: 2 additions & 2 deletions examples/server_with_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ async def authenticator(route: str, authentication: Authentication):
raise Exception('Unsupported authentication')


def handler_factory(socket):
return RoutingRequestHandler(socket, router, authenticator)
def handler_factory():
return RoutingRequestHandler(router, authenticator)


def handle_client(reader, writer):
Expand Down
6 changes: 3 additions & 3 deletions examples/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_quic_client_server(unused_tcp_port):

@pytest.mark.timeout(30)
def test_client_server_with_routing(unused_tcp_port):
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_with_routing.py', str(unused_tcp_port))
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_with_routing.py', '--port', str(unused_tcp_port))

try:
sleep(2)
Expand Down Expand Up @@ -64,7 +64,7 @@ def test_client_java_server_with_routing_and_fragmentation(unused_tcp_port):

@pytest.mark.timeout(30)
def test_rx_client_server_with_routing(unused_tcp_port):
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_with_routing.py', str(unused_tcp_port))
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_with_routing.py', '--port', str(unused_tcp_port))

try:
sleep(2)
Expand Down Expand Up @@ -136,7 +136,7 @@ def run_java_class(java_class: str, unused_tcp_port: int):


def test_java_client_server(unused_tcp_port):
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_with_routing.py', str(unused_tcp_port))
pid = os.spawnlp(os.P_NOWAIT, 'python3', 'python3', './server_with_routing.py', '--port', str(unused_tcp_port))

try:
sleep(2)
Expand Down
4 changes: 2 additions & 2 deletions performance/performance_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ async def authenticator(route: str, authentication: Authentication):
raise Exception('Unsupported authentication')


def handler_factory(socket):
return RoutingRequestHandler(socket, router, authenticator)
def handler_factory():
return RoutingRequestHandler(router, authenticator)


def client_handler_factory(on_ready=None):
Expand Down
4 changes: 0 additions & 4 deletions rsocket/reactivex/reactivex_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@
from rsocket.logger import logger
from rsocket.payload import Payload
from rsocket.reactivex.reactivex_channel import ReactivexChannel
from rsocket.rsocket import RSocket


class ReactivexHandler:
def __init__(self, socket: RSocket):
super().__init__()
self.socket = socket

@abstractmethod
async def on_setup(self,
Expand Down
10 changes: 4 additions & 6 deletions rsocket/reactivex/reactivex_handler_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,18 @@
from rsocket.reactivex.from_rsocket_publisher import RxSubscriberFromObserver
from rsocket.reactivex.reactivex_handler import ReactivexHandler
from rsocket.request_handler import RequestHandler
from rsocket.rsocket import RSocket


def reactivex_handler_factory(handler_factory: Callable[[RSocket], ReactivexHandler]):
def create_handler(socket: RSocket):
return ReactivexHandlerAdapter(handler_factory(socket), socket)
def reactivex_handler_factory(handler_factory: Callable[[], ReactivexHandler]):
def create_handler():
return ReactivexHandlerAdapter(handler_factory())

return create_handler


class ReactivexHandlerAdapter(RequestHandler):

def __init__(self, delegate: ReactivexHandler, socket: RSocket):
super().__init__(socket)
def __init__(self, delegate: ReactivexHandler):
self.delegate = delegate

async def on_setup(self, data_encoding: bytes, metadata_encoding: bytes, payload: Payload):
Expand Down
4 changes: 0 additions & 4 deletions rsocket/request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

class RequestHandler(metaclass=ABCMeta):

def __init__(self, socket):
super().__init__()
self.socket = socket

@abstractmethod
async def on_setup(self,
data_encoding: bytes,
Expand Down
3 changes: 1 addition & 2 deletions rsocket/routing/routing_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ class RoutingRequestHandler(BaseRequestHandler):
)

def __init__(self,
socket,
router: RequestRouter,
authentication_verifier: Optional[
Callable[[str, Authentication], Coroutine[None, None, None]]] = None):
super().__init__(socket)
super().__init__()
self.router = router
self.authentication_verifier = authentication_verifier
self.data_encoding = None
Expand Down
6 changes: 3 additions & 3 deletions rsocket/rsocket_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def on_next(self, value, is_complete=False):
self._socket.send_lease(value)

def __init__(self,
handler_factory: Callable[['RSocketBase'], RequestHandler] = BaseRequestHandler,
handler_factory: Callable[[], RequestHandler] = BaseRequestHandler,
honor_lease=False,
lease_publisher: Optional[Publisher] = None,
request_queue_size: int = 0,
Expand All @@ -82,7 +82,7 @@ def __init__(self,
self._lease_publisher = lease_publisher
self._sender_task = None
self._receiver_task = None
self._handler = self._handler_factory(self)
self._handler = self._handler_factory()
self._responder_lease = None
self._requester_lease = None
self._is_closing = False
Expand Down Expand Up @@ -150,7 +150,7 @@ def _start_task_if_not_closing(self, task_factory: Callable[[], Coroutine]) -> O
return asyncio.create_task(task_factory())

def set_handler_using_factory(self, handler_factory) -> RequestHandler:
self._handler = handler_factory(self)
self._handler = handler_factory()
return self._handler

def _allocate_stream(self) -> int:
Expand Down
4 changes: 0 additions & 4 deletions rsocket/rx_support/rx_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,9 @@
from rsocket.logger import logger
from rsocket.payload import Payload
from rsocket.rx_support.rx_channel import RxChannel
from rsocket.rsocket import RSocket


class RxHandler:
def __init__(self, socket: RSocket):
super().__init__()
self.socket = socket

@abstractmethod
async def on_setup(self,
Expand Down
10 changes: 4 additions & 6 deletions rsocket/rx_support/rx_handler_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,21 @@
from rsocket.error_codes import ErrorCode
from rsocket.payload import Payload
from rsocket.request_handler import RequestHandler
from rsocket.rsocket import RSocket
from rsocket.rx_support.back_pressure_publisher import BackPressurePublisher
from rsocket.rx_support.from_rsocket_publisher import RxSubscriberFromObserver
from rsocket.rx_support.rx_handler import RxHandler


def rx_handler_factory(handler_factory: Callable[[RSocket], RxHandler]):
def create_handler(socket: RSocket):
return RxHandlerAdapter(handler_factory(socket), socket)
def rx_handler_factory(handler_factory: Callable[[], RxHandler]):
def create_handler():
return RxHandlerAdapter(handler_factory())

return create_handler


class RxHandlerAdapter(RequestHandler):

def __init__(self, delegate: RxHandler, socket: RSocket):
super().__init__(socket)
def __init__(self, delegate: RxHandler):
self.delegate = delegate

async def on_setup(self, data_encoding: bytes, metadata_encoding: bytes, payload: Payload):
Expand Down
7 changes: 3 additions & 4 deletions tests/rsocket/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def assert_no_open_streams(client: RSocketBase, server: RSocketBase):


class IdentifiedHandler(BaseRequestHandler):
def __init__(self, socket, server_id: int, delay=timedelta(0)):
super().__init__(socket)
def __init__(self, server_id: int, delay=timedelta(0)):
self._delay = delay
self._server_id = server_id

Expand All @@ -74,8 +73,8 @@ def __init__(self,
self._server_id = server_id
self._handler_factory = handler_factory

def factory(self, socket) -> BaseRequestHandler:
handler = self._handler_factory(socket, self._server_id, self._delay)
def factory(self) -> BaseRequestHandler:
handler = self._handler_factory(self._server_id, self._delay)
self._on_handler_create(handler)
return handler

Expand Down
6 changes: 2 additions & 4 deletions tests/rsocket/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ async def test_authentication_frame_simple():

async def test_authentication_success_on_setup(lazy_pipe):
class Handler(BaseRequestHandler):
def __init__(self, socket):
super().__init__(socket)
def __init__(self):
self._authenticated = False

async def on_setup(self,
Expand Down Expand Up @@ -100,8 +99,7 @@ async def test_authentication_failure_on_setup(lazy_pipe):
received_error: Optional[tuple] = None

class ServerHandler(BaseRequestHandler):
def __init__(self, socket):
super().__init__(socket)
def __init__(self):
self._authenticated = False

async def on_setup(self,
Expand Down
8 changes: 4 additions & 4 deletions tests/rsocket/test_connection_lost.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@


class ServerHandler(IdentifiedHandler):
def __init__(self, socket, server_id: int, delay=timedelta(0)):
super().__init__(socket, server_id, delay)
def __init__(self, server_id: int, delay=timedelta(0)):
super().__init__(server_id, delay)
self._delay = delay

async def request_response(self, payload: Payload) -> Awaitable[Payload]:
Expand Down Expand Up @@ -285,11 +285,11 @@ async def start_quic_service(waiter: asyncio.Event, container, port: int, genera
is_client=False
)

def handler_factory(*args, **kwargs):
def handler_factory():
return IdentifiedHandlerFactory(
next(index_iterator),
ServerHandler,
delay=timedelta(seconds=1)).factory(*args, **kwargs)
delay=timedelta(seconds=1)).factory()

def on_server_create(server):
container.server = server
Expand Down
15 changes: 7 additions & 8 deletions tests/rsocket/test_fire_and_forget.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@


class FireAndForgetHandler(BaseRequestHandler):
def __init__(self, socket):
super().__init__(socket)
def __init__(self):
self.received = asyncio.Event()
self.received_payload: Optional[Payload] = None

Expand All @@ -20,9 +19,9 @@ async def request_fire_and_forget(self, payload: Payload):
async def test_request_fire_and_forget(lazy_pipe):
handler: Optional[FireAndForgetHandler] = None

def handler_factory(socket):
def handler_factory():
nonlocal handler
handler = FireAndForgetHandler(socket)
handler = FireAndForgetHandler()
return handler

async with lazy_pipe(
Expand All @@ -38,9 +37,9 @@ def handler_factory(socket):
async def test_request_fire_and_forget_wait(lazy_pipe):
handler: Optional[FireAndForgetHandler] = None

def handler_factory(socket):
def handler_factory():
nonlocal handler
handler = FireAndForgetHandler(socket)
handler = FireAndForgetHandler()
return handler

async with lazy_pipe(
Expand All @@ -53,9 +52,9 @@ def handler_factory(socket):
async def test_request_fire_and_forget_awaitable_client(lazy_pipe):
handler: Optional[FireAndForgetHandler] = None

def handler_factory(socket):
def handler_factory():
nonlocal handler
handler = FireAndForgetHandler(socket)
handler = FireAndForgetHandler()
return handler

async with lazy_pipe(
Expand Down
4 changes: 2 additions & 2 deletions tests/rsocket/test_load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def to_response_payload(payload, server_id):


class Handler(IdentifiedHandler):
def __init__(self, socket, server_id: int, delay=timedelta(0)):
super().__init__(socket, server_id, delay)
def __init__(self, server_id: int, delay=timedelta(0)):
super().__init__(server_id, delay)
self.fnf = []
self.metadata = []

Expand Down
15 changes: 7 additions & 8 deletions tests/rsocket/test_metadata_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@


class MetadataPushHandler(BaseRequestHandler):
def __init__(self, socket):
super().__init__(socket)
def __init__(self):
self.received = asyncio.Event()
self.received_payload: Optional[Payload] = None

Expand All @@ -22,9 +21,9 @@ async def on_metadata_push(self, payload: Payload):
async def test_metadata_push(pipe):
handler: Optional[MetadataPushHandler] = None

def handler_factory(socket):
def handler_factory():
nonlocal handler
handler = MetadataPushHandler(socket)
handler = MetadataPushHandler()
return handler

server, client = get_components(pipe)
Expand All @@ -41,9 +40,9 @@ def handler_factory(socket):
async def test_metadata_push_await(pipe):
handler: Optional[MetadataPushHandler] = None

def handler_factory(socket):
def handler_factory():
nonlocal handler
handler = MetadataPushHandler(socket)
handler = MetadataPushHandler()
return handler

server, client = get_components(pipe)
Expand All @@ -55,9 +54,9 @@ def handler_factory(socket):
async def test_metadata_push_awaitable_client(pipe):
handler: Optional[MetadataPushHandler] = None

def handler_factory(socket):
def handler_factory():
nonlocal handler
handler = MetadataPushHandler(socket)
handler = MetadataPushHandler()
return handler

server: RSocketServer = pipe[0]
Expand Down
Loading