Skip to content

Commit

Permalink
remove DataTree.processTxn()
Browse files Browse the repository at this point in the history
Change-Id: I8b8d21a6afb0b8b78e62cc0264ab836d50b22cda
  • Loading branch information
thkoch2001 committed Nov 1, 2011
1 parent 8ff6158 commit 3b90f17
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 133 deletions.
39 changes: 2 additions & 37 deletions src/java/main/org/apache/zookeeper/server/DataTree.java
Expand Up @@ -32,7 +32,6 @@
import org.apache.jute.Index;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
Expand All @@ -48,7 +47,6 @@
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.server.Transaction.PathTransaction;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.DeleteTxn;
import org.apache.zookeeper.txn.TxnHeader;
Expand All @@ -65,6 +63,8 @@
public class DataTree {
private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);

public volatile long lastProcessedZxid = 0;

/**
* This hashtable provides a fast lookup to the datanodes. The tree is the
* source of truth and is where all the locking occurs
Expand Down Expand Up @@ -486,41 +486,6 @@ public AccessControlList getACL(String path, Stat stat)
}
}

public volatile long lastProcessedZxid = 0;

public Transaction.ProcessTxnResult processTxn(TxnHeader header, Record txn)
{
Transaction.ProcessTxnResult rc = new Transaction.ProcessTxnResult(0);
Transaction transaction = Transaction.fromTxn(header, txn);
try {
rc = transaction.process(this);
} catch (KeeperException e) {
LOG.warn("Failed: ", e);
if(transaction != null && transaction instanceof PathTransaction) {
rc.path = ((PathTransaction)transaction).path;
}
rc.err = e.code().intValue();
}
/*
* A snapshot might be in progress while we are modifying the data
* tree. If we set lastProcessedZxid prior to making corresponding
* change to the tree, then the zxid associated with the snapshot
* file will be ahead of its contents. Thus, while restoring from
* the snapshot, the restore method will not apply the transaction
* for zxid associated with the snapshot file, since the restore
* method assumes that transaction to be present in the snapshot.
*
* To avoid this, we first apply the transaction and then modify
* lastProcessedZxid. During restore, we correctly handle the
* case where the snapshot contains data ahead of the zxid associated
* with the file.
*/
if (header.getZxid() > lastProcessedZxid) {
lastProcessedZxid = header.getZxid();
}
return rc;
}

void killSession(long session, long zxid) {
// the list is already removed from the ephemerals
// so we do not have to worry about synchronizing on
Expand Down
Expand Up @@ -26,14 +26,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MultiResponse;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.SessionMovedException;
import org.apache.zookeeper.common.AccessControlList;
import org.apache.zookeeper.common.AccessControlList.Permission;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.CreateResponse;
import org.apache.zookeeper.proto.ExistsRequest;
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetACLRequest;
Expand All @@ -45,12 +43,11 @@
import org.apache.zookeeper.proto.GetDataRequest;
import org.apache.zookeeper.proto.GetDataResponse;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.proto.SetWatches;
import org.apache.zookeeper.proto.SyncRequest;
import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.Request.Meta;
import org.apache.zookeeper.server.Transaction.PathTransaction;
import org.apache.zookeeper.server.Transaction.ProcessTxnResult;
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.txn.CreateSessionTxn;
Expand Down Expand Up @@ -79,7 +76,8 @@ public void processRequest(Request request) {
LOG.debug("Processing request:: " + request);
}

ProcessTxnResult rc = null;
Transaction transaction = null;
ProcessTxnResult rc = new ProcessTxnResult(0);
synchronized (zks.outstandingChanges) {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.getMeta().getZxid()) {
Expand All @@ -92,7 +90,34 @@ public void processRequest(Request request) {
}
}
if (request.getHdr() != null) {
rc = zks.getZKDatabase().processTxn(request.getHdr(), request.getTxn());
transaction = Transaction.fromTxn(request.getHdr(), request.getTxn());
DataTree tree = zks.getZKDatabase().getDataTree();
try {
rc = transaction.process(tree);
} catch (KeeperException e) {
LOG.warn("Failed: ", e);
if(transaction != null && transaction instanceof PathTransaction) {
rc.path = ((PathTransaction)transaction).getPath();
}
rc.err = e.code().intValue();
}
/*
* A snapshot might be in progress while we are modifying the data
* tree. If we set lastProcessedZxid prior to making corresponding
* change to the tree, then the zxid associated with the snapshot
* file will be ahead of its contents. Thus, while restoring from
* the snapshot, the restore method will not apply the transaction
* for zxid associated with the snapshot file, since the restore
* method assumes that transaction to be present in the snapshot.
*
* To avoid this, we first apply the transaction and then modify
* lastProcessedZxid. During restore, we correctly handle the
* case where the snapshot contains data ahead of the zxid associated
* with the file.
*/
if (request.getHdr().getZxid() > tree.lastProcessedZxid) {
tree.lastProcessedZxid = request.getHdr().getZxid();
}
if (request.getMeta().getType() == OpCode.createSession) {
if (request.getTxn() instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) request.getTxn();
Expand Down Expand Up @@ -155,30 +180,14 @@ public void processRequest(Request request) {
zks.finishSessionInit(request.getMeta().getCnxn(), true);
return;
}
case multi: {
rsp = new MultiResponse(rc.multiResult);
break;
}
case create: {
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
break;
}
case delete: {
err = Code.get(rc.err);
break;
}
case setData: {
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case setACL: {
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case closeSession: {
case multi:
case create:
case delete:
case setData:
case setACL:
case closeSession:
case check: {
rsp = transaction.getResponse(rc);
err = Code.get(rc.err);
break;
}
Expand All @@ -187,11 +196,6 @@ public void processRequest(Request request) {
rsp = new SyncResponse(syncRequest.getPath());
break;
}
case check: {
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case exists: {
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = (ExistsRequest)request.deserializeRequestRecord();
Expand Down
19 changes: 15 additions & 4 deletions src/java/main/org/apache/zookeeper/server/Transaction.java
Expand Up @@ -34,8 +34,12 @@
import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.Txn;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class Transaction {
private static final Logger LOG = LoggerFactory.getLogger(Transaction.class);

protected final long clientId;
protected final int cxid;
protected final long zxid;
Expand All @@ -54,7 +58,7 @@ private Transaction(TxnHeader header) {
this.type = OpCode.fromInt(header.getType());
}

static Transaction fromTxn(TxnHeader header, Record txn) {
public static Transaction fromTxn(TxnHeader header, Record txn) {
switch (OpCode.fromInt(header.getType())) {
case create: return new Create(header, (CreateTxn)txn);
case delete: return new Delete(header, (DeleteTxn)txn);
Expand Down Expand Up @@ -85,6 +89,8 @@ protected PathTransaction(TxnHeader header, String path) {
super(header);
this.path = path;
}

public String getPath() { return path; }
}

public static final class Create extends PathTransaction {
Expand Down Expand Up @@ -351,7 +357,7 @@ public Record getResponse(ProcessTxnResult rc) {
}

@Override
public ProcessTxnResult process(DataTree tree) throws KeeperException {
public ProcessTxnResult process(DataTree tree) {
List<ProcessTxnResult> multiResult = new ArrayList<ProcessTxnResult>();

boolean postFailed = false;
Expand All @@ -366,12 +372,17 @@ public ProcessTxnResult process(DataTree tree) throws KeeperException {
subRc = new ProcessTxnResult(postFailed ? Code.RUNTIMEINCONSISTENCY.intValue()
: Code.OK.intValue());
} else {
subRc = subtxn.process(tree);
try {
subRc = subtxn.process(tree);
} catch (KeeperException e) {
LOG.debug("SubTxn of type {} failed: {}", type.longString, e);
subRc = null;
}
}

multiResult.add(subRc);
}
ProcessTxnResult result = new ProcessTxnResult(error.intValue());
ProcessTxnResult result = new ProcessTxnResult(0);
result.multiResult = multiResult;
result.type = OpCode.multi;
return result;
Expand Down
20 changes: 8 additions & 12 deletions src/java/main/org/apache/zookeeper/server/ZKDatabase.java
Expand Up @@ -43,7 +43,6 @@
import org.apache.zookeeper.common.AccessControlList;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.Request.Meta;
import org.apache.zookeeper.server.Transaction.ProcessTxnResult;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
import org.apache.zookeeper.server.quorum.Leader;
Expand Down Expand Up @@ -308,17 +307,6 @@ public void setlastProcessedZxid(long zxid) {
dataTree.lastProcessedZxid = zxid;
}

/**
* the process txn on the data
* @param hdr the txnheader for the txn
* @param txn the transaction that needs to be processed
* @return the result of processing the transaction on this
* datatree/zkdatabase
*/
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
return dataTree.processTxn(hdr, txn);
}

/**
* stat the path
* @param path the path for which stat is to be done
Expand Down Expand Up @@ -482,4 +470,12 @@ public void close() throws IOException {
this.snapLog.close();
}

public void processTxn(TxnHeader hdr, Record txn) {
try {
Transaction.fromTxn(hdr, txn).process(dataTree);
} catch (KeeperException e) {
LOG.warn("Failed: ", e);
}
}

}

0 comments on commit 3b90f17

Please sign in to comment.