Skip to content
Permalink
Browse files
8252374: Add a new factory method to concatenate a sequence of BodyPu…
…blisher instances into a single publisher.

Reviewed-by: chegar
  • Loading branch information
dfuch committed Oct 12, 2020
1 parent 05459df commit 4184959d8505f85ebbad0c3214d9aabbcc8eeb19
@@ -40,8 +40,10 @@
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.function.Supplier;

import jdk.internal.net.http.HttpRequestBuilderImpl;
import jdk.internal.net.http.RequestPublishers;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
@@ -654,5 +656,56 @@ public static BodyPublisher ofByteArrays(Iterable<byte[]> iter) {
public static BodyPublisher noBody() {
return new RequestPublishers.EmptyPublisher();
}

/**
* Returns a {@code BodyPublisher} that publishes a request
* body consisting of the concatenation of the request bodies
* published by a sequence of publishers.
*
* <p> If the sequence is empty an {@linkplain #noBody() empty} publisher
* is returned. Otherwise, if the sequence contains a single element,
* that publisher is returned. Otherwise a <em>concatenation publisher</em>
* is returned.
*
* <p> The request body published by a <em>concatenation publisher</em>
* is logically equivalent to the request body that would have
* been published by concatenating all the bytes of each publisher
* in sequence.
*
* <p> Each publisher is lazily subscribed to in turn,
* until all the body bytes are published, an error occurs, or the
* concatenation publisher's subscription is cancelled.
* The concatenation publisher may be subscribed to more than once,
* which in turn may result in the publishers in the sequence being
* subscribed to more than once.
*
* <p> The concatenation publisher has a known content
* length only if all publishers in the sequence have a known content
* length. The {@link BodyPublisher#contentLength() contentLength}
* reported by the concatenation publisher is computed as follows:
* <ul>
* <li> If any of the publishers reports an <em>{@linkplain
* BodyPublisher#contentLength() unknown}</em> content length,
* or if the sum of the known content lengths would exceed
* {@link Long#MAX_VALUE}, the resulting
* content length is <em>unknown</em>.</li>
* <li> Otherwise, the resulting content length is the sum of the
* known content lengths, a number between
* {@code 0} and {@link Long#MAX_VALUE}, inclusive.</li>
* </ul>
*
* @implNote If the concatenation publisher's subscription is
* {@linkplain Flow.Subscription#cancel() cancelled}, or an error occurs
* while publishing the bytes, not all publishers in the sequence may
* be subscribed to.
*
* @param publishers a sequence of publishers.
* @return An aggregate publisher that publishes a request body
* logically equivalent to the concatenation of all bytes published
* by each publisher in the sequence.
*/
public static BodyPublisher concat(BodyPublisher... publishers) {
return RequestPublishers.concat(Objects.requireNonNull(publishers));
}
}
}
@@ -32,6 +32,7 @@
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.http.HttpRequest.BodyPublisher;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.file.Files;
@@ -47,12 +48,16 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.net.http.HttpRequest.BodyPublisher;

import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.SequentialScheduler;
import jdk.internal.net.http.common.Utils;

public final class RequestPublishers {
@@ -491,4 +496,194 @@ public final void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
publisher.subscribe(subscriber);
}
}


public static BodyPublisher concat(BodyPublisher... publishers) {
if (publishers.length == 0) {
return new EmptyPublisher();
} else if (publishers.length == 1) {
return Objects.requireNonNull(publishers[0]);
} else {
return new AggregatePublisher(List.of(publishers));
}
}

/**
* An aggregate publisher acts as a proxy between a subscriber
* and a list of publishers. It lazily subscribes to each publisher
* in sequence in order to publish a request body that is
* composed from all the bytes obtained from each publisher.
* For instance, the following two publishers are equivalent, even
* though they may result in a different count of {@code onNext}
* invocations.
* <pre>{@code
* var bp1 = BodyPublishers.ofString("ab");
* var bp2 = BodyPublishers.concat(BodyPublishers.ofString("a"),
* BodyPublisher.ofByteArray(new byte[] {(byte)'b'}));
* }</pre>
*
*/
private static final class AggregatePublisher implements BodyPublisher {
final List<BodyPublisher> bodies;
AggregatePublisher(List<BodyPublisher> bodies) {
this.bodies = bodies;
}

// -1 must be returned if any publisher returns -1
// Otherwise, we can just sum the contents.
@Override
public long contentLength() {
long length = bodies.stream()
.mapToLong(BodyPublisher::contentLength)
.reduce((a,b) -> a < 0 || b < 0 ? -1 : a + b)
.orElse(0);
// In case of overflow in any operation but the last, length
// will be -1.
// In case of overflow in the last reduce operation, length
// will be negative, but not necessarily -1: in that case,
// return -1
if (length < 0) return -1;
return length;
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
subscriber.onSubscribe(new AggregateSubscription(bodies, subscriber));
}
}

private static final class AggregateSubscription
implements Flow.Subscription, Flow.Subscriber<ByteBuffer> {
final Flow.Subscriber<? super ByteBuffer> subscriber; // upstream
final Queue<BodyPublisher> bodies;
final SequentialScheduler scheduler;
final Demand demand = new Demand(); // from upstream
final Demand demanded = new Demand(); // requested downstream
final AtomicReference<Throwable> error = new AtomicReference<>();
volatile Throwable illegalRequest;
volatile BodyPublisher publisher; // downstream
volatile Flow.Subscription subscription; // downstream
volatile boolean cancelled;
AggregateSubscription(List<BodyPublisher> bodies, Flow.Subscriber<? super ByteBuffer> subscriber) {
this.bodies = new ConcurrentLinkedQueue<>(bodies);
this.subscriber = subscriber;
this.scheduler = SequentialScheduler.synchronizedScheduler(this::run);
}

@Override
public void request(long n) {
if (cancelled || publisher == null && bodies.isEmpty()) {
return;
}
try {
demand.increase(n);
} catch (IllegalArgumentException x) {
illegalRequest = x;
}
scheduler.runOrSchedule();
}

@Override
public void cancel() {
cancelled = true;
scheduler.runOrSchedule();
}

private boolean cancelSubscription() {
Flow.Subscription subscription = this.subscription;
if (subscription != null) {
this.subscription = null;
this.publisher = null;
subscription.cancel();
}
scheduler.stop();
return subscription != null;
}

public void run() {
try {
while (error.get() == null
&& (!demand.isFulfilled()
|| (publisher == null && !bodies.isEmpty()))) {
boolean cancelled = this.cancelled;
BodyPublisher publisher = this.publisher;
Flow.Subscription subscription = this.subscription;
Throwable illegalRequest = this.illegalRequest;
if (cancelled) {
bodies.clear();
cancelSubscription();
return;
}
if (publisher == null && !bodies.isEmpty()) {
this.publisher = publisher = bodies.poll();
publisher.subscribe(this);
subscription = this.subscription;
} else if (publisher == null) {
return;
}
if (illegalRequest != null) {
onError(illegalRequest);
return;
}
if (subscription == null) return;
if (!demand.isFulfilled()) {
long n = demand.decreaseAndGet(demand.get());
demanded.increase(n);
subscription.request(n);
}
}
} catch (Throwable t) {
onError(t);
}
}


@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
scheduler.runOrSchedule();
}

@Override
public void onNext(ByteBuffer item) {
// make sure to cancel the subscription if we receive
// an item after the subscription was cancelled or
// an error was reported.
if (cancelled || error.get() != null) {
cancelSubscription();
return;
}
demanded.tryDecrement();
subscriber.onNext(item);
}

@Override
public void onError(Throwable throwable) {
if (error.compareAndSet(null, throwable)) {
publisher = null;
subscription = null;
subscriber.onError(throwable);
scheduler.stop();
}
}

@Override
public void onComplete() {
if (publisher != null && !bodies.isEmpty()) {
while (!demanded.isFulfilled()) {
demand.increase(demanded.decreaseAndGet(demanded.get()));
}
publisher = null;
subscription = null;
scheduler.runOrSchedule();
} else {
publisher = null;
subscription = null;
if (!cancelled) {
subscriber.onComplete();
}
scheduler.stop();
}
}
}
}
@@ -47,7 +47,7 @@
*/
public boolean increase(long n) {
if (n <= 0) {
throw new IllegalArgumentException(String.valueOf(n));
throw new IllegalArgumentException("non-positive subscription request: " + String.valueOf(n));
}
long prev = val.getAndAccumulate(n, (p, i) -> p + i < 0 ? Long.MAX_VALUE : p + i);
return prev == 0;

1 comment on commit 4184959

@bridgekeeper

This comment has been minimized.

Copy link

@bridgekeeper bridgekeeper bot commented on 4184959 Oct 12, 2020

Review

Issues

Please sign in to comment.