Skip to content

Commit

Permalink
Use wan only automatically when hosts are specified (#79)
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Oct 18, 2019
1 parent 420d670 commit 71cb6f5
Showing 1 changed file with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -44,6 +45,7 @@
*/
public class ElasticsearchDependenciesJob {
private static final Logger log = LoggerFactory.getLogger(ElasticsearchDependenciesJob.class);
private static final Pattern PORT_PATTERN = Pattern.compile(":\\d+");

public static Builder builder() {
return new Builder();
Expand All @@ -55,6 +57,7 @@ public static final class Builder {
String username = Utils.getEnv("ES_USERNAME", null);
String password = Utils.getEnv("ES_PASSWORD", null);
Boolean clientNodeOnly = Boolean.parseBoolean(Utils.getEnv("ES_CLIENT_NODE_ONLY", "false"));
Boolean nodesWanOnly = Boolean.parseBoolean(Utils.getEnv("ES_NODES_WAN_ONLY", "false"));
String indexPrefix = Utils.getEnv("ES_INDEX_PREFIX", null);

final Map<String, String> sparkProperties = new LinkedHashMap<>();
Expand All @@ -63,7 +66,6 @@ public static final class Builder {
sparkProperties.put("spark.ui.enabled", "false");
// don't die if there are no spans
sparkProperties.put("es.index.read.missing.as.empty", "true");
sparkProperties.put("es.nodes.wan.only", Utils.getEnv("ES_NODES_WAN_ONLY", "false"));
sparkProperties.put("es.net.ssl.keystore.location",
getSystemPropertyAsFileResource("javax.net.ssl.keyStore"));
sparkProperties.put("es.net.ssl.keystore.pass",
Expand All @@ -72,6 +74,7 @@ public static final class Builder {
getSystemPropertyAsFileResource("javax.net.ssl.trustStore"));
sparkProperties.put("es.net.ssl.truststore.pass",
System.getProperty("javax.net.ssl.trustStorePassword", ""));

}

// local[*] master lets us run & test the job locally without setting a Spark cluster
Expand All @@ -92,7 +95,7 @@ public Builder jars(String... jars) {
public Builder nodes(String hosts) {
Utils.checkNoTNull(hosts, "nodes");
this.hosts = hosts;
sparkProperties.put("es.nodes.wan.only", "true");
this.nodesWanOnly = true;
return this;
}

Expand Down Expand Up @@ -120,7 +123,29 @@ public Builder day(LocalDate day) {
return this;
}

/** Whether the connector is used against an Elasticsearch instance in a cloud/restricted
* environment over the WAN, such as Amazon Web Services. In this mode, the
* connector disables discovery and only connects through the declared es.nodes during all operations,
* including reads and writes. Note that in this mode, performance is highly affected. */
public Builder nodesWanOnly(boolean wanOnly) {
this.nodesWanOnly = wanOnly;
return this;
}

private static void logIfNoPort(String hosts) {
if (!PORT_PATTERN.matcher(hosts).find()) {
log.warn("Port is not specified, default port 9200 will be used");
}
}

public ElasticsearchDependenciesJob build() {
String hosts = System.getenv("ES_NODES");
String wanOnly = System.getenv("ES_NODES_WAN_ONLY");
// Optimize user configuration - nodes specified but wan only not
if (hosts != null && wanOnly == null) {
this.nodesWanOnly = true;
}
logIfNoPort(this.hosts);
return new ElasticsearchDependenciesJob(this);
}
}
Expand Down Expand Up @@ -150,6 +175,9 @@ private static String getSystemPropertyAsFileResource(String key) {
if (builder.hosts.indexOf("https") != -1) {
conf.set("es.net.ssl", "true");
}
if (builder.nodesWanOnly) {
conf.set("es.nodes.wan.only", "true");
}
if (builder.clientNodeOnly) {
conf.set("es.nodes.discovery", "0");
conf.set("es.nodes.client.only", "1");
Expand Down

0 comments on commit 71cb6f5

Please sign in to comment.