-
Notifications
You must be signed in to change notification settings - Fork 73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
300 - Fix potential race condition when the flowable runs on another thread #303
300 - Fix potential race condition when the flowable runs on another thread #303
Conversation
…thread When the flowable runs on another thread, `checkStatus` can be called from 2 threads: the reader one (usually an event loop) and the producer one (usually a worker thread). Although this method is synchronized, a race condition can still happen in the `handler.handle(...)` method because producer thread can catch up the execution of the reader one (and the other way around). This is safer then to ensure that only one thread produce elements of this ReadStream. I chose the reader context as the one used to execute every event publications because it's the context we know about at the subscriber's creation.
I know tests are broken, but I wanted a validation before continuing to work on this |
@NilsRenaud I don't think this is the right way to fix it, can you provide a simple reproducer of the case ? |
Okay, here is a reproducer: The idea is to have the readStream in a special state where 2 threads will end up publishing data to the ReadStream handler. import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Test;
import io.reactivex.rxjava3.core.Emitter;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.core.streams.ReadStream;
import io.vertx.rxjava3.impl.ReadStreamSubscriber;
public class Reproducer {
private int called = 0;
CountDownLatch waitForSecondElement = new CountDownLatch(1);
CountDownLatch consumerLatch = new CountDownLatch(2);
CountDownLatch testLatch = new CountDownLatch(1);
CountDownLatch waitForFirstElementInPending = new CountDownLatch(1);
String total = "";
@Test
public void test() throws InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
Flowable<String> flowableContent = Flowable.generate(this::emit).subscribeOn(Schedulers.io());
ReadStream<String> readStream = ReadStreamSubscriber.asReadStream(flowableContent, obj -> obj);
executor.execute(() -> {
// Set the readStream as if it was paused, with data in its pending queue and more incoming data expected
readStream.pause();
readStream.handler(this::handler); // Fetch elements, ending in pending queue
readStream.endHandler(unused -> testLatch.countDown());
try {
waitForFirstElementInPending.await(); // Wait for the first element to be published in the pending queue
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
readStream.fetch(2); // immediately consume the first element, published on this thread
waitForSecondElement.countDown(); // release the 2nd element to be published on Rx thread.
});
testLatch.await();
assertThat(total).isEqualTo("msg1msg2");
}
// - Emit "msg1",
// - wait for a latch to emit the second element,
// - complete
private void emit(Emitter<String> emitter) throws InterruptedException {
if (called == 0) {
called = 1;
emitter.onNext("msg1");
waitForFirstElementInPending.countDown();
} else if (called == 1) {
called = 2;
waitForSecondElement.await();
emitter.onNext("msg2");
} else {
emitter.onComplete();
}
}
private void handler(final String s) {
waitForSecondElement.countDown();
try {
// Simulate an unsynchronized delay in the consumer by blocking the first message, letting go the 2nd message
if (consumerLatch.getCount() > 1) {
consumerLatch.countDown();
consumerLatch.await();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
total += s;
consumerLatch.countDown(); // release the 1st message
}
} |
thanks for the reproducer I understand better what is going on. I think that is an inherent bug of the read stream subscriber implementation that should be fixed |
@NilsRenaud can you try your original case with public class ReadStreamSubscriber<R, J> implements Subscriber<R>, ReadStream<J> {
private static final Runnable NOOP_ACTION = () -> { };
private static final Throwable DONE_SENTINEL = new Throwable();
public static final int BUFFER_SIZE = 16;
public static <R, J> ReadStream<J> asReadStream(Flowable<R> flowable, Function<R, J> adapter) {
return new ReadStreamSubscriber<>(adapter, flowable::subscribe);
}
public static <R, J> ReadStream<J> asReadStream(Observable<R> observable, Function<R, J> adapter) {
return asReadStream(observable.toFlowable(BackpressureStrategy.BUFFER), adapter);
}
private final Function<R, J> adapter;
private Handler<Void> endHandler;
private Handler<Throwable> exceptionHandler;
private Handler<J> elementHandler;
private long demand = Long.MAX_VALUE;
private Throwable completed;
private ArrayDeque<R> pending = new ArrayDeque<>();
private int requested = 0;
private Subscription subscription;
private Publisher<R> publisher;
private boolean emitting;
public ReadStreamSubscriber(Function<R, J> adapter, Publisher<R> publisher) {
this.adapter = adapter;
this.publisher = publisher;
}
@Override
public ReadStream<J> handler(Handler<J> handler) {
Runnable action;
synchronized (this) {
elementHandler = handler;
if (handler != null) {
action = () -> publisher.subscribe(this);
} else {
Subscription s = subscription;
action = s != null ? s::cancel : NOOP_ACTION;
}
}
action.run();
checkStatus();
return this;
}
@Override
public ReadStream<J> pause() {
synchronized (this) {
demand = 0L;
}
return this;
}
@Override
public ReadStream<J> fetch(long amount) {
if (amount < 0L) {
throw new IllegalArgumentException("Invalid amount: " + amount);
}
synchronized (this) {
demand += amount;
if (demand < 0L) {
demand = Long.MAX_VALUE;
}
}
checkStatus();
return this;
}
@Override
public ReadStream<J> resume() {
return fetch(Long.MAX_VALUE);
}
@Override
public void onSubscribe(Subscription s) {
synchronized (this) {
subscription = s;
}
checkStatus();
}
private void checkStatus() {
synchronized (this) {
if (emitting) {
return;
}
emitting = true;
}
try {
Runnable action = NOOP_ACTION;
while (true) {
J adapted;
Handler<J> handler;
synchronized (this) {
if (demand > 0L && (handler = elementHandler) != null && pending.size() > 0) {
if (demand != Long.MAX_VALUE) {
demand--;
}
requested--;
R item = pending.poll();
adapted = adapter.apply(item);
} else {
if (completed != null) {
if (pending.isEmpty()) {
Handler<Throwable> onError;
Throwable result;
if (completed != DONE_SENTINEL) {
onError = exceptionHandler;
result = completed;
exceptionHandler = null;
} else {
onError = null;
result = null;
}
Handler<Void> onCompleted = endHandler;
endHandler = null;
action = () -> {
try {
if (onError != null) {
onError.handle(result);
}
} finally {
if (onCompleted != null) {
onCompleted.handle(null);
}
}
};
}
} else if (elementHandler != null && requested < BUFFER_SIZE / 2) {
int request = BUFFER_SIZE - requested;
action = () -> subscription.request(request);
requested = BUFFER_SIZE;
}
break;
}
}
handler.handle(adapted);
}
action.run();
} finally {
synchronized (this) {
emitting = false;
}
}
}
@Override
public ReadStream<J> endHandler(Handler<Void> handler) {
synchronized (this) {
if (completed == null || pending.size() > 0) {
endHandler = handler;
} else {
if (handler != null) {
throw new IllegalStateException();
}
}
}
return this;
}
@Override
public ReadStream<J> exceptionHandler(Handler<Throwable> handler) {
synchronized (this) {
if (completed == null || pending.size() > 0) {
exceptionHandler = handler;
} else {
if (handler != null) {
throw new IllegalStateException();
}
}
}
return this;
}
@Override
public void onComplete() {
onError(DONE_SENTINEL);
}
@Override
public void onError(Throwable e) {
synchronized (this) {
if (completed != null) {
return;
}
completed = e;
}
checkStatus();
}
@Override
public void onNext(R item) {
synchronized (this) {
pending.add(item);
}
checkStatus();
}
} |
Thanks a lot for this try, but unfortunately it does not work :/ That's why I think the real solution is to have only 1 thread publishing elements, either the Flowable-side one, or the ReadStream-side one. I would like to know why the FYI here is the original reproducer: import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.buffer.ByteBuf;
import io.reactivex.rxjava3.core.Emitter;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.vertx.core.http.HttpMethod;
import io.vertx.rxjava3.core.RxHelper;
import io.vertx.rxjava3.core.Vertx;
import io.vertx.rxjava3.core.buffer.Buffer;
import io.vertx.rxjava3.core.http.HttpServer;
public class Reproducer {
private static final String BASE_PATH = "whatever big (~10MB) file";
private static final String NEW_FILE = "whatever name";
private static final Vertx VERTX = Vertx.vertx();
private static String originalChecksum;
public static void main(String[] args) throws InterruptedException, IOException, NoSuchAlgorithmException {
byte[] fileAsBytes = Files.readAllBytes(Path.of(BASE_PATH));
Files.deleteIfExists(Path.of(NEW_FILE));
writeBuffInfo("client", fileAsBytes);
createHttpServerWhichLogsReceivedBufferInfo();
Flowable<Buffer> flowableContent = Flowable
.generate(emitIn8KiBChunks(fileAsBytes))
.subscribeOn(Schedulers.io());
VERTX.createHttpClient()
.rxRequest(HttpMethod.POST, 8080, "localhost", "/")
.flatMap(req -> req.rxSend(flowableContent))
.flatMap(resp -> resp.body())
.blockingSubscribe(body -> System.out.println(body.toString()));
}
private static void createHttpServerWhichLogsReceivedBufferInfo() {
HttpServer server = VERTX.createHttpServer();
server.requestHandler(req -> {
req.bodyHandler(buffer -> {
writeBuffInfo("server", buffer.getBytes());
writeFile(buffer.getBytes());
req.response().end("OK");
server.close();
});
}).listen(8080).blockingGet();
System.out.println("server started.");
}
// Do not pay too much attention to this method, it simply read a byte array by chunks
private static Consumer<Emitter<Buffer>> emitIn8KiBChunks(byte[] bufferToEmit) {
final int chunkSize = 8 * 1024;
final AtomicInteger pos = new AtomicInteger(0);
final int maxPos = bufferToEmit.length - 1;
return emitter -> {
// Adding a Thread.sleep(...) here makes it work.
int currentPos = pos.get();
if (currentPos == maxPos) {
emitter.onComplete();
} else if (currentPos + chunkSize > maxPos) {
emitter.onNext(Buffer.buffer().setBytes(0, bufferToEmit, currentPos, maxPos - currentPos + 1));
pos.set(maxPos);
} else {
emitter.onNext(Buffer.buffer().setBytes(0, bufferToEmit, currentPos, chunkSize));
pos.addAndGet(chunkSize);
}
};
}
private static void writeBuffInfo(String prefix, byte[] buff) {
try {
MessageDigest md5 = MessageDigest.getInstance("md5");
System.out.println(prefix + " - buffer size: " + buff.length);
String checkSum = Base64.getEncoder().encodeToString(md5.digest(buff));
System.out.println(prefix + " - buffer MD5 checksum: " + checkSum);
if (originalChecksum == null) {
originalChecksum = checkSum;
} else {
System.out.println("Checksums: \"" + originalChecksum + "\" vs \"" + checkSum + "\" : "
+ originalChecksum.equals(checkSum));
}
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
private static void writeFile(byte[] buff) {
try {
Files.write(Path.of(NEW_FILE), buff);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
} |
I would prefer a solution that does not rely on vertx context |
Ok I'm back on the topic. |
…another thread" This reverts commit c736b05.
In this version, only the producer thread produces elements while the Publisher has not yet ended.
@vietj I just pushed a new commit with the above strategy, using only the main producer thread to produce data implemented. I'm sure it could be better, and some tests fails because on this internal change but would you agree on such a solution ? |
Fixes this issue: #300
When the flowable runs on another thread,
checkStatus
can be called from 2 threads: the reader one (usually an event loop) and the producer one (usually a worker thread). Although this method is synchronized, a race condition can still happen in thehandler.handle(...)
method because producer thread can catch up the execution of the reader one (and the other way around). This is safer then to ensure that only one thread produce elements of this ReadStream.I chose the reader context as the one used to execute every event publications because it's the context we know about at the subscriber's creation.