Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Changelog
v0.4.6
======
- fire_and_forget now only removes the stream id when the future denoting the frame was sent, is done
- API documentation auto generated at rsocket.readthedocs.io

v0.4.5
======
Expand Down
42 changes: 28 additions & 14 deletions docs/api.rst
Original file line number Diff line number Diff line change
@@ -1,30 +1,44 @@
API Reference
-------------
Core API Reference
==================

Core
----
Server
------

.. automodule:: rsocket.rsocket_server
:members:
:inherited-members:

Client
------

.. automodule:: rsocket.rsocket_client
:members:
:inherited-members:

Helpers
--------
Handler
-------

Requester
~~~~~~~~~
.. automodule:: rsocket.request_handler
:members:

Responder
~~~~~~~~~

Clients
-------
Interfaces
----------

Publisher
~~~~~~~~~

.. automodule:: rsocket.awaitable.awaitable_rsocket
.. automodule:: reactivestreams.publisher
:members:

.. automodule:: rsocket.rx_support.rx_rsocket
Subscriber
~~~~~~~~~~

.. automodule:: reactivestreams.subscriber
:members:

Subscription
~~~~~~~~~~~~

.. automodule:: reactivestreams.subscription
:members:
18 changes: 11 additions & 7 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
# import os
# import sys
# sys.path.insert(0, os.path.abspath('.'))
import os
import sys

sys.path.insert(0, os.path.abspath('..'))


# -- Project information -----------------------------------------------------
import os

project = 'rsocket'
copyright = '2022, gabis@precog.co'
author = 'gabis@precog.co'
project = 'RSocket python'
copyright = '2021, jellofishi@pm.me'
author = 'jellofishi@pm.me'

# The full version, including alpha/beta/rc tags
release = '0.3'
release = '0.4.6'

# -- General configuration ---------------------------------------------------

Expand Down Expand Up @@ -70,5 +71,8 @@

autodoc_default_flags = ['members']
autosummary_generate = True
autodoc_inherit_docstrings = False
autodoc_typehints = 'description'
autodoc_typehints_format = 'short'
autodoc_member_order = 'bysource'
master_doc = 'index'
53 changes: 53 additions & 0 deletions docs/extensions.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
Extensions
==========

Transports
----------

TCP
~~~

.. automodule:: rsocket.transports.tcp
:members:

Websocket
~~~~~~~~~

aiohttp
+++++++

.. automodule:: rsocket.transports.aiohttp_websocket
:members:

quart
+++++

.. automodule:: rsocket.transports.quart_websocket
:members:

quic
~~~~

.. automodule:: rsocket.transports.aioquic_transport
:members:

http3
~~~~~

.. automodule:: rsocket.transports.http3_transport
:members:

Routing
-------

RequestRouter
~~~~~~~~~~~~~

.. automodule:: rsocket.routing.request_router
:members:

RoutingRequestHandler
~~~~~~~~~~~~~~~~~~~~~

.. automodule:: rsocket.routing.routing_request_handler
:members:
8 changes: 8 additions & 0 deletions docs/guide.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Guide
=====

.. autosummary::
:toctree: generated

A detailed getting started guide is available at https://rsocket.io/guides/rsocket-py/tutorial/

14 changes: 9 additions & 5 deletions docs/index.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
.. Dataclasses Serialization documentation master file, created by
sphinx-quickstart on Fri Apr 22 11:01:09 2022.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
.. rsocket documentation master file

RSocket
=======
Expand All @@ -10,14 +7,21 @@ RSocket
:maxdepth: 1

quickstart
guide
api
extensions
changelog

.. autosummary::
:toctree: generated


RSocket ...
The python rsocket package implements the 1.0 version of the RSocket protocol (excluding "resume" functionality)
and is designed for use in python >= 3.8 using asyncio.

.. note::
The python package API is not stable. There may be changes until version 1.0.0.


Indices and tables
==================
Expand Down
7 changes: 5 additions & 2 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
Quick start
===========

.. automodule:: rsocket.rsocket
:members:
.. autosummary::
:toctree: generated


A quick getting started guide is available at https://rsocket.io/guides/rsocket-py/simple
4 changes: 4 additions & 0 deletions reactivestreams/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@


class Publisher(metaclass=abc.ABCMeta):
"""
Handles event for subscription to a subscriber
"""

@abc.abstractmethod
def subscribe(self, subscriber: Subscriber):
...
Expand Down
4 changes: 4 additions & 0 deletions reactivestreams/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@


class Subscriber(metaclass=ABCMeta):
"""
Handles stream events.
"""

@abstractmethod
def on_subscribe(self, subscription: Subscription):
...
Expand Down
4 changes: 4 additions & 0 deletions reactivestreams/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@


class Subscription(metaclass=ABCMeta):
"""
Backpressure stream control.
"""

@abstractmethod
def request(self, n: int):
...
Expand Down
26 changes: 21 additions & 5 deletions rsocket/request_handler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from abc import ABCMeta, abstractmethod
from datetime import timedelta
from typing import Tuple, Optional
Expand All @@ -13,6 +12,9 @@


class RequestHandler(metaclass=ABCMeta):
"""
An interface which defines handler for all rsocket interactions, and some other events (e.g. on_setup).
"""

@abstractmethod
async def on_setup(self,
Expand All @@ -39,12 +41,16 @@ async def request_fire_and_forget(self, payload: Payload):
...

@abstractmethod
async def request_response(self, payload: Payload) -> asyncio.Future:
...
async def request_response(self, payload: Payload) -> Awaitable[Payload]:
"""
Handle request-response interaction
"""

@abstractmethod
async def request_stream(self, payload: Payload) -> Publisher:
...
"""
Handle request-stream interaction
"""

@abstractmethod
async def on_error(self, error_code: ErrorCode, payload: Payload):
Expand Down Expand Up @@ -72,6 +78,13 @@ def _parse_composite_metadata(self, metadata: bytes) -> CompositeMetadata:


class BaseRequestHandler(RequestHandler):
"""
Default implementation of :class:`RequestHandler <rsocket.request_handler.RequestHandler>` to simplify
implementing handlers.

For each request handler, the implementation will raise a RuntimeError. For :meth:`request_fire_and_forget` and
:meth:`on_metadata_push` the request will be ignored.
"""

async def on_setup(self,
data_encoding: bytes,
Expand All @@ -80,10 +93,13 @@ async def on_setup(self,
"""Nothing to do on setup by default"""

async def request_channel(self, payload: Payload) -> Tuple[Optional[Publisher], Optional[Subscriber]]:
"""
Raise RuntimeError by default if not implemented.
"""
raise RuntimeError('Not implemented')

async def request_fire_and_forget(self, payload: Payload):
"""The requester isn't listening for errors. Nothing to do."""
"""Ignored by default"""

async def on_metadata_push(self, payload: Payload):
"""Nothing by default"""
Expand Down
7 changes: 7 additions & 0 deletions rsocket/routing/request_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ def decorator(function: decorated_method):


class RequestRouter:
"""
Used to define routes for RSocket endpoints.

Pass this to :class:`RoutingRequestHandler <rsocket.routing.routing_request_handler.RoutingRequestHandler>`
to instantiate a handler using these routes.
"""

__slots__ = (
'_channel_routes',
'_stream_routes',
Expand Down
5 changes: 5 additions & 0 deletions rsocket/routing/routing_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@


class RoutingRequestHandler(BaseRequestHandler):
"""
Handler implementation which uses a :class:`RequestRouter <rsocket.routing.request_router.RequestRouter>`
to handle requests based on route information provided in the payload metadata.
"""

__slots__ = (
'router',
'data_encoding',
Expand Down
20 changes: 20 additions & 0 deletions rsocket/rsocket_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,13 +481,21 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()

def request_response(self, payload: Payload) -> Awaitable[Payload]:
"""
Initiate a request-response interaction.
"""

logger().debug('%s: request-response: %s', self._log_identifier(), payload)

requester = RequestResponseRequester(self, payload)
self.register_new_stream(requester).setup()
return requester.run()

def fire_and_forget(self, payload: Payload) -> Awaitable[None]:
"""
Initiate a fire-and-forget interaction.
"""

logger().debug('%s: fire-and-forget: %s', self._log_identifier(), payload)

stream_id = self._allocate_stream()
Expand All @@ -497,6 +505,10 @@ def fire_and_forget(self, payload: Payload) -> Awaitable[None]:
return frame.sent_future

def request_stream(self, payload: Payload) -> Union[BackpressureApi, Publisher]:
"""
Initiate a request-stream interaction.
"""

logger().debug('%s: request-stream: %s', self._log_identifier(), payload)

requester = RequestStreamRequester(self, payload)
Expand All @@ -507,12 +519,20 @@ def request_channel(
payload: Payload,
publisher: Optional[Publisher] = None,
sending_done: Optional[asyncio.Event] = None) -> Union[BackpressureApi, Publisher]:
"""
Initiate a request-channel interaction.
"""

logger().debug('%s: request-channel: %s', self._log_identifier(), payload)

requester = RequestChannelRequester(self, payload, publisher, sending_done)
return self.register_new_stream(requester)

def metadata_push(self, metadata: bytes) -> Awaitable[None]:
"""
Initiate a metadata-push interaction.
"""

logger().debug('%s: metadata-push: %s', self._log_identifier(), metadata)

frame = to_metadata_push_frame(metadata)
Expand Down
Loading