Skip to content

Commit

Permalink
Merge pull request #102 from rsocket/disposable
Browse files Browse the repository at this point in the history
initial implementation disposable streams
  • Loading branch information
jell-o-fishi committed Dec 19, 2022
2 parents f577b39 + 744568f commit 9635e1c
Show file tree
Hide file tree
Showing 28 changed files with 267 additions and 160 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changelog
---------

v0.4.7
======
- Send **cancel** to responders when requester disconnects and **error** to requesters when requester disconnects
- Fix guide examples to properly cancel responders which use asyncio Task as value source
- Rewrote guide statistics example to use generator instead of task

v0.4.6
======
- fire_and_forget now only removes the stream id when the future denoting the frame was sent, is done
Expand Down
4 changes: 2 additions & 2 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import os

project = 'RSocket python'
copyright = '2021, jellofishi@pm.me'
copyright = '2022, jellofishi@pm.me'
author = 'jellofishi@pm.me'

# The full version, including alpha/beta/rc tags
release = '0.4.6'
release = '0.4.7'

# -- General configuration ---------------------------------------------------

Expand Down
8 changes: 6 additions & 2 deletions examples/tutorial/step3/chat_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import uuid
from asyncio import Queue
from asyncio import Queue, Task
from dataclasses import dataclass, field
from typing import Dict, Optional, Awaitable
from weakref import WeakValueDictionary
Expand Down Expand Up @@ -85,9 +85,13 @@ async def messages_incoming() -> Publisher:
class MessagePublisher(DefaultPublisher, DefaultSubscription):
def __init__(self, session: UserSessionData):
self._session = session
self._sender: Optional[Task] = None

def cancel(self):
self._sender.cancel()
if self._sender is not None:
logging.info('Canceling message sender task')
self._sender.cancel()
self._sender = None

def subscribe(self, subscriber: Subscriber):
super(MessagePublisher, self).subscribe(subscriber)
Expand Down
8 changes: 6 additions & 2 deletions examples/tutorial/step4/chat_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import uuid
from asyncio import Queue
from asyncio import Queue, Task
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, Optional, Set, Awaitable
Expand Down Expand Up @@ -138,9 +138,13 @@ async def messages_incoming() -> Publisher:
class MessagePublisher(DefaultPublisher, DefaultSubscription):
def __init__(self, session: UserSessionData):
self._session = session
self._sender: Optional[Task] = None

def cancel(self):
self._sender.cancel()
if self._sender is not None:
logging.info('Canceling message sender task')
self._sender.cancel()
self._sender = None

def subscribe(self, subscriber: Subscriber):
super(MessagePublisher, self).subscribe(subscriber)
Expand Down
8 changes: 6 additions & 2 deletions examples/tutorial/step5/chat_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import uuid
from asyncio import Queue
from asyncio import Queue, Task
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, Optional, Set, Awaitable
Expand Down Expand Up @@ -170,9 +170,13 @@ async def messages_incoming() -> Publisher:
class MessagePublisher(DefaultPublisher, DefaultSubscription):
def __init__(self, session: UserSessionData):
self._session = session
self._sender: Optional[Task] = None

def cancel(self):
self._sender.cancel()
if self._sender is not None:
logging.info('Canceling message sender task')
self._sender.cancel()
self._sender = None

def subscribe(self, subscriber: Subscriber):
super(MessagePublisher, self).subscribe(subscriber)
Expand Down
3 changes: 3 additions & 0 deletions examples/tutorial/step6/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ def on_next(self, payload: Payload, is_complete=False):
if is_complete:
self.done.set()

def on_error(self, exception: Exception):
raise exception

def cancel(self):
self.subscription.cancel()

Expand Down
57 changes: 22 additions & 35 deletions examples/tutorial/step6/chat_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import uuid
from asyncio import Queue
from asyncio import Queue, Task
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, Optional, Set, Awaitable, Tuple
Expand All @@ -21,6 +21,7 @@
from rsocket.routing.routing_request_handler import RoutingRequestHandler
from rsocket.rsocket_server import RSocketServer
from rsocket.streams.empty_stream import EmptyStream
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator
from rsocket.streams.stream_from_generator import StreamFromGenerator
from rsocket.transports.tcp import TransportTCP

Expand Down Expand Up @@ -104,6 +105,7 @@ class ChatUserSession:

def __init__(self):
self._session: Optional[UserSessionData] = None
self._requested_statistics = ServerStatisticsRequest()

def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
Expand Down Expand Up @@ -162,45 +164,28 @@ async def receive_statistics(statistics: ClientStatistics):
@router.channel('statistics')
async def send_statistics() -> Tuple[Optional[Publisher], Optional[Subscriber]]:

class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription):
async def statistics_generator():
while True:
try:
await asyncio.sleep(self._requested_statistics.period_seconds)
next_message = new_statistics_data(self._requested_statistics)

def __init__(self, session: UserSessionData):
super().__init__()
self._session = session
self._requested_statistics = ServerStatisticsRequest()

def cancel(self):
self._sender.cancel()

def subscribe(self, subscriber: Subscriber):
super().subscribe(subscriber)
subscriber.on_subscribe(self)
self._sender = asyncio.create_task(self._statistics_sender())

async def _statistics_sender(self):
while True:
try:
await asyncio.sleep(self._requested_statistics.period_seconds)
next_message = new_statistics_data(self._requested_statistics)

self._subscriber.on_next(dataclass_to_payload(next_message))
except Exception:
logging.error('Statistics', exc_info=True)

def on_next(self, payload: Payload, is_complete=False):
request = decode_dataclass(payload.data, ServerStatisticsRequest)
yield dataclass_to_payload(next_message), False
except Exception:
logging.error('Statistics', exc_info=True)

logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
def on_next(payload: Payload, is_complete=False):
request = decode_dataclass(payload.data, ServerStatisticsRequest)

if request.ids is not None:
self._requested_statistics.ids = request.ids
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')

if request.period_seconds is not None:
self._requested_statistics.period_seconds = request.period_seconds
if request.ids is not None:
self._requested_statistics.ids = request.ids

response = StatisticsChannel(self._session)
if request.period_seconds is not None:
self._requested_statistics.period_seconds = request.period_seconds

return response, response
return StreamFromAsyncGenerator(statistics_generator), DefaultSubscriber(on_next=on_next)

@router.response('message')
async def send_message(message: Message) -> Awaitable[Payload]:
Expand All @@ -223,7 +208,9 @@ def __init__(self, session: UserSessionData):
self._session = session

def cancel(self):
self._sender.cancel()
if self._sender is not None:
logging.info('Canceling message sender task')
self._sender.cancel()

def subscribe(self, subscriber: Subscriber):
super(MessagePublisher, self).subscribe(subscriber)
Expand Down
3 changes: 3 additions & 0 deletions examples/tutorial/step7/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def on_next(self, payload: Payload, is_complete=False):
if is_complete:
self.done.set()

def on_error(self, exception: Exception):
raise exception

def cancel(self):
self.subscription.cancel()

Expand Down
81 changes: 35 additions & 46 deletions examples/tutorial/step7/chat_server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import logging
import uuid
from asyncio import Queue
from asyncio import Queue, Task
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, Optional, Set, Awaitable, Tuple
Expand All @@ -21,6 +21,7 @@
from rsocket.routing.routing_request_handler import RoutingRequestHandler
from rsocket.rsocket_server import RSocketServer
from rsocket.streams.empty_stream import EmptyStream
from rsocket.streams.stream_from_async_generator import StreamFromAsyncGenerator
from rsocket.streams.stream_from_generator import StreamFromGenerator
from rsocket.transports.tcp import TransportTCP

Expand Down Expand Up @@ -88,10 +89,22 @@ def find_username_by_session(session_id: SessionId) -> Optional[str]:
return session.username


def new_statistics_data(statistics_request: ServerStatisticsRequest):
statistics_data = {}

if 'users' in statistics_request.ids:
statistics_data['user_count'] = len(chat_data.user_session_by_id)

if 'channels' in statistics_request.ids:
statistics_data['channel_count'] = len(chat_data.channel_messages)

return ServerStatistics(**statistics_data)

class ChatUserSession:

def __init__(self):
self._session: Optional[UserSessionData] = None
self._requested_statistics = ServerStatisticsRequest()

def router_factory(self):
router = RequestRouter(payload_mapper=decode_payload)
Expand Down Expand Up @@ -150,56 +163,28 @@ async def receive_statistics(statistics: ClientStatistics):
@router.channel('statistics')
async def send_statistics() -> Tuple[Optional[Publisher], Optional[Subscriber]]:

class StatisticsChannel(DefaultPublisher, DefaultSubscriber, DefaultSubscription):

def __init__(self, session: UserSessionData):
super().__init__()
self._session = session
self._requested_statistics = ServerStatisticsRequest()

def cancel(self):
self._sender.cancel()

def subscribe(self, subscriber: Subscriber):
super().subscribe(subscriber)
subscriber.on_subscribe(self)
self._sender = asyncio.create_task(self._statistics_sender())

async def _statistics_sender(self):
while True:
try:
await asyncio.sleep(self._requested_statistics.period_seconds)
next_message = self.new_statistics_data()

self._subscriber.on_next(dataclass_to_payload(next_message))
except Exception:
logging.error('Statistics', exc_info=True)

def new_statistics_data(self):
statistics_data = {}

if 'users' in self._requested_statistics.ids:
statistics_data['user_count'] = len(chat_data.user_session_by_id)

if 'channels' in self._requested_statistics.ids:
statistics_data['channel_count'] = len(chat_data.channel_messages)

return ServerStatistics(**statistics_data)
async def statistics_generator():
while True:
try:
await asyncio.sleep(self._requested_statistics.period_seconds)
next_message = new_statistics_data(self._requested_statistics)

def on_next(self, payload: Payload, is_complete=False):
request = decode_dataclass(payload.data, ServerStatisticsRequest)
yield dataclass_to_payload(next_message), False
except Exception:
logging.error('Statistics', exc_info=True)

logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')
def on_next(payload: Payload, is_complete=False):
request = decode_dataclass(payload.data, ServerStatisticsRequest)

if request.ids is not None:
self._requested_statistics.ids = request.ids
logging.info(f'Received statistics request {request.ids}, {request.period_seconds}')

if request.period_seconds is not None:
self._requested_statistics.period_seconds = request.period_seconds
if request.ids is not None:
self._requested_statistics.ids = request.ids

response = StatisticsChannel(self._session)
if request.period_seconds is not None:
self._requested_statistics.period_seconds = request.period_seconds

return response, response
return StreamFromAsyncGenerator(statistics_generator), DefaultSubscriber(on_next=on_next)

@router.response('message')
async def send_message(message: Message) -> Awaitable[Payload]:
Expand All @@ -220,9 +205,13 @@ async def messages_incoming() -> Publisher:
class MessagePublisher(DefaultPublisher, DefaultSubscription):
def __init__(self, session: UserSessionData):
self._session = session
self._sender: Optional[Task] = None

def cancel(self):
self._sender.cancel()
if self._sender is not None:
logging.info('Canceling message sender task')
self._sender.cancel()
self._sender = None

def subscribe(self, subscriber: Subscriber):
super(MessagePublisher, self).subscribe(subscriber)
Expand Down
3 changes: 3 additions & 0 deletions examples/tutorial/step8/chat_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def on_next(self, payload: Payload, is_complete=False):
if is_complete:
self.done.set()

def on_error(self, exception: Exception):
raise exception

def cancel(self):
self.subscription.cancel()

Expand Down
Loading

0 comments on commit 9635e1c

Please sign in to comment.