Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Result of subscribing to topic inaccessible when calling makeSubscription causing race condition #44

Open
jrogers opened this issue Jun 13, 2015 · 2 comments

Comments

@jrogers
Copy link
Contributor

jrogers commented Jun 13, 2015

Currently when you subscribe to a topic via makeSubscription, there's no way to know if/when the subscription succeeded, as the observable returned only provides the PubSubData when something is published to the topic.

I have the following scenario where I create a "dynamic" topic, Ie. the topic has some filter information associated with it. So basically I subscribe to the topic from my client with a generated topic name, then I call a procedure registered on the router supplying the topic URI and my filter information, allowing the publisher to start publishing data to my dynamic topic. This results in a race though. Consider this:

  1. Client calls WampClient.makeSubscription with "topic_uri", WampClient sends the subscription request on the event loop thread. Subscription result processed on WampClient scheduler thread.
  2. Client calls "createSubscription" procedure via WampClient.call that has been registered, providing "topic_uri" and some filter information, request sent on the event loop thread.
  3. Other client that provides "createSubscription" immediately publishes information to "topic_uri", pubsubdata received on event loop thread.

Now the published event is received by the subscribed client, but can dropped because subscription on client has not been fully created, due to the result of the initial subscription request being processed on another thread in WampClient (scheduler) where it adds it to the map. The event can be received before or after the subscribe result is processed, resulting in a race.

If I knew when the subscription actually we finished successfully, I could then wait to call "createSubscription" to start getting data, but I can't because I don't know anything about the status of the subscription internally.

I also don't really have a good suggestion about how to fix this either.

@Matthias247
Copy link
Owner

Hmm, that's some interesting points (that also show how difficult it is how to build a robust implementation of such a system).

I think you are even pointing out multiple issues here:

  • What is actually not an issue is that multithreading or ordering problems inside the WampClient. Because it internally uses a single thread it can guarantee that all received messages will be processed in order.
  • However there really seems to be a small data race here. If a subscribe message the received the action is delayed through rescheduling to the eventloop (https://github.com/Matthias247/jawampa/blob/master/src/main/java/ws/wamp/jawampa/WampClient.java#L1300). This has the advantage that it decouples the actions. However it might lead to problem you describe. If an Event message is pushed from the transport before the handler is executed then the event will be dropped. The chance for this is really minimal (probably only when both Subscribed and Event messages are inside the same IP frame), but nevertheless it exists. Removing .subscribeOn should remedy this, but it needs to be checked whether this creates any side effects. For most distributed applications it shouldn't be an issue anyway, because there's always the possiblity that that the server already sent events that are important to you before you connected and subscribed and you might want to design your API around that fact. However this really might require the information whether subscribe was successful.
  • I agree that being able to retrieve the information that a subscribe succeeded would be helpful in some situations. But I'm currently also not sure what is the best way to express this in the API. One solution would be to have something like Observable<Observable<PubSubData>> makeSubscribption() where the first subscribed stream only yields a single value that signals that the subscription was succesful and the result is the actual stream of values to which you would need to subscribe. But the disadvantage is that this is very verbose, that the inner observable would need to buffer incoming events (the user would only get the chance to subscribe later to the Observable later than when the first events could be pushed) and that unsubscribing also gets tricky (Depending on the state you would need to either unsubscribe the outer or inner observable or even both).
    Another idea would be that the API stays as it is and the stream always outputs a special first element that only signals success for the subscription. Some kind of static marker instance of the PubSubData class. But for the overloaded variants of makeSubscription which return Observable<T> instead of Observable<PubSubData> it gets tricky again. I don't see a way to have a marker instance for every possible T type.

@jrogers
Copy link
Contributor Author

jrogers commented Jun 16, 2015

Yes I meant a data race, I didn't mean to imply that there was a threading
issue. In this particular scenario the services are running locally on the
same host so that's probably why I'm seeing it more frequently, in a fully
distributed setup the likelihood would seem to be low.

As for the changing the API or a potential fix, I agree I don't really like
the observable of observable approach either. For now I've just forked
locally and removed the subscribeOn to work around it, but I don't really
think this is the correct fix for what I'm trying to do either. I guess
I'll keep looking into a proper solution. I'm relatively new to RxJava as
well, so I'm not sure if there's a pattern for this type of use case...
On Jun 14, 2015 9:56 AM, "Matthias Einwag" notifications@github.com wrote:

Hmm, that's some interesting points (that also show how difficult it is
how to build a robust implementation of such a system).

I think you are even pointing out multiple issues here:

  • What is actually not an issue is that multithreading or ordering
    problems inside the WampClient. Because it internally uses a single thread
    it can guarantee that all received messages will be processed in order.
  • However there really seems to be a small data race here. If a
    subscribe message the received the action is delayed through rescheduling
    to the eventloop (
    https://github.com/Matthias247/jawampa/blob/master/src/main/java/ws/wamp/jawampa/WampClient.java#L1300).
    This has the advantage that it decouples the actions. However it might lead
    to problem you describe. If an Event message is pushed from the transport
    before the handler is executed then the event will be dropped. The chance
    for this is really minimal (probably only when both Subscribed and Event
    messages are inside the same IP frame), but nevertheless it exists.
    Removing .subscribeOn should remedy this, but it needs to be checked
    whether this creates any side effects. For most distributed applications it
    shouldn't be an issue anyway, because there's always the possiblity that
    that the serve r alread y sent events that are important to you before you
    connected and subscribed and you might want to design your API around that
    fact. However this really might require the information whether subscribe
    was successful.
  • I agree that being able to retrieve the information that a subscribe
    succeeded would be helpful in some situations. But I'm currently also not
    sure what is the best way to express this in the API. One solution would be
    to have something like Observable<Observable>
    makeSubscribption() where the first subscribed stream only yields a
    single value that signals that the subscription was succesful and the
    result is the actual stream of values to which you would need to subscribe.
    But the disadvantage is that this is very verbose, that the inner
    observable would need to buffer incoming events (the user would only get
    the chance to subscribe later to the Observable later than when the first
    events could be pushed) and that unsubscribing also gets tricky (Depending
    on the state you would need to either unsubscribe the outer or inner
    observable or even both).
    Another idea would be that the API stays as it is and the stream
    always outputs a special first element that only signals success for the
    subscription. Some kind of static marker instance of the PubSubData
    class. But for the overloaded variants of makeSubscription which
    return Observable instead of Observable it gets tricky
    again. I don't see a way to have a marker instance for every possible T
    type.


Reply to this email directly or view it on GitHub
#44 (comment).

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants