Skip to content

Subscribing to a stream cancels older subscriptions #60

@JonasVautherin

Description

@JonasVautherin

Currently, at the C++ level, streams like position_async() register one unique callback. This means that one cannot register two callbacks: the second call will erase the previous one.

I believe that this is not the behavior we want in Python. Already in Swift and Java, a stream is shared between the consumers at the language binding level (by using Rx mechanics).

I made a proof of concept for position() (thanks to Samyash for the help), essentially trying to call _stub.SubscribePosition(request) only once and sharing that with all the consumers.

I looks like below, exposing position() as before and instantiating the factory in a lazy way (only when the first consumer registers).

position_factory = None

...

def position(self):
    if not self.position_factory:
        self.position_factory = self._position_factory()

    return self.position_factory()

def _position_factory(self):
    import collections

    request = telemetry_pb2.SubscribePositionRequest()
    position_stream = self._stub.SubscribePosition(request)

    deques = []
    already_gone = []

    def new_position_generator():
        new_deque = collections.deque()
        new_deque.extend(already_gone)
        deques.append(new_deque)

        async def gen(my_deque):
            while True:
                if not my_deque:
                    rpc_pos = await position_stream.__anext__()
                    new_pos = Position.translate_from_rpc(rpc_pos.position)

                    for d in deques:
                        d.append(new_pos)
                yield my_deque.popleft()

        return gen(new_deque)

    return new_position_generator

Still to be investigated:

  • This is currently caching all the events, but a new consumer is only interested in the current state. That can be improved thanks to Samyash suggestion here.
  • What happens when a consumer wants to unregister? It should get erased from the queues, probably.
  • Can we stop the gRPC connection when all the consumers unregister?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions