Skip to content

Commit

Permalink
ZOOKEEPER-1259_central_mapping_from_type_to_txn_record_class
Browse files Browse the repository at this point in the history
Change-Id: I4689e233ad143b3a5df5b6b29259842ba5564cef
  • Loading branch information
thkoch2001 committed Nov 1, 2011
1 parent 5f83aaa commit 3d18a4b
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 81 deletions.
44 changes: 14 additions & 30 deletions src/java/main/org/apache/zookeeper/server/DataTree.java
Expand Up @@ -54,6 +54,7 @@
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.DeleteTxn;
import org.apache.zookeeper.txn.ErrorTxn;
Expand Down Expand Up @@ -800,31 +801,16 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn)

boolean post_failed = false;
for (Txn subtxn : txns) {
ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
Record record = null;
switch (subtxn.getType()) {
case OpCode.create:
record = new CreateTxn();
break;
case OpCode.delete:
record = new DeleteTxn();
break;
case OpCode.setData:
record = new SetDataTxn();
break;
case OpCode.error:
record = new ErrorTxn();
post_failed = true;
break;
case OpCode.check:
record = new CheckVersionTxn();
break;
default:
throw new IOException("Invalid type of op: " + subtxn.getType());
int type = subtxn.getType();
if(type == OpCode.error) {
post_failed = true;
} else if(type != OpCode.create && type != OpCode.delete
&& type != OpCode.setData && type != OpCode.check) {
throw new IOException("Invalid type of op: " + type);
}
assert(record != null);

ByteBufferInputStream.byteBuffer2Record(bb, record);
Record record = SerializeUtils.getRecordForType(type);
ByteBufferInputStream.byteBuffer2Record(
ByteBuffer.wrap(subtxn.getData()), record);

if (failed && subtxn.getType() != OpCode.error){
int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue()
Expand All @@ -834,13 +820,11 @@ record = new CheckVersionTxn();
record = new ErrorTxn(ec);
}

if (failed) {
assert(subtxn.getType() == OpCode.error) ;
}
TxnHeader subHdr = new TxnHeader(
header.getClientId(), header.getCxid(),
header.getZxid(), header.getTime(),
subtxn.getType());

TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),
header.getZxid(), header.getTime(),
subtxn.getType());
ProcessTxnResult subRc = processTxn(subHdr, record);
rc.multiResult.add(subRc);
if (subRc.err != 0 && rc.err == 0) {
Expand Down
96 changes: 45 additions & 51 deletions src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
Expand Up @@ -35,6 +35,7 @@
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.DataTree;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.txn.CheckVersionTxn;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.CreateTxnV0;
Expand All @@ -47,64 +48,57 @@

public class SerializeUtils {
private static final Logger LOG = LoggerFactory.getLogger(SerializeUtils.class);


private static final Map<Integer, Class<? extends Record>> type2txnRecordClass
= new HashMap<Integer, Class<? extends Record>>();
static {
type2txnRecordClass.put(OpCode.createSession, CreateSessionTxn.class);
type2txnRecordClass.put(OpCode.create, CreateTxn.class);
type2txnRecordClass.put(OpCode.delete, DeleteTxn.class);
type2txnRecordClass.put(OpCode.setData, SetDataTxn.class);
type2txnRecordClass.put(OpCode.setACL, SetACLTxn.class);
type2txnRecordClass.put(OpCode.error, ErrorTxn.class);
type2txnRecordClass.put(OpCode.multi, MultiTxn.class);
type2txnRecordClass.put(OpCode.check, CheckVersionTxn.class);
}

public static Record getRecordForType(int type) throws IOException {
Class<? extends Record> clazz = type2txnRecordClass.get(type);
if(clazz == null) throw new IOException("Unsupported Txn with type="+type);

try { return clazz.newInstance(); }
catch (InstantiationException e) { throw new RuntimeException(e); }
catch (IllegalAccessException e) { throw new RuntimeException(e); }
}

public static Record deserializeTxn(byte txnBytes[], TxnHeader hdr)
throws IOException {
final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes);
InputArchive ia = BinaryInputArchive.getArchive(bais);

hdr.deserialize(ia, "hdr");
bais.mark(bais.available());
Record txn = null;
switch (hdr.getType()) {
case OpCode.createSession:
// This isn't really an error txn; it just has the same
// format. The error represents the timeout
txn = new CreateSessionTxn();
break;
case OpCode.closeSession:
return null;
case OpCode.create:
txn = new CreateTxn();
break;
case OpCode.delete:
txn = new DeleteTxn();
break;
case OpCode.setData:
txn = new SetDataTxn();
break;
case OpCode.setACL:
txn = new SetACLTxn();
break;
case OpCode.error:
txn = new ErrorTxn();
break;
case OpCode.multi:
txn = new MultiTxn();
break;
default:
throw new IOException("Unsupported Txn with type=%d" + hdr.getType());
}
if (txn != null) {
try {
txn.deserialize(ia, "txn");
} catch(EOFException e) {
// perhaps this is a V0 Create
if (hdr.getType() == OpCode.create) {
CreateTxn create = (CreateTxn)txn;
bais.reset();
CreateTxnV0 createv0 = new CreateTxnV0();
createv0.deserialize(ia, "txn");
// cool now make it V1. a -1 parentCVersion will
// trigger fixup processing in processTxn
create.setPath(createv0.getPath());
create.setData(createv0.getData());
create.setAcl(createv0.getAcl());
create.setEphemeral(createv0.getEphemeral());
create.setParentCVersion(-1);
} else {
throw e;
}
if(hdr.getType() == OpCode.closeSession) return null;
final Record txn = getRecordForType(hdr.getType());

try {
txn.deserialize(ia, "txn");
} catch(EOFException e) {
// perhaps this is a V0 Create
if (hdr.getType() == OpCode.create) {
CreateTxn create = (CreateTxn)txn;
bais.reset();
CreateTxnV0 createv0 = new CreateTxnV0();
createv0.deserialize(ia, "txn");
// cool now make it V1. a -1 parentCVersion will
// trigger fixup processing in processTxn
create.setPath(createv0.getPath());
create.setData(createv0.getData());
create.setAcl(createv0.getAcl());
create.setEphemeral(createv0.getEphemeral());
create.setParentCVersion(-1);
} else {
throw e;
}
}
return txn;
Expand Down
21 changes: 21 additions & 0 deletions src/java/test/org/apache/zookeeper/test/MultiTransactionTest.java
Expand Up @@ -20,6 +20,8 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
Expand All @@ -35,6 +37,7 @@
import java.util.ArrayList;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -223,6 +226,24 @@ public void testGetResults() throws Exception {
}
}

// This tests resembles the CPP test Zookeeper_multi::testCheck
@Test
public void testAllResultsErrorCodeOK()
throws KeeperException, InterruptedException {
String parent = "/multi0";
String child = parent + "/a";
zk.create(parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Transaction transaction = zk.transaction();
transaction.check(parent, 0);
transaction.create(child, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
List<OpResult> results = transaction.commit();
assertTrue(zk.exists(child, false) instanceof Stat);
assertEquals(OpCode.check, results.get(0).getType());
assertEquals(OpCode.create, results.get(1).getType());
assertEquals(child, ((OpResult.CreateResult)results.get(1)).getPath());
}

@Test
public void testWatchesTriggered() throws KeeperException, InterruptedException {
HasTriggeredWatcher watcher = new HasTriggeredWatcher();
Expand Down

0 comments on commit 3d18a4b

Please sign in to comment.