Skip to content

Commit

Permalink
ZOOKEEPER-1234_basic_cleanup_in_LearnerHandler
Browse files Browse the repository at this point in the history
Change-Id: Ie7ec220fe698d588a4119d582a4d675dd98d9617
  • Loading branch information
thkoch2001 committed Nov 1, 2011
1 parent b2c2b8f commit 2f6714b
Showing 1 changed file with 56 additions and 136 deletions.
192 changes: 56 additions & 136 deletions src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
Expand All @@ -44,9 +43,7 @@
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.TxnHeader;

/**
* There will be an instance of this class created by the Leader for each
Expand All @@ -56,48 +53,42 @@
public class LearnerHandler extends Thread {
private static final Logger LOG = LoggerFactory.getLogger(LearnerHandler.class);

protected final Socket sock;

public Socket getSocket() {
return sock;
}

final Leader leader;
/**
* If this packet is queued, the sender thread will exit
*/
private static final QuorumPacket proposalOfDeath = new QuorumPacket();

long tickOfLastAck;
private LearnerType learnerType = LearnerType.PARTICIPANT;
private final Leader leader;
private final Socket sock;
private long tickOfLastAck;
private int protocolVersion = 0x1;

/**
* ZooKeeper server identifier of this learner
*/
protected long sid = 0;

long getSid(){
return sid;
}

protected int version = 0x1;

int getVersion() {
return version;
}
private long sid = 0;

/**
* The packets to be sent to the learner
*/
final LinkedBlockingQueue<QuorumPacket> queuedPackets =
new LinkedBlockingQueue<QuorumPacket>();
final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();

private BinaryInputArchive ia;
private final BinaryInputArchive ia;

private BinaryOutputArchive oa;
private final BinaryOutputArchive oa;

private BufferedOutputStream bufferedOutput;
private final BufferedOutputStream bufferedOutput;

LearnerHandler(Socket sock, Leader leader) throws IOException {
super("LearnerHandler-" + sock.getRemoteSocketAddress());
this.sock = sock;
this.leader = leader;
leader.addLearnerHandler(this);

ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
}

@Override
Expand All @@ -110,16 +101,21 @@ public String toString() {
return sb.toString();
}

/**
* If this packet is queued, the sender thread will exit
*/
final QuorumPacket proposalOfDeath = new QuorumPacket();

private LearnerType learnerType = LearnerType.PARTICIPANT;
public LearnerType getLearnerType() {
return learnerType;
}

public Socket getSocket() {
return sock;
}

long getSid(){
return sid;
}

int getProtocolVersion() {
return protocolVersion;
}
/**
* This method will use the thread to send packets added to the
* queuedPackets list
Expand Down Expand Up @@ -165,104 +161,35 @@ private void sendPackets() throws InterruptedException {
}
}

static public String packetToString(QuorumPacket p) {
if (true)
return null;
String type = null;
String mess = null;
Record txn = null;

switch (p.getType()) {
case Leader.ACK:
type = "ACK";
break;
case Leader.COMMIT:
type = "COMMIT";
break;
case Leader.FOLLOWERINFO:
type = "FOLLOWERINFO";
break;
case Leader.NEWLEADER:
type = "NEWLEADER";
break;
case Leader.PING:
type = "PING";
break;
case Leader.PROPOSAL:
type = "PROPOSAL";
TxnHeader hdr = new TxnHeader();
try {
txn = SerializeUtils.deserializeTxn(p.getData(), hdr);
// mess = "transaction: " + txn.toString();
} catch (IOException e) {
LOG.warn("Unexpected exception",e);
}
break;
case Leader.REQUEST:
type = "REQUEST";
break;
case Leader.REVALIDATE:
type = "REVALIDATE";
ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
DataInputStream dis = new DataInputStream(bis);
try {
long id = dis.readLong();
mess = " sessionid = " + id;
} catch (IOException e) {
LOG.warn("Unexpected exception", e);
}

break;
case Leader.UPTODATE:
type = "UPTODATE";
break;
default:
type = "UNKNOWN" + p.getType();
}
String entry = null;
if (type != null) {
entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess;
}
return entry;
}

/**
* This thread will receive packets from the peer and process them and
* also listen to new connections from new peers.
*/
@Override
public void run() {
try {
sock.setSoTimeout(leader.self.getTickTime()*leader.self.getInitLimit());
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);

QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
LOG.error("First packet {} is not FOLLOWERINFO or OBSERVERINFO!", qp);
return;
}
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
if (learnerInfoData.length == 8) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
this.sid = bbsid.getLong();
} else {
LearnerInfo li = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
this.sid = li.getServerid();
this.version = li.getProtocolVersion();
}
if (learnerInfoData.length == 8) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
this.sid = bbsid.getLong();
} else {
LearnerInfo li = new LearnerInfo();
ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
this.sid = li.getServerid();
this.protocolVersion = li.getProtocolVersion();
}
} else {
this.sid = leader.followerCounter.getAndDecrement();
this.sid = leader.followerCounter.getAndDecrement();
}

LOG.info("Follower sid: " + this.sid + " : info : "
+ leader.self.quorumPeers.get(this.sid));
LOG.info("Follower sid: {} : info : {}", this.sid, leader.self.quorumPeers.get(this.sid));

if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
Expand All @@ -275,7 +202,7 @@ public void run() {
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);

if (this.getVersion() < 0x10000) {
if (this.getProtocolVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
long epoch = ZxidUtils.getEpochFromZxid(zxid);
ss = new StateSummary(epoch, zxid);
Expand All @@ -290,10 +217,9 @@ public void run() {
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString()
+ " is not ACKEPOCH");
LOG.error("{} is not ACKEPOCH", ackEpochPacket);
return;
}
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
Expand Down Expand Up @@ -389,7 +315,7 @@ public void run() {

QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
ZxidUtils.makeZxid(newEpoch, 0), null, null);
if (getVersion() < 0x10000) {
if (getProtocolVersion() < 0x10000) {
oa.writeRecord(newLeaderQP, "packet");
} else {
queuedPackets.add(newLeaderQP);
Expand Down Expand Up @@ -419,8 +345,7 @@ public void run() {
// Start sending packets
new Thread() {
public void run() {
Thread.currentThread().setName(
"Sender-" + sock.getRemoteSocketAddress());
Thread.currentThread().setName("Sender-" + sock.getRemoteSocketAddress());
try {
sendPackets();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -453,11 +378,9 @@ public void run() {
// Mutation packets will be queued during the serialize,
// so we need to mark when the peer can actually start
// using the data
//
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));


while (true) {
while (true) { // TODO: move the loop in a separate method, this one is long enough.
qp = new QuorumPacket();
ia.readRecord(qp, "packet");

Expand All @@ -470,7 +393,6 @@ public void run() {
}
tickOfLastAck = leader.self.tick;


ByteBuffer bb;
long sessionId;
int cxid;
Expand Down Expand Up @@ -545,15 +467,14 @@ public void run() {
}
} catch (IOException e) {
if (sock != null && !sock.isClosed()) {
LOG.error("Unexpected exception causing shutdown while sock "
+ "still open", e);
//close the socket to make sure the
//other side can see it being close
try {
sock.close();
} catch(IOException ie) {
// do nothing
}
LOG.error("Unexpected exception causing shutdown while sock still open", e);
// close the socket to make sure the
// other side can see it being close
try {
sock.close();
} catch (IOException ie) {
// do nothing
}
}
} catch (InterruptedException e) {
LOG.error("Unexpected exception causing shutdown", e);
Expand Down Expand Up @@ -595,8 +516,7 @@ public void ping() {
synchronized(leader) {
id = leader.lastProposed;
}
QuorumPacket ping = new QuorumPacket(Leader.PING, id,
null, null);
QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
queuePacket(ping);
}

Expand Down

0 comments on commit 2f6714b

Please sign in to comment.