# [Distributed Nature](https://www.elastic.co/guide/en/elasticsearch/guide/current/_distributed_nature.html)

* <span style="color:red">Elasticsearch is distributed by nature, and it is designed to hide the complexity that comes with being distributed.</span>
* <span style="color:red">Nothing in the tutorial required you to know about distributed systems, sharding, cluster discovery, or dozens of other distributed concepts.</span>

ElasticSearch对用户屏蔽了以下细节：

1. Partitioning your documents into different containers or shards, which can be stored on a single node or on multiple nodes
2. Balancing these shards across the nodes in your cluster to spread the indexing and search load
3. Duplicating each shard to provide redundant copies of your data, to prevent data loss in case of hardware failure
4. Routing requests from any node in the cluster to the nodes that hold the data you’re interested in
5. Seamlessly integrating new nodes as your cluster grows or redistributing shards to recover from node loss

## 1. [Life Inside a Cluster](https://www.elastic.co/guide/en/elasticsearch/guide/current/distributed-cluster.html) - failover

### master node

* 集群中会有一个master节点，负责 create/delete 以及集群管理；
* 在ES中，master节点一般不负责doc级的变更或查询请求，从而保证单master也不会成为瓶颈；

![master node](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_0201.png)

* 用户可以连接任意节点（包括master），每个节点都有全部doc的路由信息

### index/shards

* index只是逻辑上的命名空间，指向多个shards；
* shard是实际的存储单元，对应一个Lucene的实例；
* ES分布和复制的是shard；
* 当集群机器数发生变化，ES会自动重新分布shards来保证负载均衡；
* replica shard是primary shard的副本，用于确保数据不会丢失；
* replica shard也可以支撑读请求，从而降低primary shard的压力：

![replica shards](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_0203.png)

### 平行扩容

当继续增加节点时，ES会重新调整分布：

![Scale Horizontally](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_0204.png)

此时原来节点负责的shard变为2/3（意味着CPU/磁盘io等资源也相应得到释放）

**为了确保故障超过50%也不会造成数据丢失**，可以调整replicas=2，此时数据分布如下：

~~~json
PUT media/_settings
{
    "index.number_of_replicas": 1
}
~~~

![replicas=2](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_0205.png)

* 当其中节点故障时，ES会从primary shard复制shard到现有机器，完成数据的重新分布

关于修改replicas后的节点状态：
* <span style="color:green;background:#CCC">green</span>表示一切正常
* <span style="color:yellow;background:#CCC">yellow</span>表示有部分shard未分布到其他机器（比如只有2台机器，确设置副本数量是2），此时集群提供查询功能但数据有丢失风险；
* <span style="color:red;background:#CCC">red</span>表示有部分shard已经缺失，此时应尽快考虑恢复primary shard的磁盘备份；


## 2. [Distributed Document Store](https://www.elastic.co/guide/en/elasticsearch/guide/current/distributed-docs.html) - handles document storage

当新增一个doc，ES需要根据`_id`路由到对应primary shard上，计算过程如下：

~~~py
shard = hash(routing) % number_of_primary_shards
~~~

因为是取模路由，这也造成ES无法在mapping建立后调整shard数

### 对单个doc的操作

1. 用户将单个doc的写请求请求发到node1；
2. node1使用`_id`计算出primary shard在node3上，因此转发给node3；
3. node3处理写请求成功后，同步修改给replica shard的node1/node2；
4. node3收到node1/node2的成功响应后，node3回包给node1，node1告知客户端处理成功；

![single doc](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_0402.png)

consistency等级默认是quorum，即要求写入成功节点超过半数primary shard才算成功，这是为了避免网络中断时产生的分区(或称裂脑)问题。

### 文档检索

1. node1收到用户请求；
2. 根据`_id`计算shard在node1/node2/node3上均有副本，随机路由到node2(循环发给每个节点，以保证负载均衡)；
3. node2返回doc数据给node1，node1回包给客户端；

![retrieving doc](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_0403.png)

### 文档的部分更新

使用`_update`时，实际是对数据的先读后写：

1. node1收到doc的update请求；
2. 转发请求到primary shard所在的node3；
3. node3尝试从shard中读取doc的内容，修改`_source`内容并尝试reindex数据；
4. 如果数据已经被其它过程操作，重试3直到达到`retry_on_conflict`次数；
5. 如果处理成，node3告知node1/node2进行doc的reindex；
6. 当都成功后告知node1处理成功，node1回包给客户端；

![update doc](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_0404.png)

ps: 可以通过将node的`node.master`和`node.data`都设为false，让节点成为只负责路由的Client Node，从而减轻接入数据节点的转发压力。


## 3. [Distributed Search Execution](https://www.elastic.co/guide/en/elasticsearch/guide/current/distributed-search.html) - executes distributed search

上一节说到单个doc的操作，这里再说说更为复杂的`_search`

一次查询除了要找到所有匹配的doc在那些机器上，还需要将各shard返回的数据合并成一个list返回，并且ES还支持`page`方式的翻页。所以实际查询操作被分成了两个过程：`query`和`fetch`

### query phase

因为不知道doc分布在哪些shard上，ES会将查询请求分发到各shard的副本（primary shard或者replica shard），每个shard单独构建一个`priority queue`(优先级队列)结构，用来保存前`from+size`条结果。

ps: 因为是每次都合并from+size条数据，而不是直接取size条结果，ES限制`from+size`不能大于10000

1. 客户端发送查询请求到node3；
2. node3分发请求到各shard，各shard在本地执行查询并把match的doc IDs和排序所用信息写入本地priority queue；
3. node1/node2返回本地的priority queue返回给node3，node3合并得到全局的排序结果列表；

![query](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_0901.png)

### fetch phase

fetch过程比较简单，其实就是MGET需要的size条数据。

需要注意的是，因为from过大会造成query过程非常消耗资源，如果一定要做很长的翻页，ES推荐使用[scroll查询](https://www.elastic.co/guide/en/elasticsearch/guide/current/scroll.html)

### 查询的微调

为了柔性处理，当部分shard超时后，我们希望不等待而是返回已有结果，可以在query时设置timeout。在返回时会告知是否超时：

~~~json
  "timed_out": true,
~~~


## 4. [Inside a Shard](https://www.elastic.co/guide/en/elasticsearch/guide/current/inside-a-shard.html) - shard? how it works

### 倒排索引

倒排索引是为了快速找到与query相匹配的doc列表，例如建立倒排索引如下：


Term    | Doc_1 | Doc_2
--------|-------|--------
Quick   |       |  X
The     |   X   |
brown   |   X   |  X
dog     |   X   |
dogs    |       |  X
fox     |   X   |
foxes   |       |  X
in      |       |  X
jumped  |   X   |
lazy    |   X   |  X
leap    |       |  X
over    |   X   |  X
quick   |   X   |
summer  |       |  X
the     |   X   |

假设用户搜索`quick brown`，此时可以快速计算出匹配的doc和命中次数：

Term    | Doc_1 | Doc_2
--------|-------|--------
brown   |   X   |  X
quick   |   X   |
**Total**   |   2   |  1

这样再根据命中term个数和term的TF/IDF(Term frequency/Inverse document frequency)，就可以计算出doc的返回顺序

在Lucene中，倒排索引是**不可修改(immutable)**的。这样设计是为了便于无锁和Cache优化。如果需要删除doc，ES实际是在新的segment里产生一个删除记录，最终查询时合并这个删除操作。因此当index完或者操作一段时间后，应该定期用`_forcemerge`压缩segments来提升性能

### 倒排索引的更新

1. 新增doc会先写入一个`in memory`的Cache；
2. 每隔一段时间，这些修改会被commited：
  * 新增一个segment到磁盘（一个补充的倒排索引）；
  * 新增一个包含新segment名称的commit point到磁盘；
  * fsync强制刷新到磁盘，确保写入成功；
3. 使新segment可以被检索；
4. 清除`in memory cache`以接收新请求。

![in memory](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_1102.png)

![segments](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_1103.png)

### refresh

doc的被检索时间依赖于磁盘(fsync)，一个新增doc的生效时间可能长达几分钟。

于是ES提供一种将segment临时写到文件系统Cache的方案：`refresh API`。默认每个shard 1秒refresh一次，会生成一个临时的segment：

![refresh](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_1105.png)

某些情况不需要这么频繁的刷新时，可以设置`refresh_interval: -1`来关闭，然后调用`/_refresh`手动刷新索引。

### flush (Translog)

因为未fsync前，无法确保新增、修改落地到磁盘，为了防止机器掉电时Cache数据的丢失，于是ES引入Translog

当执行refresh时只清空cache，translog保留：

![translog](https://www.elastic.co/guide/en/elasticsearch/guide/current/images/elas_1107.png)

默认translog是异步每5秒写入磁盘的，当机器掉电后，ES可以通过重做(redo) Translog中的写请求，来恢复`in memroy cache`的数据。只有当translog满了或一定时间后，才会生成一个commited segment，此时在commit point之前的translog才可以丢弃。

ES也提供手动刷新Translog的API：`POST /<index>/_flush`。

### 分段合并

当index过程结束后，为了增加查询性能一般需要进行segment merge。可以对指定：

* 2.x调用 `_optimize?max_num_segments=1`
* 5.x改为 `_forcemerge?max_num_segments=1`

注意务必指定最大分段数为1，合并后可以用`_stats/segments`查看分段数
