Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8252374: Add a new factory method to concatenate a sequence of BodyPublisher instances into a single publisher. #57

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 53 additions & 0 deletions src/java.net.http/share/classes/java/net/http/HttpRequest.java
Expand Up @@ -40,8 +40,10 @@
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.function.Supplier;

import jdk.internal.net.http.HttpRequestBuilderImpl;
import jdk.internal.net.http.RequestPublishers;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
Expand Down Expand Up @@ -654,5 +656,56 @@ public static BodyPublisher ofByteArrays(Iterable<byte[]> iter) {
public static BodyPublisher noBody() {
return new RequestPublishers.EmptyPublisher();
}

/**
* Returns a {@code BodyPublisher} that publishes a request
* body consisting of the concatenation of the request bodies
* published by a sequence of publishers.
*
* <p> If the sequence is empty an {@linkplain #noBody() empty} publisher
* is returned. Otherwise, if the sequence contains a single element,
* that publisher is returned. Otherwise a <em>concatenation publisher</em>
* is returned.
*
* <p> The request body published by a <em>concatenation publisher</em>
* is logically equivalent to the request body that would have
* been published by concatenating all the bytes of each publisher
* in sequence.
*
* <p> Each publisher is lazily subscribed to in turn,
* until all the body bytes are published, an error occurs, or the
* concatenation publisher's subscription is cancelled.
* The concatenation publisher may be subscribed to more than once,
* which in turn may result in the publishers in the sequence being
* subscribed to more than once.
*
* <p> The concatenation publisher has a known content
* length only if all publishers in the sequence have a known content
* length. The {@link BodyPublisher#contentLength() contentLength}
* reported by the concatenation publisher is computed as follows:
* <ul>
* <li> If any of the publishers reports an <em>{@linkplain
* BodyPublisher#contentLength() unknown}</em> content length,
* or if the sum of the known content lengths would exceed
* {@link Long#MAX_VALUE}, the resulting
* content length is <em>unknown</em>.</li>
* <li> Otherwise, the resulting content length is the sum of the
* known content lengths, a number between
* {@code 0} and {@link Long#MAX_VALUE}, inclusive.</li>
* </ul>
*
* @implNote If the concatenation publisher's subscription is
* {@linkplain Flow.Subscription#cancel() cancelled}, or an error occurs
* while publishing the bytes, not all publishers in the sequence may
* be subscribed to.
*
* @param publishers a sequence of publishers.
* @return An aggregate publisher that publishes a request body
* logically equivalent to the concatenation of all bytes published
* by each publisher in the sequence.
*/
public static BodyPublisher concat(BodyPublisher... publishers) {
dfuch marked this conversation as resolved.
Show resolved Hide resolved
return RequestPublishers.concat(Objects.requireNonNull(publishers));
}
}
}
Expand Up @@ -32,6 +32,7 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.http.HttpRequest.BodyPublisher;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.file.Files;
Expand All @@ -47,12 +48,16 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.net.http.HttpRequest.BodyPublisher;

import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.Utils;

public final class RequestPublishers {
Expand Down Expand Up @@ -491,4 +496,194 @@ public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
publisher.subscribe(subscriber);
}
}


public static BodyPublisher concat(BodyPublisher... publishers) {
if (publishers.length == 0) {
return new EmptyPublisher();
} else if (publishers.length == 1) {
return Objects.requireNonNull(publishers[0]);
} else {
return new AggregatePublisher(List.of(publishers));
}
}

/**
* An aggregate publisher acts as a proxy between a subscriber
* and a list of publishers. It lazily subscribes to each publisher
* in sequence in order to publish a request body that is
* composed from all the bytes obtained from each publisher.
* For instance, the following two publishers are equivalent, even
* though they may result in a different count of {@code onNext}
* invocations.
* <pre>{@code
* var bp1 = BodyPublishers.ofString("ab");
* var bp2 = BodyPublishers.concat(BodyPublishers.ofString("a"),
* BodyPublisher.ofByteArray(new byte[] {(byte)'b'}));
* }</pre>
*
*/
private static final class AggregatePublisher implements BodyPublisher {
final List<BodyPublisher> bodies;
AggregatePublisher(List<BodyPublisher> bodies) {
this.bodies = bodies;
}

// -1 must be returned if any publisher returns -1
// Otherwise, we can just sum the contents.
@Override
public long contentLength() {
long length = bodies.stream()
.mapToLong(BodyPublisher::contentLength)
.reduce((a,b) -> a < 0 || b < 0 ? -1 : a + b)
.orElse(0);
// In case of overflow in any operation but the last, length
// will be -1.
// In case of overflow in the last reduce operation, length
// will be negative, but not necessarily -1: in that case,
// return -1
if (length < 0) return -1;
return length;
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
subscriber.onSubscribe(new AggregateSubscription(bodies, subscriber));
}
}

private static final class AggregateSubscription
implements Flow.Subscription, Flow.Subscriber<ByteBuffer> {
final Flow.Subscriber<? super ByteBuffer> subscriber; // upstream
final Queue<BodyPublisher> bodies;
final SequentialScheduler scheduler;
final Demand demand = new Demand(); // from upstream
final Demand demanded = new Demand(); // requested downstream
final AtomicReference<Throwable> error = new AtomicReference<>();
volatile Throwable illegalRequest;
volatile BodyPublisher publisher; // downstream
volatile Flow.Subscription subscription; // downstream
volatile boolean cancelled;
AggregateSubscription(List<BodyPublisher> bodies, Flow.Subscriber<? super ByteBuffer> subscriber) {
this.bodies = new ConcurrentLinkedQueue<>(bodies);
this.subscriber = subscriber;
this.scheduler = SequentialScheduler.synchronizedScheduler(this::run);
}

@Override
public void request(long n) {
dfuch marked this conversation as resolved.
Show resolved Hide resolved
if (cancelled || publisher == null && bodies.isEmpty()) {
return;
}
try {
demand.increase(n);
} catch (IllegalArgumentException x) {
illegalRequest = x;
}
scheduler.runOrSchedule();
}

@Override
public void cancel() {
cancelled = true;
scheduler.runOrSchedule();
}

private boolean cancelSubscription() {
Flow.Subscription subscription = this.subscription;
if (subscription != null) {
this.subscription = null;
this.publisher = null;
subscription.cancel();
}
scheduler.stop();
return subscription != null;
}

public void run() {
try {
while (error.get() == null
&& (!demand.isFulfilled()
|| (publisher == null && !bodies.isEmpty()))) {
boolean cancelled = this.cancelled;
BodyPublisher publisher = this.publisher;
Flow.Subscription subscription = this.subscription;
Throwable illegalRequest = this.illegalRequest;
if (cancelled) {
bodies.clear();
cancelSubscription();
return;
}
if (publisher == null && !bodies.isEmpty()) {
this.publisher = publisher = bodies.poll();
publisher.subscribe(this);
subscription = this.subscription;
} else if (publisher == null) {
return;
}
if (illegalRequest != null) {
onError(illegalRequest);
return;
}
if (subscription == null) return;
if (!demand.isFulfilled()) {
long n = demand.decreaseAndGet(demand.get());
demanded.increase(n);
subscription.request(n);
}
}
} catch (Throwable t) {
onError(t);
}
}


@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
scheduler.runOrSchedule();
}

@Override
public void onNext(ByteBuffer item) {
// make sure to cancel the subscription if we receive
// an item after the subscription was cancelled or
// an error was reported.
if (cancelled || error.get() != null) {
cancelSubscription();
return;
}
demanded.tryDecrement();
subscriber.onNext(item);
}

@Override
public void onError(Throwable throwable) {
if (error.compareAndSet(null, throwable)) {
publisher = null;
subscription = null;
subscriber.onError(throwable);
scheduler.stop();
}
}

@Override
public void onComplete() {
if (publisher != null && !bodies.isEmpty()) {
while (!demanded.isFulfilled()) {
demand.increase(demanded.decreaseAndGet(demanded.get()));
}
publisher = null;
subscription = null;
scheduler.runOrSchedule();
} else {
publisher = null;
subscription = null;
if (!cancelled) {
subscriber.onComplete();
}
scheduler.stop();
}
}
}
}
Expand Up @@ -47,7 +47,7 @@ public final class Demand {
*/
public boolean increase(long n) {
if (n <= 0) {
throw new IllegalArgumentException(String.valueOf(n));
throw new IllegalArgumentException("non-positive subscription request: " + String.valueOf(n));
}
long prev = val.getAndAccumulate(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
return prev == 0;
Expand Down