From f06b850470c9b14ca0e7ebe6a671eaee12dd4ef7 Mon Sep 17 00:00:00 2001 From: AlfredAlan <854723360@mail.com> Date: Sun, 4 Aug 2019 17:53:56 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E9=99=84=E5=BD=95=EF=BC=9A=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E5=BA=95=E5=B1=82=E5=8E=9F=E7=90=86=20=E7=BF=BB?= =?UTF-8?q?=E8=AF=91=E8=87=B3=20DelayQueue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/book/Appendix-Low-Level-Concurrency.md | 141 ++++++++++++++++++-- 1 file changed, 133 insertions(+), 8 deletions(-) diff --git a/docs/book/Appendix-Low-Level-Concurrency.md b/docs/book/Appendix-Low-Level-Concurrency.md index f7ef3ecc..e7f701dc 100644 --- a/docs/book/Appendix-Low-Level-Concurrency.md +++ b/docs/book/Appendix-Low-Level-Concurrency.md @@ -547,13 +547,13 @@ public class EvenProducer extends IntGenerator { 417 not even! */ ``` -* [1] 一个任务有可能在另外一个任务执行第一个对 **currentEvenValue** 的递增操作之后,但是没有执行第二个操作之前,调用 `next()` 方法。这将使这个值处于 “不恰当” 的状态。 +* [1] 一个任务有可能在另外一个任务执行第一个对 **currentEvenValue** 的自增操作之后,但是没有执行第二个操作之前,调用 `next()` 方法。这将使这个值处于 “不恰当” 的状态。 为了证明这是可能发生的, `EvenChecker.test()` 创建了一组 **EventChecker** 对象,以连续读取 **EvenProducer** 的输出并测试检查每个数值是否都是偶数。如果不是,就会报告错误,而程序也将关闭。 多线程程序的部分问题是,即使存在 bug ,如果失败的可能性很低,程序仍然可以正确显示。 -重要的是要注意到递增操作自身需要多个步骤,并且在递增过程中任务可能会被线程机制挂起 - 也就是说,在 Java 中,递增不是原子性的操作。因此,如果不保护任务,即使单纯的递增也不是线程安全的。 +重要的是要注意到自增操作自身需要多个步骤,并且在自增过程中任务可能会被线程机制挂起 - 也就是说,在 Java 中,自增不是原子性的操作。因此,如果不保护任务,即使单纯的自增也不是线程安全的。 该示例程序并不总是在第一次非偶数产生时终止。所有任务都不会立即关闭,这是并发程序的典型特征。 @@ -584,7 +584,7 @@ synchronized void g() { /* ... */ } 在使用并发时,将字段设为 **private** 特别重要;否则,**synchronized** 关键字不能阻止其他任务直接访问字段,从而产生资源冲突。 -一个线程可以获取对象的锁多次。如果一个方法调用在同一个对象上的第二个方法,而后者又在同一个对象上调用另一个方法,就会发生这种情况。 JVM 会跟踪对象被锁定的次数。如果对象已解锁,则其计数为 0 。当一个线程首次获得锁时,计数变为 1 。每次同一线程在同一对象上获取另一个锁时,计数就会递增。显然,只有首先获得锁的线程才允许多次获取多个锁。每当线程离开 **synchronized** 方法时,计数递减,直到计数变为 0 ,完全释放锁以给其他线程使用。每个类也有一个锁(作为该类的 **Class** 对象的一部分),因此 **synchronized** 静态方法可以在类范围的基础上彼此锁定,不让同时访问静态数据。 +一个线程可以获取对象的锁多次。如果一个方法调用在同一个对象上的第二个方法,而后者又在同一个对象上调用另一个方法,就会发生这种情况。 JVM 会跟踪对象被锁定的次数。如果对象已解锁,则其计数为 0 。当一个线程首次获得锁时,计数变为 1 。每次同一线程在同一对象上获取另一个锁时,计数就会自增。显然,只有首先获得锁的线程才允许多次获取多个锁。每当线程离开 **synchronized** 方法时,计数递减,直到计数变为 0 ,完全释放锁以给其他线程使用。每个类也有一个锁(作为该类的 **Class** 对象的一部分),因此 **synchronized** 静态方法可以在类范围的基础上彼此锁定,不让同时访问静态数据。 你应该什么时候使用同步呢?可以永远 *Brian* 的同步法则[^2]。 @@ -620,7 +620,7 @@ No odd numbers discovered */ ``` -在两个递增操作之间插入 `Nap()` 构造器方法,以提高在 **currentEvenValue** 是奇数的状态时上下文切换的可能性。因为互斥锁可以阻止多个任务同时进入临界区,所有这不会产生失败。第一个进入 `next()` 方法的任务将获得锁,任何试图获取锁的后续任务都将被阻塞,直到第一个任务释放锁。此时,调度机制选择另一个等待锁的任务。通过这种方式,任何时刻只能有一个任务通过互斥锁保护的代码。 +在两个自增操作之间插入 `Nap()` 构造器方法,以提高在 **currentEvenValue** 是奇数的状态时上下文切换的可能性。因为互斥锁可以阻止多个任务同时进入临界区,所有这不会产生失败。第一个进入 `next()` 方法的任务将获得锁,任何试图获取锁的后续任务都将被阻塞,直到第一个任务释放锁。此时,调度机制选择另一个等待锁的任务。通过这种方式,任何时刻只能有一个任务通过互斥锁保护的代码。 ## volatile 关键字 @@ -631,7 +631,7 @@ No odd numbers discovered ### 字分裂 -当你的 Java 数据类型足够大(在 Java 中 **long** 和 **double** 类型都是 64 位),写入变量的过程分两步进行,就会发生 *Word tearing* (字分裂)情况。 JVM 被允许将64位数量的读写作为两个单独的32位操作执行[^3],这增加了在读写过程中发生上下文切换的可能性,因此其他任务会看到不正确的结果。这被称为 *Word tearing* (字分裂),因为你可能只看到其中一部分修改后的值。基本上,任务有时可以在第一步之后但在第二步之前读取变量,从而产生垃圾值(对于例如 **boolean** 或 **int** 类型的小变量是没有问题的;任何 **long** 或 **double** 类型则除外)。 +当你的 Java 数据类型足够大(在 Java 中 **long** 和 **double** 类型都是 64 位),写入变量的过程分两步进行,就会发生 *Word tearing* (字分裂)情况。 JVM 被允许将64位数量的读写作为两个单独的32位操作执行 [^3] ,这增加了在读写过程中发生上下文切换的可能性,因此其他任务会看到不正确的结果。这被称为 *Word tearing* (字分裂),因为你可能只看到其中一部分修改后的值。基本上,任务有时可以在第一步之后但在第二步之前读取变量,从而产生垃圾值(对于例如 **boolean** 或 **int** 类型的小变量是没有问题的;任何 **long** 或 **double** 类型则除外)。 在缺乏任何其他保护的情况下,用 **volatile** 修饰符定义一个 **long** 或 **double** 变量,可阻止字分裂情况。然而,如果使用 **synchronized** 或 **java.util.concurrent.atomic** 类之一保护这些变量,则 **volatile** 将被取代。此外,**volatile** 不会影响到增量操作并不是原子操作的事实。 @@ -701,7 +701,7 @@ public class ReOrdering implements Runnable { 在 Java 线程的讨论中,经常反复提交但不正确的知识是:“原子操作不需要同步”。 一个 *原子操作* 是不能被线程调度机制中断的操作;一旦操作开始,那么它一定可以在可能发生的“上下文切换”之前(切换到其他线程执行)执行完毕。依赖于原子性是很棘手且很危险的,如果你是一个并发编程专家,或者你得到了来自这样的专家的帮助,你才应该使用原子性来代替同步,如果你认为自己足够聪明可以应付这种玩火似的情况,那么请接受下面的测试: -> Goetz 测试:如果你可以编写用于现代微处理器的高性能 JVM ,那么就有资格考虑是否可以避免同步[^4]。 +> Goetz 测试:如果你可以编写用于现代微处理器的高性能 JVM ,那么就有资格考虑是否可以避免同步 [^4] 。 了解原子性是很有用的,并且知道它与其他高级技术一起用于实现一些更加巧妙的 **java.util.concurrent** 库组件。 但是要坚决抵制自己依赖它的冲动。 @@ -774,7 +774,7 @@ i:I 每条指令都会产生一个 “get” 和 “put”,它们之间还有一些其他指令。因此在获取指令和放置指令之间,另有一个任务可能会修改这个属性,所有,这些操作不是原子性的。 -让我们通过定义一个抽象类来测试原子性的概念,这个抽象类的方法是将一个整数类型进行偶数递增,并且 `run()` 不断地调用这个方法: +让我们通过定义一个抽象类来测试原子性的概念,这个抽象类的方法是将一个整数类型进行偶数自增,并且 `run()` 不断地调用这个方法: ```java // lowlevel/IntTestable.java @@ -876,7 +876,7 @@ public class SerialNumbers { } ``` -**SerialNumbers** 是你可以想象到最简单的类,如果你具备 C++ 或者其他底层的知识背景,你可能会认为递增是一个原子操作,因为 C++ 的递增操作通常被单个微处理器指令所实现(尽管不是以任何一致,可靠,跨平台的方式)。但是,正如前面所提到的,Java 递增操作不是原子性的,并且操作同时涉及读取和写入,因此即使在这样一个简单的操作中,也存在有线程问题的空间。 +**SerialNumbers** 是你可以想象到最简单的类,如果你具备 C++ 或者其他底层的知识背景,你可能会认为自增是一个原子操作,因为 C++ 的自增操作通常被单个微处理器指令所实现(尽管不是以任何一致,可靠,跨平台的方式)。但是,正如前面所提到的,Java 自增操作不是原子性的,并且操作同时涉及读取和写入,因此即使在这样一个简单的操作中,也存在有线程问题的空间。 我们在这里加入 volatile ,看看它是否有帮助。然而,真正的问题是 `nextSerialNumber()` 方法在不进行线程同步的情况下访问共享的可变变量值。 @@ -1363,12 +1363,137 @@ tryLock(2, TimeUnit.SECONDS): false ## 库组件 +**java.util.concurrent** 库提供大量旨在解决并发问题的类,可以帮助你生成更简单,更鲁棒的并发程序。但请注意,这些工具是比起并行流和 **CompletableFuture** 更底层的机制。 + +在本节中,我们将看一些使用不同组件的示例,然后讨论一下 *lock-free*(无锁) 库组件是如何工作的。 + ### DelayQueue +这是一个无界阻塞队列 ( **BlockingQueue** ),用于放置实现了 **Delayed** 接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,因此队首对象的延迟到期的时间最长。如果没有任何延迟到期,那么就不会有队首元素,并且 `poll()` 将返回 **null**(正因为这样,你不能将 **null** 放置到这种队列中)。 + +下面是一个示例,其中的 **Delayed** 对象自身就是任务,而 **DelayedTaskConsumer** 将最“紧急”的任务(到期时间最长的任务)从队列中取出,然后运行它。注意的是这样 **DelayQueue** 就成为了优先级队列的一种变体。 + +```java +// lowlevel/DelayQueueDemo.java +import java.util.*; +import java.util.stream.*; +import java.util.concurrent.*; +import static java.util.concurrent.TimeUnit.*; + +class DelayedTask implements Runnable, Delayed { + private static int counter = 0; + private final int id = counter++; + private final int delta; + private final long trigger; + protected static List sequence = + new ArrayList<>(); + DelayedTask(int delayInMilliseconds) { + delta = delayInMilliseconds; + trigger = System.nanoTime() + + NANOSECONDS.convert(delta, MILLISECONDS); + sequence.add(this); + } + @Override + public long getDelay(TimeUnit unit) { + return unit.convert( + trigger - System.nanoTime(), NANOSECONDS); + } + @Override + public int compareTo(Delayed arg) { + DelayedTask that = (DelayedTask)arg; + if(trigger < that.trigger) return -1; + if(trigger > that.trigger) return 1; + return 0; + } + @Override + public void run() { + System.out.print(this + " "); + } + @Override + public String toString() { + return + String.format("[%d] Task %d", delta, id); + } + public String summary() { + return String.format("(%d:%d)", id, delta); + } + public static class EndTask extends DelayedTask { + EndTask(int delay) { super(delay); } + @Override + public void run() { + sequence.forEach(dt -> + System.out.println(dt.summary())); + } + } +} + +public class DelayQueueDemo { + public static void + main(String[] args) throws Exception { + DelayQueue tasks = + Stream.concat( // Random delays: + new Random(47).ints(20, 0, 4000) + .mapToObj(DelayedTask::new), + // Add the summarizing task: + Stream.of(new DelayedTask.EndTask(4000))) + .collect(Collectors + .toCollection(DelayQueue::new)); + while(tasks.size() > 0) + tasks.take().run(); + } +} +/* Output: +[128] Task 12 [429] Task 6 [551] Task 13 [555] Task 2 +[693] Task 3 [809] Task 15 [961] Task 5 [1258] Task 1 +[1258] Task 20 [1520] Task 19 [1861] Task 4 [1998] Task +17 [2200] Task 8 [2207] Task 10 [2288] Task 11 [2522] +Task 9 [2589] Task 14 [2861] Task 18 [2868] Task 7 +[3278] Task 16 (0:4000) +(1:1258) +(2:555) +(3:693) +(4:1861) +(5:961) +(6:429) +(7:2868) +(8:2200) +(9:2522) +(10:2207) +(11:2288) +(12:128) +(13:551) +(14:2589) +(15:809) +(16:3278) +(17:1998) +(18:2861) +(19:1520) +(20:1258) +*/ +``` + +**DelayedTask** 包含一个称为 **sequence** 的 **List<DelayedTask>** ,它保存了任务被创建的顺序,因此我们可以看到排序是按照实际发生的顺序执行的。 + +**Delay** 接口有一个方法, `getDelay()` , 该方法用来告知延迟到期有多长时间,或者延迟在多长时间之前已经到期了。这个方法强制我们去使用 **TimeUnit** 类,因为这就是参数类型。这会产生一个非常方便的类,因为你可以很容易地转换单位而无需作任何声明。例如,**delta** 的值是以毫秒为单位存储的,但是 `System.nanoTime()` 产生的时间则是以纳秒为单位的。你可以转换 **delta** 的值,方法是声明它的单位以及你希望以什么单位来表示,就像下面这样: + +```java +NANOSECONDS.convert(delta, MILLISECONDS); +``` + +在 `getDelay()` 中, 所希望的单位是作为 **unit** 参数传递进来的,你使用它将当前时间与触发时间之间的差转换为调用者要求的单位,而无需知道这些单位是什么(这是*策略*设计模式的一个简单示例,在这种模式中,算法的一部分是作为参数传递进来的)。 + +为了排序, **Delayed** 接口还继承了 **Comparable** 接口,因此必须实现 `compareTo()` , 使其可以产生合理的比较。 + +从输出中可以看到,任务创建的顺序对执行顺序没有任何影响 - 相反,任务是按照所期望的延迟顺序所执行的。 + ### PriorityBlockingQueue ### Lock-Free Collections +#### The Copying Strategy + +#### Compare-And-Swap (CAS) + ## 本章小结 From 1863ae189f4d5db03e2e2d62cfe316f21d98459c Mon Sep 17 00:00:00 2001 From: AlfredAlan <854723360@mail.com> Date: Sun, 4 Aug 2019 19:06:09 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E9=99=84=E5=BD=95=20=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E5=BA=95=E5=B1=82=E5=8E=9F=E7=90=86=20=E7=BF=BB=E8=AF=91?= =?UTF-8?q?=E8=87=B3=20PriorityBlockingQueue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/book/Appendix-Low-Level-Concurrency.md | 157 +++++++++++++++++++- 1 file changed, 153 insertions(+), 4 deletions(-) diff --git a/docs/book/Appendix-Low-Level-Concurrency.md b/docs/book/Appendix-Low-Level-Concurrency.md index e7f701dc..21e23571 100644 --- a/docs/book/Appendix-Low-Level-Concurrency.md +++ b/docs/book/Appendix-Low-Level-Concurrency.md @@ -631,7 +631,7 @@ No odd numbers discovered ### 字分裂 -当你的 Java 数据类型足够大(在 Java 中 **long** 和 **double** 类型都是 64 位),写入变量的过程分两步进行,就会发生 *Word tearing* (字分裂)情况。 JVM 被允许将64位数量的读写作为两个单独的32位操作执行 [^3] ,这增加了在读写过程中发生上下文切换的可能性,因此其他任务会看到不正确的结果。这被称为 *Word tearing* (字分裂),因为你可能只看到其中一部分修改后的值。基本上,任务有时可以在第一步之后但在第二步之前读取变量,从而产生垃圾值(对于例如 **boolean** 或 **int** 类型的小变量是没有问题的;任何 **long** 或 **double** 类型则除外)。 +当你的 Java 数据类型足够大(在 Java 中 **long** 和 **double** 类型都是 64 位),写入变量的过程分两步进行,就会发生 *Word tearing* (字分裂)情况。 JVM 被允许将64位数量的读写作为两个单独的32位操作执行[^3],这增加了在读写过程中发生上下文切换的可能性,因此其他任务会看到不正确的结果。这被称为 *Word tearing* (字分裂),因为你可能只看到其中一部分修改后的值。基本上,任务有时可以在第一步之后但在第二步之前读取变量,从而产生垃圾值(对于例如 **boolean** 或 **int** 类型的小变量是没有问题的;任何 **long** 或 **double** 类型则除外)。 在缺乏任何其他保护的情况下,用 **volatile** 修饰符定义一个 **long** 或 **double** 变量,可阻止字分裂情况。然而,如果使用 **synchronized** 或 **java.util.concurrent.atomic** 类之一保护这些变量,则 **volatile** 将被取代。此外,**volatile** 不会影响到增量操作并不是原子操作的事实。 @@ -701,7 +701,7 @@ public class ReOrdering implements Runnable { 在 Java 线程的讨论中,经常反复提交但不正确的知识是:“原子操作不需要同步”。 一个 *原子操作* 是不能被线程调度机制中断的操作;一旦操作开始,那么它一定可以在可能发生的“上下文切换”之前(切换到其他线程执行)执行完毕。依赖于原子性是很棘手且很危险的,如果你是一个并发编程专家,或者你得到了来自这样的专家的帮助,你才应该使用原子性来代替同步,如果你认为自己足够聪明可以应付这种玩火似的情况,那么请接受下面的测试: -> Goetz 测试:如果你可以编写用于现代微处理器的高性能 JVM ,那么就有资格考虑是否可以避免同步 [^4] 。 +> Goetz 测试:如果你可以编写用于现代微处理器的高性能 JVM ,那么就有资格考虑是否可以避免同步[^4] 。 了解原子性是很有用的,并且知道它与其他高级技术一起用于实现一些更加巧妙的 **java.util.concurrent** 库组件。 但是要坚决抵制自己依赖它的冲动。 @@ -719,7 +719,7 @@ i++; // Might be atomic in C++ i += 2; // Might be atomic in C++ ``` -但是在 C++ 中,这取决于编译器和处理器。你无法编写出依赖于原子性的 C++ 跨平台代码,因为 C++ [^5] 没有像 Java 那样的一致 *内存模型* (memory model)。 +但是在 C++ 中,这取决于编译器和处理器。你无法编写出依赖于原子性的 C++ 跨平台代码,因为 C++ [^5]没有像 Java 那样的一致 *内存模型* (memory model)。 在 Java 中,上面的操作肯定不是原子性的,正如下面的方法产生的 JVM 指令中可以看到的那样: @@ -1488,6 +1488,155 @@ NANOSECONDS.convert(delta, MILLISECONDS); ### PriorityBlockingQueue +这是一个很基础的优先级队列,它具有可阻塞的读取操作。在下面的示例中, **Prioritized** 对象会被赋予优先级编号。几个 **Producer** 任务的实例会插入 **Prioritized** 对象到 **PriorityBlockingQueue** 中,但插入之间会有随机延时。然后,单个 **Consumer** 任务在执行 `take()` 时会显示多个选项,**PriorityBlockingQueue** 会将当前具有最高优先级的 **Prioritized** 对象提供给它。 + +在 **Prioritized** 中的静态变量 **counter** 是 **AtomicInteger** 类型。这是必要的,因为有多个 **Producer** 并行运行;如果不是 **AtomicInteger** 类型,你将会看到重复的 **id** 号。 这个问题在 [并发编程](./24-Concurrent-Programming.md) 的 [构造函数非线程安全](./24-Concurrent-Programming.md) 一节中讨论过。 + +```java +// lowlevel/PriorityBlockingQueueDemo.java +import java.util.*; +import java.util.stream.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import onjava.Nap; + +class Prioritized implements Comparable { + private static AtomicInteger counter = + new AtomicInteger(); + private final int id = counter.getAndIncrement(); + private final int priority; + private static List sequence = + new CopyOnWriteArrayList<>(); + Prioritized(int priority) { + this.priority = priority; + sequence.add(this); + } + @Override + public int compareTo(Prioritized arg) { + return priority < arg.priority ? 1 : + (priority > arg.priority ? -1 : 0); + } + @Override + public String toString() { + return String.format( + "[%d] Prioritized %d", priority, id); + } + public void displaySequence() { + int count = 0; + for(Prioritized pt : sequence) { + System.out.printf("(%d:%d)", pt.id, pt.priority); + if(++count % 5 == 0) + System.out.println(); + } + } + public static class EndSentinel extends Prioritized { + EndSentinel() { super(-1); } + } +} + +class Producer implements Runnable { + private static AtomicInteger seed = + new AtomicInteger(47); + private SplittableRandom rand = + new SplittableRandom(seed.getAndAdd(10)); + private Queue queue; + Producer(Queue q) { + queue = q; + } + @Override + public void run() { + rand.ints(10, 0, 20) + .mapToObj(Prioritized::new) + .peek(p -> new Nap(rand.nextDouble() / 10)) + .forEach(p -> queue.add(p)); + queue.add(new Prioritized.EndSentinel()); + } +} + +class Consumer implements Runnable { + private PriorityBlockingQueue q; + private SplittableRandom rand = + new SplittableRandom(47); + Consumer(PriorityBlockingQueue q) { + this.q = q; + } + @Override + public void run() { + while(true) { + try { + Prioritized pt = q.take(); + System.out.println(pt); + if(pt instanceof Prioritized.EndSentinel) { + pt.displaySequence(); + break; + } + new Nap(rand.nextDouble() / 10); + } catch(InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} + +public class PriorityBlockingQueueDemo { + public static void main(String[] args) { + PriorityBlockingQueue queue = + new PriorityBlockingQueue<>(); + CompletableFuture.runAsync(new Producer(queue)); + CompletableFuture.runAsync(new Producer(queue)); + CompletableFuture.runAsync(new Producer(queue)); + CompletableFuture.runAsync(new Consumer(queue)) + .join(); + } +} +/* Output: +[15] Prioritized 2 +[17] Prioritized 1 +[17] Prioritized 5 +[16] Prioritized 6 +[14] Prioritized 9 +[12] Prioritized 0 +[11] Prioritized 4 +[11] Prioritized 12 +[13] Prioritized 13 +[12] Prioritized 16 +[14] Prioritized 18 +[15] Prioritized 23 +[18] Prioritized 26 +[16] Prioritized 29 +[12] Prioritized 17 +[11] Prioritized 30 +[11] Prioritized 24 +[10] Prioritized 15 +[10] Prioritized 22 +[8] Prioritized 25 +[8] Prioritized 11 +[8] Prioritized 10 +[6] Prioritized 31 +[3] Prioritized 7 +[2] Prioritized 20 +[1] Prioritized 3 +[0] Prioritized 19 +[0] Prioritized 8 +[0] Prioritized 14 +[0] Prioritized 21 +[-1] Prioritized 28 +(0:12)(2:15)(1:17)(3:1)(4:11) +(5:17)(6:16)(7:3)(8:0)(9:14) +(10:8)(11:8)(12:11)(13:13)(14:0) +(15:10)(16:12)(17:12)(18:14)(19:0) +(20:2)(21:0)(22:10)(23:15)(24:11) +(25:8)(26:18)(27:-1)(28:-1)(29:16) +(30:11)(31:6)(32:-1) +*/ +``` + +与前面的示例一样,**Prioritized** 对象的创建顺序在 **sequence** 的 **list** 对象上所记入,以便与实际执行顺序进行比较。 **EndSentinel** 是用于告知 **Consumer** 对象关闭的特殊类型。 + +**Producer** 使用 **AtomicInteger** 变量为 **SplittableRandom** 设置随机生成种子,以便不同的 **Producer** 生成不同的队列。 这是必需的,因为多个生产者并行创建,如果不是这样,创建过程并不会是线程安全的。 + +**Producer** 和 **Consumer** 通过 **PriorityBlockingQueue** 相互连接。因为阻塞队列的性质提供了所有必要的同步,因为阻塞队列的性质提供了所有必要的同步,请注意,显式同步是并不需要的 — 从队列中读取数据时,你不用考虑队列中是否有任何元素,因为队列在没有元素时将阻塞读取。 + ### Lock-Free Collections #### The Copying Strategy @@ -1517,7 +1666,7 @@ NANOSECONDS.convert(delta, MILLISECONDS); [^4]: 这个测试的推论是,“如果某人表示线程是容易并且简单的,请确保这个人没有对你的项目做出重要的决策。如果那个人已经做出,那么你就已经陷入麻烦之中了。” -[^5]: 这在即将产生的 C++ 的标准中得到了补救 +[^5]: 这在即将产生的 C++ 的标准中得到了补救。
From 293ebe34ee4f6e31b9f8fa862fdaaeea305a7623 Mon Sep 17 00:00:00 2001 From: AlfredAlan <854723360@mail.com> Date: Mon, 5 Aug 2019 00:26:56 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E9=99=84=E5=BD=95=20=E5=B9=B6=E5=8F=91?= =?UTF-8?q?=E5=BA=95=E5=B1=82=E5=8E=9F=E7=90=86=20=E7=BF=BB=E8=AF=91?= =?UTF-8?q?=E8=87=B3=20=E6=97=A0=E9=94=81=E9=9B=86=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docs/book/Appendix-Low-Level-Concurrency.md | 24 ++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/docs/book/Appendix-Low-Level-Concurrency.md b/docs/book/Appendix-Low-Level-Concurrency.md index 21e23571..d85e110d 100644 --- a/docs/book/Appendix-Low-Level-Concurrency.md +++ b/docs/book/Appendix-Low-Level-Concurrency.md @@ -1637,11 +1637,29 @@ public class PriorityBlockingQueueDemo { **Producer** 和 **Consumer** 通过 **PriorityBlockingQueue** 相互连接。因为阻塞队列的性质提供了所有必要的同步,因为阻塞队列的性质提供了所有必要的同步,请注意,显式同步是并不需要的 — 从队列中读取数据时,你不用考虑队列中是否有任何元素,因为队列在没有元素时将阻塞读取。 -### Lock-Free Collections +### 无锁集合 -#### The Copying Strategy +[集合](./12-Collections.md) 章节强调集合是基本的编程工具,这也要求包含并发性。因此,早期的集合比如 **Vector** 和 **Hashtable** 有许多使用 **synchronized** 机制的方法。当这些集合不是在多线程应用中使用时,这就导致了不可接收的开销。在 Java 1.2 版本中,新的集合库是非同步的,而给 **Collection** 类赋予了各种 **static** **synchronized** 修饰的方法来同步不同的集合类型。虽然这是一个改进,因为它让你可以选择是否对集合使用同步,但是开销仍然基于同步锁定。 Java 5 版本添加新的集合类型,专门用于增加线程安全性能,使用巧妙的技术来消除锁定。 -#### Compare-And-Swap (CAS) +无锁集合有一个有趣的特性:只要读取者仅能看到已完成修改的结果,对集合的修改就可以同时发生在读取发生时。这是通过一些策略实现的。为了让你了解它们是如何工作的,我们来看看其中的一些。 + +#### 复制策略 + +使用“复制”策略,修改是在数据结构一部分的单独副本(或有时是整个数据的副本)上进行的,并且在整个修改过程期间这个副本是不可见的。仅当修改完成时,修改后的结构才与“主”数据结构安全地交换,然后读取者才会看到修改。 + +在 **CopyOnWriteArrayList** ,写入操作会复制整个底层数组。保留原来的数组,以便在修改复制的数组时可以线程安全地进行读取。当修改完成后,原子操作会将其交换到新数组中,以便新的读取操作能够看到新数组内容。 **CopyOnWriteArrayList** 的其中一个好处是,当多个迭代器遍历和修改列表时,它不会抛出 **ConcurrentModificationException** 异常,因此你不用就像过去必须做的那样,编写特殊的代码来防止此类异常。 + +**CopyOnWriteArraySet** 使用 **CopyOnWriteArrayList** 来实现其无锁行为。 + +**ConcurrentHashMap** 和 **ConcurrentLinkedQueue** 使用类似的技术来允许并发读写,但是只复制和修改集合的一部分,而不是整个集合。然而,读取者仍然不会看到任何不完整的修改。**ConcurrentHashMap** **不会抛出concurrentmodificationexception** 异常。 + +#### 比较并交换 (CAS) + +在 比较并交换 (CAS) 中,你从内存中获取一个值,并在计算新值时保留原始值。然后使用 CAS 指令,它将原始值与当前内存中的值进行比较,如果这两个值是相等的,则将内存中的旧值替换为计算新值的结果,所有操作都在一个原子操作中完成。如果原始值比较失败,则不会进行交换,因为这意味着另一个线程同时修改了内存。在这种情况下,你的代码必须再次尝试,获取一个新的原始值并重复该操作。 + +如果内存仅轻量竞争,CAS操作几乎总是在没有重复尝试的情况下完成,因此它非常快。相反,**synchronized** 操作需要考虑每次获取和释放锁的成本,这要昂贵得多,而且没有额外的好处。随着内存竞争的增加,使用 CAS 的操作会变慢,因为它必须更频繁地重复自己的操作,但这是对更多资源竞争的动态响应。这确实是一种优雅的方法。 + +最重要的是,许多现代处理器的汇编语言中都有一条 CAS 指令,并且也被 JVM 中的 CAS 操作(例如 **Atomic** 类中的操作)所使用。CAS 指令在硬件层面中是原子性的,并且与你所期望的操作一样快。 ## 本章小结