Skip to content

Commit

Permalink
After POLLIN event from zmq_poll, read ALL packets that could be avai…
Browse files Browse the repository at this point in the history
…lable on the wire.
  • Loading branch information
smira committed Dec 3, 2013
1 parent 8552830 commit 631d860
Showing 1 changed file with 71 additions and 65 deletions.
136 changes: 71 additions & 65 deletions raft/zeromq.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (transporter *ZmqTransporter) Start(server raft.Server) {
transporter.running = true
go func() {
reactor := zmq.NewReactor()
reactor.AddSocket(transporter.incoming, zmq.POLLIN, func(zmq.State) error { return transporter.parseIncomingResponse() })
reactor.AddSocket(transporter.incoming, zmq.POLLIN, func(zmq.State) error { glog.Info("Socket triggered"); transporter.parseIncomingPackets(); return nil })

var chId uint64
chId = reactor.AddChannel(transporter.stop, 1, func(interface{}) error {
Expand All @@ -85,6 +85,7 @@ func (transporter *ZmqTransporter) Start(server raft.Server) {
})

for transporter.running {
glog.Info("Entering reactor")
err := reactor.Run(100 * time.Millisecond)
if err != nil && err.Error() != "No sockets to poll, no channels to read" {
glog.Infof("Got error from ZMQ reactor: %v", err)
Expand Down Expand Up @@ -123,84 +124,89 @@ func (transporter *ZmqTransporter) processResponse(name string, rb *bytes.Buffer
return nil
}

// parseIncomingResponse parses packets on the wire
func (transporter *ZmqTransporter) parseIncomingResponse() error {
request, err := transporter.incoming.RecvBytes(0)
if err != nil {
glog.Infof("Error while receiving incoming request: %v\n", err)
return nil
}

rb := bytes.NewBuffer(request)
// parseIncomingPackets parses packets on the wire
func (transporter *ZmqTransporter) parseIncomingPackets() {
for {
request, err := transporter.incoming.RecvBytes(zmq.DONTWAIT)
if err != nil {
if err.Error() == "resource temporarily unavailable" {
// async receive, no data
break
}
glog.Infof("Error while receiving incoming packet: %v\n", err)
return
}

kind, err := rb.ReadByte()
if err != nil {
glog.Infof("Unable to read first byte of request: %v", err)
return nil
}
rb := bytes.NewBuffer(request)

switch kind {
case requestVote:
req := &raft.RequestVoteRequest{}
transporter.processResponse("request vote", rb, req,
func() raftReqResp { return transporter.server.RequestVote(req) })
case requestAppendEntries:
req := &raft.AppendEntriesRequest{}
transporter.processResponse("append entries request", rb, req,
func() raftReqResp { return transporter.server.AppendEntries(req) })
case requestSnapshot:
req := &raft.SnapshotRequest{}
transporter.processResponse("snapshot request", rb, req,
func() raftReqResp { return transporter.server.RequestSnapshot(req) })
case requestShapshotRecovery:
req := &raft.SnapshotRecoveryRequest{}
transporter.processResponse("snapshot recovery request", rb, req,
func() raftReqResp { return transporter.server.SnapshotRecoveryRequest(req) })
case requestCommand:
decoder := json.NewDecoder(rb)

var commandName string
var response error

err = decoder.Decode(&commandName)
kind, err := rb.ReadByte()
if err != nil {
response = fmt.Errorf("unable to decode command name: %v", err)
} else {
instanciator, ok := transporter.commands[commandName]
glog.Infof("Unable to read first byte of packet: %v", err)
return
}

if !ok {
response = fmt.Errorf("unknown command name %s", commandName)
switch kind {
case requestVote:
req := &raft.RequestVoteRequest{}
transporter.processResponse("request vote", rb, req,
func() raftReqResp { return transporter.server.RequestVote(req) })
case requestAppendEntries:
req := &raft.AppendEntriesRequest{}
transporter.processResponse("append entries request", rb, req,
func() raftReqResp { return transporter.server.AppendEntries(req) })
case requestSnapshot:
req := &raft.SnapshotRequest{}
transporter.processResponse("snapshot request", rb, req,
func() raftReqResp { return transporter.server.RequestSnapshot(req) })
case requestShapshotRecovery:
req := &raft.SnapshotRecoveryRequest{}
transporter.processResponse("snapshot recovery request", rb, req,
func() raftReqResp { return transporter.server.SnapshotRecoveryRequest(req) })
case requestCommand:
decoder := json.NewDecoder(rb)

var commandName string
var response error

err = decoder.Decode(&commandName)
if err != nil {
response = fmt.Errorf("unable to decode command name: %v", err)
} else {
command := instanciator()
err = decoder.Decode(command)
if err != nil {
response = fmt.Errorf("unable to decode command %s: %v", commandName, err)
instanciator, ok := transporter.commands[commandName]

if !ok {
response = fmt.Errorf("unknown command name %s", commandName)
} else {
_, err = transporter.server.Do(command)
command := instanciator()
err = decoder.Decode(command)
if err != nil {
response = fmt.Errorf("error processing command %s: %v", commandName, err)
response = fmt.Errorf("unable to decode command %s: %v", commandName, err)
} else {
_, err = transporter.server.Do(command)
if err != nil {
response = fmt.Errorf("error processing command %s: %v", commandName, err)
}
}
}
}
}

wb := new(bytes.Buffer)
encoder := json.NewEncoder(wb)
if response == nil {
encoder.Encode(nil)
} else {
encoder.Encode(response.Error())
}
wb := new(bytes.Buffer)
encoder := json.NewEncoder(wb)
if response == nil {
encoder.Encode(nil)
} else {
encoder.Encode(response.Error())
}

_, err = transporter.incoming.SendBytes(wb.Bytes(), 0)
if err != nil {
glog.Infof("Unable to send response back to command %s: %v", commandName, err)
_, err = transporter.incoming.SendBytes(wb.Bytes(), 0)
if err != nil {
glog.Infof("Unable to send response back to command %s: %v", commandName, err)
}
default:
glog.Infof("Unknown request kind: %v", kind)
}
default:
glog.Infof("Unknown request kind: %v", kind)
}

return nil
}
}

// getSocketFor returns outgoing ZMQ socket for peer, creating when necessary
Expand Down

0 comments on commit 631d860

Please sign in to comment.