## 2.5.2.5. Update with Aggregation Pipeline(聚合管道更新)

### 一、MongoDB中的聚合操作概述  
[聚合操作中文社区文档](https://docs.mongoing.com/aggregation)， [聚合视频教程](https://www.bilibili.com/video/BV1xV411z7FH/?spm_id_from=333.337.search-card.all.click&vd_source=db5f224185fdd2c28b4cc762ebce92fe)  

聚合操作处理数据记录和 return 计算结果。聚合操作将来自多个文档的值组合在一起，并且可以对分组数据执行各种操作以返回单个结果。 MongoDB 提供了三种执行聚合的方法：*聚合管道*，*map-reduce function*和*单一目的聚合方法*。

#### 聚合管道 [中文社区文档](https://docs.mongoing.com/aggregation/aggregation-reference/aggregation-pipeline-quick-reference)

MongoDB 的Aggregation framework是以数据处理管道的概念为蓝本的。**文档进入多阶段管道，将文档转换为聚合结果**。例如：
在这个例子中：

```bash
db.orders.aggregate([
   { $match: { status: "A" } },
   { $group: { _id: "$cust_id", total: { $sum: "$amount" } } }
])
```
代码解释：  
第一阶段：`$match`阶段按`status`字段过滤文档，并将`status`等于`"A"`的文档传递到下一阶段。  
此示例中只使用了简单的筛选文档。当然筛选过程中可以使用多种的运算符(算数表达式运算符，数组表达式运算符，布尔表达式运算符，比较表达式运算符，条件表达式运算符，日期，字符串表达式运算符等。)

第二阶段：`$group`阶段按`cust_id`字段将文档分组，以计算每个唯一值`cust_id`的金额总和。  
其中 `$sum`为(`group`)分组累加运算符，同时还存在其他运算符(详见 [文档](https://docs.mongoing.com/aggregation/aggregation-reference/aggregation-pipeline-quick-reference#lei-jia-qi-group))，包括:
- `$avg`: 分组中字段的平均值   
- `$max`: 分组中字段的最大值  
- `$min`: 分组中字段的最小值  
- `$first`: 分组中字段的第一个值  
- `$last`: 分组中字段的最后一个值  
- `$stdDevPop`: 返回输入值的总体标准偏差  
- `$stdDevSamp`: 返回输入值的sample标准偏差。
- `$push`: 返回每个 `group` 的表达式值的 `array`
- `$addToSet`: 返回每个 `group` 的唯一表达式值的 `array`

最基本的管道阶段提供_过滤器_，其操作类似于查询和修改输出文档格式的_文档转换_。  
其他管道操作提供了用于按特定字段对文档进行分组和排序的工具，以及用于汇总包括文档数组在内的数组内容的工具。另外，管道阶段可以将运算符用于诸如计算平均值或连接字符串之类的任务。  
管道使用MongoDB中的原生操作提供有效的数据聚合，并且是MongoDB中数据聚合的首选方法。

#### Map-Reduce: 较复杂，有待研究  

#### 单用途聚合操作

`MongoDB` 还提供 `db.collection.estimatedDocumentCount()`, `db.collection.count()`和`db.collection.distinct()`。

所有这些操作都聚合来自单个集合的文档。虽然这些操作提供了对常见聚合过程的简单访问，但它们缺乏聚合管道和 map-reduce 的灵活性和功能。

`db.collection.count(query，options)`: 返回将_查询集合或视图的find()查询的文档计数。 `db.collection.count()`方法不执行`find()`操作，而是计算并返回匹配查询的结果数。[文档](https://docs.mongoing.com/can-kao/mongo-shell-methods/collection-methods/db-collection-count)

`query`: 查询筛选文档(查询筛选条件)  
`options`: 可选的参数。修改计数的额外选项  

`db.collection. distinct(字段，查询，选项)`: 在单个集合或视图中查找指定字段的不同值，并在 array 中返回结果。[中文文档](https://docs.mongoing.com/can-kao/mongo-shell-methods/collection-methods/db-collection-distinct)

排除重复项，并返回结果。

`field(string)`：  要为其返回不同值的字段。  
`query(document)`: 一个查询，指定从中检索不同值的文档。(查询筛选条件)
`options(document)`: 见文档描述

### 二、管道聚合详解
聚合方法(db.collection.aggregate)
在`db.collection.aggregate`方法中，管道阶段出现在数组中。文档按顺序通过各个阶段。除`$out`, `$merge`和`$geoNear`阶段之外的所有阶段都可以在管道中多次出现。
```python
db.collection.aggregate( [ { <stage> }, ... ] )
```

#### 主要的管道阶段操作符

- `$match`:  过滤文档流以仅允许匹配的文档未经修改地传递到下一个管道阶段。 `$match`使用标准的 `MongoDB` 查询。对于每个输入文档，输出一个文档(匹配)或零文档(不匹配)。  
- `$group`: 按指定的标识符表达式对文档进行分组，并将累加器表达式(如果指定)应用于每个组。使用所有输入文档并*为每个不同的组输出一个文档*。输出文档只包含标识符字段和累积字段(如果指定的话)。  
- `$project`: 重新整形流中的每个文档，例如添加新字段或删除现有字段。对于每个输入文档，输出一个文档。 有关删除现有字段，请参见`$unset`。 - `skip`: 输出时跳过一定数量的文档
- `limit`: 限制输出文档的数量  

#### 示例1，`match`+`project`

```python
db.users.aggregate([
    {"$match": {"gender": "男"}},
    {"$skip": 100},
    {"$limit": 20},
    {"$project":{
        "名": "$first_name",
        "姓": "$last_name"
    }}
])
```
管道流程解释： 先筛选出 `{"gender": "男"}`的所有文档,跳过100条文档，限制输入条数为 20, 只输出 两个字段 `first_name`, `last_name`.


#### 示例2，`match`+`group`

```python
db.users.aggregate([
    {"$match": {"gender": "女"}},
    {"$group":{
        "_id": "$DEPARTMENT",
        "emp_qty": {"$sum": 1}
    }}
    {"$match": {"emp_qty": {"$lt": 10}}},

])
```
管道流程解释： 先筛选出 `{"gender": "女"}`的所有文档,对字段 `DEPARTMENT`进行分组统计；统计后，针对结果进行再次筛选，输出上阶段生成文档中`emp_qty`小于10的文档结果。等同于， 查询出部门员工女性小于10人的部门。注意每个阶段中 `$` 符号的使用。


示例3,`unwind`，展开数组

`mongodb`文档如下:
```python
>db.student.findOne()

{
    name: "张三",
    score: [
        {subject:"语文", socre: 74}，
        {subject:"数学", socre: 64}，
        {subject:"外语", socre: 94}，
    ]
}
```
使用`unwind`展开数组的结果:

```python
>db.student.aggregate([{"$unwind": "$score"}])

{name: "张三", score: {subject:"语文", socre: 74}}
{name: "张三", score: {subject:"数学", socre: 64}}
{name: "张三", score: {subject:"外语", socre: 94}}
```

### 三、MongoEngine聚合更新示例

It is possible to provide a raw PyMongo aggregation update parameter, which will be integrated directly into the update. This is done by using `__raw__` keyword argument to the update method and provide the pipeline as a list Update with Aggregation Pipeline

```python
# 'tags' field is set to 'coding is fun'
Page.objects(tags='coding').update(__raw__=[
    {"$set": {"tags": {"$concat": ["$tags", "is fun"]}}}
    ],
)
```

#### 四、 MongoEngine聚合函数 [文档](https://mongoengine-odm.readthedocs.io/apireference.html#module-mongoengine.queryset)

`QuerySet.aggregate(pipeline, *suppl_pipeline, **kwargs)`:
    Perform a aggregate function based in your queryset params

Parameters
    pipeline : list of aggregation commands, see: [document](http://docs.mongodb.org/manual/core/aggregation-pipeline/)  
    suppl_pipeline : unpacked list of pipeline (added to support deprecation of the old interface) parameter will be removed shortly  
    kwargs : (optional) kwargs dictionary to be passed to pymongo’s aggregate call See [document](https://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.aggregate)
