Skip to content

Commit

Permalink
[support_api_v3] Added class using OAuth2
Browse files Browse the repository at this point in the history
  • Loading branch information
thangnc committed Apr 14, 2017
1 parent 84b044f commit b66720c
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 0 deletions.
146 changes: 146 additions & 0 deletions src/main/java/org/embulk/output/mailchimp/MailchimpHttpClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package org.embulk.output.mailchimp;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.MissingNode;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.embulk.config.ConfigException;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.embulk.util.retryhelper.jetty92.Jetty92RetryHelper;
import org.embulk.util.retryhelper.jetty92.Jetty92SingleRequester;
import org.embulk.util.retryhelper.jetty92.StringJetty92ResponseEntityReader;
import org.slf4j.Logger;

import java.io.IOException;

/**
* Created by thangnc on 4/14/17.
*/
public class MailchimpHttpClient
{
private static final Logger LOG = Exec.getLogger(MailchimpHttpClient.class);
private final ObjectMapper jsonMapper = new ObjectMapper()
.configure(com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS, false)
.configure(com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private String accessToken;

private JsonNode sendRequest(final String endpoint, final HttpMethod method,
final MailchimpOutputPluginDelegate.PluginTask task,
final Jetty92RetryHelper retryHelper)
{
return sendRequest(endpoint, method, "", task, retryHelper);
}

private JsonNode sendRequest(final String endpoint, final HttpMethod method, final String content,
final MailchimpOutputPluginDelegate.PluginTask task,
final Jetty92RetryHelper retryHelper)
{
final String authorizationHeader = getAuthorizationHeader(retryHelper, task);

try {
String responseBody = retryHelper.requestWithRetry(
new StringJetty92ResponseEntityReader(task.getTimeoutMills()),
new Jetty92SingleRequester()
{
@Override
public void requestOnce(HttpClient client, Response.Listener responseListener)
{
org.eclipse.jetty.client.api.Request request = client
.newRequest(endpoint)
.accept("application/json")
.method(method);
if (method == HttpMethod.POST || method == HttpMethod.PUT) {
request.content(new StringContentProvider(content), "application/json");
}

if (!authorizationHeader.isEmpty()) {
request.header("Authorization", authorizationHeader);
}
request.send(responseListener);
}

@Override
public boolean isResponseStatusToRetry(org.eclipse.jetty.client.api.Response response)
{
int status = response.getStatus();
return status == 429 || status / 100 != 4;
}
});

return responseBody != null && !responseBody.isEmpty() ? parseJson(responseBody) : MissingNode.getInstance();
} catch (HttpResponseException ex) {
LOG.error("Exception occurred while sending request: {}", ex.getMessage());

throw ex;
}
}

private JsonNode parseJson(final String json)
throws DataException
{
try {
return this.jsonMapper.readTree(json);
} catch (IOException ex) {
throw new DataException(ex);
}
}

private String getAuthorizationHeader(final Jetty92RetryHelper retryHelper,
final MailchimpOutputPluginDelegate.PluginTask task)
{
try {
String accessToken = retrieveAccessToken(retryHelper, task);
return "Bearer " + accessToken;
}
catch (Exception e) {
throw new ConfigException("Failed to refresh the access token: " + e);
}
}

private String retrieveAccessToken(final Jetty92RetryHelper retryHelper,
final MailchimpOutputPluginDelegate.PluginTask task)
{
if (this.accessToken != null) {
return this.accessToken;
}

String responseBody = retryHelper.requestWithRetry(
new StringJetty92ResponseEntityReader(task.getTimeoutMills()),
new Jetty92SingleRequester()
{
@Override
public void requestOnce(HttpClient client, Response.Listener responseListener)
{
final StringBuilder stringBuilder = new StringBuilder()
.append("code").append("=").append(task.getRefreshToken()).append("&")
.append("client_id").append("=").append(task.getClientId()).append("&")
.append("client_secret").append("=").append(task.getClientSecret()).append("&")
.append("redirect_uri").append("=").append("https://login.mailchimp.com/").append("&")
.append("grant_type").append("=").append("authorization_code");

Request request = client.newRequest("https://login.mailchimp.com/oauth2/token")
.method(HttpMethod.POST)
.content(new StringContentProvider(stringBuilder.toString()))
.header(HttpHeader.CONTENT_TYPE, "application/x-www-form-urlencoded");
request.send(responseListener);
}

@Override
protected boolean isResponseStatusToRetry(Response response)
{
return response.getStatus() / 100 != 4;
}
}
);

this.accessToken = parseJson(responseBody).get("access_token").asText();
return this.accessToken;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.embulk.output.mailchimp;

import org.embulk.base.restclient.RestClientOutputPluginBase;

/**
* Created by thangnc on 4/14/17.
*/
public class MailchimpOutputPlugin
extends RestClientOutputPluginBase<MailchimpOutputPluginDelegate.PluginTask>
{
public MailchimpOutputPlugin()
{
super(MailchimpOutputPluginDelegate.PluginTask.class, new MailchimpOutputPluginDelegate());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package org.embulk.output.mailchimp;

import com.google.common.base.Throwables;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.embulk.base.restclient.RestClientOutputPluginDelegate;
import org.embulk.base.restclient.RestClientOutputTaskBase;
import org.embulk.base.restclient.jackson.JacksonServiceRequestMapper;
import org.embulk.base.restclient.jackson.JacksonTopLevelValueLocator;
import org.embulk.base.restclient.jackson.scope.JacksonAllInObjectScope;
import org.embulk.base.restclient.record.RecordBuffer;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.TaskReport;
import org.embulk.spi.Exec;
import org.embulk.spi.Schema;
import org.embulk.util.retryhelper.jetty92.Jetty92ClientCreator;
import org.embulk.util.retryhelper.jetty92.Jetty92RetryHelper;
import org.slf4j.Logger;

import java.util.List;

/**
* Created by thangnc on 4/14/17.
*/
public class MailchimpOutputPluginDelegate
implements RestClientOutputPluginDelegate<MailchimpOutputPluginDelegate.PluginTask>
{
private static final Logger LOG = Exec.getLogger(MailchimpOutputPluginDelegate.class);

public MailchimpOutputPluginDelegate()
{
}

public interface PluginTask
extends RestClientOutputTaskBase
{
@Config("maximum_retries")
@ConfigDefault("6")
int getMaximumRetries();

@Config("initial_retry_interval_millis")
@ConfigDefault("1000")
int getInitialRetryIntervalMillis();

@Config("maximum_retry_interval_millis")
@ConfigDefault("32000")
int getMaximumRetryIntervalMillis();

@Config("timeout_millis")
@ConfigDefault("60000")
int getTimeoutMills();

@Config("client_id")
String getClientId();

@Config("client_secret")
String getClientSecret();

@Config("refresh_token")
String getRefreshToken();

@Config("list_id")
String getListId();

@Config("double_optin")
@ConfigDefault("false")
boolean isDoubleOptIn();

@Config("update_existing")
@ConfigDefault("false")
boolean isUpdateExisting();

@Config("replace_interests")
@ConfigDefault("false")
boolean isReplaceInterests();

// @Config("email_column")
// @ConfigDefault("null")
// Optional<String> getEmailColumn();
//
// @Config("fname_column")
// @ConfigDefault("null")
// Optional<String> getFirstNameColumn();
//
// @Config("lname_column")
// @ConfigDefault("null")
// Optional<String> getLastNameColumn();
//
// @Config("grouping_columns")
// @ConfigDefault("null")
// List<String> getGroupingColumns();
}

@Override
public void validateOutputTask(final PluginTask task, final Schema schema, final int taskCount)
{

}

@Override
public RecordBuffer buildRecordBuffer(final PluginTask task)
{
Jetty92RetryHelper retryHelper = createRetryHelper(task);
return new MailchimpRecordBuffer("records", task, retryHelper);
}

@Override
public JacksonServiceRequestMapper buildServiceRequestMapper(final PluginTask task)
{
return JacksonServiceRequestMapper.builder()
.add(new JacksonAllInObjectScope(), new JacksonTopLevelValueLocator("record"))
.build();
}

@Override
public ConfigDiff egestEmbulkData(final PluginTask task, final Schema schema, final int taskCount,
final List<TaskReport> taskReports)
{
long totalInserted = 0;
for (TaskReport taskReport : taskReports) {
if (taskReport.has("inserted")) {
totalInserted += taskReport.get(Long.class, "inserted");
}
}

LOG.info("Insert completed. {} records", totalInserted);

return Exec.newConfigDiff();
}

protected Jetty92RetryHelper createRetryHelper(PluginTask task)
{
return new Jetty92RetryHelper(
task.getMaximumRetries(),
task.getInitialRetryIntervalMillis(),
task.getMaximumRetryIntervalMillis(),
new Jetty92ClientCreator()
{
@Override
public HttpClient createAndStart()
{
HttpClient client = new HttpClient(new SslContextFactory());
try {
client.start();
return client;
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
});
}
}
Loading

0 comments on commit b66720c

Please sign in to comment.