Skip to content
Permalink
Browse files
8267990: Revisit some uses of synchronized in the HttpClient API
Reviewed-by: chegar
  • Loading branch information
dfuch committed Jun 1, 2021
1 parent 36dc268 commit 9d8ad2ed62325bd8d813974d5aa1e031ed8bf8da
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 43 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -167,7 +167,7 @@ public void cancel() {
private final ConcurrentLinkedDeque<ByteBuffer> queue
= new ConcurrentLinkedDeque<>();
private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::flush);
SequentialScheduler.lockingScheduler(this::flush);
final MinimalFuture<Void> whenFinished;
private final Executor executor;
private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -628,7 +628,7 @@ final class Http1Publisher implements FlowTube.TubePublisher {
final Http1WriteSubscription subscription = new Http1WriteSubscription();
final Demand demand = new Demand();
final SequentialScheduler writeScheduler =
SequentialScheduler.synchronizedScheduler(new WriteTask());
SequentialScheduler.lockingScheduler(new WriteTask());

@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -1292,7 +1292,7 @@ final class Http2TubeSubscriber implements TubeSubscriber {
private final ConcurrentLinkedQueue<ByteBuffer> queue
= new ConcurrentLinkedQueue<>();
private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::processQueue);
SequentialScheduler.lockingScheduler(this::processQueue);
private final HttpClientImpl client;

Http2TubeSubscriber(HttpClientImpl client) {
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -165,7 +165,7 @@ private LineSubscription(Flow.Subscription s,
newline = separator;
upstream = Objects.requireNonNull(subscriber);
cf = Objects.requireNonNull(completion);
scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
scheduler = SequentialScheduler.lockingScheduler(this::loop);
}

@Override
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -583,7 +583,7 @@ public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
AggregateSubscription(List<BodyPublisher> bodies, Flow.Subscriber<? super ByteBuffer> subscriber) {
this.bodies = new ConcurrentLinkedQueue<>(bodies);
this.subscriber = subscriber;
this.scheduler = SequentialScheduler.synchronizedScheduler(this::run);
this.scheduler = SequentialScheduler.lockingScheduler(this::run);
}

@Override
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -36,6 +36,8 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import jdk.internal.net.http.common.BufferSupplier;
@@ -163,16 +165,22 @@ void signalClosed() {
*/
private static class SocketFlowTask implements RestartableTask {
final Runnable task;
private final Object monitor = new Object();
private final Lock lock = new ReentrantLock();
SocketFlowTask(Runnable task) {
this.task = task;
}
@Override
public final void run(DeferredCompleter taskCompleter) {
try {
// non contentious synchronized for visibility.
synchronized(monitor) {
// The logics of the sequential scheduler should ensure that
// the restartable task is running in only one thread at
// a given time: there should never be contention.
boolean locked = lock.tryLock();
assert locked : "contention detected in SequentialScheduler";
try {
task.run();
} finally {
if (locked) lock.unlock();
}
} finally {
taskCompleter.complete();
@@ -101,7 +101,7 @@

final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
final SequentialScheduler sched =
SequentialScheduler.synchronizedScheduler(this::schedule);
SequentialScheduler.lockingScheduler(this::schedule);
final SubscriptionBase userSubscription =
new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);

@@ -840,7 +840,7 @@ class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
this.contentLength = contentLen;
this.remainingContentLength = contentLen;
this.sendScheduler =
SequentialScheduler.synchronizedScheduler(this::trySend);
SequentialScheduler.lockingScheduler(this::trySend);
}

@Override
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -271,7 +271,7 @@ public void run() {

Reader() {
super();
scheduler = SequentialScheduler.synchronizedScheduler(
scheduler = SequentialScheduler.lockingScheduler(
new ReaderDownstreamPusher());
this.readBuf = ByteBuffer.allocate(1024);
readBuf.limit(0); // keep in read mode
@@ -777,10 +777,10 @@ private void processData() {
try {
if (debugw.on())
debugw.log("processData, writeList remaining:"
+ Utils.remaining(writeList) + ", hsTriggered:"
+ Utils.synchronizedRemaining(writeList) + ", hsTriggered:"
+ hsTriggered() + ", needWrap:" + needWrap());

while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
while (Utils.synchronizedRemaining(writeList) > 0 || hsTriggered() || needWrap()) {
ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
EngineResult result = wrapBuffers(outbufs);
if (debugw.on())
@@ -823,7 +823,7 @@ private void processData() {
}
}
}
if (completing && Utils.remaining(writeList) == 0) {
if (completing && Utils.synchronizedRemaining(writeList) == 0) {
if (!completed) {
completed = true;
writeList.clear();
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -27,6 +27,8 @@

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Objects.requireNonNull;

@@ -177,6 +179,36 @@ protected void run() {
}
}

/**
* A task that runs its main loop within a block protected by a lock to provide
* memory visibility between runs. Since the main loop can't run concurrently,
* the lock shouldn't be contended and no deadlock should ever be possible.
*/
public static final class LockingRestartableTask
extends CompleteRestartableTask {

private final Runnable mainLoop;
private final Lock lock = new ReentrantLock();

public LockingRestartableTask(Runnable mainLoop) {
this.mainLoop = mainLoop;
}

@Override
protected void run() {
// The logics of the sequential scheduler should ensure that
// the restartable task is running in only one thread at
// a given time: there should never be contention.
boolean locked = lock.tryLock();
assert locked : "contention detected in SequentialScheduler";
try {
mainLoop.run();
} finally {
if (locked) lock.unlock();
}
}
}

private static final int OFFLOAD = 1;
private static final int AGAIN = 2;
private static final int BEGIN = 4;
@@ -359,4 +391,20 @@ public void stop() {
public static SequentialScheduler synchronizedScheduler(Runnable mainLoop) {
return new SequentialScheduler(new SynchronizedRestartableTask(mainLoop));
}

/**
* Returns a new {@code SequentialScheduler} that executes the provided
* {@code mainLoop} from within a {@link LockingRestartableTask}.
*
* @apiNote This is equivalent to calling
* {@code new SequentialScheduler(new LockingRestartableTask(mainLoop))}
* The main loop must not perform any blocking operation.
*
* @param mainLoop The main loop of the new sequential scheduler
* @return a new {@code SequentialScheduler} that executes the provided
* {@code mainLoop} from within a {@link LockingRestartableTask}.
*/
public static SequentialScheduler lockingScheduler(Runnable mainLoop) {
return new SequentialScheduler(new LockingRestartableTask(mainLoop));
}
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -96,7 +96,7 @@ public SubscriberWrapper()
errorCommon(t);
});
this.pushScheduler =
SequentialScheduler.synchronizedScheduler(new DownstreamPusher());
SequentialScheduler.lockingScheduler(new DownstreamPusher());
this.downstreamSubscription = new SubscriptionBase(pushScheduler,
this::downstreamCompletion);
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -655,33 +655,33 @@ public static long remaining(ByteBuffer[] bufs) {
}

public static boolean hasRemaining(List<ByteBuffer> bufs) {
synchronized (bufs) {
for (ByteBuffer buf : bufs) {
if (buf.hasRemaining())
return true;
}
for (ByteBuffer buf : bufs) {
if (buf.hasRemaining())
return true;
}
return false;
}

public static long remaining(List<ByteBuffer> bufs) {
long remain = 0;
synchronized (bufs) {
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
}
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
}
return remain;
}

public static long synchronizedRemaining(List<ByteBuffer> bufs) {
synchronized (bufs) {
return remaining(bufs);
}
}

public static int remaining(List<ByteBuffer> bufs, int max) {
long remain = 0;
synchronized (bufs) {
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
if (remain > max) {
throw new IllegalArgumentException("too many bytes");
}
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
if (remain > max) {
throw new IllegalArgumentException("too many bytes");
}
}
return (int) remain;
@@ -86,7 +86,7 @@
* @library /test/lib http2/server
* @build Http2TestServer LineBodyHandlerTest HttpServerAdapters
* @build jdk.test.lib.net.SimpleSSLContext
* @run testng/othervm LineBodyHandlerTest
* @run testng/othervm -XX:+UnlockDiagnosticVMOptions -XX:DiagnoseSyncOnValueBasedClasses=1 LineBodyHandlerTest
*/

public class LineBodyHandlerTest implements HttpServerAdapters {
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -161,7 +161,7 @@ private static class LateBindingTube implements FlowTube {
final ConcurrentLinkedQueue<Consumer<Flow.Subscriber<? super List<ByteBuffer>>>> queue
= new ConcurrentLinkedQueue<>();
AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
SequentialScheduler scheduler = SequentialScheduler.lockingScheduler(this::loop);
AtomicReference<Throwable> errorRef = new AtomicReference<>();
private volatile boolean finished;
private volatile boolean completed;

1 comment on commit 9d8ad2e

@openjdk-notifier
Copy link

@openjdk-notifier openjdk-notifier bot commented on 9d8ad2e Jun 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.