Permalink
Browse files

Add setDeleteEndpoint (#1062)

  • Loading branch information...
pschorf authored and dposada committed Jan 2, 2019
1 parent a6ac312 commit f62352fd149995811cb24bb3539b263e987ef767
@@ -111,6 +111,8 @@

private String _groupEndpoint;

private String _deleteEndpoint;

private Integer _port;

public static final int DEFAULT_STATUS_UPDATE_INTERVAL_SECONDS = 10;
@@ -121,6 +123,8 @@

public static final int DEFAULT_SUBMIT_RETRY_INTERVAL_SECONDS = 10;

public static final String DEFAULT_DELETE_ENDPOINT = "/rawscheduler";

/**
* An interval in seconds which will be used to query job status update periodically.
*/
@@ -169,6 +173,9 @@ public JobClient build() throws URISyntaxException {
if (_requestTimeoutSeconds == null) {
_requestTimeoutSeconds = DEFAULT_REQUEST_TIMEOUT_SECONDS;
}
if (_deleteEndpoint == null) {
_deleteEndpoint = DEFAULT_DELETE_ENDPOINT;
}
RequestConfig requestConfig = RequestConfig.custom()
.setSocketTimeout(_requestTimeoutSeconds * 1000)
.setConnectTimeout(_requestTimeoutSeconds * 1000)
@@ -181,6 +188,7 @@ public JobClient build() throws URISyntaxException {
Preconditions.checkNotNull(_port, "port must be set"),
Preconditions.checkNotNull(_jobEndpoint, "jobEndpoint must be set"),
_groupEndpoint,
Preconditions.checkNotNull(_deleteEndpoint, "deleteEndpoint must be set"),
_statusUpdateIntervalSeconds,
_submitRetryIntervalSeconds,
_batchRequestSize,
@@ -297,6 +305,20 @@ public Builder setGroupEndpoint(String groupEndpoint) {
return this;
}

/**
* Set the Cook scheduler endpoint which accepts DELETE requests for jobs
* @param deleteEndpoint the path to use
* @return this builder
*/
public Builder setDeleteEndpoint(String deleteEndpoint) {
if (!deleteEndpoint.startsWith("/")) {
_deleteEndpoint = "/" + deleteEndpoint;
} else {
_deleteEndpoint = deleteEndpoint;
}
return this;
}

public String getEndpoint() {
return _jobEndpoint;
}
@@ -309,6 +331,10 @@ public String getGroupEndpoint() {
return _groupEndpoint;
}

public String getDeleteEndpoint() {
return _deleteEndpoint;
}

/**
* Set the status update interval in seconds for the job client expected to build.
* <p>
@@ -401,6 +427,11 @@ public Builder setInstanceDecorator(InstanceDecorator decorator) {
*/
private final URI _groupURI;

/**
* The URI for Cook scheduler job DELETE requests
*/
private final URI _deleteURI;

/**
* A kerberized HTTP client.
*/
@@ -461,9 +492,9 @@ public Builder setInstanceDecorator(InstanceDecorator decorator) {
*/
private InstanceDecorator _instanceDecorator;

private JobClient(String host, int port, String jobEndpoint, String groupEndpoint, int statusUpdateInterval,
int submitRetryInterval, int batchSubmissionLimit, InstanceDecorator instanceDecorator,
CloseableHttpClient httpClient) throws URISyntaxException {
private JobClient(String host, int port, String jobEndpoint, String groupEndpoint, String deleteEndpoint,
int statusUpdateInterval, int submitRetryInterval, int batchSubmissionLimit,
InstanceDecorator instanceDecorator, CloseableHttpClient httpClient) throws URISyntaxException {
_statusUpdateInterval = statusUpdateInterval;
_submitRetryInterval = submitRetryInterval;
_batchRequestSize = batchSubmissionLimit;
@@ -472,6 +503,7 @@ private JobClient(String host, int port, String jobEndpoint, String groupEndpoin
_activeUUIDToGroup = new ConcurrentHashMap<>();
_groupUUIDToListener = new ConcurrentHashMap<>();
_jobURI = new URIBuilder().setScheme("http").setHost(host).setPort(port).setPath(jobEndpoint).build();
_deleteURI = new URIBuilder().setScheme("http").setHost(host).setPort(port).setPath(deleteEndpoint).build();
if (groupEndpoint != null) {
_groupURI = new URIBuilder().setScheme("http").setHost(host).setPort(port).setPath(groupEndpoint).build();
} else {
@@ -1079,18 +1111,18 @@ private void abort(Collection<UUID> uuids, String impersonatedUser)
for (final List<NameValuePair> params : Lists.partition(allParams, _batchRequestSize)) {
HttpRequestBase httpRequest;
try {
URIBuilder uriBuilder = new URIBuilder(_jobURI);
URIBuilder uriBuilder = new URIBuilder(_deleteURI);
uriBuilder.addParameters(params);
httpRequest = new HttpDelete(uriBuilder.build());
addImpersonation(httpRequest, impersonatedUser);
} catch (URISyntaxException e) {
throw releaseAndCreateException(null, null, "Can not submit DELETE request " + params + " via uri " + _jobURI, e);
throw releaseAndCreateException(null, null, "Can not submit DELETE request " + params + " via uri " + _deleteURI, e);
}
HttpResponse httpResponse;
try {
httpResponse = executeWithRetries(httpRequest, 5, 10);
} catch (IOException e) {
throw releaseAndCreateException(httpRequest, null, "Can not submit DELETE request " + params + " via uri " + _jobURI, e);
throw releaseAndCreateException(httpRequest, null, "Can not submit DELETE request " + params + " via uri " + _deleteURI, e);
}
// Check status code.
final StatusLine statusLine = httpResponse.getStatusLine();
@@ -1113,7 +1145,7 @@ private void abort(Collection<UUID> uuids, String impersonatedUser)
}
} catch (ParseException | IOException e) {
throw new JobClientException("Can not parse the response for DELETE request " + params + " via uri "
+ _jobURI, e);
+ _deleteURI, e);
} finally {
httpRequest.releaseConnection();
}
@@ -25,12 +25,14 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.ImmutableList;
import mockit.Mock;
import mockit.MockUp;
import mockit.integration.junit4.JMockit;

import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.ProtocolVersion;
import org.apache.http.client.methods.HttpDelete;
@@ -128,6 +130,25 @@ public HttpResponse executeWithRetries(HttpRequestBase request, int ignore1, lon
Assert.assertEquals(1, postCounter.get());
}

@Test
public void testJobDelete() throws JobClientException {
final AtomicInteger deleteCounter = new AtomicInteger(0);
new MockUp<JobClient>() {
@Mock
public HttpResponse executeWithRetries(HttpRequestBase request, int ignore1, long ignore2) {
deleteCounter.incrementAndGet();
Assert.assertEquals("/rawscheduler", request.getURI().getPath());
Assert.assertTrue("Should be a delete request", request instanceof HttpDelete);
final ProtocolVersion version = new ProtocolVersion("HTTP", 1, 1);
final BasicStatusLine status = new BasicStatusLine(version, 204, "deleted");
return new BasicHttpResponse(status);
}
};

_client.abort(ImmutableList.of(_initializedJob.getUUID()));
Assert.assertEquals(1, deleteCounter.get());
}

private HttpResponse executeAndReturnTransactionTimedOutError(HttpRequestBase request, AtomicInteger postCounter) throws IOException {
final ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1);
final BasicHttpEntity httpEntity = new BasicHttpEntity();

0 comments on commit f62352f

Please sign in to comment.