Skip to content

Conversation

robertroeser
Copy link
Member

When streaming data from a channel RSocket would hang. Replaced the processors in the RSocketClient and RSocketServer with ones that can emit when the onNext method is called to prevent a live lock. Basically a Unicast processor with the operator fusion code removed, and a thread-safe queue. Also fixes a bug when an channel receiving an empty publisher wouldn't complete.

Copy link
Member

@yschimke yschimke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UnboundedProcessor is a tough read, is that from scratch?

}

@Test(timeout = 10000)
@Test//(timeout = 10000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put some timeout in?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'll add the timeouts back

@yschimke
Copy link
Member

Does this resolve rsocket/rsocket#241

@robertroeser
Copy link
Member Author

robertroeser commented Oct 30, 2017

@yschimke yes it fixes that

Also I did not write the processor from scratch - its based on Reactor-core's UnicastProcessor.

@robertroeser robertroeser merged commit b27c7f6 into rsocket:1.0.x Oct 30, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants