Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
DL-39: Use a distributedlog uri to configure write proxy routing address
Browse files Browse the repository at this point in the history
Author: Jon Derrick <jonathan.derrickk@gmail.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #18 from jderrickk/jd/uri_resolver
  • Loading branch information
jderrickk authored and Sijie Guo committed Sep 7, 2016
1 parent b9d2156 commit 05a8daa
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 15 deletions.
2 changes: 0 additions & 2 deletions distributedlog-benchmark/bin/dbench
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ elif [ $COMMAND == "write" ]; then
--max-rate ${MAX_RATE} \\
--change-rate ${CHANGE_RATE} \\
--change-interval ${CHANGE_RATE_INTERVAL} \\
--finagle-name ${DL_WP_FINAGLE_NAME}
"""
BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_WRITE_ARGS} \\ --mode write \\"
exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@
Expand All @@ -191,7 +190,6 @@ elif [ $COMMAND == "read" ]; then
--readers-per-stream ${NUM_READERS_PER_STREAM} \\
--max-stream-id ${MAX_STREAM_ID} \\
--truncation-interval ${TRUNCATION_INTERVAL} \\
--finagle-name ${DL_WP_FINAGLE_NAME}
"""
BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_READ_ARGS} \\ --mode read \\"
exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@
Expand Down
3 changes: 0 additions & 3 deletions distributedlog-benchmark/conf/dlogenv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,6 @@ CHANGE_RATE=100
# Rate change interval, in seconds
CHANGE_RATE_INTERVAL=300

# DL Write Proxy Finagle Name
DL_WP_FINAGLE_NAME='zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy'

##########
# Reader
##########
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,8 @@ void run() throws Exception {
}

Worker runWriter() {
Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty(),
"either serverset paths or finagle-names required");
Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
"either serverset paths, finagle-names or uri required");
Preconditions.checkArgument(msgSize > 0, "messagesize must be greater than 0");
Preconditions.checkArgument(rate > 0, "rate must be greater than 0");
Preconditions.checkArgument(maxRate >= rate, "max rate must be greater than rate");
Expand All @@ -278,6 +278,7 @@ Worker runWriter() {
new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS);
return createWriteWorker(
streamPrefix,
dlUri,
null == startStreamId ? shardId * numStreams : startStreamId,
null == endStreamId ? (shardId + 1) * numStreams : endStreamId,
rateLimiter,
Expand All @@ -299,6 +300,7 @@ Worker runWriter() {

protected WriterWorker createWriteWorker(
String streamPrefix,
URI uri,
int startStreamId,
int endStreamId,
ShiftableRateLimiter rateLimiter,
Expand All @@ -318,6 +320,7 @@ protected WriterWorker createWriteWorker(
boolean enableBatching) {
return new WriterWorker(
streamPrefix,
uri,
startStreamId,
endStreamId,
rateLimiter,
Expand Down Expand Up @@ -360,8 +363,8 @@ Worker runDLWriter() throws IOException {
}

Worker runReader() throws IOException {
Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty(),
"either serverset paths or finagle-names required");
Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
"either serverset paths, finagle-names or dlUri required");
Preconditions.checkArgument(concurrency > 0, "concurrency must be greater than 0");
Preconditions.checkArgument(truncationInterval > 0, "truncation interval should be greater than 0");
return runReaderInternal(serversetPaths, finagleNames, truncationInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public ReaderWorker(DistributedLogConfiguration conf,

builder = builder.finagleNameStrs(local, remotes);
LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames);
} else {
} else if (serverSets.length != 0){
ServerSet local = this.serverSets[0].getServerSet();
ServerSet[] remotes = new ServerSet[this.serverSets.length - 1];
for (int i = 1; i < serverSets.length; i++) {
Expand All @@ -263,6 +263,9 @@ public ReaderWorker(DistributedLogConfiguration conf,

builder = builder.serverSets(local, remotes);
LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths);
} else {
builder = builder.uri(uri);
LOG.info("Initialized distributedlog client for namespace {}", uri);
}
dlc = builder.build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class WriterWorker implements Worker {
final int hostConnectionLimit;
final ExecutorService executorService;
final ShiftableRateLimiter rateLimiter;
final URI dlUri;
final DLZkServerSet[] serverSets;
final List<String> finagleNames;
final Random random;
Expand All @@ -86,6 +87,7 @@ public class WriterWorker implements Worker {
final StatsLogger dlErrorCodeLogger;

public WriterWorker(String streamPrefix,
URI uri,
int startStreamId,
int endStreamId,
ShiftableRateLimiter rateLimiter,
Expand All @@ -106,6 +108,7 @@ public WriterWorker(String streamPrefix,
Preconditions.checkArgument(startStreamId <= endStreamId);
Preconditions.checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
this.streamPrefix = streamPrefix;
this.dlUri = uri;
this.startStreamId = startStreamId;
this.endStreamId = endStreamId;
this.rateLimiter = rateLimiter;
Expand Down Expand Up @@ -184,19 +187,21 @@ private DistributedLogClient buildDlogClient() {
.handshakeTracing(true)
.name("writer");

if (serverSets.length == 0) {
if (!finagleNames.isEmpty()) {
String local = finagleNames.get(0);
String[] remotes = new String[finagleNames.size() - 1];
finagleNames.subList(1, finagleNames.size()).toArray(remotes);

builder = builder.finagleNameStrs(local, remotes);
} else {
} else if (serverSets.length != 0){
ServerSet local = serverSets[0].getServerSet();
ServerSet[] remotes = new ServerSet[serverSets.length - 1];
for (int i = 1; i < serverSets.length; i++) {
remotes[i-1] = serverSets[i].getServerSet();
}
builder = builder.serverSets(local, remotes);
} else {
builder = builder.uri(dlUri);
}

return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@
import com.twitter.finagle.stats.NullStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId;
import org.apache.commons.lang.StringUtils;

import java.net.SocketAddress;
import java.net.URI;
import java.util.Random;

public final class DistributedLogClientBuilder {

private static final Random random = new Random(System.currentTimeMillis());

private String _name = null;
private ClientId _clientId = null;
private RoutingService.Builder _routingServiceBuilder = null;
Expand Down Expand Up @@ -170,6 +175,29 @@ public DistributedLogClientBuilder finagleNameStrs(String local, String...remote
return newBuilder;
}

/**
* URI to access proxy services. Assuming the write proxies are announced under `.write_proxy` of
* the provided namespace uri.
* <p>
* The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to
* zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`)
*
* @param uri namespace uri to access the serverset of write proxies
* @return distributedlog builder
*/
public DistributedLogClientBuilder uri(URI uri) {
DistributedLogClientBuilder newBuilder = newBuilder(this);
String zkServers = uri.getAuthority().replace(";", ",");
String[] zkServerList = StringUtils.split(zkServers, ',');
String finagleNameStr = String.format(
"zk!%s!%s/.write_proxy",
zkServerList[random.nextInt(zkServerList.length)], // zk server
uri.getPath());
newBuilder._routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
newBuilder._enableRegionStats = false;
return newBuilder;
}

/**
* Address of write proxy to connect.
*
Expand Down
4 changes: 1 addition & 3 deletions docs/operations/deployment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,7 @@ Write Proxy Naming
++++++++++++++++++

The `dlog-daemon.sh` script starts the write proxy by announcing it to the `.write_proxy` path under
the dl namespace. So you could use `zk!<zkservers>!/<namespace_path>/.write_proxy` as the finagle name
to access the write proxy cluster. It is `zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy`
in the above example.
the dl namespace. So you could use uri in the distributedlog client builder to access the write proxy cluster.

Verify the setup
++++++++++++++++
Expand Down

0 comments on commit 05a8daa

Please sign in to comment.