上次说了很多Linux下进程相关知识，这边不再复述

如果遇到听不懂的可以看上一次的文章：<a href="https://www.cnblogs.com/dotnetcrazy/p/9363810.html" target="_blank">https://www.cnblogs.com/dotnetcrazy/p/9363810.html</a>

下面来说说Python的并发编程，如有错误欢迎提出～

## 1.进程篇

官方文档：<a href="https://docs.python.org/3/library/multiprocessing.html" target="_blank">https://docs.python.org/3/library/multiprocessing.html</a>

### 1.1.进程（Process）

Python的进程创建非常方便，看个案例：(这种方法通用，fork只适用于Linux系)
```py
import os
# 注意一下，导入的是Process不是process（Class是大写开头）
from multiprocessing import Process

def test(name):
    print("[子进程-%s]PID：%d，PPID：%d" % (name, os.getpid(), os.getppid()))

def main():
    print("[父进程]PID：%d，PPID：%d" % (os.getpid(), os.getppid()))
    p = Process(target=test, args=("萌萌哒", )) # 单个元素的元组表达别忘了(x,)
    p.start()
    p.join()  # 父进程回收子进程资源（内部调用了wait系列方法）

if __name__ == '__main__':
    main()
```
运行结果：
```
[父进程]PID：25729，PPID：23434
[子进程-萌萌哒]PID：25730，PPID：25729
```

创建子进程时，传入一个执行函数和参数，用start()方法来启动进程即可

`join()`方法是父进程回收子进程的封装（主要是回收<a href="https://www.cnblogs.com/dotnetcrazy/p/9363810.html#2.2.僵尸进程和孤儿进程" target="_blank">僵尸子进程(点我)</a>）

其他参数可以参考源码 or 文档，贴一下源码的`init`方法：

`def __init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)`

扩展：`name：为当前进程实例的别名`

1. `p.is_alive()` 判断进程实例p是否还在执行
2. `p.terminate()` 终止进程（发`SIGTERM`信号）

上面的案例如果用OOP来实现就是这样：(如果不指定方法，默认调Run方法)
```py
import os
from multiprocessing import Process

class My_Process(Process):
    # 重写了Proce类的Init方法
    def __init__(self, name):
        self.__name = name
        Process.__init__(self)  # 调用父类方法

    # 重写了Process类的run()方法
    def run(self):
        print("[子进程-%s]PID：%d，PPID：%d" % (self.__name, os.getpid(),
                                          os.getppid()))

def main():
    print("[父进程]PID：%d，PPID：%d" % (os.getpid(), os.getppid()))
    p = My_Process("萌萌哒") # 如果不指定方法，默认调Run方法
    p.start()
    p.join()  # 父进程回收子进程资源（内部调用了wait系列方法）


if __name__ == '__main__':
    main()
```

---

### 1.1.源码拓展

现在说说里面的一些门道（只像用的可以忽略）

新版本的封装可能多层，这时候可以看看Python3.3.X系列（这个算是Python3早期版本了，很多代码都暴露出来，比较明了直观）

multiprocessing.process.py
```py
# 3.4.x开始，Process有了一个BaseProcess
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/process.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/process.py
def join(self, timeout=None):
    '''一直等到子进程over'''
    self._check_closed()
    # 断言（False就触发异常，提示就是后面的内容
    # 开发中用的比较多，部署的时候可以python3 -O xxx 去除所以断言
    assert self._parent_pid == os.getpid(), "只能 join 一个子进程"
    assert self._popen is not None, "只能加入一个已启动的进程"
    res = self._popen.wait(timeout) # 本质就是用了我们之前讲的wait系列
    if res is not None:
        _children.discard(self) # 销毁子进程
```
multiprocessing.popen_fork.py
```py
# 3.4.x开始，在popen_fork文件中（以前是multiprocessing.forking.py）
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/popen_fork.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/popen_fork.py
def wait(self, timeout=None):
    if self.returncode is None:
        # 设置超时的一系列处理
        if timeout is not None:
            from multiprocessing.connection import wait
            if not wait([self.sentinel], timeout):
                return None
        # 核心操作
        return self.poll(os.WNOHANG if timeout == 0.0 else 0)
    return self.returncode

# 回顾一下上次说的：os.WNOHANG - 如果没有子进程退出，则不阻塞waitpid()调用
def poll(self, flag=os.WNOHANG):
    if self.returncode is None:
        try:
            # 他的内部调用了waitpid
            pid, sts = os.waitpid(self.pid, flag)
        except OSError as e:
            # 子进程尚未创建
            # e.errno == errno.ECHILD == 10
            return None
        if pid == self.pid:
            if os.WIFSIGNALED(sts):
                self.returncode = -os.WTERMSIG(sts)
            else:
                assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
                self.returncode = os.WEXITSTATUS(sts)
    return self.returncode
```

关于断言的简单说明：（别泛滥）

如果条件为真，它什么都不做，反之它触发一个带可选错误信息的AssertionError

```py
def test(a, b):
    assert b != 0, "哥哥，分母不能为0啊"
    return a / b

def main():
    test(1, 0)

if __name__ == '__main__':
    main()
```
结果：
```
Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 2, in test
    assert b != 0, "哥哥，分母不能为0啊"
AssertionError: 哥哥，分母不能为0啊
```
运行的时候可以指定`-O参数`来忽略所以`assert`，eg：

`python3 -O 0.assert.py `
```
Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 3, in test
    return a / b
ZeroDivisionError: division by zero
```

---

扩展：

<a href="https://docs.python.org/3/library/unittest.html" target="_blank">https://docs.python.org/3/library/unittest.html</a>

<a href="https://www.cnblogs.com/shangren/p/8038935.html" target="_blank">https://www.cnblogs.com/shangren/p/8038935.html</a>

### 1.2.进程池

多个进程就不需要自己手动去管理了，有Pool来帮你完成，先看个案例：
```py
import os
import time
from multiprocessing import Pool  # 首字母大写

def test(name):
    print("[子进程-%s]PID=%d，PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)

def main():
    print("[父进程]PID=%d，PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool(5) # 设置最多5个进程（不设置就默认为CPU核数）
    for i in range(10):
        # 异步执行
        p.apply_async(test, args=(i, )) # 同步用apply（如非必要不建议用）
    p.close() # 关闭池，不再加入新任务
    p.join() # 等待所有子进程执行完毕回收资源（join可以指定超时时间，eg：`p.join(1)`）
    print("over")

if __name__ == '__main__':
    main()
```
图示：（join可以指定超时时间，eg：`p.join(1)`）
![1.进程池](https://images2018.cnblogs.com/blog/1127869/201808/1127869-20180805164841812-1549223582.gif)

**调用`join()`之前必须先调用`close()`，调用`close()`之后就不能继续添加新的`Process`了**

---

### 1.3.源码拓展

验证一下**Pool的默认大小是CPU的核数**，看源码：

multiprocessing.pool.py
```py
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/pool.py
class Pool(object):
    def __init__(self, processes=指定的进程数,...):
        if processes is None:
            processes = os.cpu_count() or 1 # os.cpu_count() ~ CPU的核数
```
源码里面`apply_async`方法，是有回调函数（callback）的
```py
def apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):
    if self._state != RUN:
        raise ValueError("Pool not running")
    result = ApplyResult(self._cache, callback, error_callback)
    self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
    return result
```
来看个例子：(和JQ很像)
```py
import os
import time
from multiprocessing import Pool  # 首字母大写

def test(name):
    print("[子进程%s]PID=%d，PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)
    return name

def error_test(name):
    print("[子进程%s]PID=%d，PPID=%d" % (name, os.getpid(), os.getppid()))
    raise Exception("[子进程%s]啊，我挂了～" % name)

def callback(result):
    """成功之后的回调函数"""
    print("[子进程%s]执行完毕" % result)  # 没有返回值就为None

def error_callback(msg):
    """错误之后的回调函数"""
    print(msg)

def main():
    print("[父进程]PID=%d，PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool()  # CPU默认核数
    for i in range(5):
        # 搞2个出错的看看
        if i > 2:
            p.apply_async(
                error_test,
                args=(i, ),
                callback=callback,
                error_callback=error_callback)  # 异步执行
        else:
            # 异步执行，成功后执行callback函数（有点像jq）
            p.apply_async(test, args=(i, ), callback=callback)
    p.close()  # 关闭池，不再加入新任务
    p.join()  # 等待所有子进程执行完毕回收资源
    print("over")

if __name__ == '__main__':
    main()
```

输出：
```
[父进程]PID=12348，PPID=10999
[子进程0]PID=12349，PPID=12348
[子进程2]PID=12351，PPID=12348
[子进程1]PID=12350，PPID=12348
[子进程3]PID=12352，PPID=12348
[子进程4]PID=12352，PPID=12348
[子进程3]啊，我挂了～
[子进程4]啊，我挂了～
[子进程0]执行完毕
[子进程2]执行完毕
[子进程1]执行完毕
over
```


## 2.线程篇

## NetCore并发编程

示例代码：<a href="https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency" target="_blank">https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency</a>

先简单说下概念（其实之前也有说，所以简说下）：
1. 并发：同时做多件事情
2. 多线程：并发的一种形式
3. 并行处理：多线程的一种（线程池产生的一种并发类型，eg：**异步编程**）
4. 响应式编程：一种编程模式，对事件进行响应（有点类似于JQ的事件）

Net里面很少用进程，在以前基本上都是`线程+池+异步+并行+协程`

我这边简单引入一下，毕竟主要是写Python的教程，Net只是帮你们回顾一下，如果你发现还没听过这些概念，或者你的项目中还充斥着各种`Thread`和`ThreadPool`的话，真的得系统的学习一下了，现在官网的文档已经很完善了，记得早几年啥都没有，也只能挖那些外国开源项目：

<a href="https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency" target="_blank">https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency</a>

### 1.异步编程（Task）

Task的目的其实就是为了简化`Thread`和`ThreadPool`的代码，下面一起看看吧：

异步用起来比较简单，一般IO，DB，Net用的比较多，很多时候都会采用重试机制，举个简单的例子：
```csharp
/// <summary>
/// 模拟一个网络操作（别忘了重试机制）
/// </summary>
/// <param name="url">url</param>
/// <returns></returns>
private async static Task<string> DownloadStringAsync(string url)
{
    using (var client = new HttpClient())
    {
        // 设置第一次重试时间
        var nextDelay = TimeSpan.FromSeconds(1);
        for (int i = 0; i < 3; i++)
        {
            try
            {
                return await client.GetStringAsync(url);
            }
            catch { }
            await Task.Delay(nextDelay); // 用异步阻塞的方式防止服务器被太多重试给阻塞了
            nextDelay *= 2; // 3次重试机会，第一次1s，第二次2s，第三次4s
        }
        // 最后一次尝试，错误就抛出
        return await client.GetStringAsync(url);
    }
}
```
然后补充说下Task异常的问题，当你await的时候如果有异常会抛出，在第一个await处捕获处理即可

如果`async`和`await`就是理解不了的可以这样想：`async`就是为了让`await`生效（为了向后兼容）

对了，如果返回的是void，你设置成Task就行了，触发是类似于事件之类的方法才使用void，不然没有返回值都是使用Task

项目里经常有这么一个场景：**等待一组任务完成后再执行某个操作**,看个引入案例：
```csharp
/// <summary>
/// 1.批量任务
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
private async static Task<string[]> DownloadStringAsync(IEnumerable<string> list)
{
    using (var client = new HttpClient())
    {
        var tasks = list.Select(url => client.GetStringAsync(url)).ToArray();
        return await Task.WhenAll(tasks);
    }
}
```
再举一个场景：**同时调用多个同效果的API，有一个返回就好了，其他的忽略**
```csharp
/// <summary>
/// 2.返回首先完成的Task
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
private static async Task<string> GetIPAsync(IEnumerable<string> list)
{
    using (var client = new HttpClient())
    {
        var tasks = list.Select(url => client.GetStringAsync(url)).ToArray();
        var task = await Task.WhenAny(tasks); // 返回第一个完成的Task
        return await task;
    }
}
```
一个async方法被await调用后，当它恢复运行时就会回到原来的上下文中运行。

如果你的Task不再需要上下文了可以使用：`task.ConfigureAwait(false)`，eg：写个日记还要啥上下文？

逆天的建议是：**在核心代码里面一种使用`ConfigureAwait`，用户页面相关代码，不需要上下文的加上**

其实如果有太多await在上下文里恢复那也是比较卡的，使用`ConfigureAwait`之后，被暂停后会在线程池里面继续运行

再看一个场景：比如一个耗时操作，我需要指定它的超时时间：
```csharp
 /// <summary>
/// 3.超时取消
/// </summary>
/// <returns></returns>
private static async Task<string> CancellMethod()
{
    //实例化取消任务
    var cts = new CancellationTokenSource();
    cts.CancelAfter(TimeSpan.FromSeconds(3)); // 设置失效时间为3s
    try
    {
        return await DoSomethingAsync(cts.Token);
    }
    // 任务已经取消会引发TaskCanceledException
    catch (TaskCanceledException ex)
    {

        return "false";
    }
}
/// <summary>
/// 模仿一个耗时操作
/// </summary>
/// <returns></returns>
private static async Task<string> DoSomethingAsync(CancellationToken token)
{
    await Task.Delay(TimeSpan.FromSeconds(5), token);
    return "ok";
}
```
异步这块简单回顾就不说了，留两个扩展，你们自行探讨：
1. 进度方面的可以使用`IProgress<T>`，就当留个作业自己摸索下吧～
2. 使用了异步之后尽量避免使用`task.Wait` or `task.Result`，这样可以避免死锁

Task其他新特征去官网看看吧，引入到此为止了。

---

### 2.并行编程（Parallel）

这个其实出来很久了，现在基本上都是用`PLinq`比较多点，主要就是：
1. **数据并行**：重点在处理数据（eg：聚合）
2. **任务并行**：重点在执行任务（每个任务块尽可能独立，越独立效率越高）

#### 数据并行

以前都是`Parallel.ForEach`这么用，现在和Linq结合之后非常方便`.AsParallel()`就OK了

说很抽象看个简单案例：

```csharp
static void Main(string[] args)
{
    IEnumerable<int> list = new List<int>() { 1, 2, 3, 4, 5, 7, 8, 9 };
    foreach (var item in ParallelMethod(list))
    {
        Console.WriteLine(item);
    }
}
/// <summary>
/// 举个例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
    return list.AsParallel().Select(x => x * x);
}
```
正常执行的结果应该是：
```
1
4
9
25
64
16
49
81
```
并行之后就是这样了（不管顺序了）：
```
25
64
1
9
49
81
4
16
```

当然了，如果你就是对顺序有要求可以使用：**`.AsOrdered()`**
```csharp
/// <summary>
/// 举个例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
    return list.AsParallel().AsOrdered().Select(x => x * x);
}
```

其实实际项目中，使用并行的时候：**任务时间适中，太长不适合，太短也不适合**

记得大家在项目里经常会用到如`Sum`，`Count`等聚合函数，其实这时候使用并行就很合适

```csharp
var list = new List<long>();
for (long i = 0; i < 1000000; i++)
{
    list.Add(i);
}
Console.WriteLine(GetSumParallel(list));
```
```csharp
private static long GetSumParallel(IEnumerable<long> list)
{
    return list.AsParallel().Sum();
}
```
time dotnet PLINQ.dll
```
499999500000

real	0m0.096s
user	0m0.081s
sys	0m0.025s
```
不使用并行：（稍微多了点，CPU越密集差距越大）
```
499999500000

real	0m0.103s
user	0m0.092s
sys	0m0.021s
```
其实聚合有一个通用方法，可以支持复杂的聚合：(以上面sum为例)
```
.Aggregate(
            seed:0,
            func:(sum,item)=>sum+item
          );
```

稍微扩展一下，PLinq也是支持取消的，**`.WithCancellation(CancellationToken)`**

Token的用法和上面一样，就不复述了，如果需要和异步结合，一个`Task.Run`就可以把并行任务交给线程池了

也可以使用Task的异步方法，设置超时时间，这样PLinq超时了也就终止了

PLinq这么方便，其实也是有一些小弊端的，比如它会直接最大程度的占用系统资源，可能会影响其他的任务，而传统的Parallel则会动态调整

---

#### 任务并行（并行调用）

这个PLinq好像没有对应的方法，有新语法你可以说下，来举个例子：
```csharp
await Task.Run(() =>
    Parallel.Invoke(
        () => Task.Delay(TimeSpan.FromSeconds(3)),
        () => Task.Delay(TimeSpan.FromSeconds(2))
    ));
```
取消也支持：
```csharp
Parallel.Invoke(new ParallelOptions() { CancellationToken = token }, actions);
```

### 扩充说明

其实还有一些比如**数据流**和**响应编程**没说，这个之前都是用第三方库，刚才看官网文档，好像已经支持了，所以就不卖弄了，感兴趣的可以去看看，其实项目里面有流数据相关的框架，eg：`Spark`，都是比较成熟的解决方案了基本上也不太使用这些了。

然后还有一些没说，比如NetCore里面**不可变类型**（列表、字典、集合、队列、栈、线程安全字典等等）以及**限流**、**任务调度**等，这些关键词我提一下，也方便你去搜索自己学习拓展

先到这吧，其他的自己探索一下吧，最后贴一些Nuget库，你可以针对性的使用：

**数据流**：`Microsoft.Tpl.Dataflow`
**响应编程**(Linq的Rx操作)：`Rx-Main`
**不可变类型**：`Microsoft.Bcl.Immutable`