Skip to content

Commit

Permalink
ZOOKEEPER-1276 ZKDatabase should not hold reference to FileTxnSnapLog
Browse files Browse the repository at this point in the history
Change-Id: I66013e357f935e983ec80b4d72e93a56d7931826
  • Loading branch information
thkoch2001 committed Nov 4, 2011
1 parent e80fd33 commit 277b348
Show file tree
Hide file tree
Showing 12 changed files with 52 additions and 104 deletions.
Expand Up @@ -107,12 +107,12 @@ public void run() {
}
if (si != null) {
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
if (zks.getTxnLogFactory().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
zks.getZKDatabase().rollLog();
zks.getTxnLogFactory().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
Expand Down Expand Up @@ -159,7 +159,7 @@ private void flush(LinkedList<Request> toFlush) throws IOException {
if (toFlush.isEmpty())
return;

zks.getZKDatabase().commit();
zks.getTxnLogFactory().commit();
while (!toFlush.isEmpty()) {
Request i = toFlush.remove();
nextProcessor.processRequest(i);
Expand Down
44 changes: 5 additions & 39 deletions src/java/main/org/apache/zookeeper/server/ZKDatabase.java
Expand Up @@ -65,7 +65,6 @@ public class ZKDatabase {
*/
protected DataTree dataTree;
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
protected FileTxnSnapLog snapLog;
protected long minCommittedLog, maxCommittedLog;
public static final int commitLogCount = 500;
protected static int commitLogBuffer = 700;
Expand All @@ -83,10 +82,9 @@ public class ZKDatabase {
* between a filetxnsnaplog and zkdatabase.
* @param snapLog the FileTxnSnapLog mapping this zkdatabase
*/
public ZKDatabase(FileTxnSnapLog snapLog) {
public ZKDatabase() {
dataTree = new DataTree();
sessionsWithTimeouts = new ConcurrentHashMap<Long, Integer>();
this.snapLog = snapLog;
}

/**
Expand Down Expand Up @@ -203,7 +201,7 @@ public ConcurrentHashMap<Long, Integer> getSessionWithTimeOuts() {
* @return the last valid zxid on disk
* @throws IOException
*/
public long loadDataBase() throws IOException {
public long loadDataBase(FileTxnSnapLog snapLog) throws IOException {
PlayBackListener listener=new PlayBackListener(){
public void onTxnLoaded(TxnHeader hdr,Record txn){
Meta meta = new Meta(0, hdr.getCxid(), hdr.getZxid(), OpCode.fromInt(hdr.getType()));
Expand Down Expand Up @@ -343,10 +341,10 @@ public int getAclSize() {
* @return true if the truncate is succesful and false if not
* @throws IOException
*/
public boolean truncateLog(long zxid) throws IOException {
public boolean truncateLog(FileTxnSnapLog snapLog, long zxid) throws IOException {
clear();
boolean truncated = this.snapLog.truncateLog(zxid);
loadDataBase();
boolean truncated = snapLog.truncateLog(zxid);
loadDataBase(snapLog);
return truncated;
}

Expand All @@ -372,38 +370,6 @@ public void serializeSnapshot(OutputArchive oa) throws IOException,
SerializeUtils.serializeSnapshot(getDataTree(), oa, getSessionWithTimeOuts());
}

/**
* append to the underlying transaction log
* @param si the request to append
* @return true if the append was succesfull and false if not
*/
public boolean append(Request si) throws IOException {
return this.snapLog.append(si);
}

/**
* roll the underlying log
*/
public void rollLog() throws IOException {
this.snapLog.rollLog();
}

/**
* commit to the underlying transaction log
* @throws IOException
*/
public void commit() throws IOException {
this.snapLog.commit();
}

/**
* close this database. free the resources
* @throws IOException
*/
public void close() throws IOException {
this.snapLog.close();
}

public void processTxn(TxnHeader hdr, Record txn) {
try {
Transaction transaction = Transaction.fromTxn(hdr, txn);
Expand Down
14 changes: 7 additions & 7 deletions src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Expand Up @@ -160,7 +160,7 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
* @throws IOException
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime) throws IOException {
this(txnLogFactory, tickTime, -1, -1, new ZKDatabase(txnLogFactory));
this(txnLogFactory, tickTime, -1, -1, new ZKDatabase());
}

public ServerStats serverStats() {
Expand All @@ -171,9 +171,9 @@ public void dumpConf(PrintWriter pwriter) {
pwriter.print("clientPort=");
pwriter.println(getClientPort());
pwriter.print("dataDir=");
pwriter.println(zkDb.snapLog.getSnapDir().getAbsolutePath());
pwriter.println(txnLogFactory.getSnapDir().getAbsolutePath());
pwriter.print("dataLogDir=");
pwriter.println(zkDb.snapLog.getDataDir().getAbsolutePath());
pwriter.println(txnLogFactory.getDataDir().getAbsolutePath());
pwriter.print("tickTime=");
pwriter.println(getTickTime());
pwriter.print("maxClientCnxns=");
Expand Down Expand Up @@ -206,7 +206,7 @@ public ZooKeeperServer(File snapDir, File logDir, int tickTime)
public ZooKeeperServer(FileTxnSnapLog txnLogFactory)
throws IOException
{
this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, new ZKDatabase(txnLogFactory));
this(txnLogFactory, DEFAULT_TICK_TIME, -1, -1, new ZKDatabase());
}

/**
Expand All @@ -229,7 +229,7 @@ public void setZKDatabase(ZKDatabase zkDb) {
* Restore sessions and data
*/
public void loadData() throws IOException, InterruptedException {
setZxid(zkDb.loadDataBase());
setZxid(zkDb.loadDataBase(txnLogFactory));
// Clean up dead sessions
LinkedList<Long> deadSessions = new LinkedList<Long>();
for (Long session : zkDb.getSessions()) {
Expand Down Expand Up @@ -331,7 +331,7 @@ public void startdata()
throws IOException, InterruptedException {
//check to see if zkDb is not null
if (zkDb == null) {
zkDb = new ZKDatabase(this.txnLogFactory);
zkDb = new ZKDatabase();
}
if (!zkDb.isInitialized()) {
loadData();
Expand Down Expand Up @@ -660,7 +660,7 @@ public long getOutstandingRequests() {
* @throws IOException
*/
public void truncateLog(long zxid) throws IOException {
this.zkDb.truncateLog(zxid);
this.zkDb.truncateLog(txnLogFactory, zxid);
}

public int getTickTime() {
Expand Down
Expand Up @@ -48,12 +48,12 @@
public class FileTxnSnapLog {
//the direcotry containing the
//the transaction logs
File dataDir;
private final File dataDir;
//the directory containing the
//the snapshot directory
File snapDir;
FileTxnLog txnLog;
FileSnap snapLog;
private final File snapDir;
private final FileTxnLog txnLog;
private final FileSnap snapLog;
public final static int VERSION = 2;
public final static String version = "version-";

Expand Down
6 changes: 3 additions & 3 deletions src/java/main/org/apache/zookeeper/server/quorum/Learner.java
Expand Up @@ -38,15 +38,15 @@
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.common.AccessControlList.Identifier;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is the superclass of two of the three main actors in a ZK
Expand Down Expand Up @@ -322,7 +322,7 @@ else if (qp.getType() == Leader.SNAP) {
//we need to truncate the log to the lastzxid of the leader
LOG.warn("Truncating log to get in sync with the leader 0x"
+ Long.toHexString(qp.getZxid()));
boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
boolean truncated=zk.getZKDatabase().truncateLog(zk.getTxnLogFactory(), qp.getZxid());
if (!truncated) {
// not able to truncate the log
LOG.error("Not able to truncate the log "
Expand Down
14 changes: 7 additions & 7 deletions src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
Expand Up @@ -36,8 +36,6 @@
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.server.ServerCnxnFactory;
Expand All @@ -47,6 +45,8 @@
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class manages the quorum protocol. There are three states this server
Expand Down Expand Up @@ -394,7 +394,7 @@ public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
this.initLimit = initLimit;
this.syncLimit = syncLimit;
this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
this.zkDb = new ZKDatabase(this.logFactory);
this.zkDb = new ZKDatabase();
if(quorumConfig == null)
this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
else this.quorumConfig = quorumConfig;
Expand All @@ -414,7 +414,7 @@ public synchronized void start() {

private void loadDataBase() {
try {
zkDb.loadDataBase();
zkDb.loadDataBase(this.logFactory);

// load the epochs
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
Expand Down Expand Up @@ -670,7 +670,7 @@ public void run() {
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk =
new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
Expand Down Expand Up @@ -710,7 +710,7 @@ public void run() {
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
}
break;
case OBSERVING:
Expand Down Expand Up @@ -787,7 +787,7 @@ public void shutdown() {
getElectionAlg().shutdown();
}
try {
zkDb.close();
logFactory.close();
} catch (IOException ie) {
LOG.warn("Error closing logs ", ie);
}
Expand Down
Expand Up @@ -21,15 +21,15 @@

import javax.management.JMException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.jmx.ManagedUtil;
import org.apache.zookeeper.server.DatadirCleanupManager;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.DatadirCleanupManager;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
Expand Down Expand Up @@ -122,13 +122,13 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException {
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}

LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());

quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
Expand All @@ -144,9 +144,9 @@ public void runFromConfig(QuorumPeerConfig config) throws IOException {
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setZKDatabase(new ZKDatabase());
quorumPeer.setLearnerType(config.getPeerType());

quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
Expand Down
Expand Up @@ -45,7 +45,7 @@ static class SimpleLearnerZooKeeperServer extends LearnerZooKeeperServer {
Learner learner;

public SimpleLearnerZooKeeperServer(FileTxnSnapLog ftsl) throws IOException {
super(ftsl, 2000, 2000, 2000, new ZKDatabase(ftsl), null);
super(ftsl, 2000, 2000, 2000, new ZKDatabase(), null);
}

@Override
Expand Down
Expand Up @@ -39,9 +39,6 @@
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LearnerInfo;
import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.util.ZxidUtils;
Expand Down Expand Up @@ -281,7 +278,7 @@ private Leader createLeader(File tmpDir, QuorumPeer peer)
Field addrField = peer.getClass().getDeclaredField("myQuorumAddr");
addrField.setAccessible(true);
addrField.set(peer, new InetSocketAddress(33556));
ZKDatabase zkDb = new ZKDatabase(logFactory);
ZKDatabase zkDb = new ZKDatabase();
LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb);
return new Leader(peer, zk);
}
Expand Down
13 changes: 3 additions & 10 deletions src/java/test/org/apache/zookeeper/test/ClientBase.java
Expand Up @@ -38,8 +38,6 @@

import javax.management.MBeanServerConnection;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.TestableZooKeeper;
Expand All @@ -50,12 +48,13 @@
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactoryAccessor;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.management.UnixOperatingSystemMXBean;

Expand Down Expand Up @@ -357,15 +356,9 @@ static void shutdownServerInstance(ServerCnxnFactory factory,
String hostPort)
{
if (factory != null) {
ZKDatabase zkDb;
{
ZooKeeperServer zs = getServer(factory);

zkDb = zs.getZKDatabase();
}
factory.shutdown();
try {
zkDb.close();
getServer(factory).getTxnLogFactory().close();
} catch (IOException ie) {
LOG.warn("Error closing logs ", ie);
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/test/org/apache/zookeeper/test/DataTreeTest.java
Expand Up @@ -37,7 +37,7 @@ public class DataTreeTest extends ZKTestCase {

@Before
public void setUp() throws Exception {
zkdb = new ZKDatabase(null);
zkdb = new ZKDatabase();
}

@Test
Expand Down

0 comments on commit 277b348

Please sign in to comment.