## 第二章 并行计算模型的形式化表示

统一的表示形式, 对于研究并行计算模型至关重要. 将各种并行模型或算法统一表达后, 不仅有助于研究, 推导和证明其各种并行算法模型的正确性, 收敛性和执行过程, 同时还能横向比较各种算法的性能和过程特征.

然而, 学界和工业界对分布式并行算法的形式化表示的研究和应用却十分缺乏. 文献[22-23]中提出了针对并行计算框架的建模, 其中提出的方法是我们通过搜索和比较得出的比较系统和完备的方法. 对比文献[22-23]中对数据框架(或框架中应用)的建模, 本章的建模是基于并行模型这一层面. 虽然, 建模的层次和对象不同, 本章中提出的形式化表示方法与文献[23]中的方法有不少的共同点, 其不同在于我们的表示方法可以进一步表示出并行模型的迭代过程和迭代式.

无论是Google's MapReduce, 或是Hadoop, 还是Spark, Dryad, GraphLab等, 所有的框架的运行过程都会基于一种或多种并行计算模型, 常见的并行计算模型, 如BSP, AiA, PiT等, 这些并行计算模型定义了框架中计算和通信的执行方式, 顺序以及其他约束. 我们将常见秉性框架所采用的模型作如下归类:

|序号| 框架名称    |    并行模型   | 备注 |
|--------|--------|-------------|------|
|1| Hadoop  | 任务1,2      |33.33%|
|2| Spark  | 任务1,2      |33.33%|
|3| Dryad  | 任务1,2      |33.33%|
|4| GrapLab  | 任务1,2      |33.33%|
|5| Dryad  | 任务1,2      |33.33%|
|6| Dryad  | 任务1,2      |33.33%|

我们的设计目标是提出一种具备如下特点的方法:
- 适用范围广: 可用于描述各种算法和框架
- 可解析性强: 可用于性能分析
- 可扩展: 可针对不同规模问题
- 容错: 
- 其他

本章将围绕这几个目标展开描述. 同时, 本章的在全文中的作用是为第三章引出DSP模型做一个准备. 虽然, 本章并不是基于文献[23]中的方法演变而来, 但本章的结构参照了文献[23]中文章的组织架构. 

### 2.1 符号定义及运算符定义

#### 2.1.1 并行算法中行为抽象: 看待算法中的输入数据和操作的视角
在分析了大量并行算法执行过程后, 我们发现输入数据$(x_1, x_2, x_3, \dots, x_n)$的成员变量$x_i$的更新实际上来源于其所依赖的分量对它产生的影响, 典型例子是大量的图并行算法, 如PageRank, 每个图顶点的更新来源于其前驱顶点, 当接收到所有前驱顶点的影响量之后再进行聚合, 聚合的结果将作为当前图顶点新一轮PageRank值(简称为pr值).

我们将并行算法看做是对输入数据进行一轮轮变换. 每轮变换得到的结果又进入到下一轮变换, 直到算法满足收敛条件停止运行, 其迭代过程可表示为$$X_{t+1} = F(X_t),$$ 其中, $X_t$和$X_{t+1}$分别表示$t+1$时刻的输入和输出数据, $f(X)$表示并行算法进行的转换. 用向量形式表示输入输出数据可表达为$$(x_t^{(1)}, x_t^{(2)}, x_t^{(3)}, \dots, x_t^{(n)}) \overset{F}{\rightarrow} (x_{t+1}^{(1)}, x_{t+1}^{(2)}, x_{t+1}^{(3)}, \dots, x_{t+1}^{(n)}).$$ 我们认为$t+1$时刻输出数据中的每一维分量$x_{t+1}^{(i)}$由与之相关的其他分量与其作用之后对其所产生的影响聚合的结果, 我们用如下图示来表达这个过程:
<img src="image/formalization.jpeg" width="600">
图中, $f_{ij}$表示转换过程中分量$x^{(i)}$与$x^{(j)}$之间运算, 也可理解为$x^{(i)}$对$x^{(j)}$在本轮运算中施加的改变. 将所有分量对$x^{(j)}$产生的改变进行聚合, 就可以得到本轮迭代$x^{(j)}$的输出. 

这样对算法所进行的操作分解为集合$F={f_{ij}}$的好处在于我们可以对操作进行划分. 加之数据划分, 我们可以更加准确的描述在每个计算节点发生的动作及对应的数据. 为更严密地描述我们的想法, 接下来部分, 我们定义了若干符号以及例子.

#### 2.1.2 符号定义

- $X_0$:迭代计算初始输入变量.向量表示为$(x_0,x_1,...,x_n)$.$x_i$可以表示各种类型的数据,如图计算中顶点信息,线性方程组中的未知数等.
-  $X_k$:迭代计算的第k轮输出.在实际并行计算中,变量$X$被切分为多段并分布到不同的计算节点上进行计算,$X_k^{(p,q)}$表示仅被更新了从$x_p$到$x_q$一段分量的输出变量$X_k$.
- F:关系矩阵.$F_{i,j}$实际上定义$x_i$和$x_j$之间的运算,表达为函数形式为$F_{i,j}(x_i,x_j)$,或简写为$F_{i,j}(x_i)$,运算结果返回给$x_j$供其进一步与其他所依赖的变量计算产生的结果进行聚合.(在具体实现算法时,$F_{i,j}$通常被表达为一个数学公式,函数,过程或方法.)

\begin{align*}
\begin{pmatrix}
          F_{0,0} & F_{0,1} & \dots & F_{0,m} \\
          F_{1,0} & F_{1,1} & \dots & F_{1, m} \\
          & & \dots & \\
          F_{n,0} & F_{n,1} & \dots & F_{n,m}
\end{pmatrix}
\end{align*}
$F^{(p,q)}$ 表示对输入变量进行一次部分转换.$F^{(p,q)}$仅仅计算和更新$x_p$和$x_q$之间的变量.其定义如下:

\begin{align*}
\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad\quad \\
        \begin{pmatrix}
          1 & 0 & \dots & 0 & F_{0,p} & \dots & F_{0,q} & 0 & \dots & 0 \\
          0 & 1 & \dots & 0 & F_{1,p} & \dots & F_{1,q} & 0 & \dots & 0 \\
          \vdots & \vdots & \ddots & \vdots & \vdots & \ddots & \vdots & \vdots & \ddots & \vdots\\
          0 & 0 & \dots & 1 & F_{p-1,p} & \dots & F_{p-1,q} & 0 & \dots & 0 \\
          0 & 0 & \dots & 0 & F_{p,p} & \dots & F_{p,q} & 0 & \dots & 0 \\
          \vdots & \vdots & \ddots & \vdots & \vdots & \ddots & \vdots & \vdots & \ddots & \vdots\\
          0 & 0 & \dots & 0 & F_{q,p} & \dots & F_{q,q} & 0 & \dots & 0 \\
          0 & 0 & \dots & 0 & F_{q+1,p} & \dots & F_{q+1,q} & 1 & \dots & 0 \\
          \vdots & \vdots & \ddots & \vdots & \vdots & \ddots & \vdots & \vdots & \ddots & \vdots\\
          0 & 0 & \dots & 0 & F_{n,p} & \dots & F_{n,q} & 0 & \dots & 1 \\
        \end{pmatrix} \\
\end{align*}

使用向量与矩阵相乘的规则,$X$的分量被投射到$F_{i,j}$上进行运算,最终只有$x_p$到$x_q$之间的变量才进行了运算和更新,其余分量都会保持不变.
- $\biguplus$:聚合操作符.此运算符将$\{F_{i,j}(x_i) ~|~ i=0,1,2,\dots,n.\}$中所有运算结果进行聚合,聚合得到的值将作为$x_j$本轮迭代计算的结果.公式为$$\biguplus_{i=0}^nF_{i,j}(x_i,x_j),~abbreviated~as \biguplus_{i=0}^nF_{i,j}(x_i).$$常见的聚合操作有min(), max(), average(), $\sum$, $\Pi$等.
- $\otimes$:转换运算符.该操作符将关系矩阵在输入变量上作用一次.

#### 2.1.3 分布式环境下的调整
分布式环境下, 每个进程负责不同的数据分块的值的计算和更新, 使用我们提供的"偏转换矩阵"可以在分布式的节点上很好的完成这个任务.

#### 2.1.4 应用举例
a.) PageRank算法:

PageRank算法的可描述为:

```{python}
from matplotlib import pyplot as plt

```

 1. 输入: $X=(x_1, x_2, x_3, \dots, x_n)$, 其中, $x_i$表示图顶点信息, 包含pr值以及顶点的邻接信息.
 2. 操作矩阵: $F=\{F_{ij}\}_{i,j=1}^n$, $F_{ij}$表示$\frac{x_i}{L(x_i)}$, 其中, $x_i$表示第i-th个顶点当前的pr值, $L(x_i)$表示第i_th顶点后继的个数.
 3. 聚合操作: $\displaystyle\sum_{x_i\in N(x_j)}\frac{x_i}{L(x_i)}$, 其中, $N(x_i)$表示$x_i$前驱节点组成的集合.
 
b.) 单源最短路(SSSP):

SSSP算法可描述为:

 1. 输入: $X=(x_1, x_2, \dots, x_n)$, 其中, $x_i$存储第i-th个图顶点到源点的最短路.
 2. 操作矩阵: $F=\{F_{ij}\}_{i,j=1}^n$, $F_{ij}$表示$x_i + w_{i,j}$, 其中, $w_{i,j}$表示图顶点i-th和j-th之间的变的权重.
 3. 聚合操作: $\displaystyle\min_{x_i\in N(x_j)}(x_i + w_{i,j})$, 其中, $N(x_j)$表示所有指向顶点j-th的顶点的集合.
 
c.) 雅各比方法求线性方程组:

雅各比方法求线性方程组可描述为:

 1. 输入: $X=(x_1, x_2, \dots, x_n)$, 其中, $x_i$存储第i-th个未知数当前的解.
 2. 操作矩阵: $F=\{F_{ij}\}_{i,j=1}^n$, $F_{ij}$表示$\frac{a_{ij}}{a_{ii}}x_i$, 其中, $a_{ii}, a_{ij}$表示系数矩阵$A$在位置(i, i)和(i, j)的系数.
 3. 聚合操作: $\displaystyle\frac{b_i}{a_{ii}}-\frac{1}{a_{ii}}\sum_{i\neq j}a_{ij}x_i$.

### 2.2 使用MOP表示常见的集中并行模型

#### 2.2.1 MOP表示BSP并行模型
BSP同步并行模型用于指导并行算法和程序的设计, 其迭代步由三部分组成: 局部计算, 通信和阻塞同步. 这种简单的并行模型可以有MOP很好的表达, 如下:

\begin{align*}
X_{0} &= (x_0, x_1, \dots, x_n) \\
% X_{1}^{(p,q)} &= X_{0}\otimes F^{(p,q)}  \\
X_{1} &= X_{0}\otimes F  \\
      &= (x_0, x_1, x_2, \dots, x_n)\otimes \begin{pmatrix}
          F_{0,0} & F_{0,1} & \dots & F_{0,m} \\
          F_{1,0} & F_{1,1} & \dots & F_{1, m} \\
          & & \dots & \\
          F_{n,0} & F_{n,1} & \dots & F_{n,m}
        \end{pmatrix} \\
      &= (\biguplus_{i=0}^{n}F_{i, 0}(x_i),~\biguplus_{i=0}^{n}F_{i, 1}(x_i),~\dots,~\biguplus_{i=0}^{n}F_{i, m}(x_i)) \\
X_{2} &= X_{1}\otimes F  \\
&= (\biguplus_{i=0}^nF_{i,0}(\biguplus_{i=0}^{n}F_{i, 0}(x_i)),~\biguplus_{i=0}^nF_{i,1}(\biguplus_{i=0}^{n}F_{i, 1}(x_i)),~\dots,~\biguplus_{i=0}^nF_{i,m}(\biguplus_{i=0}^{n}F_{i, m}(x_i))) \\
&= (\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^2F_{i, 0}(x_i),~\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^2F_{i, 1}(x_i),~\dots,~\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^2F_{i, m}(x_i)) \\
      & \vdots \\
X_{k} &= (\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^kF_{i, 0}(x_i),~\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^kF_{i, 1}(x_i),~\dots,~\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^kF_{i, m}(x_i))  \\
X_k&= (h^{k}(\alpha_0), h^{k}(\alpha_1),~\dots,h^{k}(x_n)) \tag{2.2.1} \label{eq:2.2.1}
\end{align*}

其中, $X_t$表示输入输出数据, $F$表示并行模型上运行算法对数据进行的操作, $\biguplus$表示$x_i, i=1, 2, 3, \dots, n$的更新方式(即如何聚合来自其他变量对当前变量的影响), $h(x)$表示BSP一次超级计算步进行的操作, 即局部计算, 通信和阻塞.

### 2.2.2 MOP表示AiA并行模型
BSP同步并行模型用于指导并行算法和程序的设计, 其迭代步由三部分组成: 局部计算, 通信和阻塞同步. 这种简单的并行模型可以有MOP很好的表达, 如下:

\begin{align*}
X_{0} &= (x_0, x_1, \dots, x_n) \\
% X_{1}^{(p,q)} &= X_{0}\otimes F^{(p,q)}  \\
X_{1} &= X_{0}\otimes F  \\
      &= (x_0, x_1, x_2, \dots, x_n)\otimes \begin{pmatrix}
          F_{0,0} & F_{0,1} & \dots & F_{0,m} \\
          F_{1,0} & F_{1,1} & \dots & F_{1, m} \\
          & & \dots & \\
          F_{n,0} & F_{n,1} & \dots & F_{n,m}
        \end{pmatrix} \\
      &= (\biguplus_{i=0}^{n}F_{i, 0}(x_i),~\biguplus_{i=0}^{n}F_{i, 1}(x_i),~\dots,~\biguplus_{i=0}^{n}F_{i, m}(x_i)) \\
X_{2} &= X_{1}\otimes F  \\
&= (\biguplus_{i=0}^nF_{i,0}(\biguplus_{i=0}^{n}F_{i, 0}(x_i)),~\biguplus_{i=0}^nF_{i,1}(\biguplus_{i=0}^{n}F_{i, 1}(x_i)),~\dots,~\biguplus_{i=0}^nF_{i,m}(\biguplus_{i=0}^{n}F_{i, m}(x_i))) \\
&= (\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^2F_{i, 0}(x_i),~\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^2F_{i, 1}(x_i),~\dots,~\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^2F_{i, m}(x_i)) \\
      & \vdots \\
X_{k} &= (\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^kF_{i, 0}(x_i),~\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^kF_{i, 1}(x_i),~\dots,~\biguplus_{i=0}^{n}{\vphantom{\biguplus}}^kF_{i, m}(x_i))  \\
X_k&= (h^{k}(\alpha_0), h^{k}(\alpha_1),~\dots,h^{k}(x_n)) \tag{4.1} \label{eq:4.1}
\end{align*}

其中, $X_t$表示输入输出数据, $F$表示并行模型上运行算法对数据进行的操作, $\biguplus$表示$x_i, i=1, 2, 3, \dots, n$的更新方式(即如何聚合来自其他变量对当前变量的影响), $h(x)$表示BSP一次超级计算步进行的操作, 即局部计算, 通信和阻塞.

### 2.2 可扩展性与容错性
可扩展性(Scalability)与容错性(Fault-tolerance)对于海量数据处理的重要性是不言而喻的. 可扩展性用来衡量模型是否适用于不同规模小大的应用环境, 而容错性则关乎模型在使用过程中对错误的处理和恢复能力. 本节我们讨论MOP在表示并行模型或算法时的扩展性与容错性.

#### 2.2.1 扩展性
我们首先针对不同类型扩展性做如下限定性描述[**]:

- **任务的扩展性**: 随着任务量的增加, 所需要增加的计算资源与任务增加的速度相同.
- **模型的扩展性**: 随着任务量的增加, 模型的复杂度不变.
- **表示方法的扩展性**: 随着处理数据和计算任务的增加, 对并行模型处理过程的表示不变.

本小节分如下两种情景来说明MOP的扩展性:
- 对不同规模的数据集
- 对不同规模的计算节点


##### 2.2.1.1 针对不同规模数据集
MOP并不涉及具体应用问题的数据分割和分布. 在数据分割并分布到不同的计算节点之后, 我们可以得到不同计算节点上所负责的数据段, 如向量$$X^{(p, q)} = (x_1, x_2, x_3, ..., \textbf{x}_p, ..., \textbf{x}_q, ..., x_n) \tag{2.2.1.1} $$中加粗标识的部分.

之所以采用式(2.2.1.1)中的形式是因为MOP默认向量$X$的分量之间是相互关联或依赖的, 且在计算过程中, 分量$x_i, i = 1, 2, \dots, n$之间存在相互引用(本文认为依赖和引用是不同的关系).

随着数据规模的增加, 亦或是数据划分方式的改变, MOP对输入输出数据以及数据分块的表示并不会发生变化. 且MOP的假设是$X$的分量之间存在强依赖关系(任何两个分量之间都可能相互引用), 所以, MOP在每个节点上存储了输入向量$X$的所有分量或者假设在每个计算节点可以直接访问$X$的所有分量.

在不同节点上部署的偏关系矩阵$F^{(p, q)}$与数据分块对应. 即可以认为数据规模的变化并不会对关系矩阵的划分和执行产生影响.

聚合操作$\biguplus$复杂度仍然保持$O(n)$, 其中, $n$为分量$x_i$所依赖的分量个数. 在表示上与数据规模大小无关.

##### 2.2.1.2 针对不同规模的计算节点
在单计算节点的串行模式下, MOP可直接通过$$X_{t+1}=F(X_t)$$来进行迭代计算. 在分布式并行模式下, 根据计算节点规模确定好数据分块之后(数据分块的大小一般与节点数成反比例关系), 每个计算节点首先通过$$X_{t+1}^{(p, q)}=F^{(p, q)}(X_{t})$$计算得到数据分块$(x^{(p)}, x^{(q)})$上的最新值, 再通过全局数据同步把所有最新值同步到所有计算节点. 

从上面的分析得知, 对于相同的数据集和相同的运算, 计算节点越多的情况下划分得到的子集越多, 这通常会带来更多的通信开销. 然而不管计算节点的规模如何变化, 只要MOP描述算法的数据分块方式和运算过程不会变化, MOP的描述也不会变化.

综合以上两小节论述, 无论是输入数据$X$规模的变化或是计算节点规模的变化, 只要模型或算法的数据分割方式和处理流程不变, 那么都可以通过相同的MOP描述来表达, 并抽象为两步: 1.)首先通过偏转换得到不同数据分块的更新值; 2.)再通过全局同步将全部更新值同步到每个节点中.

至此, 我们可以得出结论: MOP无论针对不同规模的数据集或是不同规模的集群, 都可以在不改变表达复杂性的前提下描述并行模型所处理的各种应用问题.

#### 2.2.2 容错性
首先我们对不同层面的容错性做如下约束性描述[**]:

- **任务的容错性**: 
- **模型的容错性**: 
- **形式化表达的容错性**: 



### 2.3 用于指导算法优化

#### 2.3.1 公理

#### 2.3.2 识别数据之间的依赖性
MOP的优点之一就是可以表达出迭代计算的迭代过程, 这样一来, 我们可以就可以标识出数据成员之间的依赖关系.

#### 2.3.3 减少通信
在识别出数据之间的依赖关系后, 我们可以识别出真正需要在节点之间传输的数据, 而节点内的数据只需要通过局部更新就可以实现, 从而减少大量的网络开销.

#### 2.3.3 挖掘潜在的并行
在识别出数据之间的依赖关系后, 我们可以消除同一节点内数据之间的依赖关系, 从而提高节点内算法执行的并行性.

### 2.4 有效性和性能比较

### 2.5 其他相关工作

#### 2.5.1 大数据框架分析矩阵模型(DOT)
为建模大数据处理系统(如Google's MapReduce, Hadoop, Dryad, Spark等)的处理流程, 俄亥俄州立大学和康奈尔大学的研究人员Yin Huai, Rubao Lee, Simon Zhang等[23]提出了基于矩阵操作的数据分析, 优化和部署系统. DOT的三个字母分别表示三个矩阵: 数据集(data sets, 简称为D), 并发数据处理操作(concurrent data processing operations, 简称为O)以及数据转换(data transformations, 简称为T).

DOT将所有的数据处理流程抽象为三个矩阵(D, O, T)之间的相互组合, 如下形式:
\begin{align*}
\overset{\rightarrow}{D}OT &= [D_1 ... D_n] 
\begin{pmatrix}
o_1 \\ o_2 \\ \vdots \\ o_n
\end{pmatrix}
[t]=[\bigsqcup_{i=1}^{n}(o_i(D_i))][t] \\ 
&= [t(o_1(D_1)), \dots, t(o_n(D_n))]
\end{align*}

或图形话表示为如下形式:
<img src="image/elementaryDOT.jpeg" width="425"/> <img src="image/compositeDOT.jpeg" width="425"/>

作者声称DOT模型具有诸多良好的特性, 如可扩展性, 容错性以及指导框架进行算法的性能优化.

#### 2.5.2 大规模无序分布式计算

Jon Feldman等[22]认为大数据处理系统(如Google's MapReduce, Hadoop, Dryad, Spark等)上执行算法可分解为三个函数:
- 局部函数(local function): 输入一个独立的数据项, 输出一个消息;
- 聚合函数(aggregation function): 聚合消息对;
- 收尾处理步(final post-processing step): 在部分情况下, 需要做收尾数据处理.

系统假设可以独立且并行地应用到输入数据上, 而聚合函数可以按任意序在消息集合中执行(即满足交换性和组合型).

使用上面三种函数定义的算法, 文献[22]将其称为大规模无序分布式计算算法(massive, unordered, distributed (mud) algorithm). 将mud算法定义为一个三元组构成的表达式为: $$m = (\Phi, \bigoplus, \eta).$$ 其中,
- $\Phi: \Sigma\rightarrow Q $, 局部函数将输入数据映射到一个消息;
- $\bigoplus: Q \times Q\rightarrow Q $, 聚合操作符将两条消息聚合为一条新的消息;
- $\eta: Q\rightarrow\Sigma $, 收尾处理操作将最终的消息转换为用户所需要的结果.

在定义了如上的符号和运算之后, mud算法几乎可以表示大数据框架上运行的任何算法, 更进一步作者推出任何流式处理中对称的函数也可以使用mud算法处理, 且能获得与之相当的空间和通信优势.

### 2.6 总结
本章中提出了一种新的并行算法和模型的形式化表示方法, 较之前的表示方法[22-23], 新方法可以表示出算法的迭代过程, 进而为算法的收敛性证明提供了可能, 同时也为算法的迭代性能分析提供了依据.