From 3d18a4bdee6badf2b9b97c6bb275ce2245c65cc9 Mon Sep 17 00:00:00 2001 From: Thomas Koch Date: Sat, 29 Oct 2011 11:24:38 +0200 Subject: [PATCH] ZOOKEEPER-1259_central_mapping_from_type_to_txn_record_class Change-Id: I4689e233ad143b3a5df5b6b29259842ba5564cef --- .../org/apache/zookeeper/server/DataTree.java | 44 +++------ .../zookeeper/server/util/SerializeUtils.java | 96 +++++++++---------- .../zookeeper/test/MultiTransactionTest.java | 21 ++++ 3 files changed, 80 insertions(+), 81 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java b/src/java/main/org/apache/zookeeper/server/DataTree.java index 757a5727bdd..1ae85dcd74b 100644 --- a/src/java/main/org/apache/zookeeper/server/DataTree.java +++ b/src/java/main/org/apache/zookeeper/server/DataTree.java @@ -54,6 +54,7 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.data.StatPersisted; +import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.txn.CreateTxn; import org.apache.zookeeper.txn.DeleteTxn; import org.apache.zookeeper.txn.ErrorTxn; @@ -800,31 +801,16 @@ public ProcessTxnResult processTxn(TxnHeader header, Record txn) boolean post_failed = false; for (Txn subtxn : txns) { - ByteBuffer bb = ByteBuffer.wrap(subtxn.getData()); - Record record = null; - switch (subtxn.getType()) { - case OpCode.create: - record = new CreateTxn(); - break; - case OpCode.delete: - record = new DeleteTxn(); - break; - case OpCode.setData: - record = new SetDataTxn(); - break; - case OpCode.error: - record = new ErrorTxn(); - post_failed = true; - break; - case OpCode.check: - record = new CheckVersionTxn(); - break; - default: - throw new IOException("Invalid type of op: " + subtxn.getType()); + int type = subtxn.getType(); + if(type == OpCode.error) { + post_failed = true; + } else if(type != OpCode.create && type != OpCode.delete + && type != OpCode.setData && type != OpCode.check) { + throw new IOException("Invalid type of op: " + type); } - assert(record != null); - - ByteBufferInputStream.byteBuffer2Record(bb, record); + Record record = SerializeUtils.getRecordForType(type); + ByteBufferInputStream.byteBuffer2Record( + ByteBuffer.wrap(subtxn.getData()), record); if (failed && subtxn.getType() != OpCode.error){ int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() @@ -834,13 +820,11 @@ record = new CheckVersionTxn(); record = new ErrorTxn(ec); } - if (failed) { - assert(subtxn.getType() == OpCode.error) ; - } + TxnHeader subHdr = new TxnHeader( + header.getClientId(), header.getCxid(), + header.getZxid(), header.getTime(), + subtxn.getType()); - TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(), - header.getZxid(), header.getTime(), - subtxn.getType()); ProcessTxnResult subRc = processTxn(subHdr, record); rc.multiResult.add(subRc); if (subRc.err != 0 && rc.err == 0) { diff --git a/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java b/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java index f9647c45871..f57fdb24826 100644 --- a/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java +++ b/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java @@ -35,6 +35,7 @@ import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.DataTree; import org.apache.zookeeper.server.ZooTrace; +import org.apache.zookeeper.txn.CheckVersionTxn; import org.apache.zookeeper.txn.CreateSessionTxn; import org.apache.zookeeper.txn.CreateTxn; import org.apache.zookeeper.txn.CreateTxnV0; @@ -47,7 +48,29 @@ public class SerializeUtils { private static final Logger LOG = LoggerFactory.getLogger(SerializeUtils.class); - + + private static final Map> type2txnRecordClass + = new HashMap>(); + static { + type2txnRecordClass.put(OpCode.createSession, CreateSessionTxn.class); + type2txnRecordClass.put(OpCode.create, CreateTxn.class); + type2txnRecordClass.put(OpCode.delete, DeleteTxn.class); + type2txnRecordClass.put(OpCode.setData, SetDataTxn.class); + type2txnRecordClass.put(OpCode.setACL, SetACLTxn.class); + type2txnRecordClass.put(OpCode.error, ErrorTxn.class); + type2txnRecordClass.put(OpCode.multi, MultiTxn.class); + type2txnRecordClass.put(OpCode.check, CheckVersionTxn.class); + } + + public static Record getRecordForType(int type) throws IOException { + Class clazz = type2txnRecordClass.get(type); + if(clazz == null) throw new IOException("Unsupported Txn with type="+type); + + try { return clazz.newInstance(); } + catch (InstantiationException e) { throw new RuntimeException(e); } + catch (IllegalAccessException e) { throw new RuntimeException(e); } + } + public static Record deserializeTxn(byte txnBytes[], TxnHeader hdr) throws IOException { final ByteArrayInputStream bais = new ByteArrayInputStream(txnBytes); @@ -55,56 +78,27 @@ public static Record deserializeTxn(byte txnBytes[], TxnHeader hdr) hdr.deserialize(ia, "hdr"); bais.mark(bais.available()); - Record txn = null; - switch (hdr.getType()) { - case OpCode.createSession: - // This isn't really an error txn; it just has the same - // format. The error represents the timeout - txn = new CreateSessionTxn(); - break; - case OpCode.closeSession: - return null; - case OpCode.create: - txn = new CreateTxn(); - break; - case OpCode.delete: - txn = new DeleteTxn(); - break; - case OpCode.setData: - txn = new SetDataTxn(); - break; - case OpCode.setACL: - txn = new SetACLTxn(); - break; - case OpCode.error: - txn = new ErrorTxn(); - break; - case OpCode.multi: - txn = new MultiTxn(); - break; - default: - throw new IOException("Unsupported Txn with type=%d" + hdr.getType()); - } - if (txn != null) { - try { - txn.deserialize(ia, "txn"); - } catch(EOFException e) { - // perhaps this is a V0 Create - if (hdr.getType() == OpCode.create) { - CreateTxn create = (CreateTxn)txn; - bais.reset(); - CreateTxnV0 createv0 = new CreateTxnV0(); - createv0.deserialize(ia, "txn"); - // cool now make it V1. a -1 parentCVersion will - // trigger fixup processing in processTxn - create.setPath(createv0.getPath()); - create.setData(createv0.getData()); - create.setAcl(createv0.getAcl()); - create.setEphemeral(createv0.getEphemeral()); - create.setParentCVersion(-1); - } else { - throw e; - } + if(hdr.getType() == OpCode.closeSession) return null; + final Record txn = getRecordForType(hdr.getType()); + + try { + txn.deserialize(ia, "txn"); + } catch(EOFException e) { + // perhaps this is a V0 Create + if (hdr.getType() == OpCode.create) { + CreateTxn create = (CreateTxn)txn; + bais.reset(); + CreateTxnV0 createv0 = new CreateTxnV0(); + createv0.deserialize(ia, "txn"); + // cool now make it V1. a -1 parentCVersion will + // trigger fixup processing in processTxn + create.setPath(createv0.getPath()); + create.setData(createv0.getData()); + create.setAcl(createv0.getAcl()); + create.setEphemeral(createv0.getEphemeral()); + create.setParentCVersion(-1); + } else { + throw e; } } return txn; diff --git a/src/java/test/org/apache/zookeeper/test/MultiTransactionTest.java b/src/java/test/org/apache/zookeeper/test/MultiTransactionTest.java index e3e020015c2..b55f847d640 100644 --- a/src/java/test/org/apache/zookeeper/test/MultiTransactionTest.java +++ b/src/java/test/org/apache/zookeeper/test/MultiTransactionTest.java @@ -20,6 +20,8 @@ import org.apache.log4j.Logger; import org.apache.zookeeper.*; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZooKeeperServer; @@ -35,6 +37,7 @@ import java.util.ArrayList; import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -223,6 +226,24 @@ public void testGetResults() throws Exception { } } + // This tests resembles the CPP test Zookeeper_multi::testCheck + @Test + public void testAllResultsErrorCodeOK() + throws KeeperException, InterruptedException { + String parent = "/multi0"; + String child = parent + "/a"; + zk.create(parent, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Transaction transaction = zk.transaction(); + transaction.check(parent, 0); + transaction.create(child, new byte[0], Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + List results = transaction.commit(); + assertTrue(zk.exists(child, false) instanceof Stat); + assertEquals(OpCode.check, results.get(0).getType()); + assertEquals(OpCode.create, results.get(1).getType()); + assertEquals(child, ((OpResult.CreateResult)results.get(1)).getPath()); + } + @Test public void testWatchesTriggered() throws KeeperException, InterruptedException { HasTriggeredWatcher watcher = new HasTriggeredWatcher();