Skip to content

Commit

Permalink
Http join converter and broker fixes (#1963)
Browse files Browse the repository at this point in the history
* Fix minor bugs http join converter.

* Fix minor bugs http join converter and broker.

* Fix broker bug.
  • Loading branch information
ibuenros committed Jun 20, 2017
1 parent f101369 commit 5997f07
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
Expand Up @@ -104,10 +104,14 @@ private static String createLimiterKey(Config config) {
try {
String urlTemplate = config.getString(HttpConstants.URL_TEMPLATE);
URL url = new URL(urlTemplate);
LOG.info("Get limiter key [" + url.getProtocol() + url.getPath() + "]");
return url.getProtocol() + "/" + url.getHost() + "/" + url.getPort();
String key = url.getProtocol() + "/" + url.getHost();
if (url.getPort() > 0) {
key = key + "/" + url.getPort();
}
LOG.info("Get limiter key [" + key + "]");
return key;
} catch (MalformedURLException e) {
throw new IllegalStateException("Cannot get limiter key ");
throw new IllegalStateException("Cannot get limiter key.", e);
}
}
}
}
Expand Up @@ -134,7 +134,8 @@ public JobContext(Properties jobProps, Logger logger, SharedResourcesBroker<Gobb
this.jobSequence = Long.toString(Id.Job.parse(this.jobId).getSequence());
jobProps.setProperty(ConfigurationKeys.JOB_ID_KEY, this.jobId);

this.jobBroker = instanceBroker.newSubscopedBuilder(new JobScopeInstance(this.jobName, this.jobId)).build();
this.jobBroker = instanceBroker.newSubscopedBuilder(new JobScopeInstance(this.jobName, this.jobId))
.withOverridingConfig(ConfigUtils.propertiesToConfig(jobProps)).build();
this.jobCommitPolicy = JobCommitPolicy.getCommitPolicy(jobProps);

this.datasetStateStore = createStateStore(ConfigUtils.propertiesToConfig(jobProps));
Expand Down
Expand Up @@ -120,8 +120,9 @@ public <K extends SharedResourceKey> KeyedScopedConfigViewImpl<S, K> getConfigVi
if (scopedConfig.getScopeType().equals(scopedConfig.getScopeType().rootScope())) {
config = ConfigUtils.getConfigOrEmpty(scopedConfig.getConfig(), factoryName).withFallback(config);
} else if (scope != null && SharedResourcesBrokerUtils.isScopeTypeAncestor(scope, scopedConfig.getScopeType())) {
config = ConfigUtils.getConfigOrEmpty(scopedConfig.getConfig(), factoryName).getConfig(scope.name())
.atKey(scope.name()).withFallback(config);
Config tmpConfig = ConfigUtils.getConfigOrEmpty(scopedConfig.getConfig(), factoryName);
tmpConfig = ConfigUtils.getConfigOrEmpty(tmpConfig, scope.name());
config = tmpConfig.atKey(scope.name()).withFallback(config);
}
}

Expand Down

0 comments on commit 5997f07

Please sign in to comment.