Skip to content

Commit

Permalink
Merge cbfcdea into 14e7692
Browse files Browse the repository at this point in the history
  • Loading branch information
chavdar committed Oct 29, 2016
2 parents 14e7692 + cbfcdea commit 2126dcf
Show file tree
Hide file tree
Showing 10 changed files with 483 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,23 +401,24 @@ public class ConfigurationKeys {
/**
* Configuration properties for source connection.
*/
public static final String SOURCE_CONN_USE_AUTHENTICATION = "source.conn.use.authentication";
public static final String SOURCE_CONN_PRIVATE_KEY = "source.conn.private.key";
public static final String SOURCE_CONN_KNOWN_HOSTS = "source.conn.known.hosts";
public static final String SOURCE_CONN_CLIENT_SECRET = "source.conn.client.secret";
public static final String SOURCE_CONN_CLIENT_ID = "source.conn.client.id";
public static final String SOURCE_CONN_DOMAIN = "source.conn.domain";
public static final String SOURCE_CONN_USERNAME = "source.conn.username";
public static final String SOURCE_CONN_PASSWORD = "source.conn.password";
public static final String SOURCE_CONN_SECURITY_TOKEN = "source.conn.security.token";
public static final String SOURCE_CONN_HOST_NAME = "source.conn.host";
public static final String SOURCE_CONN_VERSION = "source.conn.version";
public static final String SOURCE_CONN_TIMEOUT = "source.conn.timeout";
public static final String SOURCE_CONN_REST_URL = "source.conn.rest.url";
public static final String SOURCE_CONN_USE_PROXY_URL = "source.conn.use.proxy.url";
public static final String SOURCE_CONN_USE_PROXY_PORT = "source.conn.use.proxy.port";
public static final String SOURCE_CONN_DRIVER = "source.conn.driver";
public static final String SOURCE_CONN_PORT = "source.conn.port";
public static final String SOURCE_CONN_PREFIX = "source.conn.";
public static final String SOURCE_CONN_USE_AUTHENTICATION = SOURCE_CONN_PREFIX + "use.authentication";
public static final String SOURCE_CONN_PRIVATE_KEY = SOURCE_CONN_PREFIX + "private.key";
public static final String SOURCE_CONN_KNOWN_HOSTS = SOURCE_CONN_PREFIX + "known.hosts";
public static final String SOURCE_CONN_CLIENT_SECRET = SOURCE_CONN_PREFIX + "client.secret";
public static final String SOURCE_CONN_CLIENT_ID = SOURCE_CONN_PREFIX + "client.id";
public static final String SOURCE_CONN_DOMAIN = SOURCE_CONN_PREFIX + "domain";
public static final String SOURCE_CONN_USERNAME = SOURCE_CONN_PREFIX + "username";
public static final String SOURCE_CONN_PASSWORD = SOURCE_CONN_PREFIX + "password";
public static final String SOURCE_CONN_SECURITY_TOKEN = SOURCE_CONN_PREFIX + "security.token";
public static final String SOURCE_CONN_HOST_NAME = SOURCE_CONN_PREFIX + "host";
public static final String SOURCE_CONN_VERSION = SOURCE_CONN_PREFIX + "version";
public static final String SOURCE_CONN_TIMEOUT = SOURCE_CONN_PREFIX + "timeout";
public static final String SOURCE_CONN_REST_URL = SOURCE_CONN_PREFIX + "rest.url";
public static final String SOURCE_CONN_USE_PROXY_URL = SOURCE_CONN_PREFIX + "use.proxy.url";
public static final String SOURCE_CONN_USE_PROXY_PORT = SOURCE_CONN_PREFIX + "use.proxy.port";
public static final String SOURCE_CONN_DRIVER = SOURCE_CONN_PREFIX + "driver";
public static final String SOURCE_CONN_PORT = SOURCE_CONN_PREFIX + "port";
public static final int SOURCE_CONN_DEFAULT_PORT = 22;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright (C) 2014-2016 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied.
*/
package gobblin.http;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.http.HttpHost;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.HttpClientBuilder;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

import gobblin.annotation.Alias;
import gobblin.configuration.State;

/**
* Default implementation that uses the following properties to configure an {@link HttpClient}.
*
* <ul>
* <li>{@link #PROXY_HOSTPORT_KEY}
* <li>{@link #PROXY_URL_KEY}
* <li>{@link #PROXY_PORT_KEY}
* </ul>
*/
@Alias(value="default")
public class DefaultHttpClientConfigurator implements HttpClientConfigurator {
// IMPORTANT: don't change the values for PROXY_URL_KEY and PROXY_PORT_KEY as they are meant to
// be backwards compatible with SOURCE_CONN_USE_PROXY_URL and SOURCE_CONN_USE_PROXY_PORT when
// the statePropertiesPrefix is "source.conn."
/** The hostname of the HTTP proxy to use */
public static final String PROXY_URL_KEY = "use.proxy.url";
/** The port of the HTTP proxy to use */
public static final String PROXY_PORT_KEY = "use.proxy.port";
/** Similar to {@link #PROXY_URL_KEY} and {@link #PROXY_PORT_KEY} but allows you to set it on
* one property as <host>:<port> */
public static final String PROXY_HOSTPORT_KEY = "proxyHostport";
/** Port to use if the HTTP Proxy is enabled but no port is specified */
public static final int DEFAULT_HTTP_PROXY_PORT = 8080;

private static final Pattern HOSTPORT_PATTERN = Pattern.compile("([^:]+)(:([0-9]+))?");

protected final HttpClientBuilder _builder = HttpClientBuilder.create();
protected String _statePropertiesPrefix = null;

/** {@inheritDoc} */
@Override
public DefaultHttpClientConfigurator configure(Config httpClientConfig) {
Optional<HttpHost> proxy = getProxyAddr(httpClientConfig);
if (proxy.isPresent()) {
getBuilder().setProxy(proxy.get());
}
return this;
}

/** {@inheritDoc} */
@Override
public DefaultHttpClientConfigurator configure(State state) {
Config cfg = stateToConfig(state);
return configure(cfg);
}

protected Config stateToConfig(State state) {
String proxyUrlKey = getPrefixedPropertyName(PROXY_URL_KEY);
String proxyPortKey = getPrefixedPropertyName(PROXY_PORT_KEY);
String proxyHostportKey = getPrefixedPropertyName(PROXY_HOSTPORT_KEY);

Config cfg = ConfigFactory.empty();
if (state.contains(proxyUrlKey)) {
cfg = cfg.withValue(PROXY_URL_KEY, ConfigValueFactory.fromAnyRef(state.getProp(proxyUrlKey)));
}
if (state.contains(proxyPortKey)) {
cfg = cfg.withValue(PROXY_PORT_KEY, ConfigValueFactory.fromAnyRef(state.getPropAsInt(proxyPortKey)));
}
if (state.contains(proxyHostportKey)) {
cfg = cfg.withValue(PROXY_HOSTPORT_KEY, ConfigValueFactory.fromAnyRef(state.getProp(proxyHostportKey)));
}
return cfg;
}

/** {@inheritDoc} */
@Override
public HttpClient createClient() {
return _builder.build();
}

@VisibleForTesting
public static Optional<HttpHost> getProxyAddr(Config httpClientConfig) {
String proxyHost = null;
int proxyPort = DEFAULT_HTTP_PROXY_PORT;
if (httpClientConfig.hasPath(PROXY_URL_KEY) &&
!httpClientConfig.getString(PROXY_URL_KEY).isEmpty()) {
proxyHost = httpClientConfig.getString(PROXY_URL_KEY);
}
if (httpClientConfig.hasPath(PROXY_PORT_KEY)) {
proxyPort = httpClientConfig.getInt(PROXY_PORT_KEY);
}
if (httpClientConfig.hasPath(PROXY_HOSTPORT_KEY)) {
String hostport = httpClientConfig.getString(PROXY_HOSTPORT_KEY);
Matcher hostportMatcher = HOSTPORT_PATTERN.matcher(hostport);
if (!hostportMatcher.matches()) {
throw new IllegalArgumentException("Invalid HTTP proxy hostport: " + hostport);
}
proxyHost = hostportMatcher.group(1);
if (!Strings.isNullOrEmpty(hostportMatcher.group(3))) {
proxyPort = Integer.parseInt(hostportMatcher.group(3));
}
}
return null != proxyHost ? Optional.of(new HttpHost(proxyHost, proxyPort))
: Optional.<HttpHost>absent();
}

@Override
public DefaultHttpClientConfigurator setStatePropertiesPrefix(String propertiesPrefix) {
_statePropertiesPrefix = propertiesPrefix;
return this;
}

String getPrefixedPropertyName(String propertyName) {
return null != _statePropertiesPrefix ? _statePropertiesPrefix + propertyName : propertyName;
}

@Override
public HttpClientBuilder getBuilder() {
return _builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2014-2016 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied.
*/
package gobblin.http;

import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.HttpClientBuilder;

import com.typesafe.config.Config;

import gobblin.configuration.State;

/**
* An adapter from Gobblin configuration to {@link HttpClientBuilder}. It can also be used to
* create {@link HttpClient} instances.
*/
public interface HttpClientConfigurator {

/** Sets a prefix to use when extracting the configuration from {@link State}. The default is
* empty. */
HttpClientConfigurator setStatePropertiesPrefix(String propertiesPrefix);

/**
* Extracts the HttpClient configuration from a typesafe config. Supported configuration options
* may vary from implementation to implementation.
* */
HttpClientConfigurator configure(Config httpClientConfig);

/** Same as {@link #configure(Config)} but for legacy cases using State. */
HttpClientConfigurator configure(State httpClientConfig);

/** The underlying client builder */
HttpClientBuilder getBuilder();

/**
* Typically this will use {@link HttpClientBuilder#build()} based on the configuration but
* implementations may also return decorated instances. */
HttpClient createClient();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2014-2016 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied.
*/
package gobblin.http;

import org.apache.http.client.HttpClient;

import com.google.common.base.Optional;
import com.typesafe.config.Config;

import gobblin.configuration.State;
import gobblin.util.ClassAliasResolver;

/**
* Creates an instance of HttpClientConfigurator using dependency injection from configuration.
*/
public class HttpClientConfiguratorLoader {

/** Classname or alias for an {@link HttpClientConfigurator} instance to use for configuring and
* instantiating of {@link HttpClient} instances. */
public static final String HTTP_CLIENT_CONFIGURATOR_TYPE_KEY = "httpClientConfigurator.type";
public static final String HTTP_CLIENT_CONFIGURATOR_TYPE_FULL_KEY =
"gobblin." + HTTP_CLIENT_CONFIGURATOR_TYPE_KEY;
public static final Class<? extends HttpClientConfigurator> DEFAULT_CONFIGURATOR_CLASS =
DefaultHttpClientConfigurator.class;

private static final ClassAliasResolver<HttpClientConfigurator> TYPE_RESOLVER =
new ClassAliasResolver<>(HttpClientConfigurator.class);
private final HttpClientConfigurator _configurator;

/**
* Loads a HttpClientConfigurator using the value of the {@link #HTTP_CLIENT_CONFIGURATOR_TYPE_FULL_KEY}
* property in the state.
*/
public HttpClientConfiguratorLoader(State state) {
this(Optional.<String>fromNullable(state.getProp(HTTP_CLIENT_CONFIGURATOR_TYPE_FULL_KEY)));
}

/** Loads a HttpClientConfigurator using the value of {@link #HTTP_CLIENT_CONFIGURATOR_TYPE_KEY}
* in the local typesafe config. */
public HttpClientConfiguratorLoader(Config config) {
this(Optional.<String>fromNullable(config.hasPath(HTTP_CLIENT_CONFIGURATOR_TYPE_KEY) ?
config.getString(HTTP_CLIENT_CONFIGURATOR_TYPE_KEY) : null));
}

/** Loads a HttpClientConfigurator with the specified class or alias. If not specified,
* {@link #DEFAULT_CONFIGURATOR_CLASS} is used. */
public HttpClientConfiguratorLoader(Optional<String> configuratorType) {
try {
_configurator = getConfiguratorClass(configuratorType).newInstance();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new RuntimeException("Unable to find HttpClientConfigurator:" + e, e);
}
}

private static Class<? extends HttpClientConfigurator>
getConfiguratorClass(Optional<String> configuratorType) throws ClassNotFoundException {
return configuratorType.isPresent() ? TYPE_RESOLVER.resolveClass(configuratorType.get()) :
DEFAULT_CONFIGURATOR_CLASS;
}

public HttpClientConfigurator getConfigurator() {
return _configurator;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@

import org.apache.commons.io.IOUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.StatusLine;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.conn.params.ConnRoutePNames;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;

import com.google.common.base.Charsets;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.State;
import gobblin.http.HttpClientConfiguratorLoader;
import gobblin.source.extractor.exception.RestApiConnectionException;
import gobblin.source.extractor.exception.RestApiProcessingException;
import gobblin.source.extractor.extract.Command;
import gobblin.source.extractor.extract.CommandOutput;
import gobblin.source.extractor.extract.restapi.RestApiCommand.RestApiCommandType;

import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -96,7 +96,7 @@ public boolean connect() throws RestApiConnectionException {
log.error("entity class: " + httpEntity.getClass().getName());
log.error("entity string size: " + EntityUtils.toString(httpEntity).length());
log.error("content length: " + httpEntity.getContentLength());
log.error("content: " + IOUtils.toString(httpEntity.getContent()));
log.error("content: " + IOUtils.toString(httpEntity.getContent(), Charsets.UTF_8));
throw new RestApiConnectionException(
"JSON is NULL ! Failed on authentication with the following HTTP response received:\n"
+ EntityUtils.toString(httpEntity));
Expand Down Expand Up @@ -130,16 +130,11 @@ public boolean connect() throws RestApiConnectionException {

protected HttpClient getHttpClient() {
if (this.httpClient == null) {
this.httpClient = new DefaultHttpClient();

if (this.state.contains(ConfigurationKeys.SOURCE_CONN_USE_PROXY_URL)
&& !this.state.getProp(ConfigurationKeys.SOURCE_CONN_USE_PROXY_URL).isEmpty()) {
log.info("Connecting via proxy: " + this.state.getProp(ConfigurationKeys.SOURCE_CONN_USE_PROXY_URL));

HttpHost proxy = new HttpHost(this.state.getProp(ConfigurationKeys.SOURCE_CONN_USE_PROXY_URL),
this.state.getPropAsInt(ConfigurationKeys.SOURCE_CONN_USE_PROXY_PORT));
this.httpClient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
}
HttpClientConfiguratorLoader configuratorLoader = new HttpClientConfiguratorLoader(this.state);
this.httpClient = configuratorLoader.getConfigurator()
.setStatePropertiesPrefix(ConfigurationKeys.SOURCE_CONN_PREFIX)
.configure(this.state)
.createClient();
}
return this.httpClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -72,6 +73,7 @@ public ConnectionRequest requestConnection(HttpRoute route, Object state) {

}

@SuppressWarnings("rawtypes")
public AbstractHttpWriter(AbstractHttpWriterBuilder builder) {
super(builder.getState());
this.log = builder.getLogger().isPresent() ? (Logger)builder.getLogger() : LoggerFactory.getLogger(this.getClass());
Expand Down

0 comments on commit 2126dcf

Please sign in to comment.