Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use EmittingPublisher in OutputStreamPublisher to remove busy waiting #1900

Merged
merged 8 commits into from
Jun 1, 2020

Conversation

danielkec
Copy link
Contributor

@danielkec danielkec commented May 29, 2020

Partially solving #1741, fixing race-condition in EmittingPublisher

Added to IoMulti api

        StringBuilder result = new StringBuilder();

        MultiFromOutputStream osMulti = IoMulti.createOutputStream();

        Single<Void> multiFuture = osMulti
                .map(ByteBuffer::array)
                .map(String::new)
                .forEach(result::append);

        PrintWriter printer = new PrintWriter(osMulti);
        printer.println("test1");
        printer.println("test2");
        printer.println("test3");
        printer.close();

        multiFuture.await();
        assertThat(result.toString(), is(equalTo("test1\ntest2\ntest3\n")));

Signed-off-by: Daniel Kec daniel.kec@oracle.com

@danielkec danielkec changed the title [WIP] Use EmittingPublisher in OutputStreamPublisher to remove busy waiting Use EmittingPublisher in OutputStreamPublisher to remove busy waiting May 29, 2020
@danielkec
Copy link
Contributor Author

There was a lock in write method but I would expect java.io.OutputStream#write(byte[], int, int) should be always presumed as not thread safe(rule 1.3 should be assured by EmittingPublisher anyway)

          synchronized (invocationLock) {
                if (subscriber.isClosed()) {
                    throw new IOException("Output stream already closed.");
                }

                sub.onNext(createBuffer(buffer, offset, length));
            }

@danielkec danielkec self-assigned this May 29, 2020
@danielkec danielkec added reactive Reactive streams and related components webserver labels May 29, 2020
@danielkec danielkec added this to the 2.0.0-RC1 milestone May 29, 2020
@danielkec danielkec force-pushed the reactive-osp-no-busy-waiting branch 2 times, most recently from d3e90dc to 46b0dc3 Compare May 29, 2020 23:00
@danielkec danielkec force-pushed the reactive-osp-no-busy-waiting branch from bc6af96 to 1ab3bbb Compare May 31, 2020 20:54
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>

Tck compliance

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>

Deferred request callbacks after onSubscribe

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>

Forward request callbacks in ready state

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
tomas-langer
tomas-langer previously approved these changes May 31, 2020
Copy link
Member

@tomas-langer tomas-langer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Copy link
Member

@tomas-langer tomas-langer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@danielkec danielkec merged commit 6aaacd4 into helidon-io:master Jun 1, 2020
@olotenko
Copy link

olotenko commented Jun 3, 2020

The changes to BufferedEmittingPublisher are racy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
reactive Reactive streams and related components webserver
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants