Skip to content

Commit

Permalink
Fixes #4058 - Review Locker.
Browse files Browse the repository at this point in the history
Removes the Locker class, replaced by AutoLock.
Removed usages of Locker.isLocked() from the session code
since it was not necessary.
Took the chance to do a little code cleanup.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Sep 9, 2019
1 parent 103a506 commit 692c017
Show file tree
Hide file tree
Showing 16 changed files with 215 additions and 316 deletions.
47 changes: 18 additions & 29 deletions jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java
Expand Up @@ -36,7 +36,7 @@
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Scheduler;

/**
Expand Down Expand Up @@ -68,24 +68,13 @@ public class ByteArrayEndPoint extends AbstractEndPoint

private static final ByteBuffer EOF = BufferUtil.allocate(0);

private final Runnable _runFillable = new Runnable()
{
@Override
public void run()
{
getFillInterest().fillable();
}
};

private final Locker _locker = new Locker();
private final Condition _hasOutput = _locker.newCondition();
private final Runnable _runFillable = () -> getFillInterest().fillable();
private final AutoLock _lock = new AutoLock();
private final Condition _hasOutput = _lock.newCondition();
private final Queue<ByteBuffer> _inQ = new ArrayDeque<>();
private ByteBuffer _out;
private boolean _growOutput;

/**
*
*/
public ByteArrayEndPoint()
{
this(null, 0, null, null);
Expand Down Expand Up @@ -138,7 +127,7 @@ public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input,
public void doShutdownOutput()
{
super.doShutdownOutput();
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
_hasOutput.signalAll();
}
Expand All @@ -148,7 +137,7 @@ public void doShutdownOutput()
public void doClose()
{
super.doClose();
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
_hasOutput.signalAll();
}
Expand Down Expand Up @@ -180,7 +169,7 @@ protected void execute(Runnable task)
@Override
protected void needsFillInterest() throws IOException
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
if (!isOpen())
throw new ClosedChannelException();
Expand All @@ -205,7 +194,7 @@ public void addInputEOF()
public void addInput(ByteBuffer in)
{
boolean fillable = false;
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
if (isEOF(_inQ.peek()))
throw new RuntimeIOException(new EOFException());
Expand Down Expand Up @@ -238,7 +227,7 @@ public void addInput(String s, Charset charset)
public void addInputAndExecute(ByteBuffer in)
{
boolean fillable = false;
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
if (isEOF(_inQ.peek()))
throw new RuntimeIOException(new EOFException());
Expand All @@ -263,7 +252,7 @@ public void addInputAndExecute(ByteBuffer in)
*/
public ByteBuffer getOutput()
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
return _out;
}
Expand Down Expand Up @@ -293,7 +282,7 @@ public ByteBuffer takeOutput()
{
ByteBuffer b;

try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
b = _out;
_out = BufferUtil.allocate(b.capacity());
Expand All @@ -314,7 +303,7 @@ public ByteBuffer waitForOutput(long time, TimeUnit unit) throws InterruptedExce
{
ByteBuffer b;

try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
while (BufferUtil.isEmpty(_out) && !isOutputShutdown())
{
Expand Down Expand Up @@ -351,15 +340,15 @@ public String takeOutputString(Charset charset)
*/
public void setOutput(ByteBuffer out)
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
_out = out;
}
getWriteFlusher().completeWrite();
}

/**
* @return <code>true</code> if there are bytes remaining to be read from the encoded input
* @return {@code true} if there are bytes remaining to be read from the encoded input
*/
public boolean hasMore()
{
Expand All @@ -373,7 +362,7 @@ public boolean hasMore()
public int fill(ByteBuffer buffer) throws IOException
{
int filled = 0;
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
while (true)
{
Expand Down Expand Up @@ -418,7 +407,7 @@ else if (filled < 0)
public boolean flush(ByteBuffer... buffers) throws IOException
{
boolean flushed = true;
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
if (!isOpen())
throw new IOException("CLOSED");
Expand Down Expand Up @@ -467,7 +456,7 @@ public boolean flush(ByteBuffer... buffers) throws IOException
@Override
public void reset()
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
_inQ.clear();
_hasOutput.signalAll();
Expand Down Expand Up @@ -507,7 +496,7 @@ public String toString()
int q;
ByteBuffer b;
String o;
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
q = _inQ.size();
b = _inQ.peek();
Expand Down
Expand Up @@ -52,7 +52,7 @@
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.ThreadPoolBudget;
Expand Down Expand Up @@ -144,8 +144,8 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
{
protected static final Logger LOG = Log.getLogger(AbstractConnector.class);

private final Locker _locker = new Locker();
private final Condition _setAccepting = _locker.newCondition();
private final AutoLock _lock = new AutoLock();
private final Condition _setAccepting = _lock.newCondition();
private final Map<String, ConnectionFactory> _factories = new LinkedHashMap<>(); // Order is important on server side, so we use a LinkedHashMap
private final Server _server;
private final Executor _executor;
Expand Down Expand Up @@ -231,7 +231,7 @@ public void beanRemoved(Container parent, Object bean)
* Get the {@link HttpChannel.Listener}s added to the connector
* as a single combined Listener.
* This is equivalent to a listener that iterates over the individual
* listeners returned from <code>getBeans(HttpChannel.Listener.class);</code>,
* listeners returned from {@code getBeans(HttpChannel.Listener.class);},
* except that: <ul>
* <li>The result is precomputed, so it is more efficient</li>
* <li>The result is ordered by the order added.</li>
Expand Down Expand Up @@ -332,7 +332,7 @@ protected void doStart() throws Exception

protected void interruptAcceptors()
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
for (Thread thread : _acceptors)
{
Expand Down Expand Up @@ -387,7 +387,7 @@ public void join() throws InterruptedException

public void join(long timeout) throws InterruptedException
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
for (Thread thread : _acceptors)
{
Expand All @@ -404,15 +404,15 @@ public void join(long timeout) throws InterruptedException
*/
public boolean isAccepting()
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
return _accepting;
}
}

public void setAccepting(boolean accepting)
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
_accepting = accepting;
_setAccepting.signalAll();
Expand All @@ -422,7 +422,7 @@ public void setAccepting(boolean accepting)
@Override
public ConnectionFactory getConnectionFactory(String protocol)
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
return _factories.get(StringUtil.asciiToLowerCase(protocol));
}
Expand All @@ -431,7 +431,7 @@ public ConnectionFactory getConnectionFactory(String protocol)
@Override
public <T> T getConnectionFactory(Class<T> factoryType)
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
for (ConnectionFactory f : _factories.values())
{
Expand Down Expand Up @@ -683,7 +683,7 @@ public void run()
{
while (isRunning())
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
if (!_accepting && isRunning())
{
Expand Down
Expand Up @@ -57,7 +57,7 @@
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ShutdownThread;
import org.eclipse.jetty.util.thread.ThreadPool;
Expand All @@ -83,8 +83,7 @@ public class Server extends HandlerWrapper implements Attributes
private boolean _dumpBeforeStop = false;
private ErrorHandler _errorHandler;
private RequestLog _requestLog;

private final Locker _dateLocker = new Locker();
private final AutoLock _dateLock = new AutoLock();
private volatile DateField _dateField;

public Server()
Expand Down Expand Up @@ -315,7 +314,7 @@ public HttpField getDateField()

if (df == null || df._seconds != seconds)
{
try (Locker.Lock lock = _dateLocker.lock())
try (AutoLock lock = _dateLock.lock())
{
df = _dateField;
if (df == null || df._seconds != seconds)
Expand Down
Expand Up @@ -48,7 +48,7 @@
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.AutoLock;

/**
* <p>Handler to limit the threads per IP address for DOS protection</p>
Expand Down Expand Up @@ -241,7 +241,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
}
}

protected Remote getRemote(Request baseRequest)
private Remote getRemote(Request baseRequest)
{
Remote remote = (Remote)baseRequest.getAttribute(REMOTE);
if (remote != null)
Expand Down Expand Up @@ -329,11 +329,11 @@ private String getXForwardedFor(Request request)
return (comma >= 0) ? forwardedFor.substring(comma + 1).trim() : forwardedFor;
}

private final class Remote implements Closeable
private static final class Remote implements Closeable
{
private final String _ip;
private final int _limit;
private final Locker _locker = new Locker();
private final AutoLock _lock = new AutoLock();
private int _permits;
private Deque<CompletableFuture<Closeable>> _queue = new ArrayDeque<>();
private final CompletableFuture<Closeable> _permitted = CompletableFuture.completedFuture(this);
Expand All @@ -346,7 +346,7 @@ public Remote(String ip, int limit)

public CompletableFuture<Closeable> acquire()
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
// Do we have available passes?
if (_permits < _limit)
Expand All @@ -358,16 +358,16 @@ public CompletableFuture<Closeable> acquire()
}

// No pass available, so queue a new future
CompletableFuture<Closeable> pass = new CompletableFuture<Closeable>();
CompletableFuture<Closeable> pass = new CompletableFuture<>();
_queue.addLast(pass);
return pass;
}
}

@Override
public void close() throws IOException
public void close()
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
// reduce the allocated passes
_permits--;
Expand Down Expand Up @@ -396,14 +396,14 @@ public void close() throws IOException
@Override
public String toString()
{
try (Locker.Lock lock = _locker.lock())
try (AutoLock lock = _lock.lock())
{
return String.format("R[ip=%s,p=%d,l=%d,q=%d]", _ip, _permits, _limit, _queue.size());
}
}
}

private final class RFC7239 extends QuotedCSV
private static final class RFC7239 extends QuotedCSV
{
String _for;

Expand Down

0 comments on commit 692c017

Please sign in to comment.