# MapReduce  大型集群的简化数据处理

## 摘要
MapReduce是一种编程模型，是处理和生成大型数据集的相关实现。用户指定map函数，用来处理一组键/值数据,
生成一组中间键/值对, 然后reduce函数用来合并这组中间值. 如本文所示，在该模型中可以表达许多现实世界的任务。


以这种功能风格编写的程序可以自动并行化在集群中执行。运行时的系统关心分区输入的数据, 调度该程序运行在一系列
的机器上，处理机器故障以及管理所需的机器间通信。这使得没有任何并行和分布式系统经验的程序员可以轻松利用大型分布式系统的资源。


我们的MapReduce实现在大型商用机器集群上运行，并且具有高度可扩展性：典型的MapReduce计算可处理数千台计算机上的大量数据。程序员发现该系统易于使用：已经实施了数百个MapReduce程序，每天在Google的集群上执行一次以上的MapReduce作业。

### 1 介绍

在过去的五年里， 作者和谷歌的其他工作人员已经实现数百个用于特殊目的的计算程序，这些计算处理大量原始数据, 例如爬行文档，Web请求日志等，以计算各种派生数据，例如作为反向索引，Web文档图形结构的各种表示，总共爬了多少页, 在给定的一天中最常见的查询集等. 大多数此类计算在概念上都很简单。但是，输入数据通常很大，并且计算必须分布在数百或数千台机器上，以便在合理的时间内完成。如何平衡计算，分发数据和处理故障的问题共同构成了用大量复杂代码来处理这些问题的原始简单计算。


作为对这种复杂性的反应，我们设计了一个新的抽象，允许我们表达我们试图执行的简单计算，但隐藏了库中并行化，容错，数据分布和负载平衡的混乱细节。我们的抽象是由Lisp和许多其他函数语言中的map和reduce原语所引发的。我们意识到我们的大多数计算都涉及将映射操作应用于输入中的每个逻辑“记录”，以便计算一组中间键/值对，然后对所有共享相同值的值应用reduce操作, 以便适当地组合派生数据。我们使用具有用户指定的映射和减少操作的功能模型使我们能够轻松地并行化大型计算并将重新执行作为容错的主要机制。

这项工作的主要贡献是一个简单而强大的界面，可实现大规模计算的自动并行化和分发，并结合此接口的实现，在大型商用PC集群上实现高性能。

第2节描述了基本的编程模型，并给出了几个例子。第3节描述了为基于集群的计算环境量身定制的MapReduce接口的实现。第4节描述了我们发现有用的编程模型的几个改进。第5节对各种任务的实施进行了性能测量。第6节探讨了MapReduce在Google中的使用，包括我们使用它作为基础的经验
用于重写我们的生产索引系统。第7节讨论相关和未来的工作。


### 2 编程模型

计算采用一组输入键/值对，并产生一组输出键/值对。 MapReduce库的用户将计算表达为两个函数：Map和Reduce。

由用户编写的map,采用输入对并生成一组中间键/值对。MapReduce库组合所有与中间值键一致组合在一起将它们传递到reduce函数


reduce函数同样是通过用户编写， 接受一个中间值的键和与那个键对应的一组值.它合并这行值，形成一组可能较小的值。通常reduce函数只有一个或者
0个输出。每个中间值通过迭代应用到reduce函数上。这允许我们处理一组对于内存太大的值。

####  2.1 例子
考虑计算大量文档中每个单词的出现次数的问题。用户将编写类似于以下伪代码的代码:

```
   map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
      EmitIntermediate(w, "1");
      
      
  reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
      result += ParseInt(v);
    Emit(AsString(result));      
```

map函数接受每个单词，输出每个单词和计数1，reduce函数将相同的单词分成一组，出现的次数进行累加.

此外，用户编写代码以使用输入和输出文件的名称以及可选的调整参数填充mapreduce规范对象。然后，用户调用MapReduce函数，并将规范对象传递给它。



#### 2.2 类型

尽管先前的伪代码是根据字符串输入和输出编写的，但从概念上讲，用户提供的map和reduce函数具有相关的类型：
```
map (k1, v1)   -> list(k2, v2)
reduce(k2, list(v2) -> list(v2)
```
输入键和值是从与输出键和值不同的域中提取的。此外，中间键和值来自与输出键和值相同的域。

### 3 实现

MapReduce接口的许多不同实现都是可能的。正确的选择取决于环境。例如，一种实现方式可能适用于小型共享存储器机器，另一种实现方式适用于大型NUMA多处理器，而另一种实现方式适用于甚至更大的联网机器集合。
本节介绍针对Google广泛使用的计算环境的实施：
使用交换式以太网连接在一起的大型商用PC集群。在我们的环境中：
1. 机器通常是运行Linux的双处理器x86处理器，每台机器有2-4 GB的内存。
2. 商用网络硬件 - 通常在机器级别上为100兆比特/秒或1千兆比特/秒，但在整体二分频带宽上平均要少得多。
3. 集群由数百或数千台机器组成，因此机器故障很常见。
4. 存储由廉价的IDE磁盘提供，这些磁盘直接连接到各个机器。内部开发的分布式文件系统[8]用于管理存储在这些磁盘上的数据。文件系统使用复制在不可靠的硬件之上提供可用性和可靠性。
5. 用户将作业提交给调度系统。每个作业由一组任务组成，并由调度程序映射到集群中的一组可用计算机。

####  3.1 执行概述

通过自动将输入数据分区为一组M个分割，Map调用分布在多台机器上。输入分割可以由不同的机器并行处理。通过使用分区函数（例如，散列（密钥）mod R）将中间密钥空间划分为R个片段来分配减少的调用。分区数（R）和分区功能由用户指定。


图1显示了我们实现中MapReduce操作的总体流程。当用户程序调用MapReduce函数时，会发生以下操作序列（图1中的编号标签与下面列表中的数字相对应:
![figure1](images/f1.png)

1. 用户程序中的MapReduce库首先将输入文件拆分为M个，每个通常为16MB到64MB（用户可通过可选参数进行控制）。然后，它会在一组计算机上启动该程序的许多副本。
2. 该程序的master副本是特殊的。其余的是由master分配工作的worker。有M个map任务和R reduce任务要分配。主人挑选闲置的工作人员并为每个人分配一个map任务或reduce任务。
3. 分配了map任务的worker将读取相应输入拆分的内容。它从输入数据中解析键/值对，并将每对传递给用户定义的Map函数。 Map函数生成的中间键/值对在内存中缓冲。
4. 周期性地，缓冲对被写入本地磁盘，通过分区功能划分为R个区域。这些缓冲对在本地磁盘上的位置将传递回master，master负责将这些位置转发给reduce worker.
5. 当master通知worker去进行reduce程序时候， 它使用rpc去读取已经执行完map的worker存储的数据.当执行reduce的worker读取完全部的数据，它会通过中间键对其进行排序，以便将所有出现的相同键组合在一起。需要排序的原因是因为通常许多不同的键映射到相同的reduce任务。如果中间数据量太大而无法放入内存，则使用外部排序。
6. reduce程序遍历排序后的数据，并且对于每个唯一的键，它将键和相应的值集传递给用户的Reduce函数。 Reduce函数的输出附加到此reduce分区的最终输出文件。
7. 完成所有map任务和reduce任务后，master会唤醒用户程序。此时，用户程序中的MapReduce调用将返回用户代码。

成功完成后，map输出执行的输出在R输出文件中可用（每个reduce任务一个，文件名由用户指定）。通常，用户不需要将这些R输出文件合并到一个文件中 - 他们经常将这些文件作为输入传递给另一个MapReduce调用，或者从另一个能够处理分区为多个文件的输入的分布式应用程序中使用它们。

#### 3.2 master 数据结构

master 保存几个数据结构。 

- 对于每个map和reduce任务， 存储状态(idle, in-progress, completed), 和正在工作机器的标示
- master相当于管道， 中间文件的区域位置从map任务传递到reduce任务。因此，对于每个完成的map任务，master存储由map任务产生的R个中间文件区域的位置和大小。完成map任务时，将接收对此位置和大小信息的更新。信息将逐渐推送给正在进行reduce任务的工作人员。



#### 3.3 错误容忍

由于MapReduce库旨在帮助使用数百或数千台计算机处理大量数据，因此库必须能够轻松地容忍计算机故障。

**Worker Failure** 

master定期对worker进行ping, 如果在一定时间内没有收到worker的相应，则标记为失败. worker完成的任何map任务都会重置回初始状态，因此有资格参加worker的调度。同样，失败的工作程序上正在进行的任何map任务或reduce任务也会重置为空闲状态，并有资格重新安排。

完成的map任务在故障时重新执行，因为它们的输出存储在故障机器的本地磁盘上，因此无法访问。完成的reduce任务不需要重新执行，因为它们的输出存储在全局文件系统中。


当一个map任务首先由worker A执行，然后由worker B执行（因为A失败）时，执行reduce任务的所有工作人员都会收到重新执行的通知。任何尚未从wroker A读取数据的reduce任务将从worker B读取数据。


MapReduce可以抵御大规模的worker故障。例如，在一次MapReduce操作期间，正在运行的集群上的网络维护导致一组80台计算机一次无法访问几分钟。 MapReduce master只是简单地重新执行了无法访问的工作机器完成的工作，并继续前进，最终完成MapReduce操作。
***
**Master Failure**

很容易使master写入上述master的数据结构的周期性检查点。如果master终止，则可以从上次检查点状态开始新副本。但是，鉴于只有一个主人，其失败的可能性不大;因此，如果master失败，我们的当前实现将中止MapReduce计算。客户端可以检查此情况并根据需要重试MapReduce操作。
***
**存在失败的语义学**
当用户提供的map和reduce运算符是其输入值的确定函数时，我们的分布式实现产生的输出与整个程序的非错误顺序执行产生的输出相同。

我们依靠map的原子提交和reduce任务输出来实现这个属性。每个正在进行的任务将其输出写入私有临时文件。 reduce任务生成一个这样的文件，map任务生成R个这样的文件（每个reduce任务一个）。当map任务完成时，worker向master发送消息，并在消息中包含R个临时文件的名称。如果master收到已完成的map任务的完成消息，则忽略该消息。否则，它会在主数据结构中记录R个文件的名称。

reduce任务完成后，reduce worker会将其临时输出文件原子重命名为最终输出文件。如果在多台机器上执行相同的reduce任务，则将对同一个最终输出文件执行多次重命名调用。我们依赖底层文件系统提供的原子重命名操作来保证最终文件系统状态只包含一次执行reduce任务所产生的数据。

绝大多数map和reduce运算符都是确定性的，并且在这种情况下我们的语义等同于顺序执行这一事实使得程序员很容易推理出他们程序的行为。当map/reduce运算符是不确定的时，我们提供较弱但仍然合理的语义。在存在非确定性运算符的情况下，特定reduce任务R1的输出等效于由非确定性程序的顺序执行产生的R1的输出。然而，不同reduce任务R2的输出可以对应于由非确定性程序的不同顺序执行产生的R2的输出。

考虑map任务M和reduce任务R1和R2。设e(Ri)是提交的Ri的执行(只有一个这样的执行）。较弱的语义出现是因为e（R1）可能已经读取了一次执行M所产生的输出，而e(R2)可能已经读取了由M的不同执行产生的输出。

#### 3.4 局部性
在我们的计算环境中，网络带宽是一种相对稀缺的资源。我们通过利用输入数据存储在构成我们集群的机器的本地磁盘上的事实来节省网络带宽。 GFS将每个文件划分为64 MB块，并将每个块的多个副本（通常为3个副本）存储在不同的计算机上。 MapReduce主服务器会考虑输入文件的位置信息，并尝试在包含相应输入数据副本的计算机上计划映射任务。如果不这样做，它会尝试在该任务的输入数据的副本附近安排地图任务（例如，在与包含数据的机器位于同一网络交换机上的工作机上）。在群集中的大部分工作程序上运行大型MapReduce操作时，大多数输入数据在本地读取并且不消耗网络带宽。

#### 3.5 任务粒度
如上所述，我们将map阶段细分为M个片段，将reduce阶段细分为R个片段。理想情况下，M和R应远大于工作机器的数量。让每个worker执行许多不同的任务可以改善动态负载平衡，并且还可以在worker出现故障时加快恢复速度：已完成的许多map任务可以分布在所有其他工作计算机上。

有实用范围多大M和R可以在我们的实现中, 由于主必须O(M + R)调度决策和O(M∗R)状态保存在存储器中如上所述。(然而内存使用量很小的常数因素:O(M∗R)的状态由大约一个字节的数据map /reduce 任务对)

此外，R经常受到用户的限制，因为每个还原任务的输出都以单独的输出文件结束。在实践中，我们倾向于选择M，以便每个单独的任务大约是16MB到64MB的输入数据（因此上面描述的局部优化是最有效的），我们使我们期望使用的工作机器的数量是R的一个小倍数。。我们通常使用MapReduce计算，m＝200000，r＝5000, 使用2000个worker。


#### 3.6 备份任务

延长MapReduce操作总时间的常见原因之一是“落后者”：一台机器需要花费非常长的时间来完成最后几个map中的一个或reduce任务。 Stragglers可能出于各种原因而出现。例如，具有坏磁盘的计算机可能会遇到频繁的可纠正错误，从而将读取性能从30 MB/s降低到1MB/s。群集调度系统可能已在计算机上安排了其他任务，导致它因CPU，内存，本地磁盘或网络带宽的竞争而更慢地执行MapReduce代码。我们遇到的最近一个问题是机器初始化代码中的一个错误导致处理器缓存被禁用：受影响机器的计算速度减慢了一百多倍。

我们有一个通用机制来缓解落后者的问题。当MapReduce操作接近完成时，主服务器会计划剩余正在进行的任务的备份执行。无论主要执行还是备份执行完成，任务都会标记为已完成。我们已经调整了这种机制，因此它通常会将操作使用的计算资源增加不超过百分之几。我们发现这大大减少了完成大型MapReduce操作的时间。例如，当禁用备份任务机制时，第5.3节中描述的排序程序需要44％的时间才能完成。



### 4. 改良点
虽然通过简单编写Map和Reduce函数提供的基本功能足以满足大多数需求，但我们发现一些扩展很有用。本节将介绍这些内容。


#### 4.1分区功能

MapReduce的用户指定他们想要的reduce任务/输出文件的数量(R)。提供了一个使用哈希(例如“hash(key) mod R”)的默认分区函数。这往往会导致相当均衡的分区。但是，在某些情况下，通过键的其他函数对数据进行分区是有用的。例如，有时输出键是url，我们希望单个主机的所有条目最终在同一个输出文件中结束。为了支持这种情况，MapReduce库的用户可以提供一个特殊的分区函数。例如，使用“hash(Hostname(urlkey)) mod R”作为par- tiating函数会导致来自同一主机的所有url最终出现在相同的输出文件中。


#### 4.2 顺序保证
我们保证在给定的分区中，中间键/值对以递增的键顺序处理。这种排序保证使每个分区生成排序的输出文件变得容易，当输出文件格式需要根据键支持高效的随机访问查找时，或者输出用户发现数据需要排序时，这是非常有用的。

#### 4.3 组合函数

在某些情况下，每个map任务产生的中介键之间存在明显的重复，用户指定的Reduce函数是可交换的和关联的。一个很好的例子是2.1节中的单词计数考试。由于词频倾向于遵循Zipf分布，因此每个map任务将生成数百或数千条表单<the, 1>的记录。所有这些计数将通过网络发送到一个单一的重新生成任务，然后通过Reduce函数加在一起生成一个数字。我们允许用户指定一个可选的组合函数，在数据通过网络发送之前对其进行部分合并。
在执行map任务的每台机器上执行组合函数。通常，相同的代码用于实现组合器和reduce函数。reduce函数和combiner函数的唯一区别是MapReduce库如何控制函数的输出。reduce函数的输出被写入最终的输出文件。组合器函数的输出被写入一个中间文件，该文件将被发送到reduce任务。
部分组合大大加快了某些MapReduce操作的速度。附录A包含一个使用组合器的示例。

#### 4.4 输入输出类型

MapReduce库支持以几种不同的格式读取数据。例如，“文本”模式输入将每行作为键/值对:键是文件中的偏移量，值是行内容。另一种受支持的通用格式存储按键排序的键/值对序列。每个输入类型实现都知道如何将自己分割成平均范围，以便作为单独的map任务进行处理(例如，文本模式的范围分割确保范围只在行边界处分割)。用户可以通过提供一个简单的reader接口实现来添加对新输入类型的支持，尽管大多数用户只使用少量预定义的输入类型之一。


#### 4. 5 副作用

在某些情况下，MapReduce的用户发现有必要从他们的map和/或reduce操作符中生成辅助文件作为附加输出。我们依靠应用程序编写器来实现这样的原子和幂等的副作用。通常，应用程序写入一个临时文件，并在此文件完全生成后自动重命名。
我们不支持单个任务生成的多个输出文件的原子两阶段提交。因此，生成具有跨文件一致性要求的多个输出文件的任务应该具有确定性。这种限制在实践中从来都不是问题。

#### 4.6 去除坏记录

有时用户代码中存在bug，导致map或reduce函数在某些记录上确定性崩溃。这样的错误阻止了MapReduce操作的完成。通常的做法是修复错误，但有时这是不可行的;也许这个错误存在于第三方库中，对于第三方库，源代码是无用的。此外，有时忽略一些记录是可以接受的，例如在对大数据集进行统计分析时。

每个工作进程都安装一个信号处理程序，用于捕获分段违例和总线错误。在调用用户Map或Reduce操作之前，MapRecece库将参数的序列号存储在全局变量中。如果用户代码生成信号，则信号处理程序将包含序列号的“last gasp”UDP数据包发送到MapReduce主机。当主服务器在特定记录上看到多个故障时，它表示在下一次重新执行相应的Map或Reduce任务时应该跳过该记录。

#### 4.7 本地执行

在Map或Reduce函数中调试问题可能会很棘手，因为实际的计算发生在一个分散的系统中，通常在几千台机器上，工作分配决策是由主动态做出的。为了方便调试、分析和小规模测试，我们开发了MapReduce库的另一种实现，它顺序地执行本地机器上MapReduce操作的所有工作。向用户提供控件，以便将计算限制在特定的映射任务中。用户用一个特殊的标志调用他们的程序，然后可以轻松地使用任何他们认为有用的调试或测试工具(例如gdb)。

#### 4.8 状态信息

master服务器运行一个内部HTTP服务器，并导出一组状态页供人使用。状态页面显示的进度计算,如已经完成多少任务,多少在运行,有多少字节的输入,中间数据的字节,字节的输出,处理速率等。页面包含的链接标准误差和标准输出文件生成的每个任务。用户可以使用这些数据预先决定计算需要多长时间，以及是否应该向计算添加更多的资源。这些页面还可以用来计算什么时候通信比预期慢得多。
此外，顶级状态页面显示哪些worker失败了，以及哪些映射和减少了他们失败时正在处理的任务。这种信息在试图诊断用户代码中的错误时非常有用。

#### 4.9 计数

MapReduce库提供了一个计数器工具来计算各种事件的发生情况。例如，用户代码可能希望计算已处理的单词总数或已索引的德语文档数等。
要使用此功能，用户代码创建一个命名的计数器对象，然后在Map和/或Reduce函数中适当地增加计数器。例如:

```
Counter* uppercase;
  uppercase = GetCounter("uppercase");
  map(String name, String contents):
    for each word w in contents:
      if (IsCapitalized(w)):
        uppercase->Increment();
      EmitIntermediate(w, "1");
```

来自单个worker机器的计数器值定期传播到主机器(在ping响应上承载)。主节点从成功映射中聚合计数器值并进行reduce任务，并在MapReduce操作完成时将它们返回给用户代码。当前的计数器值也会显示在主状态页面上，以便用户可以查看实时计算的进展。当触发计数器值时，主程序消除了相同map的重复执行的影响，或者reduce任务以避免重复计数。(由于使用备份任务和由于失败而重新执行任务，可能会出现重复执行。)
MapReduce库会自动维护一些计数器值，例如处理的输入键/值对的数量和生成的输出键/值对的数量。
用户发现计数器工具对于明智地检查MapReduce操作的行为很有用。例如，在某些MapReduce操作中，用户代码可能希望确保生成的输出对的数量恰好等于处理过的输入对的数量，或者确保处理过的德语文档的数量在处理过的文档总数的一定范围内。

### 5 性能

在本节中，我们将通过在大型计算机集群上运行的两个计算来度量MapReduce的性能。一个计算搜索大约1tb的数据，寻找特定的模式。另一种计算是对大约1tb的数据进行排序。
这两个程序是MapReduce用户编写的真实程序的一个大子集的代表——一类程序将数据从一个表示转移到另一类，另一类程序从一个大数据集中提取少量有趣的数据。

#### 5.1 集群配置

所有的程序都在一个由大约1800台机器组成的集群上执行。每台机器都有两个2GHz的英特尔Xeon处理器，支持超线程，4GB内存，两个160GB的IDE磁盘和一个千兆以太网连接。这些机器被安排在一个两层的树状交换网络中，在根部大约有100-200 Gbps的总带宽可用。所有的机器都在同一个主机上，因此任何一对机器之间的往返时间都不到一毫秒。
在4GB内存中，集群上运行的其他任务大约保留了1-1.5GB。这些程序是在一个周末的下午执行的，那时cpu、磁盘和网络大部分处于空闲状态。

#### 5.2 Grep

![figure2](images/f2.png)

grep程序扫描1010个100字节的记录，搜索一个相对少见的三字符模式(该模式出现在92,337条记录中)。输入被分割成大约64MB (M = 15000)，而所有的输出被放在一个文件中(R = 1)。
图2显示了随时间推移的计算进度。y轴表示扫描输入数据的速率。随着分配给这个MapReduce计算的机器越来越多，这个速率逐渐加快，当分配1764个worker时，峰值超过30 GB/s。当map任务完成时，速率开始下降，在计算中大约80秒内达到0。整个计算从开始到结束大约需要150秒。这包括大约一分钟的启动过度。开销是由于程序在所有工作机器上的传播，以及延迟与GFS交互以打开1000个输入文件集并获取本地化优化所需的信息。

#### 5.3 排序

排序程序对1010个100字节的记录进行排序(大约为1tb)。这个程序是按照TeraSort基准进行建模的。

排序程序由不到50行用户代码组成。一个三行map函数从文本行中提取一个10字节的排序键，并将该键和原始文本行作为中间键/值对发出。我们使用内置的Identity函数作为Reduce运算符。这个函数将中间键/值对作为输出键/值对传递。最后排序的输出被写入一组双向复制的GFS文件(即，作为程序的输出写入2tb)。

与前面一样，输入数据被分割成64MB (M = 15000)块。我们将排序后的输出划分为4000个文件(R = 4000)。分区函数使用键的ini字节将其分隔成R段之一。

这个基准测试的分区函数内建了键分布的知识。在一般的排序程序中，我们将添加一个预传递MapReduce操作，该操作将收集一个键的样本，并使用抽样键的分布来计算最终排序通过的分割点。

![f3](images/f3.png)

图3 (a)显示了排序程序的正常执行过程。左上方的图表显示了读取输入的速率。速率峰值约为13 GB/s，并且很快就会消失，因为所有的map任务都在200秒之前完成了。注意，输入率小于grep。这是因为排序映射任务花费了大约一半的时间，并且I/O带宽将间接输出写入本地磁盘。grep对应的中间输出的大小可以忽略不计。

左中图显示了数据通过网络从映射任务发送到重新生成任务的速度。当第一个map任务完成时，就会开始这种变换。图中的第一个驼峰是第一批大约1700个reduce任务(整个MapReduce分配了大约1700台机器，每台机器一次最多执行一个reduce任务)。在大约300秒的计算之后，第一批reduce任务中的一些已经完成，我们开始对剩下的reduce任务进行数据重组。所有的洗牌都是在大约600秒内完成的。

左下方的图表显示了reduce任务将排序数据写入最终输出文件的速度。第一次洗牌结束和写作开始之间有一个延迟，因为机器正忙于整理中间数据。写入以大约2-4 GB/s的速率持续了一段时间。在计算过程中，所有的写操作都在850秒内完成。包括启动开销，整个计算花费891秒。这与目前TeraSort基准报告的1057秒的最佳结果相似。

需要注意的是:由于局部性优化，输入速率高于洗牌速率和输出速率——大多数数据从本地磁盘读取，并绕过相对带宽受限的网络。由于输出阶段写入已排序数据的两个副本(出于可靠性和可用性的原因，我们对输出做了两个副本)，所以shuffle率高于输出率。我们编写两个副本，因为这是基础文件系统提供的可靠性和可用性机制。

#### 5.4 备份任务的影响

在图3 (b)中，我们展示了禁用备份任务的排序程序的执行。执行流类似于图3 (a)所示，只是有一个非常长的尾巴，几乎不发生任何写活动。960秒后，除5个reduce任务外的所有任务都完成了。然而，这最后的几个落差直到300秒后才结束。整个计算过程耗时1283秒，比原来增加了44%。

#### 5.5 机器错误

在图3 (c)中，我们展示了排序程序的执行，在这个过程中，我们故意在计算的几分钟内杀死了1746个工作进程中的200个进程。底层的集群调度器立即重新启动这些机器上的新工作进程(因为只有进程被杀死，机器仍然正常工作)。


worker死亡显示为负输入率，因为一些先前完成的map工作消失了(因为相应的地图工作人员被杀死了)，需要重新做。此map工作的重新执行相对较快。整个计算在933秒内完成，包括启动开销(仅比正常执行时间增加5%)。

### 6 经历

我们写的第一个版本MapReduce库于2003年2月, 2003年8月, 取得了显著增强, 包括位置优化、动态负载平衡任务执行的工作机器,等。从那时起,我们一直惊喜广泛适用的MapReduce第二版一直在解决我们工作的各种问题。它已被广泛应用于谷歌领域，包括:
- 大规模机器学习问题，
- 谷歌新闻和Froogle产品
- 提取用于生成流行查询报告的数据(如Google Zeitgeist)
- 为新的实验和产品提取web页面的属性(例如，为本地化搜索从大量web页面中提取地理学位置
- 大规模图形计算

![f4](images/f4.png)

图4显示了随着时间的推移，检入我们主要源代码管理系统的独立MapReduce程序的数量显著增加，从2003年初的0个到2004年9月下旬的近900个独立实例。MapReduce非常成功，因为它可以在半小时的时间内编写一个简单的程序并在上千台机器上高效地运行，大大加快了开发和原型开发周期。此外，它允许没有分布式和/或并行系统经验的程序员轻松地利用大量资源。
在每个作业结束时，MapReduce库记录有关作业使用的计算资源的统计信息。在表1中，我们展示了2004年8月在谷歌运行的MapReduce任务子集的一些统计数据。



####  6.1 大规模索引

MapReduce迄今最重要的用途之一是对生产索引系统的完全重写，该系统生成用于谷歌web搜索服务的数据结构。索引系统以我们的爬行系统检索到的大量文档作为输入，存储为一组GFS文件。这些文档的原始内容超过20兆字节。索引过程以5到10个MapReduce操作序列运行。使用MapReduce(而不是索引系统先前版本中的临时分布式传递)提供了一些好处
- 索引代码更简单、更小，也更容易理解，因为处理容错、分布和并行化的代码隐藏在MapReduce库中。例如，使用MapReduce时，计算的一个阶段的大小从大约3800行c++代码下降到大约700行
- MapReduce库的性能足够好，我们可以将概念上不相关的计算分开，而不是将它们混合在一起，以避免额外的数据传递。这使得更改索引过程变得很容易。例如，在我们的旧索引系统中进行一个花了几个月时间的更改，在新系统中实现只需几天时间。
- 索引过程变得更容易操作，因为大多数由机器故障、机器运行缓慢和网络故障引起的问题都是由MapReduce库自动处理的，而不需要操作员干预。此外，通过向索引集群中添加新机器，很容易提高索引过程的性能。




### 7 相关的工作

许多系统都提供了受限的编程模型，并使用这些限制自动并行化计算。例如，使用并行前缀计算，可以在N个处理器上对N个元素数组的所有前缀在log N时间内进行关联函数的计算。MapReduce可以被认为是基于我们对大型现实计算的经验，对其中一些模型的简化和精化。更重要的是，我们提供了可扩展到数千个处理器的容错实现。相比之下，大多数并行处理系统只在较小的范围内实现，处理机器故障的细节留给程序员处理。

批量同步编程和一些MPI基元提供了更高级别的抽象，使程序员更容易编写并行程序。这些系统和MapReduce的一个关键区别是MapReduce利用一个受限的预语法模型自动并行化用户程序，并提供透明的容错能力。

局部优化的灵感来源于活跃的磁盘等技术，该技术将计算机技术引入到与本地磁盘接近的处理元素中，以减少通过I/O子系统或网络发送的数据量。我们在普通处理器上运行，少数磁盘直接连接到这些处理器上，而不是直接运行在磁盘控制器处理器上，但是一般的方法是类似的。

我们的备份任务机制类似于Charlotte系统中使用的即时调度机制。简单的快速调度的缺点之一是，如果给定的任务导致重复的失败，整个计算就无法完成。我们用跳过错误记录的机制修复了这个问题的某些情况。

MapReduce实现依赖于一个内部集群管理系统，该系统负责在大量共享机器上分配和运行用户任务。虽然不是本文的重点，但是集群管理系统在本质上与Condor[16]等其他系统相似。

MapReduce库中的排序工具在操作上与NOW-Sort [1].相似。源机器(map worker)对要排序的数据进行分区，并将其发送给R reduce worker之一。每个reduce worker都在本地(如果可能的话，在内存中)对其数据进行排序。当然，现在排序没有用户定义的map和reduce功能，这些函数使我们的库广泛适用。

River[2]提供了一个编程模型，在这个模型中，通过分布式队列发送数据，进程之间可以进行通信。与MapReduce类似，即使存在由异构硬件或系统扰动引入的非均质性，River system也试图提供良好的平均情况性能。River通过精心安排磁盘和网络传输来实现这一点，以达到平衡的完成时间。MapReduce有不同的方法。通过限制预语法模型，MapReduce框架能够将问题划分为大量的细粒度任务。这些任务在可用的worker上动态调度，以便更快的worker处理更多的任务。受限制的编程模型还允许我们将重复执行的任务安排在工作接近尾声的时候，这大大减少了在出现不一致的情况下(如缓慢或卡住的工人)完成任务的时间。

BAD-FS[5]的编程模型与MapReduce非常不同，与MapReduce不同的是，它的目标是跨广域网络执行作业。无论如何，有两个基本的相似之处。(1)两个系统都使用冗余执行从故障造成的数据丢失中恢复。(2)两者都使用位置感知调度，以减少跨连接互连发送的数据量。
TACC[7]是一个旨在简化高可用网络服务结构的系统。与MapReduce类似，它依赖于重新执行作为实现容错的机制。

### 8 总结

MapReduce编程模型已经成功地在谷歌中得到了广泛的应用。我们把这种成功归因于几个原因。首先，该模型易于使用，甚至对于没有并行和分布式系统经验的程序员也是如此，因为它隐藏了并行化、容错、局部优化和负载平衡等细节。其次，各种各样的问题都很容易用MapReduce 计算来表达。例如，MapReduce用于谷歌的生产web搜索引擎的数据生成，vice，用于排序，数据挖掘，机器学习和许多其他系统。第三，我们已经开发了MapReduce的实现，它可以扩展到由数千台机器组成的大型计算机群。该实现有效地利用了这些机器资源，因此适用于谷歌中遇到的许多大型计算问题。

我们从这项工作中学到了一些东西。首先，对编程模型的限制使计算变得容易进行平均分配和分配，并使计算具有容错能力。其次，网络带宽是一种稀缺资源。因此，我们系统中的许多优化都旨在减少通过网络发送的数据量:局部优化使我们能够从本地磁盘读取数据，并将中间数据的一个副本写入本地磁盘可以节省网络带宽。第三，可以使用冗余执行来减少慢速机器的影响，并处理机器故障和数据丢失。

