Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
DL-31: Provide flag to disable zk based distributed lock
Browse files Browse the repository at this point in the history
DL doesn't enforce any leader election. However it still provides a zookeeper ephemeral znode based lock for leader election. It is unnecessary if applications use core library directly already have its own leader election mechanism.

This change is to provide a flag to allow disable the zk based lock.

Author: Sijie Guo <sijieg@twitter.com>

Reviewers: Leigh Stewart <lstewart@apache.org>

Closes #9 from sijie/sijie/flag_to_disable_lock
  • Loading branch information
Sijie Guo committed Aug 23, 2016
1 parent b23291a commit 89613fb
Show file tree
Hide file tree
Showing 16 changed files with 754 additions and 620 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.lock.NopDistributedLock;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.ZKDistributedLock;
import com.twitter.distributedlog.lock.ZKSessionLockFactory;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.metadata.BKDLConfig;
Expand Down Expand Up @@ -100,9 +102,9 @@
* scope `writer_future_pool`. See {@link MonitoredFuturePool} for detail stats.
* <li> `reader_future_pool/*`: metrics about the future pools that used by readers are exposed under
* scope `reader_future_pool`. See {@link MonitoredFuturePool} for detail stats.
* <li> `lock/*`: metrics about the locks used by writers. See {@link DistributedLock} for detail
* <li> `lock/*`: metrics about the locks used by writers. See {@link ZKDistributedLock} for detail
* stats.
* <li> `read_lock/*`: metrics about the locks used by readers. See {@link DistributedLock} for
* <li> `read_lock/*`: metrics about the locks used by readers. See {@link ZKDistributedLock} for
* detail stats.
* <li> `logsegments/*`: metrics about basic operations on log segments. See {@link BKLogHandler} for details.
* <li> `segments/*`: metrics about write operations on log segments. See {@link BKLogWriteHandler} for details.
Expand Down Expand Up @@ -604,12 +606,17 @@ private void createWriteHandler(ZKLogMetadataForWriter logMetadata,
final Promise<BKLogWriteHandler> createPromise) {
OrderedScheduler lockStateExecutor = getLockStateExecutor(true);
// Build the locks
DistributedLock lock = new DistributedLock(
lockStateExecutor,
getLockFactory(true),
logMetadata.getLockPath(),
conf.getLockTimeoutMilliSeconds(),
statsLogger);
DistributedLock lock;
if (conf.isWriteLockEnabled()) {
lock = new ZKDistributedLock(
lockStateExecutor,
getLockFactory(true),
logMetadata.getLockPath(),
conf.getLockTimeoutMilliSeconds(),
statsLogger);
} else {
lock = NopDistributedLock.INSTANCE;
}
// Build the ledger allocator
LedgerAllocator allocator;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.injector.AsyncFailureInjector;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.ZKDistributedLock;
import com.twitter.distributedlog.lock.ZKSessionLockFactory;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
Expand All @@ -39,7 +41,6 @@
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.ExceptionalFunction;
import com.twitter.util.ExceptionalFunction0;
Expand Down Expand Up @@ -98,7 +99,7 @@
* becoming idle.
* </ul>
* <h4>Read Lock</h4>
* All read lock related stats are exposed under scope `read_lock`. See {@link DistributedLock}
* All read lock related stats are exposed under scope `read_lock`. See {@link ZKDistributedLock}
* for detail stats.
*/
class BKLogReadHandler extends BKLogHandler {
Expand Down Expand Up @@ -216,7 +217,7 @@ synchronized Future<Void> lockStream() {
public DistributedLock applyE() throws IOException {
// Unfortunately this has a blocking call which we should not execute on the
// ZK completion thread
BKLogReadHandler.this.readLock = new DistributedLock(
BKLogReadHandler.this.readLock = new ZKDistributedLock(
lockStateExecutor,
lockFactory,
readLockPath,
Expand Down Expand Up @@ -247,7 +248,7 @@ public Future<Void> applyE(DistributedLock lock) throws IOException {
* executor service thread.
*/
Future<Void> acquireLockOnExecutorThread(DistributedLock lock) throws LockingException {
final Future<DistributedLock> acquireFuture = lock.asyncAcquire();
final Future<? extends DistributedLock> acquireFuture = lock.asyncAcquire();

// The future we return must be satisfied on an executor service thread. If we simply
// return the future returned by asyncAcquire, user callbacks may end up running in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class BKLogWriteHandler extends BKLogHandler {
protected final int regionId;
protected volatile boolean closed = false;
protected final RollingPolicy rollingPolicy;
protected Future<DistributedLock> lockFuture = null;
protected Future<? extends DistributedLock> lockFuture = null;
protected final PermitLimiter writeLimiter;
protected final FeatureProvider featureProvider;
protected final DynamicDistributedLogConfiguration dynConf;
Expand Down Expand Up @@ -337,7 +337,7 @@ protected void abortOpResult(Throwable t, OpResult opResult) {
*
* @return future represents the lock result
*/
Future<DistributedLock> lockHandler() {
Future<? extends DistributedLock> lockHandler() {
if (null != lockFuture) {
return lockFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
public static final int BKDL_LOGSEGMENT_ROLLING_CONCURRENCY_DEFAULT = 1;

// Lock Settings
public static final String BKDL_WRITE_LOCK_ENABLED = "writeLockEnabled";
public static final boolean BKDL_WRITE_LOCK_ENABLED_DEFAULT = true;
public static final String BKDL_LOCK_TIMEOUT = "lockTimeoutSeconds";
public static final long BKDL_LOCK_TIMEOUT_DEFAULT = 30;
public static final String BKDL_LOCK_REACQUIRE_TIMEOUT = "lockReacquireTimeoutSeconds";
Expand Down Expand Up @@ -2038,6 +2040,30 @@ public DistributedLogConfiguration setLogSegmentRollingConcurrency(int concurren
// Lock Settings
//

/**
* Is lock enabled when opening a writer to write a stream?
* <p> We don't generally require a lock to write a stream to guarantee correctness. The lock
* is more on tracking ownerships. The built-in fencing mechanism is used guarantee correctness
* during stream owner failover. It is okay to disable lock if your application knows which nodes
* have to write which streams.
*
* @return true if lock is enabled, otherwise false.
*/
public boolean isWriteLockEnabled() {
return this.getBoolean(BKDL_WRITE_LOCK_ENABLED, BKDL_WRITE_LOCK_ENABLED_DEFAULT);
}

/**
* Enable lock for opening a writer to write a stream?
*
* @param enabled flag to enable or disable lock for opening a writer to write a stream.
* @return distributedlog configuration.
*/
public DistributedLogConfiguration setWriteLockEnabled(boolean enabled) {
setProperty(BKDL_WRITE_LOCK_ENABLED, enabled);
return this;
}

/**
* Get lock timeout in milliseconds. The default value is 30.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
Expand Down Expand Up @@ -118,40 +116,19 @@ public LocalDLMEmulator build() throws Exception {
conf = (ServerConfiguration) DEFAULT_SERVER_CONFIGURATION.clone();
conf.setZkTimeout(zkTimeoutSec * 1000);
}
ServerConfiguration newConf = new ServerConfiguration();
newConf.loadConf(conf);
newConf.setAllowLoopback(true);

return new LocalDLMEmulator(numBookies, shouldStartZK, zkHost, zkPort,
initialBookiePort, zkTimeoutSec, conf);
initialBookiePort, zkTimeoutSec, newConf);
}
}

public static Builder newBuilder() {
return new Builder();
}

public LocalDLMEmulator(final int numBookies) throws Exception {
this(numBookies, true, DEFAULT_ZK_HOST, DEFAULT_ZK_PORT, DEFAULT_BOOKIE_INITIAL_PORT);
}

public LocalDLMEmulator(final int numBookies, final String zkHost, final int zkPort) throws Exception {
this(numBookies, false, zkHost, zkPort, DEFAULT_BOOKIE_INITIAL_PORT);
}

public LocalDLMEmulator(final int numBookies, final String zkHost, final int zkPort, final ServerConfiguration serverConf) throws Exception {
this(numBookies, false, zkHost, zkPort, DEFAULT_BOOKIE_INITIAL_PORT, DEFAULT_ZK_TIMEOUT_SEC, serverConf);
}

public LocalDLMEmulator(final int numBookies, final int initialBookiePort) throws Exception {
this(numBookies, true, DEFAULT_ZK_HOST, DEFAULT_ZK_PORT, initialBookiePort);
}

public LocalDLMEmulator(final int numBookies, final String zkHost, final int zkPort, final int initialBookiePort) throws Exception {
this(numBookies, false, zkHost, zkPort, initialBookiePort);
}

private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort) throws Exception {
this(numBookies, shouldStartZK, zkHost, zkPort, initialBookiePort, DEFAULT_ZK_TIMEOUT_SEC, new ServerConfiguration());
}

private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, final String zkHost, final int zkPort, final int initialBookiePort, final int zkTimeoutSec, final ServerConfiguration serverConf) throws Exception {
this.numBookies = numBookies;
this.zkHost = zkHost;
Expand All @@ -162,7 +139,9 @@ private LocalDLMEmulator(final int numBookies, final boolean shouldStartZK, fina
this.bkStartupThread = new Thread() {
public void run() {
try {
LOG.info("Starting {} bookies : allowLoopback = {}", numBookies, serverConf.getAllowLoopback());
LocalBookKeeper.startLocalBookies(zkHost, zkPort, numBookies, shouldStartZK, initialBookiePort, serverConf);
LOG.info("{} bookies are started.");
} catch (InterruptedException e) {
// go away quietly
} catch (Exception e) {
Expand Down Expand Up @@ -205,6 +184,7 @@ public BookieServer newBookie() throws Exception {
ServerConfiguration bookieConf = new ServerConfiguration();
bookieConf.setZkTimeout(zkTimeoutSec * 1000);
bookieConf.setBookiePort(0);
bookieConf.setAllowLoopback(true);
File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_",
"test");
if (!tmpdir.delete()) {
Expand Down

0 comments on commit 89613fb

Please sign in to comment.