Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream-stream rpc method send\receive in any order #100

Closed
antonio-antuan opened this issue Jan 14, 2020 · 14 comments
Closed

stream-stream rpc method send\receive in any order #100

antonio-antuan opened this issue Jan 14, 2020 · 14 comments

Comments

@antonio-antuan
Copy link

GRPC docs says:

Bidirectional streaming RPCs where both sides send a sequence of messages using a read-write stream. The two streams operate independently, so clients and servers can read and write in whatever order they like: for example, the server could wait to receive all the client messages before writing its responses, or it could alternately read a message then write a message, or some other combination of reads and writes. 

Looks like on client-side I can send message to a stream, then read from the stream, then send message, then read and so on....
But current grpclib implementation does not allow it. Is it possible to change that? :)

@vmagamedov
Copy link
Owner

Can you show an example of what is not possible in grpclib?

@antonio-antuan
Copy link
Author

antonio-antuan commented Jan 14, 2020

Sure. Here it is:

from grpclib.client import Channel
from provider_grpc import SubscriptionProviderStub
from provider_pb2 import Request

channel = Channel('0.0.0.0', port=7011)
stub = SubscriptionProviderStub(channel)


async def recv_without_end():
    async with stub.Subscribe.open() as stream:
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )

        await stream.recv_message()  # grpclib.exceptions.ProtocolError: Outgoing stream was not ended


async def send_after_end():
    async with stub.SubscribeToEvents.open() as stream:
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )
        await stream.end()
        await stream.recv_message()
        await stream.send_message(
            Request(user_id='ea6f086b-4e5e-4d23-89cc-a10bf5927c17',
                                      obj_ids=['529fa903-d5d6-4f9c-9d2e-518df987e683'])
        )  # grpclib.exceptions.ProtocolError: Stream is ended

proto:

syntax = 'proto3';

service SubscriptionProvider {
    rpc Subscribe (stream Request) returns (stream Data) {}
}

message Request {
    string user_id = 1;
    repeated string obj_ids = 2;
}


message Data {
    repeated string objects = 1;
}

Actually, the code is more complex. Here is what I want to do:

  1. open channel and stream in some kind of proxy-server (asyncio + websockets)
  2. send initial data to stream when websocket channel is opened and first data received from any clieny
  3. send the first data to grpc-server
  4. create two asyncio.Task objects. The first listen to websocket messages and send them to grpc-server. The second listen to grpc-steam and send received data to websocket-client. Something like this:
async def from_ws_to_grpc():
    while True:
        try:
            msg = json.loads(await websocket.recv())
            await stream.send_message(
                Request(user_id=msg['user_id'], obj_ids=msg['obj_ids'])
            )
        finally:
            return


async def from_grpc_to_ws():
    while True:
        try:
            subscription_data = await stream.recv_message()
            await websocket.send(json.dumps([json.loads(ev) for ev in subscription_data.objects]))
        finally:
            return


await from_ws_to_grpc()
_, pending = await asyncio.wait(
    [from_grpc_to_ws(), from_ws_to_grpc()],    
    return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
    task.cancel()

@vmagamedov
Copy link
Owner

grpclib currently assumes that you end() stream explicitly before exiting from the stub.Subscribe.open() context-manager:

async with stub.Subscribe.open() as stream:
    ...
    # exchange messages
    ...
    await asyncio.wait(tasks)
    await stream.end()  # !!!

Maybe this will be improved in the future, but it is possible to make stream-stream requests with grpclib right now.

@vmagamedov
Copy link
Owner

Outgoing stream was not ended error raised not when you call recv_message() but when you exit from the context-manager.

@antonio-antuan
Copy link
Author

I understand it. But my approach is another. Here is working example of bidirectional streaming with purerpc:

async def main():
    async with purerpc.insecure_channel("channel", 81) as channel:
        stub = ServiceStub(channel)
        stream = await stub.Call()
        await stream.send_message(Request(
            params='{"object_ids": ["cc76b6ea-dc92-46c4-ae00-fa7956c0eda8"]}'))
        msg = await stream.receive_message()
        print(msg)
        msg = await stream.receive_message()
        print(msg)
        await stream.send_message(Request(
            params={"object_ids": ["19a6a096-3fa0-47e8-bd05-836a6d33ffb1"]}))
        msg = await stream.receive_message()
        print(msg)
        msg = await stream.receive_message()
        print(msg)

@vmagamedov
Copy link
Owner

Can you show a traceback you have when you try the same approach with grpclib? And can you show the same working example but with grpclib and with my tip about adding await stream.end() in the end?

@antonio-antuan
Copy link
Author

I do not want to mark sending as ended. I want to send and receiive infinitly. Send initial data, waiting for receiving. Then sending again and waiting again. And again and again. Then, maybe, receiving data multiple times, then send and receive again.
If I call stream.end() then I can't send more data to stream.

@vmagamedov
Copy link
Owner

In my example I placed await stream.end() after await asyncio.wait(tasks), that wait() may block as long as you wish, waiting indefinitely for from_ws_to_grpc and from_grpc_to_ws to finish. end() is only needed to properly close the stream, just before an exit from the .open() context-manager.

@malarinv
Copy link

await stream.recv_message() before any stream.send_message(req) is throwing an exception. Is it intentional?

@vmagamedov
Copy link
Owner

vmagamedov commented Jan 19, 2020

@malarinv call await stream.send_request() before any await stream.send_message(...) or await stream.recv_message() and you'll be fine. Yes, this is intentional.

For some reason generated documentation at https://grpclib.readthedocs.io/en/latest/client.html is incomplete. This method is documented, but I don't see any Stream's method in the docs.

@vmagamedov
Copy link
Owner

For some reason generated documentation at https://grpclib.readthedocs.io/en/latest/client.html is incomplete. This method is documented, but I don't see any Stream's method in the docs.

fixed: https://grpclib.readthedocs.io/en/latest/client.html#grpclib.client.Stream.send_request

@antonio-antuan
Copy link
Author

thanks. sorry, missed your last message

@bgervan
Copy link

bgervan commented Jun 25, 2020

@AcLr Can you show a working example of bidirectional stream client? If you succeed.

@vmagamedov
Copy link
Owner

There was a mistake in the docs: 2e9d222

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants