Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions docs/book/24-Concurrent-Programming.md
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ Sum Iterated: 284ms

**main()** 的第一个版本使用直接生成 **Stream** 并调用 **sum()** 的方法。我们看到流的好处在于即使SZ为十亿(1_000_000_000)程序也可以很好地处理而没有溢出(为了让程序运行得快一点,我使用了较小的数字)。使用 **parallel()** 的基本范围操作明显更快。

如果使用**iterate()**来生成序列,则减速是相当明显的,可能是因为每次生成数字时都必须调用lambda。但是如果我们尝试并行化,当**SZ**超过一百万时,结果不仅比非并行版本花费的时间更长,而且也会耗尽内存(在某些机器上)。当然,当你可以使用**range()**时,你不会使用**iterate()**,但如果你生成的东西不是简单的序列,你必须使用**iterate()**。应用**parallel()**是一个合理的尝试,但会产生令人惊讶的结果。我们将在后面的部分中探讨内存限制的原因,但我们可以对流并行算法进行初步观察:
如果使用**iterate()** 来生成序列,则减速是相当明显的,可能是因为每次生成数字时都必须调用lambda。但是如果我们尝试并行化,当**SZ**超过一百万时,结果不仅比非并行版本花费的时间更长,而且也会耗尽内存(在某些机器上)。当然,当你可以使用**range()**时,你不会使用**iterate()** ,但如果你生成的东西不是简单的序列,你必须使用**iterate()** 。应用**parallel()** 是一个合理的尝试,但会产生令人惊讶的结果。我们将在后面的部分中探讨内存限制的原因,但我们可以对流并行算法进行初步观察:

- 流并行性将输入数据分成多个部分,因此算法可以应用于那些单独的部分。
- 数组分割成本低,分割均匀且对分割的大小有着完美的掌控。
Expand Down Expand Up @@ -554,9 +554,9 @@ Long Parallel: 1008ms**

- parallel()/limit()交点

使用**parallel()**时会有更复杂的问题。从其他语言中吸取的流机制被设计为大约是一个无限的流模型。如果你拥有有限数量的元素,则可以使用集合以及为有限大小的集合设计的关联算法。如果你使用无限流,则使用针对流优化的算法。
使用 **parallel()** 时会有更复杂的问题。从其他语言中吸取的流机制被设计为大约是一个无限的流模型。如果你拥有有限数量的元素,则可以使用集合以及为有限大小的集合设计的关联算法。如果你使用无限流,则使用针对流优化的算法。

Java 8将两者合并起来。例如,**Collections**没有内置的**map()**操作。在**Collection**和**Map**中唯一类似流的批处理操作是**forEach()**。如果要执行**map()**和**reduce()**等操作,必须首先将**Collection**转换为存在这些操作的**Stream**:
Java 8将两者合并起来。例如,**Collections**没有内置的**map()** 操作。在**Collection**和**Map**中唯一类似流的批处理操作是**forEach()** 。如果要执行**map()** 和**reduce()** 等操作,必须首先将**Collection**转换为存在这些操作的**Stream**:

```java
// concurrent/CollectionIntoStream.java
Expand Down Expand Up @@ -595,9 +595,9 @@ bynxt
:PENCUXGVGINNLOZVEWPPCPOALJLNXT
```

**Collection**确实有一些批处理操作,如**removeAll()**,**removeIf()**和**retainAll()**,但这些都是破坏性的操作。**ConcurrentHashMap**对**forEach**和**reduce**操作有特别广泛的支持。
**Collection**确实有一些批处理操作,如**removeAll()**,**removeIf()** 和**retainAll()** ,但这些都是破坏性的操作。**ConcurrentHashMap**对**forEach**和**reduce**操作有特别广泛的支持。

在许多情况下,只在集合上调用**stream()**或者**parallelStream()**没有问题。但是,有时将**Stream**与**Collection**混合会产生意想不到的结果。这是一个有趣的难题:
在许多情况下,只在集合上调用**stream()** 或者**parallelStream()** 没有问题。但是,有时将**Stream**与**Collection**混合会产生意想不到的结果。这是一个有趣的难题:

```java
// concurrent/ParallelStreamPuzzle.java
Expand Down Expand Up @@ -672,11 +672,12 @@ public class ParallelStreamPuzzle2 {
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
```

current是使用线程安全的 **AtomicInteger** 类定义的,可以防止竞争条件;**parallel()**允许多个线程调用**get()**。
current是使用线程安全的 **AtomicInteger** 类定义的,可以防止竞争条件;**parallel()** 允许多个线程调用**get()**。

在查看 **PSP2.txt**.**IntGenerator.get()** 被调用1024次时,你可能会感到惊讶。
在查看 **PSP2.txt** .**IntGenerator.get()** 被调用1024次时,你可能会感到惊讶。

**0: main
```
0: main
1: ForkJoinPool.commonPool-worker-1
2: ForkJoinPool.commonPool-worker-2
3: ForkJoinPool.commonPool-worker-2
Expand All @@ -698,7 +699,8 @@ current是使用线程安全的 **AtomicInteger** 类定义的,可以防止竞
20: ForkJoinPool.commonPool-worker-110
21: ForkJoinPool.commonPool-worker-110
22: ForkJoinPool.commonPool-worker-110
23: ForkJoinPool.commonPool-worker-1**
23: ForkJoinPool.commonPool-worker-1
```

这个块大小似乎是内部实现的一部分(尝试使用`limit()` 的不同参数来查看不同的块大小)。将`parallel()`与`limit()`结合使用可以预取一串值,作为流输出。

Expand Down Expand Up @@ -741,17 +743,17 @@ public class ParallelStreamPuzzle3 {
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
```

为了表明**parallel()**确实有效,我添加了一个对**peek()**的调用,这是一个主要用于调试的流函数:它从流中提取一个值并执行某些操作但不影响从流向下传递的元素。注意这会干扰线程行为,但我只是尝试在这里做一些事情,而不是实际调试任何东西。
为了表明**parallel()**确实有效,我添加了一个对**peek()** 的调用,这是一个主要用于调试的流函数:它从流中提取一个值并执行某些操作但不影响从流向下传递的元素。注意这会干扰线程行为,但我只是尝试在这里做一些事情,而不是实际调试任何东西。

你还可以看到**boxed()**的添加,它接受**int**流并将其转换为**Integer**流。
你还可以看到**boxed()** 的添加,它接受**int**流并将其转换为**Integer**流。

现在我们得到多个线程产生不同的值,但它只产生10个请求的值,而不是1024个产生10个值。

它更快吗?一个更好的问题是:什么时候开始有意义?当然不是这么小的一套;上下文切换的代价远远超过并行性的任何加速。很难想象什么时候用并行生成一个简单的数字序列会有意义。如果你要生成的东西需要很高的成本,它可能有意义 - 但这都是猜测。只有通过测试我们才能知道用并行是否有效。记住这句格言:“首先使它工作,然后使它更快地工作 - 只有当你必须这样做时。”将**parallel()**和**limit()**结合使用仅供专家操作(把话说在前面,我不认为自己是这里的专家)。
它更快吗?一个更好的问题是:什么时候开始有意义?当然不是这么小的一套;上下文切换的代价远远超过并行性的任何加速。很难想象什么时候用并行生成一个简单的数字序列会有意义。如果你要生成的东西需要很高的成本,它可能有意义 - 但这都是猜测。只有通过测试我们才能知道用并行是否有效。记住这句格言:“首先使它工作,然后使它更快地工作 - 只有当你必须这样做时。”将**parallel()** 和**limit()** 结合使用仅供专家操作(把话说在前面,我不认为自己是这里的专家)。

- 并行流只看起来很容易

实际上,在许多情况下,并行流确实可以毫不费力地更快地产生结果。但正如你所见,仅仅将**parallel()**加到你的Stream操作上并不一定是安全的事情。在使用**parallel()**之前,你必须了解并行性如何帮助或损害你的操作。一个基本认知错误就是认为使用并行性总是一个好主意。事实上并不是。Stream意味着你不需要重写所有代码便可以并行运行它。但是流的出现并不意味着你可以不用理解并行的原理以及不用考虑并行是否真的有助于实现你的目标。
实际上,在许多情况下,并行流确实可以毫不费力地更快地产生结果。但正如你所见,仅仅将**parallel()** 加到你的Stream操作上并不一定是安全的事情。在使用**parallel()** 之前,你必须了解并行性如何帮助或损害你的操作。一个基本认知错误就是认为使用并行性总是一个好主意。事实上并不是。Stream意味着你不需要重写所有代码便可以并行运行它。但是流的出现并不意味着你可以不用理解并行的原理以及不用考虑并行是否真的有助于实现你的目标。

## 创建和运行任务

Expand Down Expand Up @@ -788,7 +790,7 @@ public class NapTask implements Runnable {
}
```

这只是一个**Runnable**:一个包含**run()**方法的类。它没有包含实际运行任务的机制。我们使用**Nap**类中的“sleep”:
这只是一个**Runnable**:一个包含**run()** 方法的类。它没有包含实际运行任务的机制。我们使用**Nap**类中的“sleep”:

```java
// onjava/Nap.java
Expand All @@ -810,9 +812,9 @@ public class Nap {
```
为了消除异常处理的视觉干扰,这被定义为实用程序。第二个构造函数在超时时显示一条消息

对**TimeUnit.MILLISECONDS.sleep()**的调用获取“当前线程”并在参数中将其置于休眠状态,这意味着该线程被挂起。这并不意味着底层处理器停止。操作系统将其切换到其他任务,例如在你的计算机上运行另一个窗口。OS任务管理器定期检查**sleep()**是否超时。当它执行时,线程被“唤醒”并给予更多处理时间。
对**TimeUnit.MILLISECONDS.sleep()** 的调用获取“当前线程”并在参数中将其置于休眠状态,这意味着该线程被挂起。这并不意味着底层处理器停止。操作系统将其切换到其他任务,例如在你的计算机上运行另一个窗口。OS任务管理器定期检查**sleep()** 是否超时。当它执行时,线程被“唤醒”并给予更多处理时间。

你可以看到**sleep()**抛出一个受检的**InterruptedException**;这是原始Java设计中的一个工件,它通过突然断开它们来终止任务。因为它往往会产生不稳定的状态,所以后来不鼓励终止。但是,我们必须在需要或仍然发生终止的情况下捕获异常。
你可以看到**sleep()** 抛出一个受检的**InterruptedException**;这是原始Java设计中的一个工件,它通过突然断开它们来终止任务。因为它往往会产生不稳定的状态,所以后来不鼓励终止。但是,我们必须在需要或仍然发生终止的情况下捕获异常。

要执行任务,我们将从最简单的方法--SingleThreadExecutor开始:

Expand Down Expand Up @@ -867,7 +869,7 @@ main awaiting termination
NapTask[9] pool-1-thread-1
```

首先请注意,没有**SingleThreadExecutor**类。**newSingleThreadExecutor()****Executors**中的工厂,它创建特定类型的[^4]
首先请注意,没有**SingleThreadExecutor**类。**newSingleThreadExecutor()****Executors** 中的一个工厂方法,它创建特定类型的**ExecutorService**[^4]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
docsify 这么s神奇的么。那这里不会出现加粗失败么?


我创建了十个NapTasks并将它们提交给ExecutorService,这意味着它们开始自己运行。然而,在此期间,main()继续做事。当我运行callexec.shutdown()时,它告诉ExecutorService完成已经提交的任务,但不接受任何新任务。此时,这些任务仍然在运行,因此我们必须等到它们在退出main()之前完成。这是通过检查exec.isTerminated()来实现的,这在所有任务完成后变为true。

Expand Down Expand Up @@ -932,7 +934,7 @@ public class MoreTasksAfterShutdown {
java.util.concurrent.RejectedExecutionException: TaskNapTask[99] rejected from java.util.concurrent.ThreadPoolExecutor@4e25154f[Shutting down, pool size = 1,active threads = 1, queued tasks = 0, completed tasks =0]NapTask[1] pool-1-thread-1
```

**exec.shutdown()**的替代方法是**exec.shutdownNow()**,它除了不接受新任务外,还会尝试通过中断任务来停止任何当前正在运行的任务。同样,中断是错误的,容易出错并且不鼓励。
**exec.shutdown()** 的替代方法是**exec.shutdownNow()** ,它除了不接受新任务外,还会尝试通过中断任务来停止任何当前正在运行的任务。同样,中断是错误的,容易出错并且不鼓励。

- 使用更多线程

Expand Down