Skip to content

Commit

Permalink
Run store/schema verification in parallel to reduce BnP job running
Browse files Browse the repository at this point in the history
time.
  • Loading branch information
gaojieliu committed Jul 15, 2016
1 parent 7615628 commit 3c864ee
Showing 1 changed file with 26 additions and 2 deletions.
Expand Up @@ -141,8 +141,10 @@ public class VoldemortBuildAndPushJob extends AbstractJob {
public final static String MIN_NUMBER_OF_RECORDS = "min.number.of.records";
public final static String REDUCER_OUTPUT_COMPRESS_CODEC = "reducer.output.compress.codec";
public final static String REDUCER_OUTPUT_COMPRESS = "reducer.output.compress";
public final static String STORE_VERIFICATION_MAX_THREAD_NUM = "store.verification.max.thread.num";
// default
private final static String RECOMMENDED_FETCHER_PROTOCOL = "webhdfs";
private final int DEFAULT_THREAD_NUM_FOR_STORE_VERIFICATION = 20;

// CONFIG VALUES (and other immutable state)
private final Props props;
Expand All @@ -168,6 +170,9 @@ public class VoldemortBuildAndPushJob extends AbstractJob {
private final List<Closeable> closeables = Lists.newArrayList();
private final ExecutorService executorService;
private final boolean enableStoreCreation;
// Executor to do store/schema verification in parallel
private final int maxThreadNumForStoreVerification;
private ExecutorService storeVerificationExecutorService;

// Mutable state
private StoreDefinition storeDef;
Expand All @@ -177,7 +182,7 @@ public class VoldemortBuildAndPushJob extends AbstractJob {
private Future heartBeatHookFuture = null;
private Map<String, VAdminProto.GetHighAvailabilitySettingsResponse> haSettingsPerCluster;
private boolean buildPrimaryReplicasOnly;

private AdminClient createAdminClient(String url, boolean fetchAllStoresXml) {
ClientConfig config = new ClientConfig().setBootstrapUrls(url)
.setConnectionTimeout(15,TimeUnit.SECONDS)
Expand Down Expand Up @@ -309,6 +314,16 @@ public VoldemortBuildAndPushJob(String name, azkaban.utils.Props azkabanProps) {
}
this.executorService = Executors.newFixedThreadPool(requiredNumberOfThreads);

this.maxThreadNumForStoreVerification = props.getInt(STORE_VERIFICATION_MAX_THREAD_NUM,
DEFAULT_THREAD_NUM_FOR_STORE_VERIFICATION);
// Specifying value <= 1 for prop: STORE_VERIFICATION_MAX_THREAD_NUM will enable sequential store verification.
if (this.maxThreadNumForStoreVerification > 1) {
this.storeVerificationExecutorService = Executors.newFixedThreadPool(this.maxThreadNumForStoreVerification);
log.info("Build and Push Job will run store verification in parallel, thread num: " + this.maxThreadNumForStoreVerification);
} else {
log.info("Build and Push Job will run store verification sequentially.");
}

log.info("Build and Push Job constructed for " + numberOfClusters + " cluster(s).");
}

Expand Down Expand Up @@ -602,6 +617,11 @@ public void run() throws Exception {
tasks.put(url, executorService.submit(new StorePushTask(props, url, buildOutputDir)));
}
}
// We can safely shutdown storeVerificationExecutorService here since all the verifications are done.
if (null != storeVerificationExecutorService) {
storeVerificationExecutorService.shutdownNow();
storeVerificationExecutorService = null;
}

for (Map.Entry<String, Future<Boolean>> task: tasks.entrySet()) {
String url = task.getKey();
Expand Down Expand Up @@ -674,6 +694,9 @@ private void cleanUp() {
}
}
this.executorService.shutdownNow();
if (null != this.storeVerificationExecutorService) {
this.storeVerificationExecutorService.shutdownNow();
}
}

public String runBuildStore(Props props, String url) throws Exception {
Expand Down Expand Up @@ -949,7 +972,8 @@ private void verifyOrAddStore(String clusterURL,
StoreDefinition newStoreDef = VoldemortUtils.getStoreDef(newStoreDefXml);

try {
adminClientPerCluster.get(clusterURL).storeMgmtOps.verifyOrAddStore(newStoreDef, "BnP config/data", enableStoreCreation);
adminClientPerCluster.get(clusterURL).storeMgmtOps.verifyOrAddStore(newStoreDef, "BnP config/data",
enableStoreCreation, this.storeVerificationExecutorService);
} catch (UnreachableStoreException e) {
log.info("verifyOrAddStore() failed on some nodes for clusterURL: " + clusterURL + " (this is harmless).", e);
// When we can't reach some node, we just skip it and won't create the store on it.
Expand Down

0 comments on commit 3c864ee

Please sign in to comment.