diff --git a/methanol/src/main/java/com/github/mizosoft/methanol/MoreBodyPublishers.java b/methanol/src/main/java/com/github/mizosoft/methanol/MoreBodyPublishers.java index 670f6944..43e70d1d 100644 --- a/methanol/src/main/java/com/github/mizosoft/methanol/MoreBodyPublishers.java +++ b/methanol/src/main/java/com/github/mizosoft/methanol/MoreBodyPublishers.java @@ -22,15 +22,45 @@ package com.github.mizosoft.methanol; +import static java.util.Objects.requireNonNull; + import com.github.mizosoft.methanol.BodyAdapter.Encoder; +import com.github.mizosoft.methanol.function.ThrowingConsumer; import com.github.mizosoft.methanol.internal.extensions.MimeBodyPublisherAdapter; +import java.io.OutputStream; import java.net.http.HttpRequest.BodyPublisher; +import java.net.http.HttpRequest.BodyPublishers; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.Executor; +import java.util.function.Function; import org.checkerframework.checker.nullness.qual.Nullable; /** Provides additional {@link BodyPublisher} implementations. */ public class MoreBodyPublishers { private MoreBodyPublishers() {} // non-instantiable + /** + * Returns a {@code BodyPublisher} that reads what's written to the {@code OutputStream} received + * by the given task. When the returned publisher receives a subscriber (i.e. when the HTTP client + * starts sending the request body), the given task is executed by the given executor to write the + * body's content. The latter is asynchronously channeled to the HTTP client. + */ + public static BodyPublisher ofOutputStream( + ThrowingConsumer writerTask, Executor executor) { + return ofBodyWriter(WritableBodyPublisher::outputStream, writerTask, executor); + } + + /** + * Returns a {@code BodyPublisher} that reads what's written to the {@code WritableByteChannel} + * received by the given task. When the returned publisher receives a subscriber (i.e. when the + * HTTP client starts sending the request body), the given task is executed by the given executor + * to write the body's content. The latter is asynchronously channeled to the HTTP client. + */ + public static BodyPublisher ofWritableByteChannel( + ThrowingConsumer writerTask, Executor executor) { + return ofBodyWriter(WritableBodyPublisher::byteChannel, writerTask, executor); + } + /** * Adapts the given {@code BodyPublisher} into a {@link MimeBodyPublisher} with the given media * type. @@ -67,4 +97,35 @@ private static UnsupportedOperationException unsupportedConversion( } return new UnsupportedOperationException(message); } + + private static BodyPublisher ofBodyWriter( + Function extractor, + ThrowingConsumer writerTask, + Executor executor) { + requireNonNull(extractor); + requireNonNull(writerTask); + requireNonNull(executor); + return BodyPublishers.fromPublisher( + subscriber -> { + requireNonNull(subscriber); + + var publisher = WritableBodyPublisher.create(); + publisher.subscribe(subscriber); + + if (!publisher.isClosed()) { + try { + executor.execute( + () -> { + try (var out = extractor.apply(publisher)) { + writerTask.accept(out); + } catch (Throwable t) { + publisher.closeExceptionally(t); + } + }); + } catch (Throwable t) { + publisher.closeExceptionally(t); + } + } + }); + } } diff --git a/methanol/src/main/java/com/github/mizosoft/methanol/WritableBodyPublisher.java b/methanol/src/main/java/com/github/mizosoft/methanol/WritableBodyPublisher.java index 16534ec9..891d194a 100644 --- a/methanol/src/main/java/com/github/mizosoft/methanol/WritableBodyPublisher.java +++ b/methanol/src/main/java/com/github/mizosoft/methanol/WritableBodyPublisher.java @@ -50,12 +50,15 @@ import org.checkerframework.checker.nullness.qual.Nullable; /** - * A {@code BodyPublisher} that exposes a sink for writing the body's content. Sink can be acquired - * either as a {@link WritableByteChannel} or an {@link OutputStream}. After writing is finished, - * the publisher must be closed to complete the request (either by calling {@link #close()} or - * closing one of the returned sinks, or using a try-with-resources construct). Additionally, {@link - * #closeExceptionally(Throwable)} can be used to fail the request in case an error is encountered - * while writing. + * A {@code BodyPublisher} that exposes a sink for writing the body's content. It is recommended to + * use {@link MoreBodyPublishers#ofOutputStream} or {@link MoreBodyPublishers#ofWritableByteChannel} + * instead of directly using on this class. + * + *

The sink can be acquired either as a {@link WritableByteChannel} or an {@link OutputStream}. + * After writing is finished, the publisher must be closed to complete the request (either by + * calling {@link #close()} or closing one of the returned sinks, or using a try-with-resources + * construct). Additionally, {@link #closeExceptionally(Throwable)} can be used to fail the request + * in case an error is encountered while writing. * *

Note that ({@link #contentLength()} always returns {@code -1}). If the content length is known * prior to writing, {@link BodyPublishers#fromPublisher(Publisher, long)} can be used to attach the @@ -66,7 +69,7 @@ public final class WritableBodyPublisher implements BodyPublisher, Flushable, Au private static final ByteBuffer CLOSED_SENTINEL = ByteBuffer.allocate(0); - private final Lock lock = new ReentrantLock(); + private final Lock closeLock = new ReentrantLock(); private final Lock writeLock = new ReentrantLock(); private final ConcurrentLinkedQueue pipe = new ConcurrentLinkedQueue<>(); private final AtomicBoolean subscribed = new AtomicBoolean(); @@ -74,6 +77,11 @@ public final class WritableBodyPublisher implements BodyPublisher, Flushable, Au private volatile @Nullable SubscriptionImpl downstreamSubscription; + /** + * The error received in {@link #closeExceptionally} if a subscriber is yet to arrived while + * calling that method. When a subscriber does arrive, it reads this field and completes the + * subscriber exceptionally right away. + */ @GuardedBy("lock") private @MonotonicNonNull Throwable pendingCloseError; @@ -117,7 +125,7 @@ public OutputStream outputStream() { public void closeExceptionally(Throwable error) { requireNonNull(error); SubscriptionImpl subscription; - lock.lock(); + closeLock.lock(); try { if (closed) { FlowSupport.onDroppedError(error); @@ -126,11 +134,12 @@ public void closeExceptionally(Throwable error) { closed = true; subscription = downstreamSubscription; if (subscription == null) { + // Record the error to consume it when a subscriber arrives pendingCloseError = error; return; } } finally { - lock.unlock(); + closeLock.unlock(); } subscription.signalError(error); @@ -142,18 +151,18 @@ public void closeExceptionally(Throwable error) { */ @Override public void close() { - lock.lock(); + closeLock.lock(); try { if (closed) { return; } closed = true; - flushInternal(); - pipe.offer(CLOSED_SENTINEL); } finally { - lock.unlock(); + closeLock.unlock(); } + flushInternal(); + pipe.offer(CLOSED_SENTINEL); signalDownstream(true); } @@ -189,12 +198,12 @@ public void subscribe(Subscriber subscriber) { var subscription = new SubscriptionImpl(subscriber); if (subscribed.compareAndSet(false, true)) { Throwable error; - lock.lock(); + closeLock.lock(); try { downstreamSubscription = subscription; - error = this.pendingCloseError; + error = pendingCloseError; } finally { - lock.unlock(); + closeLock.unlock(); } if (error != null) { @@ -205,7 +214,7 @@ public void subscribe(Subscriber subscriber) { } else { FlowSupport.refuse( subscriber, - new IllegalStateException("already subscribed, multiple subscribers not supported")); + new IllegalStateException("already subscribed (multiple subscribers not supported)")); } } @@ -359,8 +368,8 @@ protected long emit(Subscriber downstream, long emit) { if (buffer == CLOSED_SENTINEL) { cancelOnComplete(downstream); return 0; - } else if (submitted >= emit || buffer == null) { // Exhausted either demand or batches - latestBuffer = buffer; // Save the last polled batch for latter rounds + } else if (submitted >= emit || buffer == null) { // Exhausted either demand or buffers + latestBuffer = buffer; // Save the last polled buffer for later rounds return submitted; } else if (submitOnNext(downstream, buffer)) { submitted++; @@ -376,13 +385,9 @@ protected void abort(boolean flowInterrupted) { pipe.clear(); if (flowInterrupted) { // If this publisher is cancelled "abnormally" (amid writing), possibly ongoing writers - // should know (by setting closed to true) so they can abort. - lock.lock(); - try { - closed = true; - } finally { - lock.unlock(); - } + // should know (by setting closed to true) so they can abort writing by throwing an + // exception. + closed = true; } } } diff --git a/methanol/src/main/java/com/github/mizosoft/methanol/function/ThrowingConsumer.java b/methanol/src/main/java/com/github/mizosoft/methanol/function/ThrowingConsumer.java new file mode 100644 index 00000000..efe1d1b1 --- /dev/null +++ b/methanol/src/main/java/com/github/mizosoft/methanol/function/ThrowingConsumer.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2022 Moataz Abdelnasser + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.github.mizosoft.methanol.function; + +/** + * A {@link java.util.function.Consumer Consumer} that permits throwing an arbitrary {@code + * Exception}. + */ +public interface ThrowingConsumer { + void accept(T t) throws Exception; +} diff --git a/methanol/src/main/java/module-info.java b/methanol/src/main/java/module-info.java index 93b7df88..c15b1515 100644 --- a/methanol/src/main/java/module-info.java +++ b/methanol/src/main/java/module-info.java @@ -36,6 +36,7 @@ exports com.github.mizosoft.methanol; exports com.github.mizosoft.methanol.decoder; exports com.github.mizosoft.methanol.adapter; + exports com.github.mizosoft.methanol.function; exports com.github.mizosoft.methanol.internal.flow to methanol.adapter.jackson, methanol.adapter.jackson.flux; diff --git a/methanol/src/test/java/com/github/mizosoft/methanol/MoreBodyPublishersTest.java b/methanol/src/test/java/com/github/mizosoft/methanol/MoreBodyPublishersTest.java index 8c7da931..3b43f9ac 100644 --- a/methanol/src/test/java/com/github/mizosoft/methanol/MoreBodyPublishersTest.java +++ b/methanol/src/test/java/com/github/mizosoft/methanol/MoreBodyPublishersTest.java @@ -22,13 +22,27 @@ package com.github.mizosoft.methanol; +import static com.github.mizosoft.methanol.testing.ExecutorExtension.ExecutorType.FIXED_POOL; import static com.github.mizosoft.methanol.testutils.Verification.verifyThat; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import com.github.mizosoft.methanol.testing.ExecutorExtension; +import com.github.mizosoft.methanol.testing.ExecutorExtension.ExecutorConfig; +import com.github.mizosoft.methanol.testutils.TestException; +import com.github.mizosoft.methanol.testutils.TestSubscriber; import java.net.http.HttpRequest.BodyPublishers; import java.nio.CharBuffer; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(ExecutorExtension.class) class MoreBodyPublishersTest { @Test void ofMediaType() { @@ -54,4 +68,75 @@ void ofObject_unsupported() { .isThrownBy( () -> MoreBodyPublishers.ofObject("something", MediaType.parse("application/*"))); } + + @Test + @ExecutorConfig(FIXED_POOL) + void ofOutputStream_basicWriting(Executor executor) { + verifyThat( + MoreBodyPublishers.ofOutputStream( + out -> out.write("Pikachu".getBytes(UTF_8)), executor)) + .succeedsWith("Pikachu"); + } + + @Test + @ExecutorConfig(FIXED_POOL) + void ofOutputStream_exceptionalCompletion(Executor executor) { + verifyThat( + MoreBodyPublishers.ofOutputStream( + out -> { + throw new TestException(); + }, + executor)) + .failsWith(TestException.class); + } + + @Test + @ExecutorConfig(FIXED_POOL) + void ofWritableByteChannel_basicWriting(Executor executor) { + verifyThat( + MoreBodyPublishers.ofWritableByteChannel( + out -> out.write(UTF_8.encode("Pikachu")), executor)) + .succeedsWith("Pikachu"); + } + + @Test + @ExecutorConfig(FIXED_POOL) + void ofWritableByteChannel_exceptionalCompletion(Executor executor) { + verifyThat( + MoreBodyPublishers.ofWritableByteChannel( + out -> { + throw new TestException(); + }, + executor)) + .failsWith(TestException.class); + } + + @Test + @ExecutorConfig(FIXED_POOL) + void ofOutputStream_onlySubmitsOnSubscribe(ExecutorService service) throws Exception { + var inWriter = new AtomicBoolean(); + MoreBodyPublishers.ofOutputStream(out -> inWriter.set(true), service); + service.shutdown(); + assertThat(service.awaitTermination(0, TimeUnit.SECONDS)).isTrue(); + assertThat(inWriter).isFalse(); + } + + @Test + @ExecutorConfig(FIXED_POOL) + void ofOutputStream_onlySubmitsIfNotCancelledOnSubscribe(ExecutorService service) throws Exception { + var inWriter = new AtomicBoolean(); + var publisher = MoreBodyPublishers.ofOutputStream(out -> inWriter.set(true), service); + publisher.subscribe( + new TestSubscriber<>() { + @Override + public synchronized void onSubscribe(Subscription subscription) { + super.onSubscribe(subscription); + subscription.cancel(); + } + }); + + service.shutdown(); + assertThat(service.awaitTermination(0, TimeUnit.SECONDS)).isTrue(); + assertThat(inWriter).isFalse(); + } }