Skip to content

Commit

Permalink
feat(google): add a StatefullyUpdateBootImage GCE operation (#3815)
Browse files Browse the repository at this point in the history
* refactor(google): add a FakeComputeBatchRequest

* feat(google): add a StatefullyUpdateBootImage GCE operation

* style(google): Add missing copyright headers.

* style(google/tests): remove unnecessary junit version
  • Loading branch information
plumpy authored and ezimanyi committed Jun 25, 2019
1 parent 28ffe09 commit 234a960
Show file tree
Hide file tree
Showing 21 changed files with 1,007 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class AtomicOperations {
public static final String START_SERVER_GROUP = "startServerGroup";
public static final String STOP_SERVER_GROUP = "stopServerGroup";
public static final String SET_STATEFUL_DISK = "setStatefulDisk";
public static final String STATEFULLY_UPDATE_BOOT_IMAGE = "statefullyUpdateBootImage";
public static final String UPSERT_DISRUPTION_BUDGET = "upsertDisruptionBudget";
public static final String UPDATE_JOB_PROCESSES = "updateJobProcesses";

Expand Down
3 changes: 3 additions & 0 deletions clouddriver-google/clouddriver-google.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ dependencies {

testImplementation "cglib:cglib-nodep"
testImplementation "org.assertj:assertj-core"
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.junit.platform:junit-platform-runner"
testImplementation "org.mockito:mockito-core"
testImplementation "org.mockito:mockito-junit-jupiter"
testImplementation "org.objenesis:objenesis"
testImplementation "org.spockframework:spock-core"
testImplementation "org.spockframework:spock-spring"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ public GoogleComputeRequest<ComputeRequest<InstanceGroupManager>, InstanceGroupM

abstract ComputeRequest<InstanceGroupManager> performGet() throws IOException;

@Override
public GoogleComputeOperationRequest patch(InstanceGroupManager content) throws IOException {
return wrapOperationRequest(performPatch(content), "patch");
}

abstract ComputeRequest<Operation> performPatch(InstanceGroupManager content) throws IOException;

@Override
public GoogleComputeOperationRequest<ComputeRequest<Operation>> update(
InstanceGroupManager content) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,200 +16,14 @@

package com.netflix.spinnaker.clouddriver.google.compute;

import static com.google.common.collect.Lists.partition;

import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.util.Throwables;
import com.google.api.services.compute.Compute;
import com.google.api.services.compute.ComputeRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.netflix.spectator.api.Registry;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.AllArgsConstructor;
import lombok.Value;
import org.apache.http.client.HttpResponseException;

public class ComputeBatchRequest<RequestT extends ComputeRequest<ResponseT>, ResponseT> {

// Platform-specified max to not overwhelm batch backends.
@VisibleForTesting static final int MAX_BATCH_SIZE = 100;
private static final Duration CONNECT_TIMEOUT = Duration.ofMinutes(2);
private static final Duration READ_TIMEOUT = Duration.ofMinutes(2);

private final Compute compute;
private final Registry registry;
private final String userAgent;
private final ListeningExecutorService executor;
private final List<QueuedRequest<RequestT, ResponseT>> queuedRequests;

ComputeBatchRequest(
Compute compute, Registry registry, String userAgent, ListeningExecutorService executor) {
this.compute = compute;
this.registry = registry;
this.userAgent = userAgent;
this.executor = executor;
this.queuedRequests = new ArrayList<>();
}

public void queue(
GoogleComputeRequest<RequestT, ResponseT> request, JsonBatchCallback<ResponseT> callback) {
queuedRequests.add(new QueuedRequest<>(request.getRequest(), callback));
}

public void execute(String batchContext) throws IOException {
if (queuedRequests.size() == 0) {
return;
}

List<List<QueuedRequest<RequestT, ResponseT>>> requestPartitions =
partition(queuedRequests, MAX_BATCH_SIZE);
List<BatchRequest> queuedBatches = createBatchRequests(requestPartitions);

String statusCode = "500";
String success = "false";
long start = registry.clock().monotonicTime();
try {
executeBatches(queuedBatches);
success = "true";
statusCode = "200";
} catch (HttpResponseException e) {
statusCode = Integer.toString(e.getStatusCode());
throw e;
} finally {
long nanos = registry.clock().monotonicTime() - start;
String status = statusCode.charAt(0) + "xx";
Map<String, String> tags =
ImmutableMap.of(
"context", batchContext,
"success", success,
"status", status,
"statusCode", statusCode);
registry
.timer(registry.createId("google.batchExecute", tags))
.record(Duration.ofNanos(nanos));
registry
.counter(registry.createId("google.batchSize", tags))
.increment(queuedRequests.size());
}
}

private void executeBatches(List<BatchRequest> queuedBatches) throws IOException {
if (queuedBatches.size() == 1) {
queuedBatches.get(0).execute();
return;
}

List<ListenableFuture<Void>> futures = new ArrayList<>();
for (BatchRequest batchRequest : queuedBatches) {
ListenableFuture<Void> submit =
executor.submit(
() -> {
batchRequest.execute();
return null;
});
futures.add(submit);
}

try {
new FailFastFuture(futures, executor).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
throw new RuntimeException(cause);
}
}

private List<BatchRequest> createBatchRequests(
List<List<QueuedRequest<RequestT, ResponseT>>> requestPartitions) throws IOException {

List<BatchRequest> queuedBatches = new ArrayList<>();

try {
requestPartitions.forEach(
partition -> {
BatchRequest batch = newBatch();
partition.forEach(
qr -> wrapIOException(() -> qr.getRequest().queue(batch, qr.getCallback())));
queuedBatches.add(batch);
});
return queuedBatches;
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

private BatchRequest newBatch() {
return compute.batch(
request -> {
request.getHeaders().setUserAgent(userAgent);
request.setConnectTimeout((int) CONNECT_TIMEOUT.toMillis());
request.setReadTimeout((int) READ_TIMEOUT.toMillis());
});
}

@FunctionalInterface
private interface IoExceptionRunnable {
void run() throws IOException;
}

private static void wrapIOException(IoExceptionRunnable runnable) {
try {
runnable.run();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Value
@AllArgsConstructor
private static class QueuedRequest<RequestT extends ComputeRequest<ResponseT>, ResponseT> {
private RequestT request;
private JsonBatchCallback<ResponseT> callback;
}

private static class FailFastFuture extends AbstractFuture<Void> {

private final AtomicInteger remainingFutures;
public interface ComputeBatchRequest<RequestT extends ComputeRequest<ResponseT>, ResponseT> {

FailFastFuture(List<ListenableFuture<Void>> futures, ExecutorService executor) {
remainingFutures = new AtomicInteger(futures.size());
for (ListenableFuture<Void> future : futures) {
Futures.addCallback(
future,
new FutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
if (remainingFutures.decrementAndGet() == 0) {
set(null);
}
}
void queue(
GoogleComputeRequest<RequestT, ResponseT> request, JsonBatchCallback<ResponseT> callback);

@Override
public void onFailure(Throwable t) {
setException(t);
}
},
executor);
}
}
}
void execute(String batchContext) throws IOException;
}
Loading

0 comments on commit 234a960

Please sign in to comment.