# Concurent programming

Sequential Programming: is a form of computing that executes the same sequence of instructions and always produces the same result
- execution is deterministic (assuming no deliberate use of randomness)
- Textual order of the source code doesn't define the order of execution

Concurrent Programming: is a form of computing where threds cn run simultaneously

Different executions of a concurent program may produce different instruction offerings:
- Texture order of source code define the order of execution

```
new Thread(() ->
           computationA()).
           Start();

new Thread(() -> 
           computationB()).
           Start();
           
new Thread(() ->
           computationC()).
           start();
```
computationA(), computationB(), and computationC() can run in any order after their threads start executing.
- Operations are permitted to overlap in time across multiple cores
- Concurrent programming is often used to offlead work form user(UI) thread to background thread(s)
- - Background thread(s) can block
- - UI threads does not block
- - Any mutable state shared between these threads must be protectted to avoid concurrency hazards
- - - Motivates the need for various types of synchronizers

## Concurrency in Java
Concurrency is commmonly used to offload work from the user interface(UI) threads to background threads()

Java Thread is an object 
- it contains methods and field

|Class Thread|
|-|
|java.lang Object java.lang.Thread|
|All Implemented Interface: Runnable|
|Direct Known Subclasses: ForkJoinWorkerThread|
|public class Thread extends Object implements Runnable|
|A thread is a thread of execution in a program. lThe Java Virtual Machine allows an application oto have multiple threads of execution running concurrently|
|Every thread has a priority. Threads with higher priority are executed in preference to threads with lower priority. Each thread may or may not also be marked as a daemon. When code running in some thread creates a new Thread object, the new thread has its prtiority initially set equal to the priority of the creating thread, and is a daemon thread if and only if the creating thread is a daemon.

Each Java thread has its own unique stack, registers, thread-specific storage, etc.

<img src="media/threadstack.png"/>

A Java thread can be in one of the various states
- a Java thread starts out in `New` state

<img src="media/threadstate.png"/>

Concurrent Java threads interact via shared objects and/or messages

### Shared objects
- Synchronized concurrent operations on objects to ensure certain properties
- - Mutual Exclusion
- - - Interation between treads does not corrupt shared mutable data
- - Coordination
- - - Operations occur in the right order at the right time and under the right conditions
<img src="media/sharedobject.png"/>

Example of Java synchronizers:
- Synchronized statements/methods
- Reentrant locks and intrinsic locks
- Atomic operations
- Semaphores
- Condition objects
- "Compare-and-swap" CAS operation in sun.misc.unsafe

### Message passing
- Send messages(s) from producer thread(s) to consumer thread(s) via a threaed-safe queue
<img src="media/messagepassing.png"/>

Examples of Java thread-safe quueues
- Array and linked blocking queues
- Priority blocking queue
- Synchronous queue
- Concurrent linked queue


Java shard objects and message passing are designed to share resources safely and avoid concurrency hazards
- Race conditions: race conditions occur when a program depend upon the sequence or timing of threads for it to operate properly
- Memory inconsistencies: errors that occur when different threads have inconsistent views of what should be the same data
- Deadlocks: Occurs when 2+ competing threads are waiting for the others to finish, and huse never ever do.

# Introduction to Threads
Concurrent apps use threads to simultaneously run multiple computations that potentially interact with each other
- Threads are the most basic way of obtaining concurrency in Java
- A Java thread is a unit of computation that runs in the context of a process
- Process: A process is a unit of resource allocation and protection in Java
- A Java thread runs on one or more cores during its lifetime
- Java enables multiple threads to run in multiple processes atop multiple cores
<img src="media/processthread.png"/>

- Java threads running in the same process can communicate with each other via shard objects or message passing

- Java threads in different processes communicate via shard memory or inter-process communication (IPC) mechanism
<img src="media/interprocesscommunication.png"/>


Each Java thread leverage unique "state" from the underylying OS thread, e.g., a runtime stack, an instruction counter and other registers.
<img src="media/underlyingosthread.png"/>


## Ways of Giving Code to Java Threads
Java threads must be given code to run
- Do not use the "no argument" Thread constructor directly!!!!
```
Thread t = new Thread();
t.start();
```

There are 2 ways to give code to Java Threads
1. Extend the Thread class
```
public class GCDThread extends Thread {
    public void run() {
        // code to run goes here
    }
}
```
Create and start a thread using a named subclass of Thread
```
Thread gCDThread = new GCDThread();
gCDThread.start();
```
2. Implement the Runnable interface
- Implement the run() hook method of an interface to define the thread's computations
```
public class GCDRunnable implements Runnable {
    public void run() {
        // code to run goes here
    }
}
Runnable gCDRunnable = new GCDRunnable();
new Thread(gcdRunnable).start();
```
Pass the runnable to a new thread object and start it.


or we can Create and start a thead by using an anonymous inner class as the runnable
```
new Thread(new Runnable() {
    public void run(){
        // code to run goes here
    }
}).start();
```
3. Use Java 8 lambda expression (variant of using Runnable interface)
- A lambda expression is an unnamed black of code (with optional parameters) that can be passed around and execute later
```
new Thread(() -> {
    // code to run goes here
}).start();
```
- This approach is unwieldy if the cod to run is long, complex, or needs to be used multiple times
- Another approach is to store the runnable in a variable and pass it to the Thread constructor
```
Runnable r = () -> {
    // code to run goes here
};
new Thread(r).start();
```

## Passing Parameters to a Java Thread
- The run() methods defined in Java Thread and Runnable take no parameters
```
<<Java Class>>
(C) Thread
run(): void
```
-----------
```
<<Java Interface>>
(I) Runnable
run():void
```

### Parameters passed to run() can be supplied via one of two other means, e.g.
1. As parameters to a class constructor
- - By passing the parameter(s) when the runnable or thread is created

In [79]:
public class MainActivity {
    public void runnable() {
        System.out.println("MainThread:" + Thread.currentThread());
        new Thread(new GCDRunnable(this)).start();
    }
    
    public void println(String s) {        
        System.out.println(s);
    }
    
}

In [80]:
public class GCDRunnable implements Runnable {
    private final MainActivity mActivity;
    
    public GCDRunnable(MainActivity mainActivity) {
        mActivity = mainActivity;
    }
    
    public void run() {
        final String threadString = "Spin off thread: " + Thread.currentThread().toString(); 
        mActivity.println(threadString);
    }
}

In [81]:
MainActivity main = new MainActivity();
main.runnable();

2. As parameters to `setter` methods

In [148]:
public class MainActivity {
    public void runnable() {
        System.out.println("MainThread:" + Thread.currentThread());
        Thread gCDThread = new GCDThread().setActivity(this);
        gCDThread.start();
    }
    
    public void println(String s) {        
        System.out.println(s);
    }
}

In [149]:
public class GCDThread extends Thread {
    private MainActivity mActivity;
    
    public GCDThread setActivity(MainActivity activity) {
        this.mActivity = activity;
        return this;
    }
    
    public void run() {
        final String threadString = "Spin off thread: " + Thread.currentThread().toString(); 
        mActivity.println(threadString);
    }
}

In [150]:
MainActivity main = new MainActivity();

In [151]:
main.runnable();

MainThread:Thread[IJava-executor-6,5,main]
Spin off thread: Thread[Thread-33,5,main]

Use Fluent interface to pass parameter(s) when the runnable or thread is created.

# How Java Threads Run
There are multiple layers involved in creating and starting a thread

|Treading and Synchronization Packages|
|-|
|Java Execution Environment (e.g., JVM, ART, etc)|
|System Libraries|
|Operating System Kernal|
|Cores|

- Creating a new thread object doens't allocate a run-time call stack of activation records
- The runtime stack and other thread resources are only allocated after the start() method is called
- The Java execution environment calls a thresd's run() hood method after start() creates its resources
- Each thread can run concurrently and block independently
- Any code can generally run in a thread
- A thread can live as long is its run() hook method hasn't returned
- - The underlying thread scheduler can suspend and resume a thread many times during its lifecycle
- - - Scheduler operations are largely invisible to the user code, as long as synchronization is performed properly
- For a thread to execute `forever`, its run() hook method needs an infinite loop
```
public void run() {
    while(true) { ... }
}
```
- The tread is dead after run() returns 
- - A thread can end normally
- - Or an uncought exception can be thrown
- The join() method allows one thread to waif for another thread to complete, which is a simple form of `barrier synchronization`
- - Or a thread can simply evaporate!
- - The Java execution environment recycles thread resources
- - - e.g., runtime stack of activation records, thread-specific storage, etc.
<img src="media/runningthreads.png" />

### Key Methods in Java Thread Class
Certain Java Thread class methods are used in many concurrent Java programs, e.g.,
- `void setDaemon()`
- - Marks thread as `daemon`
- `void start()`
- - Allocates thread resources and initiates thread execution by calling the run() hook method
- - the start() method can only be called once per thread object
- `void run()`
- - Hook method where user code is supplied
- `void join()`
- - Waits for a thread to finish
- `void sleep(long time)`
- - Sleeps for given time in ms
- `Thread currentThread()`
- - Object for current Thread
- `void interrupt()`
- - Post an interrupt request to a Thread
- `boolean isInterrupted()`
- - Tests whether a thread has been interrupted
- `boolean interrupted()`
- - Test whether current thread has been interrupted 
- `void setPriority(int newPriority)` and `int getPriority()`
- - Set and get the prioprity of a Thread

## Types of Java Threads
There are types of threads in Java: user threads and daemon threads

When the JVM starts it contains a singer user thread
- Known as the `main thread`

User threads and daemon threads differ in what happens when they exit
- The lifecycle of a user thread can outlive the main thread
- All daemon threads terminate automatically when all user threads terminate 
- The JVM itself exits when all user threads are all daemon threads


Java use daeomon threads in ustility roles in the java.util.concurrent package
- e.g., the ForkJoinPool and Timer classes


#### Java User Threads VS. Daemon Threads

Example that extends the Thread class.

In [106]:
import java.util.Random;
/**
 * This class demonstrates the difference b etween a user thread and daemon thread. If its constructor
 * is passed "true" it becomes a daemon thread, which exits when the main thread exits.
 * If it's passed "false" it's aa "user" thread, which can continue to run even after the main thread exits.

 * The main() function demonstrates the difference between a Java user thread and a daemon thread. If it's launched with no command-line
 * parameters the main thread creates a user thread, which can outlive the main thread (i.e., it )
 * continues to run after the main thread exits). If it's launched with a command-line parameter then it creates a daemon thread which exits when the main thread exits.
 */
public class UserOrDaemonThread extends Thread {
    /** 
     * Keep track of whether this is a "user" or a "daemon" thread. 
     */
    private final String threadType;
    
    /** 
     * Number of times to iterate, which is 100 million to ensure the program runs fro a while 
     */
    private final int MAX_ITERATIONS = 1000000000;
    
    /**
     * Constructor determines what type of thread is being created.
     */
    public UserOrDaemonThread(Boolean daemonThread) {
        if (daemonThread) {
            // Become a daemon thread (setDaemon() obtained from the superclass).
            setDaemon(true);
            threadType = "daemon";
        } else {           
            threadType = "user";
        }
    }
    
    /**
     * Provides a recursive implementation of Euclid's algorithm to compute the "greatest common divisor" (GCD), which is the
     * largest positive integer that divides two integers without a remainder.
     */
    private int computeGCD(int number1,
                           int number2) {
        // Basis case.
        if (number2 == 0) { return number1; }
        // Recursive call.
        return computeGCD(number2, number1 % number2);
    }
    
    /**
     * Hook method that runs for MAX_ITERATIONs.
     */
    @Override
    public void run() {
        final String threadString = " with " + threadType + " thread id " + Thread.currentThread();
        
        System.out.println("Entering run()" + threadString);
        
        /**
         * Create a new Random number generator. We need to allocate a new Random object dynamically since we can't inherit from
         * Random since we already inherit from Thread and Java only allows single inheritance.
         */
        Random random = new Random();
        
        try {
            // Iterate fro the given # of iterations.
            for (int i = 0; i < MAX_ITERATIONS; ++i) {
                // Generate two random numbers.
                int number1 = random.nextInt();
                int number2 = random.nextInt();
                
                // Print results every 10 million iterations.
                if ((i % 100000000) == 0) {
                    System.out.println("In run()" + threadString + " the GCD of " + number1 + " and " + number2 + " is " + computeGCD(number1, number2));
                }
            }
        } finally {
            System.out.println("Leaving run() " + threadString);
        }
    }
    
    /**
     * Entry point method into the program's main thread, which creates/starts the desired type of threads (i.e., either "user" or "daemon") and sleeps for 1 second
     * while that threads runs in the background. If a "daemon" thread is created it will only run as long as the main thread runs. Conversely, if a "user" thread is
     * created it will continue to run even after the main thread exits.
     */
    public static void main(String[] args) {
        System.out.println("Entering main()");
        
        // Create a "daemon" thread if any command-line parameter is passed to the program.
        final Boolean daemonThread = args.length > 0;
        System.out.println("Daemon?: " + daemonThread);
        // Create the appropriate type of thread (i.e., "user" or "daemon").
        UserOrDaemonThread thr = new UserOrDaemonThread(daemonThread);
        
        // Start the thread.
        thr.start();
        
        // Sleep for 1 second and then exit.
        try {
            Thread.sleep(1000);
        } catch (InterruptedException x) {}
        
        System.out.println("Leaving main()");
    } 

}

In [107]:
String[] user = {};

In [108]:
String[] daemon = {"daemon"};

In [115]:
UserOrDaemonThread.main(user);

Entering main()
Daemon?: false
Entering run() with user thread id Thread[Thread-38,5,main]
In run() with user thread id Thread[Thread-38,5,main] the GCD of -755717759 and -231356454 is -1
In run() with user thread id Thread[Thread-38,5,main] the GCD of 1315083374 and -1885106546 is 2
Leaving main()


In [116]:
UserOrDaemonThread.main(daemon);

Entering main()
Daemon?: true
Entering run() with daemon thread id Thread[Thread-39,5,main]
In run() with daemon thread id Thread[Thread-39,5,main] the GCD of 2086051485 and -1620330562 is 1
In run() with user thread id Thread[Thread-38,5,main] the GCD of -1128243862 and -1250606574 is -2
Leaving main()
In run() with daemon thread id Thread[Thread-39,5,main] the GCD of -681687412 and 1465773539 is -1


Java doesn't allow multiple inheritence of classes, so implement Runnable

In [118]:
import java.util.Random;

    /**
     * Computes the greatest common divisor (GCD) of two numbers.
     */
    public class GCDRunnable 
        extends Random // Inherits random number generation capabilities
        implements Runnable {
        /**
         * Keep track of whether this is a "user" or a "daemon" thread.
         */
        private final String threadType;
        
        /**
         * Number of times to iterate, which is 100 million to ensure the program runs fro a while.
         */
        private final int MAX_ITERATIONS = 100000000;
        
        /**
         * Constructor determines what type of thread is being created.
         */
        public GCDRunnable(String threadType) {
            this.threadType = threadType;
        }
        
        /**
         * Provides a recursive implementation of Euclid's algorithm to compute the "greatest common divisor"
         * (GCD), which is the largest positive integer that divides two integers without a remainder.
         */
        private int computeGCD(int number1, int number2) {
            // Basis case.
            if (number2 == 0) {
                return number1;
            }
            // Recursive call.
            return computeGCD(number2, number1 % number2);
        }
        
        /**
         * Hook method that runs for MAX_ITERATIONs, sleeping for half a second at a time.
         */
        public void run() {
            final String threadString = " with " + threadType + " thread id " + Thread.currentThread();
            
            System.out.println("Entering run()" + threadString);
            
            try {
                // Iterate for the give number of times.
                for(int i = 0; i < MAX_ITERATIONS; ++i) {
                    // Generate two random numbsers (nextInt() obtained from Random superclass).
                    int number1 = nextInt();
                    int number2 = nextInt();
                    
                    // Print results every 10 million iterations.
                    if((i % 10000000) == 0) {
                        System.out.println("In run()" + threadString + " the GCD of " + number1 + " and " 
                                           + number2 + " is " + computeGCD(number1, number2));
                    }
                }
            } finally {
                System.out.println("Leaving run() " + threadString);
            }
        }
    }

In [None]:
/**
 * This program demonstrates the difference between a Java user thread and a daemon thread. If it's launched with no command-line
 * parameters the main thread creates a user thread, which can outlive the main thread (i.e., it continues to run even after the main
 * thread exists). If it's launched with a command-line parameter then it creates a daemon thread, which exists when the main thread exits.
 */
public class TestGCDRunnable {
    /**
     * Entry point method into the program's main thread, which creates/starts the desired type of thread (i.e., either "user"
     * or "daemon") and sleeps for 1 second while that thread runs in the background. If a "daemon" thread is created it will only
     * run as long as the main thread runs. Conversely, if a "user" thread is created it will to run even after the main thread exits.
     */
    public static void main(String[] args) {
        System.out.println("Entering main()");
        
        // Creates a "daemon" thread if any command-line parameter is passed to the program.
        final boolean daemonThread = args.length > 0;
        
        // Create the GCD Runnable, passing in the type of thread it runs in (i.e., "user" or "daemon").
        GCDRunnable runnableCommand = new GCDRunnable(daemonThread?)
        
    }
}

### Pros and Cons of Java Thread Programming Models

Pros of extending Thread:
- It's straigntforward to extend the Thread super class
- All state and methods are consolidated in one place
- - Enables central allocation and management of the thread
- - This design is useful when the thread must be updated during runtime configuration changes
- - - e.g., interrupting/restarting a running thread and reading/writing its state

Cons with extending Thread:
- A subclass must extend the Thread superclass
- - This is restrictive since Java only allows one superclass per subclass!

Pros of implementing Runnable:
- A subclass can implement multiple interfaces
- - Which enables it to extend a different superclass
- Runnables are flexible since they can be reused in other contexts

Cons of implementing Runnable:
- Yields more "moving parts"
- - e.g., Runnable and Thread are separate entities and must be managed/accessed separately

## Java Memory Models
Java's memory model defines semantics of multi-threaded access to shared memory
- Which instruction reorderings are allowed in memory (out of order)
- - Potential source of reordering, e.g., the Java compiler, the Just-In-Time (JIT) compiler, processor instruction pipelines, caches, etc.
- - Should not be overly restrictive, to enable hardware obtimization
- - Which program outputs may occur in a correct JVM implementation

## Java Synchronizers
Java synchronizer is an object used to control the flow of cooperating threads based on its state

Java synchronizers ensure interactions between threads obey certain properties
- Don't corrupt shared mutable state
- Occur in the right order, at the right time, and under the right conditions

Pervasiveness of Java Synchronizer Classes

|Applications|
|--|
|Additional Frameworks and Languages|
|Threading and Synchronization Packages|
|Java Execution Environment (e.g., JVM, ART, etc)|
|System Libraries|
|Operating System Kernel|

- Multiple layers of synchronizers are provided on the Java platform
- - (Java Execution Environment) The Java language contains some features that synchronize threads, e.g., volatile variables and built-in monitor objects
- - (Threading and Synchronization Packages) Other synchronizers are provided by the Java Class Library e.g., Java atomics, locks, and other synchronizers

Synchronization complexity arises from coordinating the interactons of entities that run concurrently

## Types of Java Synchronizer Capabilities
Recognize the types of capabilites provided by Java synchronizers

|Category|Definition|
|--|--|
|Atomic operations|An action that effectively happens all at once or not at all|
|Mutual exclusion|Allows concurrent access and updates to shared mutable data without race conditions|
|Coordination|Ensures computations run properly, e.g., in the right order, at the right time, under the right conditions, etc.|
|Barrier synchronization|Ensures that any thread(s) must stop at a certain point and cannot proceed until all other thread(s) reach this barrier e.g. (join)|


### Atomic Ordering
- Ensures an action happens all at once or none at all
- Operations on a field in thread 1 occur all at once with respect to operations on the field in thread2..n
- **Atomicity does not occur on primitive Java types withot using synchronizers**
- Atomic ordering is supported by the Java atomic package
- Atomic ordering is also supported by the Java volatile type qualifier

The volitile type qualifier ensures that read and write of a variable is atomic, that means read/write is on the main memory and not cached

|Thread1|Thread2||L field|
|--|--|--|--|
|initialized|||0|
|read field||<-|0|
|increase field by 1||||
|write back||->|1|
||read field|<-||1|
||increase field by 1||1|
||write back|->|2|

### Mutual Exclusion
- Prevents simultaneous access to a shared resource in a critical section
- - Race conditions occur when a program depends on the sequence or timing of threads for it to operate properly
- Read/Write conflicts
- - If one thread reads while another thread writes concurrently, the field that's read may be consistent

|Thread1|Thread2||L field|
|--|--|--|--|
|initialized|||0|
|read field||<-|0|
|increase field by 1|||0|
|write back|read field|<- or ->|0 or 1?|

- Write/Write conflicts
- - If two threads try to write to same field concurrently, the result may be inconsistent
- - This can yield a "lost update"

|Thread1|Thread2||L field|
|--|--|--|--|
|initialized|||0|
|read field||<-|0|
||read field|<-|0|
|increase field by 2|||0|
||increase field by 1||0|
|write back||write back|->|1 or 2?|

These problems ofter occur in multi-core procesors with "weak" memory ordering due to core caches that allow "out-of-order" load and store operations

- Mutual exclusion is supported by the Java locks package
- - e.g., ReentrantLock, Reentrant ReadWriteLock, StampedLock, etc.
- Mutual exclusion is also supported by the *synchronized* keyword in Java built-in monitor objects

### Coordination
- Ensures computations run properly, e.g.
- - In the right order
- - At the right time
- - Under the right conditions
- Coordination is supported by the Java concurrent and locks package
- - e.g., ConditionObject, Semaphore, etc.
- - Coordination is also supported by Java built-in monitor objects

### Barrier synchronization
- Ensures that any thread(s) must stop at a certain point and cannot proceed until all thread(s) reach the barrier
- Barrier synchronization is supported by the Java concurrent package
- - e.g., CountDownLatch, CyclicBarrier, Phaser, etc.

## Java Synchronizer Classes
Key Synchronizers defined in the Java class library

|Java Class|Purpose|
|--|--|
|ReentrantLock|A reentrant mutual exclusion lock that extends the built-in monitor lock capabilities|
|ReentrantRead WriteLock|Improves performance when resources are read much more often than written|
|StampedLock|A readers-writer lock that's more efficient than ReentrantReadWriteLock|
|Semaphore|Maintains permits that controls thread access to limited # of shared resources|
|ConditionObject|Allows Thread to block until a condition becomes true|
|CountDownLatch|Allows one or more threads to wait until a set of operations being performed in othr thread complete|
|CyclicBarrier|Allows a set of threads to all wait for each other to reach a common barrier point|
|Phaser|A more flexible reusable synchronization barrier|


### ReentrantLock
- A mutual exclusive lock that extends built-in monitor lock capabilities
- "Reentrant" means that the thread holding the lock can reacquire it without deadlock
- Must be "fully bracket"
- - A thread that acquires a lock must be the one to release it

### ReentrantReadWriteLock
- Improves performance when resources are read much more often than written

### StampedLock
- A readers-writer lock that's more efficient than a ReentrantReadWriteLock
- Supports "optimistic" reads
- Also supports "lock upgrading"

### Semaphore
- Maintains permits that control thread access to limited # of shard resources
- Operations need not be fully bracketed

### ConditionObject
- Allows a thread to wait until some condition become true
- Always used in conjunction with a ReentrantLock

### CountDownLatch
- Allows one or more threads to wait on the completion of operations in other threads

### CyclicBarrier
- Allows a set of threads to all wait for each other to reach a common barrier point

### Phaser
- A more flexible, reusable and dynamic barrier synchronizer that subsumes CyclicBarrier and CountDownLatch

## Java ReentrantLocks
Overview of Mutual Exclusive Locks
- A mutual exclusion lock (mutex) defines a "critical section"
- - Ensures only one thread can run in a block of code at a time
- - Other threads are kept "at bay" so they don't corrupt shared resources via multiple concurrent operations
- - - Race conditions can occur if multiple threads could run within a critical section
- - - Race condition can arise when a program depends on the sequence or timing of threads for it to operate properly
- - - After a thread leaves a critical section another thread can enter and start running

A mutex is typically implemented in hardware via atomic operations
- Atomic operations appear to occur instantaneously and either change the state of the system successful or have no affect
- Implemented in Java via the `compareAndSwap*()` methods in the Unsafe class

Human known use of mutual exclusion locks is an airplane restroom protocol

### Overview of ReentrantLock
provide mutual exclusion to concurrent Java programs

```
public class ReentrantLock implements Lock, java.ip.Serializable {
...
```

|Class ReentrantLock|
|--|
|java.lang.Object java.util.concurrentlocks.ReentrantLock|
|All Implemented Interfaces; Serializable, Lock|
|public class ReentrantLock extends Object implements Lock, Serializable|
|A reentrant mutual exclusion Lock with the same basic behavior and semantics as the implicit monitor lock accessed using synchronized methods and statements, but with extended capabilities.|
|A RentrantLock is owned by the thread last successfully locking, but not yet unlocking it. A thread invoking lock will return, successfully acquiring the lock, when the lock is not owned by another thread. The method qill return immediately if the current thread already owns the lock. This can be checked using methods isHeldByCurrentThread(). and getHoldCount().|

Implements Lock Inteface

|Interface Lock|
|--|
|All Known Implementation Classes: ReentrantLock, ReentrantReadWriteLock.ReadLock, ReentrantReadWriteLock,.WriteLock|
|public interface Lock|
|Lock implementations provide more extensible locking operations that can be obtained using synchronized methods and statements. They allow more flexible structuring, may have quite different properties, and may support multiple associated Condition objects.|
|A lock is a tool for controlling access to a shared resource by multiple threads. Commonly, a lock provides exclusive access to a shared resource: only one thread at a time can acquire the lock and all access to the shared resource requries that the lock be acquired first. However, some locks may allow concurent access to a shared resource, such as the read lock of a ReadWriteLock.|
|The use of synchronized mehtods or statements provides access to the implicit monitor lock associwith every object, but forces all lock acquisition and release to occur in a block-structured way: when multiple locks are acquired they must be released in the opposite order, and all locks must be released in the same lexical scope in which they are acquired.|

#### ReentrantLock applies the Bridge patterns
Decouples its inteface from its implementation so fair and non-fair semantics can be supported uniformly
- Locking handled by sync Implementor hierarchy
- Inherits functionality from AbstractQueuedSynchronizer
- - Many Java synchronizers that rely on FIFO wait queues use this framework
- - But also provides extended capabilities

ReentrantLock is similar to the monitor lock provided by Java's built-in monitor object

|Return Type|Method|
|--|--|
|void|lock() - Acquires the lock|
|void|unlock() - Attempts to release this lock|
|void|lockInterruptibly() - Acquires the lock unless the currently thread is interrupted|
|boolean|tryLock() - Acquires the lock if it is not held by another thread at the time of invocation|
|boolean|tryLock(long timeout, Timeunit unit) - Acquires the lock if it is not held by another thread within the given waiting time and the current thread has not been interrupted|

In contrast, Java's synchronized methods/blocks are not interruptible

ReentrantLock supports "recursive mutex" semantics
- The thead that hold the mutex can reacquire it without self-deadlock
- Recursive mutex semantics add a bit mroe overhead relative to non-recursive semantics dut to additional software logic and synchronization

### Key ReentrantLock Methods

In [1]:
import java.util;
import java.util.concurrent;
import java.util.concurrent.atomic;

/**
 * A reentrant mutual exclusion {@link Lock} with the same basic behaviour and semantics as the implicit monitor lock accessed using
 * {@code synchronized} methods and statements, but with extended capabilities.
 *
 * A {@code ReentrantLock} is owned by the thread last successfully locking, but not yet unlocking it. A thread invoking
 * {@code lock} will return, successfully acquiring the lock, when the lock is not owned by another thread. The mehtod will return
 * immediately if the current thread already owns the lock. This can be checked by using methods {@lind #isHeldCurrentThread}, and {@link #getHoldCount}.
 *
 * The constructor for this class accepts an optional fairness parameter. When set {@code true}, under
 * contention, locks favor granting access to the longest-waiting thread. Otherwise this lock does not guarantee any particular
 * access order. Programs use fair locks accessed by many threads may display overall throughput (i.e., are slower; often much
 * slower) then those using the default setting, but have smaller variance in times to obtain locks and guarantee lack of
 * starvation. Not however, that fairness of locks does not guarantee fairness of thread scheduling. This, one of many threads using a fair lock may
 * obtain it multiple times in succession while other active threads are not progressing and not currently holding the lock.
 * Also note that the untimed {@link #tryLock() tryLock} method does not honor the fairness setting. It will succeed if the lock is available even if other threads are waiting.
 *
 * It is recommended practice to always immediately follow a call to {@code lock} with a {@code try} block, mos typically in a before/after construction such as:
 *
 * class X {
 *   private final ReentrantLock lock = new ReentrantLocjk();
 *   // ...
 *   
 *   public void m() {
 *     lock.lock();   // block until condition holds
 *     try {
 *       // ... method body
 *     } finally {
 *       lock.unlock(); 
 *     }
 *   }
 * }
 *
 * In addition to implementing the {@link Lock} interface, this class defines methods {@code isLocked} and
 * {@code getLockQueueLength}, as well as some associated {@code protected} access methods that may be usedeful for instrumentation and monitoring
 *
 * Serialization of this class behaves in the same way as built-in locks: a deserialized lock is in the unlocked state, regardless of its state when serialized.
 *
 * This lock supports a maximum of 2147483647 recursive locks by the same thread. Attempts to exceed this limit result in {@link Error} throws from locking methods.
 */

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;
    
    /**
     * Base of synchronization control for this lock. Subclassed into fair and nonfair versions below. 
     * Use AQS state to represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final serialVersionUID = -5179523762034025860L;
        
        /** Performs {@link Lock#lock}. The main reason for subclassing is to allow fast path for nonfair version. */
        abstract void lock();
        
        /** Performs non-fair tryLock. tryAcquire is implemented in subclasses, but both need nonfair try and trylock method. */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) {// overflow
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextc);
                return true;
            }
            return false;
        }
        
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
        
        protected final boolean isHeldExclusively() {
            /** While we must in general read state before owner, we don't need to do so to check if current thread is owner */
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
        
        final ConditionObject new Condition() {
            return new ConditionObject();
        }
        
        // Methods relayed from outer class
        
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
        
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
        
        final boolean isLocked() {
            return getState() != 0;
        }
        
        /** 
         * Reconstruct this lock instance from a stream.
         * @param s the steram
         */
        private void readObject(java.io.ObjectInputSteam s) throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlock state
        }
    }
    
    /** Sync object for non-fair locks */
    static final class NonfairSync extend Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        
        /** Performs lock. Try immediate barge, backing up to normal acquire on failure. */
        final void lock() {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(This.currentThread());
            } else {
                acquire(1);
            }
        }
        
        protected final boolean tryAcquire(int acquires) {
            try nonfairTryAcquire(acquires);
        }
    }
    
    /** Sync object for fair locks */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
        
        final void lock() {
            acquire(1);
        }
        
        /** Far version of tryAcquire. Don't grant acess unless recursive call or no waiters or is first. */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) {
                    throw new Error("Maximum lock count exceeded");
                }
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    
    /** Creates an instance of {@code ReentrantLock}. This is equivalent to using {@code ReentrantLock(false)}. */
    public ReentrantLock() {
        sync = new NonFairSync();
    }
    
    /** 
     *Creates an instance of {@code ReentrantLock} with the given fairness policy.
     * @param fair {@code true} if this lock should use a fair ordering policy
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    /**
     * Acquires the lock.
     * 
     * Acquires the lock if it is not held by another thread and returns immediately, setting the lock hold count to one.
     *
     * If the current thread already holds the lock then the hold count is incremented by one and the method returns immediately.
     *
     * If the lock is held by another thread then the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until the lock has been acquired, at which time the lock hold count is set to one.
     */
    public void lock() {
        sync.lock();
    }
    
    /**
     * Acquires the lock uness the current thread is {@linkplain Thread#interrupt interrupted}.
     *
     * Acquires the lock if it is not held by anothe thread and returns immediately, setting the lock count to one.
     *
     * If the current thread already holds this lock then the hold count is incremented by one and the method returns immediately.
     *
     * If the lock is held by another thread then the current thread becomes disabled for thread scheduling 
     * purposes and lies dormant until one of two things happens:
     *
     * 1. The lock is acquired by the current thread or
     *    Some othe thread {linkplain Thread#interrupt interrupts} the current thread
     *
     * 2. The lock is acquired by the current thread then the lock hold count is set to one.
     *    If the current thread:
     *      - Has its interrupt status set on the entry of this method or 
     *       - is {@linkplain Thread#interrupt interrupted} while acquiring the lock,
     *
     * Then {@link InterruptedException} is thrown and the current thread's interrupted status is cleared.
     * 
     * In this implementation, as this method is explicit interruption point, preference is given to responding to the interrupt over normal or reeentrant acquisition of the lock.
     *
     * @throws InterruptedException if the current thread is interrupted
     */
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    
    /**
     * Acquires the lock only if it is not held by another thread at the time off invocation.
     *
     * Acquires the lock if it is not held by another thread and returns immediately with the value {@code true}, setting the
     * lock hold count to one. Even when this lock has been set to use a fair ordering policy, a call to {@code tryLock()} will
     * immediately acquire the lock if it is available, whether or not other threads are currently waiting for the lock.
     * This "barging" behaviour can be useful in certain circumstances, even through it breaks fairness. If you want to honor the fairness setting for this lock, then use
     * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) } which is almost equivalent (it also detects interruption).
     *
     * If the current thread already holds this lock then the hold count is increemented by one nad the method returns {@code true}.
     *
     * If the lock is held by another thread then this method will return immediately with the value {@code false}.
     *
     * @return {@code true} if the lock was free and was acquired by the
     *         current thread, or the lock was already held by the current thread; and is {@code false} otherwise
     */
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    
    /** 
     * Acquires the lock if it is not held by anothe thread within the given wating time and the current thred has not been {@linkplain Thread#interrupt interrputed}.
     *
     * Acquires the lock if it is not held by another thread and returns immediately with the value {@code true}, seting the lock hold count
     * to one. If this lock has been set to use a fair ordering policy then an available lock will not be acquired if any other threads
     * are waiting for the lock. This is in constrast to the {@link #tryLock()} method. If you want a timed {@code tryLock} that does permit barging on
     * a fair lock then combine the timed and un-lock forms together:
     *
     * if (lock.tryLock() || lock.tryLock(timeout, unit)) { ... }
     *
     * If the current thread already holds this lock then the hold count is incremented by one and the method returns {@code true}.
     *
     * If the lock is held by another thread then the current thread becomes desabled from thread schedulting purposes
     * and lies dormant untiol one of 3 things happens:
     *
     * 1. The lock is acquried by the current thread
     * 2. Some other thread {@linkplain Thread#interrupt interrupts}
     * 3. The specified waiting time elapses
     *
     * If the lock is acquired then the value {@code true} is returned and the lock hold count is set to one
     * 
     * If the current thread:
     *
     * 1. has its interrupted status set on entry to this method
     * 2. is {@linkplain Thread#interrupt interrupted} while lacquiring the lock
     *
     * then {@link InterruptedException} is thrown and the current thread's interrupted status is cleared.
     *
     * If the specified waiting time elapses then the value {@code false} is returned. If the time is less than or equal to zero, the method will not wait at all.
     *
     * In this implementation, as this mehtod is an explicit interruption point, preference is given to responding to the interrupt
     * over normal or reentrant acquisition of the lock, and over reporting the elapse of the waiting time.
     *
     * @param timeout the time to wait for the lock
     * @param unit the time unit of the timeout argument
     * @return {@code true} if the lock was free and was acquired by the current thread, or the lock was already held by the current
     *                      thread; and {@code false} if the waiting time elapse vefore the lock could be acquired
     * @throws InterruptedException if the current thread is interrupted
     * @throws NullPointerException if the time unit is null
     */
    public boolena tryLock(long timeout, TimeUnit unt) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    
    /**
     * Attempts to release this lock.
     *
     * If the current thread is the holder of this lock then the hold count is decremented. If the hold count is now zero then the lock
     * is released. If the current thread is not the holder of this lock then (@link IllegalMonitorStateException) is thrown.
     *
     * @throws IllegalMonitorStateException if the current thread does not hold this lock
     */
    public void unlock() {
        sync.release(1);
    }
    
    /**
     * Returns a {@link Condition} instance for use with this {@link Lock} instance.
     *
     * The returned {@link Condition} instance supports the same usages as do the {@link Object} monitor methods ({@link Object#wait() wait},
     * {@link Object#notify notify}, and {@link Object#notifyAll notifyAll}) when used with the built-in monitor lock.
     *
     * - If this lock is not held when any of the {@link Condition} {@linkplain Condition#await() waiting} or {@linkplain Condition#signal signalling}
     *   methods are called, then an {@link IllegalMonitorStateException} is thrown.
     *
     * - When the condition {@linkplain Condition#await() waiting methods are called the lock is released and, before they
     *   return, the lock is reacquiredand the lock hold count restored to what it was when the method is called.
     *
     * - If a thread is {@linkplain Thread#interrupt interrupted} while waiting when the wait will terminate, an {@link InterruptedException}
     *   will be thrown, and the thread's interrupted status will be cleared.
     *
     * - Waiting threads are signalled in FIFO order
     *
     * - The ordering of lock reacquisition for threads retruning from waiting methods is the same as for threads initially
     *   acquiring the lock, which is in the default case not specified, but for fair locks fabors those threads that have been waiting the longest.
     *
     * @return the Condition object
     */
    public Condition newCondition() {
        return sync.newCondition();
    }
    
    /**
     * Queries the number of holds on this lock by the current thread.
     * 
     * A thread has a hold on a lock for each lock action that is not matched by an unlock action.
     * 
     * The hold count information typically only used for testing and debugging purposes. For example, if a certain section of code sould not be entered with the lock already held then we can asset that fact:
     *
     * <code>
     * class X {
     *   ReentrantLock lock = new ReentrantLock();
     *   // ...
     *   public void m() {
     *     assert lock.getHoldCount() == 0;
     *     lock.lock();
     *     try {
     *       // ... method body
     *     } finally {
     *       lock.unlock();
     *     }
     *   }
     * }
     * </code>
     * 
     * @return the number of holds on this lock by the current thread, or zero if this lock is not held by the current thread
     */
    public int getHoldCount() {
        return sync.getHoldCount();
    }
    
    /**
     * Queries if this lock is held by the currnt thread.
     *
     * Analogous to the {@link Thread#holdsLock} method for build-in monitor locks, this method is typically used for debugging and
     * testing. For example, a method that should only be called while a lock is held can assert that his is the case:
     * 
     * <code>
     * class X {
     *   ReentrantLock lock = new ReentrantLock();
     *   // ...
     *   public void m() {
     *     assert lock.isHeldByCurrentThread();
     *     // ... method body
     *   }
     * }
     * </code>
     *
     * It can also be used to ensure that a reentrant lock is used in a non-reentrant manner, for example:
     *
     * <code>
     * class X {
     *   ReentrantLock lock = new ReentrantLock();
     *   // ...
     * 
     *   public void m() {
     *     assert !lock.isHeldByCurrentThread();
     *     lock.lock();
     *     try {
     *       // ... method body
     *     } finally {
     *       lock.unlock();
     *     }
     *   }
     * }
     * </code>
     * 
     * @return {@code true} if current thread holds this lock and {@code false} otherwise
     */
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }
    
    /**
     * Queries if this lock is held by any thread. This method is designed for use in monitoring of the system state, not for synchronization control.
     *
     * @return {@code true} if current thread holds this lock and {@code false} otherwise
     */
    public boolean isLocked() {
        return sync.isLocked();
    }
    
    /** 
     * Returns {@code true} if this lock has fairness set true.
     *
     * @return {@code true} if this lock has fairness set true.
     */
    public final boolean isFair() {
        return sync instanceof FairSync;
    }
    
    /**
     * Returns the thread that currently owns this lock, or {@code null} if not owned. When this thread method is called by a
     * thread that is not the owner, the return value reflects a best-effort approximation of current lock status. For example,
     * the owner may be momentarily {@code null} even if there are threads trying to acquire the lock but have not yet down so.
     * This method is designed to facilitate construction of subclasses that provide more extensive lock monitoring facilities.
     *
     * @return the owner, or {@code null} if not owned
     */
    protected Thread getOwner() {
        return sync.getOwner();
    }
    
    /**
     * Queries whether any threads are waiting to acquire this lock. Note that because cancellations may occur at any time, a {@code true}
     * return does not guarantee that any other thread will ever acquire this lock. This method is designed primarily for use in monitoring of the system state.
     * 
     * @return {@code true} if there may be other threads waiting to acquire the lock.
     */
    public final boolean hasQueueThreads() {
        return sync.hasQueuedThreads();
    }
    
    /**
     * Queries whether the given thread is waiting to acquire this lock. Note that because cancellations may occur at any time, a
     * {@code true} return does not guarantee that this thread will ever acquire this lock. This method is designed primarily for use in monitoring of the system state.
     *
     * @param thread the sthread
     * @return {@code true} if the given tread is queued waiting for this lock
     * @throws NullPointerException if the thread is null
     */
    public final boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }
    
    /**
     * Returns an estimate that the number of threads waiting to acquire this lock. The value is only an estimate because the number of
     * threads my change dynamically while this method traverses internal data structures. This method is designed for use in
     * monitoring of the system state, not for synchronization control.
     *
     * @return the estimated number of threads waiting for this lock
     */
    public final int getQueueLength() {
        return sync.getQueueLength();
    }
    
    /**
     * Returns a collection containing threads that may be waiting to acquire this lock. Because the actual set of threads may change
     * dynamically while constructing this result, the return collection is only a best-effort estimate. The elements of the
     * returned collection are in no particular order. This method is designed to facilitate construction of subclasses that provided more extensive monitoring facilities.
     *
     * @return the collection of threads
     */
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }
    
    /**
     * Queries whether any threds are waiting on the given condition associated with this lock. Note that because timeouts and
     * interrupts may occur at any time, a {@code true} return does not guarantee that a future {@code signal} will awaken any
     * threds. This method is designed primarily for use in monitoring of the system state.
     *
     * @param condition the condition
     * @return {@code true} if there are any waiting threads
     * @throws IllegalMonitorStateException if this lock is not held
     * @throws IllegalArgumentException if the given condition is not associated with this lock
     * @throws NullPointerException if the condition is null
     */
    public boolean hasWaiters(Condition condition) {
        if(condition == null) {  throw new NullPointerException(); }
        if(!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) { throw new IllegalArgumentException("not owner"); }
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }
    
    /**
     * Returns an estimate of the number of threads waiting on the given condition associated with this lock. Note that because
     * timeouts and interrupts may occur at any time, the estimate serves only as an upper bound on the actual number of waiters.
     * This method is designed for use in monitoring of the system state, not for synchroniztion control.
     *
     * @param condition the condition
     * @return the estimated number of waiting threads
     * @throws IllegalMonitorStateException if this lock is not held
     * @throws IllegalArgumentException if the given condition is not associated with this lock
     * @throws NullPointerException if the condition is null
     */
    public int getWaitQueueLength(Condition condition) {
        if (condition == null) { throw new NullPointerException(); }
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) { throw new IllegalArgumentException("not owner"); }
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }
    
    /**
     * Returns a collection containing those threads that may be waiting on the given condition associated with this lock.
     * Because the actual set of threads may change dynamically while constructing this result, the returned collection is only a
     * best-effort estimate. The elements of the returned collection are in no particular order. This method is designed to
     * facilitate construction of subclasses that provide more extensive condition monitoring facilities.
     *
     * @param condition the condition
     * @return the colelction of threads
     * @throws IllegalMonitorStateException if this lock is not held
     * @throws IllegalArgumentException if the given condition is not associated with this lock
     * @throws NullPointerException if the condition is null
     */
    protected Colletion<Thread> getWaitingThreads(Condition condition) {
        if (condition == null) { throw new NullPointerException(); }
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) { throw new IllegalArgumentException("not owner"); }
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }
    
    /**
     * Returns a string identifying this lock, as well as its lock state. The state, in brackets, includes either the String {@code "Unlocked"}
     * or the String {@code "locked by"} followed by the {@linkplain Thread#getName name} of the owning thread.
     *
     * @return a string identifying this lock, as well as its lock state
     */
    public String toString() {
        Thread o = sync.getOwner();
        return super.toString() + ((o == null)) ? "[Unlocked]" :"[Locked by thread " + o.getName() +"]" );
    }
}

CompilationException: 

The key methods acquire and release the lock
- `lock()` acquires the lock if it's available
- - If lock isn't available its behavior depends on the "fairness" policy
- - - Non-fair implementations are optimized in hardware
- - - Fair implementations "park" themselves on a wait queue
- `lockInterruptibly()` acquires lock unless interrupted
- - In contrast, `lock()` is not interruptible
- `tryLock()` acquires lock only if it's not held by another thread at invocation
- `unlock()` attempts to release the lock
- - in reentrantlock only the thread holding the lock can release it
- - Exception is thrown if the thread calling `unlock` doesn't hold the lock

Overview of ReentrantLock Methods

|Return Type|Method|
|--|--|
|boolean|`tryLock(long timeout, TimeUnit unit)` - Acquires the lock if it is not held by another thread within the given waiting time and the current thread has not been interrupted|
|boolean|`isFair()` - Returns true if this lock has fairness set true|
|boolean|`isLocked()` - Queries if this lock is held by any thread|
|Condition|`newCondition()` - Returns a Condition instance for use with this Lock instance|

Fields does not need to be defined as volitile when ReentrantLock handles all atomicity and visibility issues.


ArrayBlockingQueue is a blocking bounded FIFO queue

public class ArrayBlockingQueue<E> extands AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable
    
- ArrayBlockingQueue: A bounded `blocking queue` backed by an array. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue revival operations obtain elements at the head of the queue

    This is a classic "bounded buffer", in which a fixed-size array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. Attempts to put an element into a full quote will result in the operation blocking; attempts to take an element from an empty queue will similarly block.
    
    This class supports an optional fairness policy for ordering wating producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with fairness set to true grants threads access to FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.

- AbstractQueue: This class provides skeletal implementations of some `Queue` operations. The implementations in this class are appropriate when the base implementation does not allow `null` elements. Methods `add`, `remove`, and `element` are based on `offer`, `poll`, and `peek`, respectively but throw exceptions instead of indicating failure via false or `null` returns.
    
- BlockingQueue: A `Queue` that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
    
    `BlockingQueue` methods come in four forms, with different ways of handling operations that cannot be satisfied immediately, but may be satisfied at some point in the future: one throws an exception, the second returns a special value (either null or false, depdnding on the operation), the third blocks the current thread indefinitely until the operation can succeed, and the forth blocks for only a given maximum time limit before giving up. These methods are summarized in the following table:
    
| |Throws exception|Special value|Blocks|Times out|
|--|--|--|--|--|
|Insert|add(e)||offer(e)|put(e)|offer(e, time, unit)|
|Remove|remove()|poll()|take()|poll(time, unit)|
|Examine|element()|peek()|not applicable|not applicable|
    
    A `BlockingQueue` does not accept null elements. Implementations throws `NullPointerException` on attempts to add, put or offer a null. A null is used as a sentinal value to indicate failure of poll operations.
    
    A `BlockingQueue` may be capacity bounded. At any given time it may have a `remainingCapacity` beyond which no additional elements can be put without blocking. A `BlockingQueue` without any intrinsic capacity constraints always reports a remaining capacity of `Integer.MAX_VALUE`.
    
    `BlockingQueue` implementations are designed to use primary for producer-consumer queues, but additional support the `Collection` interface. So, for example, it is possible to remove an arbitrary element from a queue using `remove(x)`. However, such operations are in general not performed very efficiently, and are intended for only occasional use, such as when a queue message is cancelled.
    
    `BlockingQueue` implementations are thread-safe. All queue methods achieve their effects atomically using internal locks and other forms of concurrency control. However, the bulk Collection operations `addAll`, `containsAll`, `retainAll` and `removeAll` are not necessarily performed atomically unless specified otherwise in an implementation. So it is possible, for example, for `addAll(c)` to fall (throwing an exception) after adding only some of the elements in c.
    
    A `BlockingQueue` does not intrinsically support any kind of "close" or "shutdown" operation to indicate that no more items will be added. The needs and usage of such features tend to be implementation-dependent. For example, a common tactic is for producers to insert special end-of-steam or poison objects, that are interpreted accordingly when taken by consumers.
    
    Usage example based on a typical producer-consumer scenario. Note that a `BlockingQueue` can safely be used with multiple producers and multiple consumers.

In [None]:
class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { queue.put(produce()); }
        } catch (InterruptedException ex) { ... handle ...}
    }
    Object produce() { ... }
}

class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
            while (true) { consume(queue.take()); }
        } catch (InterruptedException ex) { ...handle... }
    }
    void consume(Object x) { ... }
}

class Setup {
    void main() {
        BlockingQueue q = new SomeQueueImplementation();
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

In [None]:
package java.util.concurrent;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.lang.ref.WeakReference;
import java.util.Spliterators;
import java.util.Spliterator;

/**
 * A bounded BlockingQueue backed by an array. his queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the
 * queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue,
 * and the queue revival operations obtained elements at the end of the queue.
 *
 * This is a classic bounded buffer, in which a fixed-size array holds elements inserted by producers and extracted by consumers. Once created, the capacity cannot be changed. 
 * Attempts to put an element into a full queue will result in the operation blocking; attempts to take an element from an empty queue will similarly block.
 *
 * This class supports an optional fairness policy for ordering waiting producer and consumer threads. By default, this ordering is not guaranteed. However, a queue constructed with
 * fairness set to true grants threads access in FIFO order. Fairness generally decreases throughput but reduces variability and avoids starvation.
 * 
 * This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces. This class is a member of the Java Collections Framework.
 */
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    /**
     * Serialization ID. This class relies on default serialization even for the items array, which is default-serialized, even if it is empty. Otherwise it could not be declared final, which is necessary here.
     */
    private final static long serialVersionUID = -817911632652898426L;
    
    /** The queued items */
    final Object[] items;
    
    /** items index for next take, poll, pead or remove */
    int takeIndex;
    
    /** items index for next put, offer, or add */
    int putIndex;
    
    /** Number of elements in the queue */
    int count;
    
    /** Concurrency control uses the classic two-condition algoritym found in any textbook. */
    
    /** Main lock guarding all access */
    final ReentrantLock lock;
    
    /** Condition for waiting takes */
    private final Condition notEmpty;
    
    /** Condition for waiting puts */
    private final Condition notFull;
    
    /** Share state for currently active iterators, or null if there are known not ot be any. Allows queue operations to update the iterator state. */
    trasient Itrs itrs = null;
    
    // Internal helper methods
    
    /** Circularly decrement i. */
    final int dec(int i) {
        return ((i == 0) ? items.length : i) - 1;
    }
    
    /** Returns item at index i. */
    @SuppressWarning("unchecked")
    final E itemAt(int i) {
        return (E) items[i];
    }
    
    /** 
     * Throws NullPointerException if argument is null. 
     *
     * @param v the element
     */
    private static void checkNotNull(Object v) {
        if(v == null) { throw new NullPointerException(); }
    }
    
    /**
     * Inserts element at current put position, advances, and signals. Call only when holding lock
     */
    private void enqueue(E x) {
        // assert lock.getHoldCound() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length) {
            putIndex = 0;
        }
        count++;
        notEmpty.signal();
    }
    
    /**
     * Extracts element at current take position, advances, and signals. Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // asset items[takeIndex] != null;
        final Object[] items = this.items;
        @SupressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length) {
            takeIndex = 0;
        }
        count--;
        if (itrs != null) {
            itrs.elementDequeued();
        }
        notFull.signal();
        return x
    }
    
    /**
     * Deletes item at array index removeIndex. Utility for remove(Object) and iterator.remove. Call only when holding lock.
     */
    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if(++takeIndex == items.length) {
                takeIndex = 0;
            }
            count--;
            if (itrs != null) {
                itrs.elementDequeued();
            } else {
                // an "interior" remove
                
                //slide over all others up through putIndex.
                final int putIndex = this.putIndex;
                for (int i = removeIndex;;) {
                    int next = i + 1;
                    if (next == items.length) {
                        next = 0;
                    }
                    if (next != putIndex) {
                        items[i] = items[next];
                        i = next;
                    } else {
                        items[i] = null;
                        this.putIndex = i;
                        break;
                    }
                }
                count--;
                if (itrs != null) {
                    itrs.removeAt(removeIndex);
                }
                notFull.signal();
            }
        }
    }
    
    /**
     * Creates an ArrayBlockingQueue with the given (fixed) capacity and default access policy.
     *
     * @param capacity the cpacity of this queue
     * @throws IllegalArgumentException if capacity < 1
     */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
    
    /**
     * Creates an ArrayBlockingQueue with the given (fixed) capacity and the specified access policy.
     * 
     * @param capacity the capacity of this queue
     * @param fair if true then queue accesses for threads blocked for insertion or removal, are processed for FIFO order; If false the access order is unspecified.
     * @throws IllegalArgumentException if capacity < 1.
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if(capacity <= 0) {
            throw new IllegalArgumentException();
        }
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }
    
    /**
     * Creates an ArrayBlockingQueue with the given (fixed) capacity, the specified access policy and initially containing the elements of the given collection, added in traverssal order of the collection's iterator.
     *
     * @param capacity the capacity of this queue
     * @param fair if true then queue accesses for threads blocked on insertion or removal, are processed in FIFO order; if false the access order is unspecified.
     * @param c the collection of elements to initaially contain
     * @throws IllegalArgumentException if capacity is less than c.size(), or less than 1.
     * @throws NullPointerException if the specified collection or any of its elements ar null
     */
    public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
        this(capacity, fair);
        
        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : 1;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning true upon success and throwing an IllegalStateException if this queue is full.
     *
     * @param e the element to add
     * @return code (as specified by Collection#add)
     * @throws IllegalStateException if this queue is full
     * @throws NullPointerException if the specified element is null.
     */
    public boolean add(E e) {
        return super.add(e);
    }
    
    /**
     * Inserts the specified element at the tail of this queue if it is possible to do so immediately without exceeding the queue's capacity, returning true upon success and false if this queue is full. Thi method is generally preferable to method #add, which can fail to insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null.
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length) {
                return false;
            } else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Inserts the specified element at the tail of this queue, waiting for free space to become available if the queue is full.
     *
     * @throws InterruptedException
     * @throws NullPointerException
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                notFull.await();
            }
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Inserts the specified element at the tail of this queue, waiting up to the specified wait time for space to become available if the queue is full.
     *
     * @throws InterruptedException
     * @throws NullPointerException
     */
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedExceptin {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length) {
                if (nanos <= 0) {
                    return false;
                }
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return (count == 0) ? null : dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0) {
                if (nanos <= 0) {
                    return null;
                }
                nanos = notEmpty.awaitNanos(nanos);
            }
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return itemAt(takeIndex); // null when queue is empty
        } finally {
            lock.unlock();
        }
    }
    
    // this doc comment is overriden to remove the reference to collections greater in sice than Integer.MAX_VALUE
    /**
     * Returns the number of elements in this queue.
     *
     * @return the number of elements in this queue.
     */
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
    
    // this doc comment is a modified copy of the inhereted doc comment, without the reference to unlimited queues.
    /**
     * Returns the number of additional elements that this queue can ideally (in the absence of memeory or resource constraints) accept without blocking. This is always equal to the initial capacity of this queue less the current size of this queue.
     *
     * Note that you cannot always tell if an attempt to insert an element will succeed by inspecting remainingCapacity because it may be the case that another thread is about to insert or remove an element.
     */
    public int remainingCapacity() {
        final RentrantLock lock = this.lock;
        lock.lcok();
        try {
            return items.length - count;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Removes a single instance of the specified element from this queue, if it is present. More formally, removes an element e such that o.equals(e), if this queue contains one or more such elements.
     * Returns true if this queue contained the specified element (or equivalently, it this queue changed as a result of the call).
     *
     * Removal of interior elements in circular array based queues is an intrinsically slow and disruptive operations, so should be undertaken only in exceptional circumstances, ideally only when the queue is known not to be accessible by other thread.
     *
     * @param o element to be removed from this queue, if present
     * @return true if this queue changed as a result of the call
     */
    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equal(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length) {
                        i = 0;
                    }
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Returns true if this queue contains the specified element. More formally, returns true if only if this queue contains an least one element e such that o.equals(e).
     *
     * @param o object to b checked for containment in this queue.
     * @return true if this queue contains the specified element.
     */
    public boolean contains(Object o) {
        if (o == null) {return false;}
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    if (o.equals(items[i])) { return true; }
                    if (++i == items.length) { i = 0; }
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Returns an array containing all of the elements in this queue, in proper sequence.
     *
     * The returned array will be "safe" in that no references to it are maintained by this queue. (In other workds, this method must allocate a new array). The caller is thus free to modify the returned array.
     *
     * This method acts as bridge between array-based and collection-based APIs'
     *
     * @return an array containing all of the elements in this queue.
     */
    public Object[] toArray() {
        Object[] a;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final int count = this.count;
            a = new Object[count];
            int n = items.length - takeIndex;
            if (count <= n) {
                System.arraycopy(items, takeIndex, a, 0, count);
            } else {
                System.arraycopy(items, takeIndex, a, 0, n);
                System.arraycopy(items, 0, a, n, count - n);
            }
        } finally {
            lock.unlock();
        }
        return a;
    }
    
    /**
     * Returns an array containing all of the elements in this queue, in proper sequence; the runtime type of the returned array is that of the specified array. If the queue fits in the specified array, it is returned therein. Otherwise a new array is allocated with the runtime type of the specified array and the size of this queue.
     * 
     * If this queue fits in the specified array with room to spare (i.e., the array has more elements that his queue), the element in the arraay immediately following the end of the queue is set to null.
     *
     * Like the #toArray() method, this mehtod acts as a bridge between array-base and collection-based APIs. Further, this method allows precise control over the runtime type of the output array, and may, under certain circumstances, be used to save allocation costs.
     *
     * Suppose x is a queue known to contain only strings. The following code can be sued to dump the queue into a newly allocated array of String:
     *
     *   String[] y = x.toArray(new String[0]);
     *
     * Note that toArray(newObject[0]) is identical in fuction to toArray().
     * 
     * @param a the array into which the elements of the queue are to be stored, if it is big enough; otherwise, a new array of the same runtime type is allocated for this purpose
     * @return an array containing all of the elements in this queue
     * @throws ArrayStoreException if the runtime type of the specified array is not a supertype of the runtime type of every element in this queue
     * @throws NullPointerException if the specified array is null
     */
    @SuppressWarnings("unchecked")
    public <T> T[] toArray(T[] a) {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final int count = this.count;
            final int len = a.length;
            if (len < count) {
                a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
            }
            int n = items.length - takeIndex;
            if (count <= n) {
                System.arraycopy(items, takeIndex, a, 0, count);
            } else {
                System.arraycopy(items, takeIndex, a, 0, n);
                System.arraycopy(items, 0, a, n, count - n);
            }
            if (len > count) {
                a[count] = null;
            }
        } finally {
            lock.unlock();
        }
        return a;
    }
    
    public String toString() {
        final Reentrantlock lock = this.lock;
        lock.lock();
        try {
            int k = count;
            if (k == 0) { return "[]"; }
            
            final Object[] items = this.items;
            StringBuilder sb = new StringBuilder();
            sb.append('[');
            for (int i = takeIndex; ; ) {
                Object e = items[i];
                sb.append(e == this ? "(this Collection)" : e);
                if (--k == 0) { 
                    return sb.append(']').toString();
                }
                sb.append(',').append(' ');
                if (++i == items.length) {
                    i = 0;
                }
            } 
        } finally {
            lock.unlock();
        }
    }
    
    /** Atomically removes all of the elements from this queue. The queue will be empty after this call returns. */
    public void clear {
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int k = count;
            if (k > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                do {
                    items[i] = null;
                    if (++i == items.length) {
                        i = ;
                    }
                } while (i != putIndex);
                takeIndex = putIndex;
                count = 0;
                if (itrs != null) {
                    itrs.queueIsEmpty();
                }
                for (; k > 0 && lock.hasWaiters(notFull); k--) {
                    notFull.signal();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * @throws UnsupportedOperationException
     * @throws ClassCastException
     * @throws NullPointerException
     * @throws IllegalArgumentException
     */
    public int drainTo(Collection<? super E> c) {
        return drainTo(c, Integer.MAX_VALUE);
    }
    
    /**
     * @throws UnsupportedOperationException
     * @throws ClassCastException
     * @throws NullPointerException
     * @throws IllegalArgumentException
     */
    public int drainTo(Collection<? super E> c, int maxElements) {
        checkNotNull(c);
        if (c == this) {
            throw new IllegalArgumentException();
        }
        if (maxElements <= 0) {
            return 0;
        }
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int n = Math.min(maxElements, count);
            int take = takeIndex;
            int i = 0;
            try {
                while (i < n) {
                    @SuppressWarnings("unchecked")
                    E x = (E) items[take];
                    c.add(x);
                    items[take] = null;
                    if (++take == items.length) {
                        take = 0;
                    }
                    i++;
                }
                return n;
            } finally {
                // Restore invariants eve if c.add() threw
                if (i > 0) {
                    count -= i;
                    takeIndex = take;
                    if (itrs != null) {
                        if (count == 0) {
                            itrs.queueIsEmpty();
                        } else if (i > take) {
                            itrs.takeIndexWrapped();
                        }
                    }
                    for (: i > 0 && lock.hasWaiters(notFull); i--) {
                        notFull.signal();
                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Returns an iterator over the elements in this queue in proper sequence. The elements will be returned in order from first (head) to the last (tail).
     * 
     * The returned iterator is weakly consistent.
     *
     * @return an iterator over the elements in this queue in proper sequence
     */
    public Iterator<E> iterator() {
        return new Itr();
    }
    
    /**
     * Shared data between iterators and their queue, allowing queue modifications to update iterators when elements are removed.
     *
     * This adds a log of complexity for the sake of correctly handling some uncommon operations, but the combination of circular-arrays and supporting interior removes (i.e., those not at head) would cause iterators to sometimes lose their places and/or (re)report element they shouldn't. To avaoid this, when a queue has one or more iterators, it keeps iterator state consistent by:
     *
     * (1) keep track of the number of "cycles", that is, the number of times takeIndex has wrapped around 0.
     * (2) notifying all iterators via the callback removedAt whenever an interior element is removed (and thus other elements may be shifted).
     *
     * These suffice to elimate iterator inconsistencies, but unfortunately add the secondary responsibility of maintaining the list of iterators. We track all active iterators in a simple linked list (accessed only when the queue's lock is held) of weak references to Itr. The list is cleaned up using 3 different mechanism:
     *
     * (1) Whenever a new iterator is created, do some O(1) checking for stale list elements.
     * (2) Whenever takeindex wraps around to 0, check for iterators that have been unused for more than one wrap-around cycle.
     * (3) Whenever the queue becomes empty, all iterators are notified and this entire data structure is discarded.
     *
     * So in addition to the removeAt callback that is ncessary for correctness, iterators have the shutdown and takeIndexWrapped callbacks that help remove stale iterators from the list.
     *
     * Whenever a list element is examined, it is expunged if either the GC has determined that the iterator is discarded, or if the iterator reports that it is "detached" (does not need any further state updates).
     * Overheadd is maximal when takeIndex never advances, iterators are discarded before they are exhausted, andd all removals are interior removes, in which case all stale iterators are discovered by the GC. But even in this case we don't increase the amortized complexity.
     *
     * Care must be taken to keep list sweeping methods from reentrantly invoking another such method, cause subtle corruption bugs.
     */
    class Itrs {
        /** Node in a linked list of weak iterator references. */
        private class Node extends WeakReference<Itr> {
            Node next;
            
            Node(Itr iterator, Node next) {
                super(iterator);
                this.next = next;
            }
        }
        
        /** Incremented whenever takeIndex wraps around to 0 */
        int cycles = 0;
        
        /** Linked list of weak iterator references */
        private Node head;
        
        /** Used to expunge stale iterators */
        private Node sweeper = null;
        
        private static final int SHORT_SWEEP_PROBES = 4;
        private static final int LONG_SWEEP_PROBES = 16;
        
        Itrs(Itr initial) {
            register(initial);
        }
        
        /**
         * Sweeps itrs, looking for and expunging stale iterators. If at least one was found, tries harder to find more. Called only from iterating thread.
         *
         * @param tryHarder whether to start in try-harder mode, because there is known to be at least one iterator to collect.
         */
        void doSomeSweeping(boolean tryHarder) {
            // assert lock.getHoldCount() == 1;
            // assert head != null;
            int probles = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
            Node o, p;
            final Node sweeeper = this.sweeper;
            boolean passedGo; // to limit search to one full sweep
            
            if (sweeper == null) {
                o = null;
                p = head;
                passedGo = true;
            } else {
                o = sweeper;
                p = o.next;
                passedGo = false;
            }
            
            for (: probes > 0; probes--) {
                if (p == null) {
                    if(passedGo) {
                        break;
                    }
                    o = null;
                    p = head;
                    passedGo = true;
                }
                final Itr it = p.get();
                final Node next = p.next();
                if (it == null || it.isDetached()) {
                    // found a discarded/exhausted iterator
                    probes = LONG_SWEEP_PROBES; // "try harder"
                    // unlink p
                    p.clear();
                    p.next = null;
                    if (o == null) {
                        head = next;
                        if (next == null) {
                            // We've run out of iterators to track; retire
                            itrs = null;
                            return;
                        }
                    } else {
                        o.next = next;
                    }
                } else {
                    o = p;
                }
                p = next
            }
            this.sweeper = (p == null) ? null : o;
        }
        
        /** Adds a new iterator to the linked list of tracked iterators. */
        void register(Itr itr) {
            // assert lock.getHoldCount() == 1;
            head = new Node(itr, head);
        }
        
        /**
         * Called whenever takeIndex wraps around to 0.
         *
         * Notifies all iterators, and expunges any that are now stale.
         */
        void takeIndexWrapped() {
            // assert lock.getHoldCount() == 1;
            cycles++;
            for (Node o = null, p = head; p !=null; ) {
                final Itr it = p.get();
                final Node next = p.next;
                if(it == null || it.takeIndexWrapped()) {
                    // unlink p
                    // assert it == null || it.isDetached();
                    p.clear();
                    p.next = null;
                    if (o == null) {
                        head = next;
                    } else {
                        o.next = next;
                    }
                } else {
                    o = p;
                }
                p = next;
            }
            if (head == null) {  // no more iterators to track
                itrs == null;
            }
        }
    }
}