diff --git a/docs/book/Appendix-Low-Level-Concurrency.md b/docs/book/Appendix-Low-Level-Concurrency.md index f7ef3ecc..d85e110d 100644 --- a/docs/book/Appendix-Low-Level-Concurrency.md +++ b/docs/book/Appendix-Low-Level-Concurrency.md @@ -547,13 +547,13 @@ public class EvenProducer extends IntGenerator { 417 not even! */ ``` -* [1] 一个任务有可能在另外一个任务执行第一个对 **currentEvenValue** 的递增操作之后,但是没有执行第二个操作之前,调用 `next()` 方法。这将使这个值处于 “不恰当” 的状态。 +* [1] 一个任务有可能在另外一个任务执行第一个对 **currentEvenValue** 的自增操作之后,但是没有执行第二个操作之前,调用 `next()` 方法。这将使这个值处于 “不恰当” 的状态。 为了证明这是可能发生的, `EvenChecker.test()` 创建了一组 **EventChecker** 对象,以连续读取 **EvenProducer** 的输出并测试检查每个数值是否都是偶数。如果不是,就会报告错误,而程序也将关闭。 多线程程序的部分问题是,即使存在 bug ,如果失败的可能性很低,程序仍然可以正确显示。 -重要的是要注意到递增操作自身需要多个步骤,并且在递增过程中任务可能会被线程机制挂起 - 也就是说,在 Java 中,递增不是原子性的操作。因此,如果不保护任务,即使单纯的递增也不是线程安全的。 +重要的是要注意到自增操作自身需要多个步骤,并且在自增过程中任务可能会被线程机制挂起 - 也就是说,在 Java 中,自增不是原子性的操作。因此,如果不保护任务,即使单纯的自增也不是线程安全的。 该示例程序并不总是在第一次非偶数产生时终止。所有任务都不会立即关闭,这是并发程序的典型特征。 @@ -584,7 +584,7 @@ synchronized void g() { /* ... */ } 在使用并发时,将字段设为 **private** 特别重要;否则,**synchronized** 关键字不能阻止其他任务直接访问字段,从而产生资源冲突。 -一个线程可以获取对象的锁多次。如果一个方法调用在同一个对象上的第二个方法,而后者又在同一个对象上调用另一个方法,就会发生这种情况。 JVM 会跟踪对象被锁定的次数。如果对象已解锁,则其计数为 0 。当一个线程首次获得锁时,计数变为 1 。每次同一线程在同一对象上获取另一个锁时,计数就会递增。显然,只有首先获得锁的线程才允许多次获取多个锁。每当线程离开 **synchronized** 方法时,计数递减,直到计数变为 0 ,完全释放锁以给其他线程使用。每个类也有一个锁(作为该类的 **Class** 对象的一部分),因此 **synchronized** 静态方法可以在类范围的基础上彼此锁定,不让同时访问静态数据。 +一个线程可以获取对象的锁多次。如果一个方法调用在同一个对象上的第二个方法,而后者又在同一个对象上调用另一个方法,就会发生这种情况。 JVM 会跟踪对象被锁定的次数。如果对象已解锁,则其计数为 0 。当一个线程首次获得锁时,计数变为 1 。每次同一线程在同一对象上获取另一个锁时,计数就会自增。显然,只有首先获得锁的线程才允许多次获取多个锁。每当线程离开 **synchronized** 方法时,计数递减,直到计数变为 0 ,完全释放锁以给其他线程使用。每个类也有一个锁(作为该类的 **Class** 对象的一部分),因此 **synchronized** 静态方法可以在类范围的基础上彼此锁定,不让同时访问静态数据。 你应该什么时候使用同步呢?可以永远 *Brian* 的同步法则[^2]。 @@ -620,7 +620,7 @@ No odd numbers discovered */ ``` -在两个递增操作之间插入 `Nap()` 构造器方法,以提高在 **currentEvenValue** 是奇数的状态时上下文切换的可能性。因为互斥锁可以阻止多个任务同时进入临界区,所有这不会产生失败。第一个进入 `next()` 方法的任务将获得锁,任何试图获取锁的后续任务都将被阻塞,直到第一个任务释放锁。此时,调度机制选择另一个等待锁的任务。通过这种方式,任何时刻只能有一个任务通过互斥锁保护的代码。 +在两个自增操作之间插入 `Nap()` 构造器方法,以提高在 **currentEvenValue** 是奇数的状态时上下文切换的可能性。因为互斥锁可以阻止多个任务同时进入临界区,所有这不会产生失败。第一个进入 `next()` 方法的任务将获得锁,任何试图获取锁的后续任务都将被阻塞,直到第一个任务释放锁。此时,调度机制选择另一个等待锁的任务。通过这种方式,任何时刻只能有一个任务通过互斥锁保护的代码。 ## volatile 关键字 @@ -701,7 +701,7 @@ public class ReOrdering implements Runnable { 在 Java 线程的讨论中,经常反复提交但不正确的知识是:“原子操作不需要同步”。 一个 *原子操作* 是不能被线程调度机制中断的操作;一旦操作开始,那么它一定可以在可能发生的“上下文切换”之前(切换到其他线程执行)执行完毕。依赖于原子性是很棘手且很危险的,如果你是一个并发编程专家,或者你得到了来自这样的专家的帮助,你才应该使用原子性来代替同步,如果你认为自己足够聪明可以应付这种玩火似的情况,那么请接受下面的测试: -> Goetz 测试:如果你可以编写用于现代微处理器的高性能 JVM ,那么就有资格考虑是否可以避免同步[^4]。 +> Goetz 测试:如果你可以编写用于现代微处理器的高性能 JVM ,那么就有资格考虑是否可以避免同步[^4] 。 了解原子性是很有用的,并且知道它与其他高级技术一起用于实现一些更加巧妙的 **java.util.concurrent** 库组件。 但是要坚决抵制自己依赖它的冲动。 @@ -719,7 +719,7 @@ i++; // Might be atomic in C++ i += 2; // Might be atomic in C++ ``` -但是在 C++ 中,这取决于编译器和处理器。你无法编写出依赖于原子性的 C++ 跨平台代码,因为 C++ [^5] 没有像 Java 那样的一致 *内存模型* (memory model)。 +但是在 C++ 中,这取决于编译器和处理器。你无法编写出依赖于原子性的 C++ 跨平台代码,因为 C++ [^5]没有像 Java 那样的一致 *内存模型* (memory model)。 在 Java 中,上面的操作肯定不是原子性的,正如下面的方法产生的 JVM 指令中可以看到的那样: @@ -774,7 +774,7 @@ i:I 每条指令都会产生一个 “get” 和 “put”,它们之间还有一些其他指令。因此在获取指令和放置指令之间,另有一个任务可能会修改这个属性,所有,这些操作不是原子性的。 -让我们通过定义一个抽象类来测试原子性的概念,这个抽象类的方法是将一个整数类型进行偶数递增,并且 `run()` 不断地调用这个方法: +让我们通过定义一个抽象类来测试原子性的概念,这个抽象类的方法是将一个整数类型进行偶数自增,并且 `run()` 不断地调用这个方法: ```java // lowlevel/IntTestable.java @@ -876,7 +876,7 @@ public class SerialNumbers { } ``` -**SerialNumbers** 是你可以想象到最简单的类,如果你具备 C++ 或者其他底层的知识背景,你可能会认为递增是一个原子操作,因为 C++ 的递增操作通常被单个微处理器指令所实现(尽管不是以任何一致,可靠,跨平台的方式)。但是,正如前面所提到的,Java 递增操作不是原子性的,并且操作同时涉及读取和写入,因此即使在这样一个简单的操作中,也存在有线程问题的空间。 +**SerialNumbers** 是你可以想象到最简单的类,如果你具备 C++ 或者其他底层的知识背景,你可能会认为自增是一个原子操作,因为 C++ 的自增操作通常被单个微处理器指令所实现(尽管不是以任何一致,可靠,跨平台的方式)。但是,正如前面所提到的,Java 自增操作不是原子性的,并且操作同时涉及读取和写入,因此即使在这样一个简单的操作中,也存在有线程问题的空间。 我们在这里加入 volatile ,看看它是否有帮助。然而,真正的问题是 `nextSerialNumber()` 方法在不进行线程同步的情况下访问共享的可变变量值。 @@ -1363,11 +1363,303 @@ tryLock(2, TimeUnit.SECONDS): false ## 库组件 +**java.util.concurrent** 库提供大量旨在解决并发问题的类,可以帮助你生成更简单,更鲁棒的并发程序。但请注意,这些工具是比起并行流和 **CompletableFuture** 更底层的机制。 + +在本节中,我们将看一些使用不同组件的示例,然后讨论一下 *lock-free*(无锁) 库组件是如何工作的。 + ### DelayQueue +这是一个无界阻塞队列 ( **BlockingQueue** ),用于放置实现了 **Delayed** 接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,因此队首对象的延迟到期的时间最长。如果没有任何延迟到期,那么就不会有队首元素,并且 `poll()` 将返回 **null**(正因为这样,你不能将 **null** 放置到这种队列中)。 + +下面是一个示例,其中的 **Delayed** 对象自身就是任务,而 **DelayedTaskConsumer** 将最“紧急”的任务(到期时间最长的任务)从队列中取出,然后运行它。注意的是这样 **DelayQueue** 就成为了优先级队列的一种变体。 + +```java +// lowlevel/DelayQueueDemo.java +import java.util.*; +import java.util.stream.*; +import java.util.concurrent.*; +import static java.util.concurrent.TimeUnit.*; + +class DelayedTask implements Runnable, Delayed { + private static int counter = 0; + private final int id = counter++; + private final int delta; + private final long trigger; + protected static List sequence = + new ArrayList<>(); + DelayedTask(int delayInMilliseconds) { + delta = delayInMilliseconds; + trigger = System.nanoTime() + + NANOSECONDS.convert(delta, MILLISECONDS); + sequence.add(this); + } + @Override + public long getDelay(TimeUnit unit) { + return unit.convert( + trigger - System.nanoTime(), NANOSECONDS); + } + @Override + public int compareTo(Delayed arg) { + DelayedTask that = (DelayedTask)arg; + if(trigger < that.trigger) return -1; + if(trigger > that.trigger) return 1; + return 0; + } + @Override + public void run() { + System.out.print(this + " "); + } + @Override + public String toString() { + return + String.format("[%d] Task %d", delta, id); + } + public String summary() { + return String.format("(%d:%d)", id, delta); + } + public static class EndTask extends DelayedTask { + EndTask(int delay) { super(delay); } + @Override + public void run() { + sequence.forEach(dt -> + System.out.println(dt.summary())); + } + } +} + +public class DelayQueueDemo { + public static void + main(String[] args) throws Exception { + DelayQueue tasks = + Stream.concat( // Random delays: + new Random(47).ints(20, 0, 4000) + .mapToObj(DelayedTask::new), + // Add the summarizing task: + Stream.of(new DelayedTask.EndTask(4000))) + .collect(Collectors + .toCollection(DelayQueue::new)); + while(tasks.size() > 0) + tasks.take().run(); + } +} +/* Output: +[128] Task 12 [429] Task 6 [551] Task 13 [555] Task 2 +[693] Task 3 [809] Task 15 [961] Task 5 [1258] Task 1 +[1258] Task 20 [1520] Task 19 [1861] Task 4 [1998] Task +17 [2200] Task 8 [2207] Task 10 [2288] Task 11 [2522] +Task 9 [2589] Task 14 [2861] Task 18 [2868] Task 7 +[3278] Task 16 (0:4000) +(1:1258) +(2:555) +(3:693) +(4:1861) +(5:961) +(6:429) +(7:2868) +(8:2200) +(9:2522) +(10:2207) +(11:2288) +(12:128) +(13:551) +(14:2589) +(15:809) +(16:3278) +(17:1998) +(18:2861) +(19:1520) +(20:1258) +*/ +``` + +**DelayedTask** 包含一个称为 **sequence** 的 **List<DelayedTask>** ,它保存了任务被创建的顺序,因此我们可以看到排序是按照实际发生的顺序执行的。 + +**Delay** 接口有一个方法, `getDelay()` , 该方法用来告知延迟到期有多长时间,或者延迟在多长时间之前已经到期了。这个方法强制我们去使用 **TimeUnit** 类,因为这就是参数类型。这会产生一个非常方便的类,因为你可以很容易地转换单位而无需作任何声明。例如,**delta** 的值是以毫秒为单位存储的,但是 `System.nanoTime()` 产生的时间则是以纳秒为单位的。你可以转换 **delta** 的值,方法是声明它的单位以及你希望以什么单位来表示,就像下面这样: + +```java +NANOSECONDS.convert(delta, MILLISECONDS); +``` + +在 `getDelay()` 中, 所希望的单位是作为 **unit** 参数传递进来的,你使用它将当前时间与触发时间之间的差转换为调用者要求的单位,而无需知道这些单位是什么(这是*策略*设计模式的一个简单示例,在这种模式中,算法的一部分是作为参数传递进来的)。 + +为了排序, **Delayed** 接口还继承了 **Comparable** 接口,因此必须实现 `compareTo()` , 使其可以产生合理的比较。 + +从输出中可以看到,任务创建的顺序对执行顺序没有任何影响 - 相反,任务是按照所期望的延迟顺序所执行的。 + ### PriorityBlockingQueue -### Lock-Free Collections +这是一个很基础的优先级队列,它具有可阻塞的读取操作。在下面的示例中, **Prioritized** 对象会被赋予优先级编号。几个 **Producer** 任务的实例会插入 **Prioritized** 对象到 **PriorityBlockingQueue** 中,但插入之间会有随机延时。然后,单个 **Consumer** 任务在执行 `take()` 时会显示多个选项,**PriorityBlockingQueue** 会将当前具有最高优先级的 **Prioritized** 对象提供给它。 + +在 **Prioritized** 中的静态变量 **counter** 是 **AtomicInteger** 类型。这是必要的,因为有多个 **Producer** 并行运行;如果不是 **AtomicInteger** 类型,你将会看到重复的 **id** 号。 这个问题在 [并发编程](./24-Concurrent-Programming.md) 的 [构造函数非线程安全](./24-Concurrent-Programming.md) 一节中讨论过。 + +```java +// lowlevel/PriorityBlockingQueueDemo.java +import java.util.*; +import java.util.stream.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import onjava.Nap; + +class Prioritized implements Comparable { + private static AtomicInteger counter = + new AtomicInteger(); + private final int id = counter.getAndIncrement(); + private final int priority; + private static List sequence = + new CopyOnWriteArrayList<>(); + Prioritized(int priority) { + this.priority = priority; + sequence.add(this); + } + @Override + public int compareTo(Prioritized arg) { + return priority < arg.priority ? 1 : + (priority > arg.priority ? -1 : 0); + } + @Override + public String toString() { + return String.format( + "[%d] Prioritized %d", priority, id); + } + public void displaySequence() { + int count = 0; + for(Prioritized pt : sequence) { + System.out.printf("(%d:%d)", pt.id, pt.priority); + if(++count % 5 == 0) + System.out.println(); + } + } + public static class EndSentinel extends Prioritized { + EndSentinel() { super(-1); } + } +} + +class Producer implements Runnable { + private static AtomicInteger seed = + new AtomicInteger(47); + private SplittableRandom rand = + new SplittableRandom(seed.getAndAdd(10)); + private Queue queue; + Producer(Queue q) { + queue = q; + } + @Override + public void run() { + rand.ints(10, 0, 20) + .mapToObj(Prioritized::new) + .peek(p -> new Nap(rand.nextDouble() / 10)) + .forEach(p -> queue.add(p)); + queue.add(new Prioritized.EndSentinel()); + } +} + +class Consumer implements Runnable { + private PriorityBlockingQueue q; + private SplittableRandom rand = + new SplittableRandom(47); + Consumer(PriorityBlockingQueue q) { + this.q = q; + } + @Override + public void run() { + while(true) { + try { + Prioritized pt = q.take(); + System.out.println(pt); + if(pt instanceof Prioritized.EndSentinel) { + pt.displaySequence(); + break; + } + new Nap(rand.nextDouble() / 10); + } catch(InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} + +public class PriorityBlockingQueueDemo { + public static void main(String[] args) { + PriorityBlockingQueue queue = + new PriorityBlockingQueue<>(); + CompletableFuture.runAsync(new Producer(queue)); + CompletableFuture.runAsync(new Producer(queue)); + CompletableFuture.runAsync(new Producer(queue)); + CompletableFuture.runAsync(new Consumer(queue)) + .join(); + } +} +/* Output: +[15] Prioritized 2 +[17] Prioritized 1 +[17] Prioritized 5 +[16] Prioritized 6 +[14] Prioritized 9 +[12] Prioritized 0 +[11] Prioritized 4 +[11] Prioritized 12 +[13] Prioritized 13 +[12] Prioritized 16 +[14] Prioritized 18 +[15] Prioritized 23 +[18] Prioritized 26 +[16] Prioritized 29 +[12] Prioritized 17 +[11] Prioritized 30 +[11] Prioritized 24 +[10] Prioritized 15 +[10] Prioritized 22 +[8] Prioritized 25 +[8] Prioritized 11 +[8] Prioritized 10 +[6] Prioritized 31 +[3] Prioritized 7 +[2] Prioritized 20 +[1] Prioritized 3 +[0] Prioritized 19 +[0] Prioritized 8 +[0] Prioritized 14 +[0] Prioritized 21 +[-1] Prioritized 28 +(0:12)(2:15)(1:17)(3:1)(4:11) +(5:17)(6:16)(7:3)(8:0)(9:14) +(10:8)(11:8)(12:11)(13:13)(14:0) +(15:10)(16:12)(17:12)(18:14)(19:0) +(20:2)(21:0)(22:10)(23:15)(24:11) +(25:8)(26:18)(27:-1)(28:-1)(29:16) +(30:11)(31:6)(32:-1) +*/ +``` + +与前面的示例一样,**Prioritized** 对象的创建顺序在 **sequence** 的 **list** 对象上所记入,以便与实际执行顺序进行比较。 **EndSentinel** 是用于告知 **Consumer** 对象关闭的特殊类型。 + +**Producer** 使用 **AtomicInteger** 变量为 **SplittableRandom** 设置随机生成种子,以便不同的 **Producer** 生成不同的队列。 这是必需的,因为多个生产者并行创建,如果不是这样,创建过程并不会是线程安全的。 + +**Producer** 和 **Consumer** 通过 **PriorityBlockingQueue** 相互连接。因为阻塞队列的性质提供了所有必要的同步,因为阻塞队列的性质提供了所有必要的同步,请注意,显式同步是并不需要的 — 从队列中读取数据时,你不用考虑队列中是否有任何元素,因为队列在没有元素时将阻塞读取。 + +### 无锁集合 + +[集合](./12-Collections.md) 章节强调集合是基本的编程工具,这也要求包含并发性。因此,早期的集合比如 **Vector** 和 **Hashtable** 有许多使用 **synchronized** 机制的方法。当这些集合不是在多线程应用中使用时,这就导致了不可接收的开销。在 Java 1.2 版本中,新的集合库是非同步的,而给 **Collection** 类赋予了各种 **static** **synchronized** 修饰的方法来同步不同的集合类型。虽然这是一个改进,因为它让你可以选择是否对集合使用同步,但是开销仍然基于同步锁定。 Java 5 版本添加新的集合类型,专门用于增加线程安全性能,使用巧妙的技术来消除锁定。 + +无锁集合有一个有趣的特性:只要读取者仅能看到已完成修改的结果,对集合的修改就可以同时发生在读取发生时。这是通过一些策略实现的。为了让你了解它们是如何工作的,我们来看看其中的一些。 + +#### 复制策略 + +使用“复制”策略,修改是在数据结构一部分的单独副本(或有时是整个数据的副本)上进行的,并且在整个修改过程期间这个副本是不可见的。仅当修改完成时,修改后的结构才与“主”数据结构安全地交换,然后读取者才会看到修改。 + +在 **CopyOnWriteArrayList** ,写入操作会复制整个底层数组。保留原来的数组,以便在修改复制的数组时可以线程安全地进行读取。当修改完成后,原子操作会将其交换到新数组中,以便新的读取操作能够看到新数组内容。 **CopyOnWriteArrayList** 的其中一个好处是,当多个迭代器遍历和修改列表时,它不会抛出 **ConcurrentModificationException** 异常,因此你不用就像过去必须做的那样,编写特殊的代码来防止此类异常。 + +**CopyOnWriteArraySet** 使用 **CopyOnWriteArrayList** 来实现其无锁行为。 + +**ConcurrentHashMap** 和 **ConcurrentLinkedQueue** 使用类似的技术来允许并发读写,但是只复制和修改集合的一部分,而不是整个集合。然而,读取者仍然不会看到任何不完整的修改。**ConcurrentHashMap** **不会抛出concurrentmodificationexception** 异常。 + +#### 比较并交换 (CAS) + +在 比较并交换 (CAS) 中,你从内存中获取一个值,并在计算新值时保留原始值。然后使用 CAS 指令,它将原始值与当前内存中的值进行比较,如果这两个值是相等的,则将内存中的旧值替换为计算新值的结果,所有操作都在一个原子操作中完成。如果原始值比较失败,则不会进行交换,因为这意味着另一个线程同时修改了内存。在这种情况下,你的代码必须再次尝试,获取一个新的原始值并重复该操作。 + +如果内存仅轻量竞争,CAS操作几乎总是在没有重复尝试的情况下完成,因此它非常快。相反,**synchronized** 操作需要考虑每次获取和释放锁的成本,这要昂贵得多,而且没有额外的好处。随着内存竞争的增加,使用 CAS 的操作会变慢,因为它必须更频繁地重复自己的操作,但这是对更多资源竞争的动态响应。这确实是一种优雅的方法。 + +最重要的是,许多现代处理器的汇编语言中都有一条 CAS 指令,并且也被 JVM 中的 CAS 操作(例如 **Atomic** 类中的操作)所使用。CAS 指令在硬件层面中是原子性的,并且与你所期望的操作一样快。 ## 本章小结 @@ -1392,7 +1684,7 @@ tryLock(2, TimeUnit.SECONDS): false [^4]: 这个测试的推论是,“如果某人表示线程是容易并且简单的,请确保这个人没有对你的项目做出重要的决策。如果那个人已经做出,那么你就已经陷入麻烦之中了。” -[^5]: 这在即将产生的 C++ 的标准中得到了补救 +[^5]: 这在即将产生的 C++ 的标准中得到了补救。