From 277b348b086aca958c7f56520213451e82ecdcbb Mon Sep 17 00:00:00 2001 From: Thomas Koch Date: Fri, 4 Nov 2011 08:20:23 +0100 Subject: [PATCH] ZOOKEEPER-1276 ZKDatabase should not hold reference to FileTxnSnapLog Change-Id: I66013e357f935e983ec80b4d72e93a56d7931826 --- .../server/SyncRequestProcessor.java | 6 +-- .../apache/zookeeper/server/ZKDatabase.java | 44 +++---------------- .../zookeeper/server/ZooKeeperServer.java | 14 +++--- .../server/persistence/FileTxnSnapLog.java | 8 ++-- .../zookeeper/server/quorum/Learner.java | 6 +-- .../zookeeper/server/quorum/QuorumPeer.java | 14 +++--- .../server/quorum/QuorumPeerMain.java | 14 +++--- .../zookeeper/server/quorum/LearnerTest.java | 2 +- .../zookeeper/server/quorum/Zab1_0Test.java | 5 +-- .../org/apache/zookeeper/test/ClientBase.java | 13 ++---- .../apache/zookeeper/test/DataTreeTest.java | 2 +- .../apache/zookeeper/test/TruncateTest.java | 28 +++++------- 12 files changed, 52 insertions(+), 104 deletions(-) diff --git a/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java index be287c065c5..74cf9800f88 100644 --- a/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java +++ b/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java @@ -107,12 +107,12 @@ public void run() { } if (si != null) { // track the number of records written to the log - if (zks.getZKDatabase().append(si)) { + if (zks.getTxnLogFactory().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r.nextInt(snapCount/2); // roll the log - zks.getZKDatabase().rollLog(); + zks.getTxnLogFactory().rollLog(); // take a snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping"); @@ -159,7 +159,7 @@ private void flush(LinkedList toFlush) throws IOException { if (toFlush.isEmpty()) return; - zks.getZKDatabase().commit(); + zks.getTxnLogFactory().commit(); while (!toFlush.isEmpty()) { Request i = toFlush.remove(); nextProcessor.processRequest(i); diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java index eda42aae41d..b055ee2da61 100644 --- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java +++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java @@ -65,7 +65,6 @@ public class ZKDatabase { */ protected DataTree dataTree; protected ConcurrentHashMap sessionsWithTimeouts; - protected FileTxnSnapLog snapLog; protected long minCommittedLog, maxCommittedLog; public static final int commitLogCount = 500; protected static int commitLogBuffer = 700; @@ -83,10 +82,9 @@ public class ZKDatabase { * between a filetxnsnaplog and zkdatabase. * @param snapLog the FileTxnSnapLog mapping this zkdatabase */ - public ZKDatabase(FileTxnSnapLog snapLog) { + public ZKDatabase() { dataTree = new DataTree(); sessionsWithTimeouts = new ConcurrentHashMap(); - this.snapLog = snapLog; } /** @@ -203,7 +201,7 @@ public ConcurrentHashMap getSessionWithTimeOuts() { * @return the last valid zxid on disk * @throws IOException */ - public long loadDataBase() throws IOException { + public long loadDataBase(FileTxnSnapLog snapLog) throws IOException { PlayBackListener listener=new PlayBackListener(){ public void onTxnLoaded(TxnHeader hdr,Record txn){ Meta meta = new Meta(0, hdr.getCxid(), hdr.getZxid(), OpCode.fromInt(hdr.getType())); @@ -343,10 +341,10 @@ public int getAclSize() { * @return true if the truncate is succesful and false if not * @throws IOException */ - public boolean truncateLog(long zxid) throws IOException { + public boolean truncateLog(FileTxnSnapLog snapLog, long zxid) throws IOException { clear(); - boolean truncated = this.snapLog.truncateLog(zxid); - loadDataBase(); + boolean truncated = snapLog.truncateLog(zxid); + loadDataBase(snapLog); return truncated; } @@ -372,38 +370,6 @@ public void serializeSnapshot(OutputArchive oa) throws IOException, SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts()); } - /** - * append to the underlying transaction log - * @param si the request to append - * @return true if the append was succesfull and false if not - */ - public boolean append(Request si) throws IOException { - return this.snapLog.append(si); - } - - /** - * roll the underlying log - */ - public void rollLog() throws IOException { - this.snapLog.rollLog(); - } - - /** - * commit to the underlying transaction log - * @throws IOException - */ - public void commit() throws IOException { - this.snapLog.commit(); - } - - /** - * close this database. free the resources - * @throws IOException - */ - public void close() throws IOException { - this.snapLog.close(); - } - public void processTxn(TxnHeader hdr, Record txn) { try { Transaction transaction = Transaction.fromTxn(hdr, txn); diff --git a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java index b2cb8cdedf9..a2d42b51127 100644 --- a/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java @@ -160,7 +160,7 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, * @throws IOException */ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime) throws IOException { - this(txnLogFactory, tickTime, -1, -1, new ZKDatabase(txnLogFactory)); + this(txnLogFactory, tickTime, -1, -1, new ZKDatabase()); } public ServerStats serverStats() { @@ -171,9 +171,9 @@ public void dumpConf(PrintWriter pwriter) { pwriter.print("clientPort="); pwriter.println(getClientPort()); pwriter.print("dataDir="); - pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath()); + pwriter.println(txnLogFactory.getSnapDir().getAbsolutePath()); pwriter.print("dataLogDir="); - pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath()); + pwriter.println(txnLogFactory.getDataDir().getAbsolutePath()); pwriter.print("tickTime="); pwriter.println(getTickTime()); pwriter.print("maxClientCnxns="); @@ -206,7 +206,7 @@ public ZooKeeperServer(File snapDir, File logDir, int tickTime) public ZooKeeperServer(FileTxnSnapLog txnLogFactory) throws IOException { - this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, new ZKDatabase(txnLogFactory)); + this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, new ZKDatabase()); } /** @@ -229,7 +229,7 @@ public void setZKDatabase(ZKDatabase zkDb) { * Restore sessions and data */ public void loadData() throws IOException, InterruptedException { - setZxid(zkDb.loadDataBase()); + setZxid(zkDb.loadDataBase(txnLogFactory)); // Clean up dead sessions LinkedList deadSessions = new LinkedList(); for (Long session : zkDb.getSessions()) { @@ -331,7 +331,7 @@ public void startdata() throws IOException, InterruptedException { //check to see if zkDb is not null if (zkDb == null) { - zkDb = new ZKDatabase(this.txnLogFactory); + zkDb = new ZKDatabase(); } if (!zkDb.isInitialized()) { loadData(); @@ -660,7 +660,7 @@ public long getOutstandingRequests() { * @throws IOException */ public void truncateLog(long zxid) throws IOException { - this.zkDb.truncateLog(zxid); + this.zkDb.truncateLog(txnLogFactory, zxid); } public int getTickTime() { diff --git a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java index 92c2c3d0d97..a1ba53b82f1 100644 --- a/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java +++ b/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java @@ -48,12 +48,12 @@ public class FileTxnSnapLog { //the direcotry containing the //the transaction logs - File dataDir; + private final File dataDir; //the directory containing the //the snapshot directory - File snapDir; - FileTxnLog txnLog; - FileSnap snapLog; + private final File snapDir; + private final FileTxnLog txnLog; + private final FileSnap snapLog; public final static int VERSION = 2; public final static String version = "version-"; diff --git a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java index 77ed729b0c2..e8bce8fca7a 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/Learner.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/Learner.java @@ -38,8 +38,6 @@ import org.apache.jute.InputArchive; import org.apache.jute.OutputArchive; import org.apache.jute.Record; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.common.AccessControlList.Identifier; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerCnxn; @@ -47,6 +45,8 @@ import org.apache.zookeeper.server.util.SerializeUtils; import org.apache.zookeeper.server.util.ZxidUtils; import org.apache.zookeeper.txn.TxnHeader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class is the superclass of two of the three main actors in a ZK @@ -322,7 +322,7 @@ else if (qp.getType() == Leader.SNAP) { //we need to truncate the log to the lastzxid of the leader LOG.warn("Truncating log to get in sync with the leader 0x" + Long.toHexString(qp.getZxid())); - boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid()); + boolean truncated=zk.getZKDatabase().truncateLog(zk.getTxnLogFactory(), qp.getZxid()); if (!truncated) { // not able to truncate the log LOG.error("Not able to truncate the log " diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java index 3bc9316df0c..9d27b65dccf 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -36,8 +36,6 @@ import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.jmx.ZKMBeanInfo; import org.apache.zookeeper.server.ServerCnxnFactory; @@ -47,6 +45,8 @@ import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.util.ZxidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * This class manages the quorum protocol. There are three states this server @@ -394,7 +394,7 @@ public QuorumPeer(Map quorumPeers, File dataDir, this.initLimit = initLimit; this.syncLimit = syncLimit; this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir); - this.zkDb = new ZKDatabase(this.logFactory); + this.zkDb = new ZKDatabase(); if(quorumConfig == null) this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers)); else this.quorumConfig = quorumConfig; @@ -414,7 +414,7 @@ public synchronized void start() { private void loadDataBase() { try { - zkDb.loadDataBase(); + zkDb.loadDataBase(this.logFactory); // load the epochs long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; @@ -670,7 +670,7 @@ public void run() { // Create read-only server but don't start it immediately final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb); - + // Instead of starting roZk immediately, wait some grace // period before we decide we're partitioned. // @@ -710,7 +710,7 @@ public void run() { } catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING); - } + } } break; case OBSERVING: @@ -787,7 +787,7 @@ public void shutdown() { getElectionAlg().shutdown(); } try { - zkDb.close(); + logFactory.close(); } catch (IOException ie) { LOG.warn("Error closing logs ", ie); } diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java index 987b4b74e94..8412385ec53 100644 --- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java +++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java @@ -21,15 +21,15 @@ import javax.management.JMException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.jmx.ManagedUtil; +import org.apache.zookeeper.server.DatadirCleanupManager; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ZKDatabase; -import org.apache.zookeeper.server.DatadirCleanupManager; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @@ -122,13 +122,13 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException { } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } - + LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); - + quorumPeer = new QuorumPeer(); quorumPeer.setClientPortAddress(config.getClientPortAddress()); quorumPeer.setTxnFactory(new FileTxnSnapLog( @@ -144,9 +144,9 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException { quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); quorumPeer.setCnxnFactory(cnxnFactory); - quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); + quorumPeer.setZKDatabase(new ZKDatabase()); quorumPeer.setLearnerType(config.getPeerType()); - + quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { diff --git a/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java b/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java index fbca134bce9..561d7c501f3 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java @@ -45,7 +45,7 @@ static class SimpleLearnerZooKeeperServer extends LearnerZooKeeperServer { Learner learner; public SimpleLearnerZooKeeperServer(FileTxnSnapLog ftsl) throws IOException { - super(ftsl, 2000, 2000, 2000, new ZKDatabase(ftsl), null); + super(ftsl, 2000, 2000, 2000, new ZKDatabase(), null); } @Override diff --git a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java index b949189fc16..ff725763312 100644 --- a/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java +++ b/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java @@ -39,9 +39,6 @@ import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.apache.zookeeper.server.quorum.Leader; -import org.apache.zookeeper.server.quorum.LearnerInfo; -import org.apache.zookeeper.server.quorum.QuorumPacket; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.apache.zookeeper.server.quorum.flexible.QuorumMaj; import org.apache.zookeeper.server.util.ZxidUtils; @@ -281,7 +278,7 @@ private Leader createLeader(File tmpDir, QuorumPeer peer) Field addrField = peer.getClass().getDeclaredField("myQuorumAddr"); addrField.setAccessible(true); addrField.set(peer, new InetSocketAddress(33556)); - ZKDatabase zkDb = new ZKDatabase(logFactory); + ZKDatabase zkDb = new ZKDatabase(); LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb); return new Leader(peer, zk); } diff --git a/src/java/test/org/apache/zookeeper/test/ClientBase.java b/src/java/test/org/apache/zookeeper/test/ClientBase.java index f3dc1ba72f2..49131610285 100644 --- a/src/java/test/org/apache/zookeeper/test/ClientBase.java +++ b/src/java/test/org/apache/zookeeper/test/ClientBase.java @@ -38,8 +38,6 @@ import javax.management.MBeanServerConnection; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.TestableZooKeeper; @@ -50,12 +48,13 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.server.ServerCnxnFactory; import org.apache.zookeeper.server.ServerCnxnFactoryAccessor; -import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnLog; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.sun.management.UnixOperatingSystemMXBean; @@ -357,15 +356,9 @@ static void shutdownServerInstance(ServerCnxnFactory factory, String hostPort) { if (factory != null) { - ZKDatabase zkDb; - { - ZooKeeperServer zs = getServer(factory); - - zkDb = zs.getZKDatabase(); - } factory.shutdown(); try { - zkDb.close(); + getServer(factory).getTxnLogFactory().close(); } catch (IOException ie) { LOG.warn("Error closing logs ", ie); } diff --git a/src/java/test/org/apache/zookeeper/test/DataTreeTest.java b/src/java/test/org/apache/zookeeper/test/DataTreeTest.java index 073bb4027f2..32d5519808a 100644 --- a/src/java/test/org/apache/zookeeper/test/DataTreeTest.java +++ b/src/java/test/org/apache/zookeeper/test/DataTreeTest.java @@ -37,7 +37,7 @@ public class DataTreeTest extends ZKTestCase { @Before public void setUp() throws Exception { - zkdb = new ZKDatabase(null); + zkdb = new ZKDatabase(); } @Test diff --git a/src/java/test/org/apache/zookeeper/test/TruncateTest.java b/src/java/test/org/apache/zookeeper/test/TruncateTest.java index 4800ce54c4d..9a7022f3e46 100644 --- a/src/java/test/org/apache/zookeeper/test/TruncateTest.java +++ b/src/java/test/org/apache/zookeeper/test/TruncateTest.java @@ -23,8 +23,6 @@ import java.net.InetSocketAddress; import java.util.HashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; @@ -34,34 +32,34 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.ServerCnxnFactory; -import org.apache.zookeeper.server.ZKDatabase; -import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TruncateTest extends ZKTestCase { private static final Logger LOG = LoggerFactory.getLogger(TruncateTest.class); File dataDir1, dataDir2, dataDir3; final int baseHostPort = 12233; - + @Before public void setUp() throws IOException { dataDir1 = ClientBase.createTmpDir(); dataDir2 = ClientBase.createTmpDir(); dataDir3 = ClientBase.createTmpDir(); } - + @After public void tearDown() { ClientBase.recursiveDelete(dataDir1); ClientBase.recursiveDelete(dataDir2); ClientBase.recursiveDelete(dataDir3); } - + volatile boolean connected; Watcher nullWatcher = new Watcher() { @Override @@ -69,7 +67,7 @@ public void process(WatchedEvent event) { connected = event.getState() == Watcher.Event.KeeperState.SyncConnected; } }; - + @Test public void testTruncate() throws IOException, InterruptedException, KeeperException { // Prime the server that is going to come in late with 50 txns @@ -79,16 +77,10 @@ public void testTruncate() throws IOException, InterruptedException, KeeperExcep zk.create("/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } zk.close(); - - ZKDatabase zkDb; - { - ZooKeeperServer zs = ClientBase.getServer(factory); - - zkDb = zs.getZKDatabase(); - } + factory.shutdown(); try { - zkDb.close(); + ClientBase.getServer(factory).getTxnLogFactory().close(); } catch (IOException ie) { LOG.warn("Error closing logs ", ie); } @@ -98,7 +90,7 @@ public void testTruncate() throws IOException, InterruptedException, KeeperExcep int port1 = baseHostPort+1; int port2 = baseHostPort+2; int port3 = baseHostPort+3; - + // Start up two of the quorum and add 10 txns HashMap peers = new HashMap(); peers.put(Long.valueOf(1), new QuorumServer(1, new InetSocketAddress("127.0.0.1", port1 + 1000))); @@ -118,7 +110,7 @@ public void testTruncate() throws IOException, InterruptedException, KeeperExcep zk.create("/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } zk.close(); - + final ZooKeeper zk2 = new ZooKeeper("127.0.0.1:" + port2, 15000, nullWatcher); zk2.getData("/9", false, new Stat()); try {