Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
68 lines (62 sloc) 1.58 KB
package mapreduce
import (
"fmt"
"log"
"net"
"net/rpc"
"os"
)
// Shutdown is an RPC method that shuts down the Master's RPC server.
func (mr *Master) Shutdown(_, _ *struct{}) error {
debug("Shutdown: registration server\n")
close(mr.shutdown)
mr.l.Close() // causes the Accept to fail
return nil
}
// startRPCServer starts the Master's RPC server. It continues accepting RPC
// calls (Register in particular) for as long as the worker is alive.
func (mr *Master) startRPCServer() {
rpcs := rpc.NewServer()
rpcs.Register(mr)
os.Remove(mr.address) // only needed for "unix"
l, e := net.Listen("unix", mr.address)
if e != nil {
log.Fatal("RegstrationServer", mr.address, " error: ", e)
}
mr.l = l
// now that we are listening on the master address, can fork off
// accepting connections to another thread.
go func() {
loop:
for {
select {
// 当close(mr.shutdown) 这里就会被处罚
case <-mr.shutdown:
break loop
default:
}
conn, err := mr.l.Accept()
if err == nil {
go func() {
rpcs.ServeConn(conn)
conn.Close()
}()
} else {
debug("RegistrationServer: accept error", err)
break
}
}
debug("RegistrationServer: done\n")
}()
}
// stopRPCServer stops the master RPC server.
// This must be done through an RPC to avoid race conditions between the RPC
// server thread and the current thread.
func (mr *Master) stopRPCServer() {
var reply ShutdownReply
ok := call(mr.address, "Master.Shutdown", new(struct{}), &reply)
if ok == false {
fmt.Printf("Cleanup: RPC %s error\n", mr.address)
}
debug("cleanupRegistration: done\n")
}