Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

8267990: Revisit some uses of synchronized in the HttpClient API #4275

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -167,7 +167,7 @@ public void cancel() {
private final ConcurrentLinkedDeque<ByteBuffer> queue
= new ConcurrentLinkedDeque<>();
private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::flush);
SequentialScheduler.lockingScheduler(this::flush);
final MinimalFuture<Void> whenFinished;
private final Executor executor;
private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -628,7 +628,7 @@ final class Http1Publisher implements FlowTube.TubePublisher {
final Http1WriteSubscription subscription = new Http1WriteSubscription();
final Demand demand = new Demand();
final SequentialScheduler writeScheduler =
SequentialScheduler.synchronizedScheduler(new WriteTask());
SequentialScheduler.lockingScheduler(new WriteTask());

@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -1292,7 +1292,7 @@ final class Http2TubeSubscriber implements TubeSubscriber {
private final ConcurrentLinkedQueue<ByteBuffer> queue
= new ConcurrentLinkedQueue<>();
private final SequentialScheduler scheduler =
SequentialScheduler.synchronizedScheduler(this::processQueue);
SequentialScheduler.lockingScheduler(this::processQueue);
private final HttpClientImpl client;

Http2TubeSubscriber(HttpClientImpl client) {
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -165,7 +165,7 @@ private LineSubscription(Flow.Subscription s,
newline = separator;
upstream = Objects.requireNonNull(subscriber);
cf = Objects.requireNonNull(completion);
scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
scheduler = SequentialScheduler.lockingScheduler(this::loop);
}

@Override
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -583,7 +583,7 @@ public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
AggregateSubscription(List<BodyPublisher> bodies, Flow.Subscriber<? super ByteBuffer> subscriber) {
this.bodies = new ConcurrentLinkedQueue<>(bodies);
this.subscriber = subscriber;
this.scheduler = SequentialScheduler.synchronizedScheduler(this::run);
this.scheduler = SequentialScheduler.lockingScheduler(this::run);
}

@Override
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -36,6 +36,8 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import jdk.internal.net.http.common.BufferSupplier;
@@ -163,16 +165,22 @@ void signalClosed() {
*/
private static class SocketFlowTask implements RestartableTask {
final Runnable task;
private final Object monitor = new Object();
private final Lock lock = new ReentrantLock();
SocketFlowTask(Runnable task) {
this.task = task;
}
@Override
public final void run(DeferredCompleter taskCompleter) {
try {
// non contentious synchronized for visibility.
synchronized(monitor) {
// The logics of the sequential scheduler should ensure that
// the restartable task is running in only one thread at
// a given time: there should never be contention.
boolean locked = lock.tryLock();
assert locked : "contention detected in SequentialScheduler";
try {
task.run();
} finally {
if (locked) lock.unlock();
}
} finally {
taskCompleter.complete();
@@ -101,7 +101,7 @@

final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
final SequentialScheduler sched =
SequentialScheduler.synchronizedScheduler(this::schedule);
SequentialScheduler.lockingScheduler(this::schedule);
final SubscriptionBase userSubscription =
new SubscriptionBase(sched, this::cancel, this::onSubscriptionError);

@@ -840,7 +840,7 @@ class RequestSubscriber implements Flow.Subscriber<ByteBuffer> {
this.contentLength = contentLen;
this.remainingContentLength = contentLen;
this.sendScheduler =
SequentialScheduler.synchronizedScheduler(this::trySend);
SequentialScheduler.lockingScheduler(this::trySend);
}

@Override
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -271,7 +271,7 @@ public void run() {

Reader() {
super();
scheduler = SequentialScheduler.synchronizedScheduler(
scheduler = SequentialScheduler.lockingScheduler(
new ReaderDownstreamPusher());
this.readBuf = ByteBuffer.allocate(1024);
readBuf.limit(0); // keep in read mode
@@ -777,10 +777,10 @@ private void processData() {
try {
if (debugw.on())
debugw.log("processData, writeList remaining:"
+ Utils.remaining(writeList) + ", hsTriggered:"
+ Utils.synchronizedRemaining(writeList) + ", hsTriggered:"
+ hsTriggered() + ", needWrap:" + needWrap());

while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) {
while (Utils.synchronizedRemaining(writeList) > 0 || hsTriggered() || needWrap()) {
ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY);
EngineResult result = wrapBuffers(outbufs);
if (debugw.on())
@@ -823,7 +823,7 @@ private void processData() {
}
}
}
if (completing && Utils.remaining(writeList) == 0) {
if (completing && Utils.synchronizedRemaining(writeList) == 0) {
if (!completed) {
completed = true;
writeList.clear();
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2016, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -27,6 +27,8 @@

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static java.util.Objects.requireNonNull;

@@ -177,6 +179,36 @@ protected void run() {
}
}

/**
* A task that runs its main loop within a block protected by a lock to provide
* memory visibility between runs. Since the main loop can't run concurrently,
* the lock shouldn't be contended and no deadlock should ever be possible.
*/
public static final class LockingRestartableTask
extends CompleteRestartableTask {

private final Runnable mainLoop;
private final Lock lock = new ReentrantLock();

public LockingRestartableTask(Runnable mainLoop) {
this.mainLoop = mainLoop;
}

@Override
protected void run() {
// The logics of the sequential scheduler should ensure that
// the restartable task is running in only one thread at
// a given time: there should never be contention.
boolean locked = lock.tryLock();
assert locked : "contention detected in SequentialScheduler";
try {
mainLoop.run();
} finally {
if (locked) lock.unlock();
}
}
}

private static final int OFFLOAD = 1;
private static final int AGAIN = 2;
private static final int BEGIN = 4;
@@ -359,4 +391,20 @@ public void stop() {
public static SequentialScheduler synchronizedScheduler(Runnable mainLoop) {
return new SequentialScheduler(new SynchronizedRestartableTask(mainLoop));
}

/**
* Returns a new {@code SequentialScheduler} that executes the provided
* {@code mainLoop} from within a {@link LockingRestartableTask}.
*
* @apiNote This is equivalent to calling
* {@code new SequentialScheduler(new LockingRestartableTask(mainLoop))}
* The main loop must not perform any blocking operation.
*
* @param mainLoop The main loop of the new sequential scheduler
* @return a new {@code SequentialScheduler} that executes the provided
* {@code mainLoop} from within a {@link LockingRestartableTask}.
*/
public static SequentialScheduler lockingScheduler(Runnable mainLoop) {
return new SequentialScheduler(new LockingRestartableTask(mainLoop));
}
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -96,7 +96,7 @@ public SubscriberWrapper()
errorCommon(t);
});
this.pushScheduler =
SequentialScheduler.synchronizedScheduler(new DownstreamPusher());
SequentialScheduler.lockingScheduler(new DownstreamPusher());
this.downstreamSubscription = new SubscriptionBase(pushScheduler,
this::downstreamCompletion);
}
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015, 2020, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2015, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -655,33 +655,33 @@ public static long remaining(ByteBuffer[] bufs) {
}

public static boolean hasRemaining(List<ByteBuffer> bufs) {
synchronized (bufs) {
for (ByteBuffer buf : bufs) {
if (buf.hasRemaining())
return true;
}
for (ByteBuffer buf : bufs) {
if (buf.hasRemaining())
return true;
}
return false;
}

public static long remaining(List<ByteBuffer> bufs) {
long remain = 0;
synchronized (bufs) {
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
}
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
}
return remain;
}

public static long synchronizedRemaining(List<ByteBuffer> bufs) {
synchronized (bufs) {
return remaining(bufs);
}
}

public static int remaining(List<ByteBuffer> bufs, int max) {
long remain = 0;
synchronized (bufs) {
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
if (remain > max) {
throw new IllegalArgumentException("too many bytes");
}
for (ByteBuffer buf : bufs) {
remain += buf.remaining();
if (remain > max) {
throw new IllegalArgumentException("too many bytes");
}
}
return (int) remain;
@@ -86,7 +86,7 @@
* @library /test/lib http2/server
* @build Http2TestServer LineBodyHandlerTest HttpServerAdapters
* @build jdk.test.lib.net.SimpleSSLContext
* @run testng/othervm LineBodyHandlerTest
* @run testng/othervm -XX:+UnlockDiagnosticVMOptions -XX:DiagnoseSyncOnValueBasedClasses=1 LineBodyHandlerTest
*/

public class LineBodyHandlerTest implements HttpServerAdapters {
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -161,7 +161,7 @@ private static class LateBindingTube implements FlowTube {
final ConcurrentLinkedQueue<Consumer<Flow.Subscriber<? super List<ByteBuffer>>>> queue
= new ConcurrentLinkedQueue<>();
AtomicReference<Flow.Subscriber<? super List<ByteBuffer>>> subscriberRef = new AtomicReference<>();
SequentialScheduler scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
SequentialScheduler scheduler = SequentialScheduler.lockingScheduler(this::loop);
AtomicReference<Throwable> errorRef = new AtomicReference<>();
private volatile boolean finished;
private volatile boolean completed;