Skip to content

Commit

Permalink
WritableBodyPublisher with callback
Browse files Browse the repository at this point in the history
Closes #54
  • Loading branch information
mizosoft committed Jul 12, 2022
1 parent cd3b580 commit 2c06cc5
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 26 deletions.
Expand Up @@ -22,15 +22,45 @@

package com.github.mizosoft.methanol;

import static java.util.Objects.requireNonNull;

import com.github.mizosoft.methanol.BodyAdapter.Encoder;
import com.github.mizosoft.methanol.function.ThrowingConsumer;
import com.github.mizosoft.methanol.internal.extensions.MimeBodyPublisherAdapter;
import java.io.OutputStream;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpRequest.BodyPublishers;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.Executor;
import java.util.function.Function;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Provides additional {@link BodyPublisher} implementations. */
public class MoreBodyPublishers {
private MoreBodyPublishers() {} // non-instantiable

/**
* Returns a {@code BodyPublisher} that reads what's written to the {@code OutputStream} received
* by the given task. When the returned publisher receives a subscriber (i.e. when the HTTP client
* starts sending the request body), the given task is executed by the given executor to write the
* body's content. The latter is asynchronously channeled to the HTTP client.
*/
public static BodyPublisher ofOutputStream(
ThrowingConsumer<? super OutputStream> writerTask, Executor executor) {
return ofBodyWriter(WritableBodyPublisher::outputStream, writerTask, executor);
}

/**
* Returns a {@code BodyPublisher} that reads what's written to the {@code WritableByteChannel}
* received by the given task. When the returned publisher receives a subscriber (i.e. when the
* HTTP client starts sending the request body), the given task is executed by the given executor
* to write the body's content. The latter is asynchronously channeled to the HTTP client.
*/
public static BodyPublisher ofWritableByteChannel(
ThrowingConsumer<? super WritableByteChannel> writerTask, Executor executor) {
return ofBodyWriter(WritableBodyPublisher::byteChannel, writerTask, executor);
}

/**
* Adapts the given {@code BodyPublisher} into a {@link MimeBodyPublisher} with the given media
* type.
Expand Down Expand Up @@ -67,4 +97,35 @@ private static UnsupportedOperationException unsupportedConversion(
}
return new UnsupportedOperationException(message);
}

private static <T extends AutoCloseable> BodyPublisher ofBodyWriter(
Function<WritableBodyPublisher, T> extractor,
ThrowingConsumer<? super T> writerTask,
Executor executor) {
requireNonNull(extractor);
requireNonNull(writerTask);
requireNonNull(executor);
return BodyPublishers.fromPublisher(
subscriber -> {
requireNonNull(subscriber);

var publisher = WritableBodyPublisher.create();
publisher.subscribe(subscriber);

if (!publisher.isClosed()) {
try {
executor.execute(
() -> {
try (var out = extractor.apply(publisher)) {
writerTask.accept(out);
} catch (Throwable t) {
publisher.closeExceptionally(t);
}
});
} catch (Throwable t) {
publisher.closeExceptionally(t);
}
}
});
}
}
Expand Up @@ -50,12 +50,15 @@
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* A {@code BodyPublisher} that exposes a sink for writing the body's content. Sink can be acquired
* either as a {@link WritableByteChannel} or an {@link OutputStream}. After writing is finished,
* the publisher must be closed to complete the request (either by calling {@link #close()} or
* closing one of the returned sinks, or using a try-with-resources construct). Additionally, {@link
* #closeExceptionally(Throwable)} can be used to fail the request in case an error is encountered
* while writing.
* A {@code BodyPublisher} that exposes a sink for writing the body's content. It is recommended to
* use {@link MoreBodyPublishers#ofOutputStream} or {@link MoreBodyPublishers#ofWritableByteChannel}
* instead of directly using on this class.
*
* <p>The sink can be acquired either as a {@link WritableByteChannel} or an {@link OutputStream}.
* After writing is finished, the publisher must be closed to complete the request (either by
* calling {@link #close()} or closing one of the returned sinks, or using a try-with-resources
* construct). Additionally, {@link #closeExceptionally(Throwable)} can be used to fail the request
* in case an error is encountered while writing.
*
* <p>Note that ({@link #contentLength()} always returns {@code -1}). If the content length is known
* prior to writing, {@link BodyPublishers#fromPublisher(Publisher, long)} can be used to attach the
Expand All @@ -66,14 +69,19 @@ public final class WritableBodyPublisher implements BodyPublisher, Flushable, Au

private static final ByteBuffer CLOSED_SENTINEL = ByteBuffer.allocate(0);

private final Lock lock = new ReentrantLock();
private final Lock closeLock = new ReentrantLock();
private final Lock writeLock = new ReentrantLock();
private final ConcurrentLinkedQueue<ByteBuffer> pipe = new ConcurrentLinkedQueue<>();
private final AtomicBoolean subscribed = new AtomicBoolean();
private final int bufferSize;

private volatile @Nullable SubscriptionImpl downstreamSubscription;

/**
* The error received in {@link #closeExceptionally} if a subscriber is yet to arrived while
* calling that method. When a subscriber does arrive, it reads this field and completes the
* subscriber exceptionally right away.
*/
@GuardedBy("lock")
private @MonotonicNonNull Throwable pendingCloseError;

Expand Down Expand Up @@ -117,7 +125,7 @@ public OutputStream outputStream() {
public void closeExceptionally(Throwable error) {
requireNonNull(error);
SubscriptionImpl subscription;
lock.lock();
closeLock.lock();
try {
if (closed) {
FlowSupport.onDroppedError(error);
Expand All @@ -126,11 +134,12 @@ public void closeExceptionally(Throwable error) {
closed = true;
subscription = downstreamSubscription;
if (subscription == null) {
// Record the error to consume it when a subscriber arrives
pendingCloseError = error;
return;
}
} finally {
lock.unlock();
closeLock.unlock();
}

subscription.signalError(error);
Expand All @@ -142,18 +151,18 @@ public void closeExceptionally(Throwable error) {
*/
@Override
public void close() {
lock.lock();
closeLock.lock();
try {
if (closed) {
return;
}
closed = true;
flushInternal();
pipe.offer(CLOSED_SENTINEL);
} finally {
lock.unlock();
closeLock.unlock();
}

flushInternal();
pipe.offer(CLOSED_SENTINEL);
signalDownstream(true);
}

Expand Down Expand Up @@ -189,12 +198,12 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
var subscription = new SubscriptionImpl(subscriber);
if (subscribed.compareAndSet(false, true)) {
Throwable error;
lock.lock();
closeLock.lock();
try {
downstreamSubscription = subscription;
error = this.pendingCloseError;
error = pendingCloseError;
} finally {
lock.unlock();
closeLock.unlock();
}

if (error != null) {
Expand All @@ -205,7 +214,7 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
} else {
FlowSupport.refuse(
subscriber,
new IllegalStateException("already subscribed, multiple subscribers not supported"));
new IllegalStateException("already subscribed (multiple subscribers not supported)"));
}
}

Expand Down Expand Up @@ -359,8 +368,8 @@ protected long emit(Subscriber<? super ByteBuffer> downstream, long emit) {
if (buffer == CLOSED_SENTINEL) {
cancelOnComplete(downstream);
return 0;
} else if (submitted >= emit || buffer == null) { // Exhausted either demand or batches
latestBuffer = buffer; // Save the last polled batch for latter rounds
} else if (submitted >= emit || buffer == null) { // Exhausted either demand or buffers
latestBuffer = buffer; // Save the last polled buffer for later rounds
return submitted;
} else if (submitOnNext(downstream, buffer)) {
submitted++;
Expand All @@ -376,13 +385,9 @@ protected void abort(boolean flowInterrupted) {
pipe.clear();
if (flowInterrupted) {
// If this publisher is cancelled "abnormally" (amid writing), possibly ongoing writers
// should know (by setting closed to true) so they can abort.
lock.lock();
try {
closed = true;
} finally {
lock.unlock();
}
// should know (by setting closed to true) so they can abort writing by throwing an
// exception.
closed = true;
}
}
}
Expand Down
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022 Moataz Abdelnasser
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.github.mizosoft.methanol.function;

/**
* A {@link java.util.function.Consumer Consumer<T>} that permits throwing an arbitrary {@code
* Exception}.
*/
public interface ThrowingConsumer<T> {
void accept(T t) throws Exception;
}
1 change: 1 addition & 0 deletions methanol/src/main/java/module-info.java
Expand Up @@ -36,6 +36,7 @@
exports com.github.mizosoft.methanol;
exports com.github.mizosoft.methanol.decoder;
exports com.github.mizosoft.methanol.adapter;
exports com.github.mizosoft.methanol.function;
exports com.github.mizosoft.methanol.internal.flow to
methanol.adapter.jackson,
methanol.adapter.jackson.flux;
Expand Down
Expand Up @@ -22,13 +22,27 @@

package com.github.mizosoft.methanol;

import static com.github.mizosoft.methanol.testing.ExecutorExtension.ExecutorType.FIXED_POOL;
import static com.github.mizosoft.methanol.testutils.Verification.verifyThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import com.github.mizosoft.methanol.testing.ExecutorExtension;
import com.github.mizosoft.methanol.testing.ExecutorExtension.ExecutorConfig;
import com.github.mizosoft.methanol.testutils.TestException;
import com.github.mizosoft.methanol.testutils.TestSubscriber;
import java.net.http.HttpRequest.BodyPublishers;
import java.nio.CharBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ExecutorExtension.class)
class MoreBodyPublishersTest {
@Test
void ofMediaType() {
Expand All @@ -54,4 +68,75 @@ void ofObject_unsupported() {
.isThrownBy(
() -> MoreBodyPublishers.ofObject("something", MediaType.parse("application/*")));
}

@Test
@ExecutorConfig(FIXED_POOL)
void ofOutputStream_basicWriting(Executor executor) {
verifyThat(
MoreBodyPublishers.ofOutputStream(
out -> out.write("Pikachu".getBytes(UTF_8)), executor))
.succeedsWith("Pikachu");
}

@Test
@ExecutorConfig(FIXED_POOL)
void ofOutputStream_exceptionalCompletion(Executor executor) {
verifyThat(
MoreBodyPublishers.ofOutputStream(
out -> {
throw new TestException();
},
executor))
.failsWith(TestException.class);
}

@Test
@ExecutorConfig(FIXED_POOL)
void ofWritableByteChannel_basicWriting(Executor executor) {
verifyThat(
MoreBodyPublishers.ofWritableByteChannel(
out -> out.write(UTF_8.encode("Pikachu")), executor))
.succeedsWith("Pikachu");
}

@Test
@ExecutorConfig(FIXED_POOL)
void ofWritableByteChannel_exceptionalCompletion(Executor executor) {
verifyThat(
MoreBodyPublishers.ofWritableByteChannel(
out -> {
throw new TestException();
},
executor))
.failsWith(TestException.class);
}

@Test
@ExecutorConfig(FIXED_POOL)
void ofOutputStream_onlySubmitsOnSubscribe(ExecutorService service) throws Exception {
var inWriter = new AtomicBoolean();
MoreBodyPublishers.ofOutputStream(out -> inWriter.set(true), service);
service.shutdown();
assertThat(service.awaitTermination(0, TimeUnit.SECONDS)).isTrue();
assertThat(inWriter).isFalse();
}

@Test
@ExecutorConfig(FIXED_POOL)
void ofOutputStream_onlySubmitsIfNotCancelledOnSubscribe(ExecutorService service) throws Exception {
var inWriter = new AtomicBoolean();
var publisher = MoreBodyPublishers.ofOutputStream(out -> inWriter.set(true), service);
publisher.subscribe(
new TestSubscriber<>() {
@Override
public synchronized void onSubscribe(Subscription subscription) {
super.onSubscribe(subscription);
subscription.cancel();
}
});

service.shutdown();
assertThat(service.awaitTermination(0, TimeUnit.SECONDS)).isTrue();
assertThat(inWriter).isFalse();
}
}

0 comments on commit 2c06cc5

Please sign in to comment.