Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
92 lines (81 sloc) 2.62 KB
package mapreduce
import (
"encoding/json"
"log"
"os"
"sort"
)
// doReduce does the job of a reduce worker: it reads the intermediate
// key/value pairs (produced by the map phase) for this task, sorts the
// intermediate key/value pairs by key, calls the user-defined reduce function
// (reduceF) for each key, and writes the output to disk.
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTaskNumber int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
// TODO:
// You will need to write this function.
// You can find the intermediate file for this reduce task from map task number
// m using reduceName(jobName, m, reduceTaskNumber).
// Remember that you've encoded the values in the intermediate files, so you
// will need to decode them. If you chose to use JSON, you can read out
// multiple decoded values by creating a decoder, and then repeatedly calling
// .Decode() on it until Decode() returns an error.
//
// You should write the reduced output in as JSON encoded KeyValue
// objects to a file named mergeName(jobName, reduceTaskNumber). We require
// you to use JSON here because that is what the merger than combines the
// output from all the reduce tasks expects. There is nothing "special" about
// JSON -- it is just the marshalling format we chose to use. It will look
// something like this:
//
// enc := json.NewEncoder(mergeFile)
// for key in ... {
// enc.Encode(KeyValue{key, reduceF(...)})
// }
// file.Close()
keyValues := make(map[string][]string, 0)
for i := 0; i < nMap; i++ {
fileName := reduceName(jobName, i, reduceTaskNumber)
file, err := os.Open(fileName)
if err != nil {
log.Fatal("doReduce: open intermediate file ", fileName, " error: ", err)
}
defer file.Close()
dec := json.NewDecoder(file)
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
break
}
_, ok := keyValues[kv.Key]
if !ok {
keyValues[kv.Key] = make([]string, 0)
}
keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
}
}
var keys []string
// 对keys 排序
for k, _ := range keyValues {
keys = append(keys, k)
}
sort.Strings(keys)
mergeFile, err := os.Create(outFile)
if err != nil {
log.Fatal("doReduce: create merge file ", outFile, " error: ", err)
}
defer mergeFile.Close()
enc := json.NewEncoder(mergeFile)
for _, k := range keys {
res := reduceF(k, keyValues[k])
err := enc.Encode(&KeyValue{k, res})
if err != nil {
log.Fatal("doReduce: encode error: ", err)
}
}
}