Skip to content

Commit

Permalink
refactor(gce): Add support for batch requests to the compute API wrap…
Browse files Browse the repository at this point in the history
…per (#3792)

* refactor(gce): Add support for batch requests to the new compute API wrappers

* refactor(gce): introduce a wrapper around compute.images()

* refactor(gce): Allow retrieval of the ComputeRequest from GoogleComputeRequest

* style(google): Add copyright headers.

* style(google): revert accidentally changed copyright year
  • Loading branch information
plumpy authored and ezimanyi committed Jun 21, 2019
1 parent a8e26cc commit b29b07c
Show file tree
Hide file tree
Showing 24 changed files with 1,706 additions and 534 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,44 +46,48 @@ abstract class AbstractGoogleServerGroupManagers implements GoogleServerGroupMan
}

@Override
public GoogleComputeOperationRequest abandonInstances(List<String> instances) throws IOException {
public GoogleComputeOperationRequest<ComputeRequest<Operation>> abandonInstances(
List<String> instances) throws IOException {
return wrapOperationRequest(performAbandonInstances(instances), "abandonInstances");
}

abstract ComputeRequest<Operation> performAbandonInstances(List<String> instances)
throws IOException;

@Override
public GoogleComputeOperationRequest delete() throws IOException {
public GoogleComputeOperationRequest<ComputeRequest<Operation>> delete() throws IOException {
return wrapOperationRequest(performDelete(), "delete");
}

abstract ComputeRequest<Operation> performDelete() throws IOException;

@Override
public GoogleComputeRequest<InstanceGroupManager> get() throws IOException {
public GoogleComputeRequest<ComputeRequest<InstanceGroupManager>, InstanceGroupManager> get()
throws IOException {
return wrapRequest(performGet(), "get");
}

abstract ComputeRequest<InstanceGroupManager> performGet() throws IOException;

@Override
public GoogleComputeOperationRequest update(InstanceGroupManager content) throws IOException {
public GoogleComputeOperationRequest<ComputeRequest<Operation>> update(
InstanceGroupManager content) throws IOException {
return wrapOperationRequest(performUpdate(content), "update");
}

abstract ComputeRequest<Operation> performUpdate(InstanceGroupManager content) throws IOException;

private <T> GoogleComputeRequest<T> wrapRequest(ComputeRequest<T> request, String api) {
private <RequestT extends ComputeRequest<ResponseT>, ResponseT>
GoogleComputeRequest<RequestT, ResponseT> wrapRequest(RequestT request, String api) {
return new GoogleComputeRequestImpl<>(
request, registry, getMetricName(api), getRegionOrZoneTags());
}

private GoogleComputeOperationRequest wrapOperationRequest(
private GoogleComputeOperationRequest<ComputeRequest<Operation>> wrapOperationRequest(
ComputeRequest<Operation> request, String api) {

OperationWaiter waiter = getOperationWaiter(credentials, poller);
return new GoogleComputeOperationRequestImpl(
return new GoogleComputeOperationRequestImpl<>(
request, registry, getMetricName(api), getRegionOrZoneTags(), waiter);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2019 Google, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.clouddriver.google.compute;

import static com.google.common.collect.Lists.partition;

import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.util.Throwables;
import com.google.api.services.compute.Compute;
import com.google.api.services.compute.ComputeRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.netflix.spectator.api.Registry;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.AllArgsConstructor;
import lombok.Value;
import org.apache.http.client.HttpResponseException;

public class ComputeBatchRequest<RequestT extends ComputeRequest<ResponseT>, ResponseT> {

// Platform-specified max to not overwhelm batch backends.
@VisibleForTesting static final int MAX_BATCH_SIZE = 100;
private static final Duration CONNECT_TIMEOUT = Duration.ofMinutes(2);
private static final Duration READ_TIMEOUT = Duration.ofMinutes(2);

private final Compute compute;
private final Registry registry;
private final String userAgent;
private final ListeningExecutorService executor;
private final List<QueuedRequest<RequestT, ResponseT>> queuedRequests;

ComputeBatchRequest(
Compute compute, Registry registry, String userAgent, ListeningExecutorService executor) {
this.compute = compute;
this.registry = registry;
this.userAgent = userAgent;
this.executor = executor;
this.queuedRequests = new ArrayList<>();
}

public void queue(
GoogleComputeRequest<RequestT, ResponseT> request, JsonBatchCallback<ResponseT> callback) {
queuedRequests.add(new QueuedRequest<>(request.getRequest(), callback));
}

public void execute(String batchContext) throws IOException {
if (queuedRequests.size() == 0) {
return;
}

List<List<QueuedRequest<RequestT, ResponseT>>> requestPartitions =
partition(queuedRequests, MAX_BATCH_SIZE);
List<BatchRequest> queuedBatches = createBatchRequests(requestPartitions);

String statusCode = "500";
String success = "false";
long start = registry.clock().monotonicTime();
try {
executeBatches(queuedBatches);
success = "true";
statusCode = "200";
} catch (HttpResponseException e) {
statusCode = Integer.toString(e.getStatusCode());
throw e;
} finally {
long nanos = registry.clock().monotonicTime() - start;
String status = statusCode.charAt(0) + "xx";
Map<String, String> tags =
ImmutableMap.of(
"context", batchContext,
"success", success,
"status", status,
"statusCode", statusCode);
registry
.timer(registry.createId("google.batchExecute", tags))
.record(Duration.ofNanos(nanos));
registry
.counter(registry.createId("google.batchSize", tags))
.increment(queuedRequests.size());
}
}

private void executeBatches(List<BatchRequest> queuedBatches) throws IOException {
if (queuedBatches.size() == 1) {
queuedBatches.get(0).execute();
return;
}

List<ListenableFuture<Void>> futures = new ArrayList<>();
for (BatchRequest batchRequest : queuedBatches) {
ListenableFuture<Void> submit =
executor.submit(
() -> {
batchRequest.execute();
return null;
});
futures.add(submit);
}

try {
new FailFastFuture(futures, executor).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.propagateIfPossible(cause, IOException.class);
throw new RuntimeException(cause);
}
}

private List<BatchRequest> createBatchRequests(
List<List<QueuedRequest<RequestT, ResponseT>>> requestPartitions) throws IOException {

List<BatchRequest> queuedBatches = new ArrayList<>();

try {
requestPartitions.forEach(
partition -> {
BatchRequest batch = newBatch();
partition.forEach(
qr -> wrapIOException(() -> qr.getRequest().queue(batch, qr.getCallback())));
queuedBatches.add(batch);
});
return queuedBatches;
} catch (UncheckedIOException e) {
throw e.getCause();
}
}

private BatchRequest newBatch() {
return compute.batch(
request -> {
request.getHeaders().setUserAgent(userAgent);
request.setConnectTimeout((int) CONNECT_TIMEOUT.toMillis());
request.setReadTimeout((int) READ_TIMEOUT.toMillis());
});
}

@FunctionalInterface
private interface IoExceptionRunnable {
void run() throws IOException;
}

private static void wrapIOException(IoExceptionRunnable runnable) {
try {
runnable.run();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Value
@AllArgsConstructor
private static class QueuedRequest<RequestT extends ComputeRequest<ResponseT>, ResponseT> {
private RequestT request;
private JsonBatchCallback<ResponseT> callback;
}

private static class FailFastFuture extends AbstractFuture<Void> {

private final AtomicInteger remainingFutures;

FailFastFuture(List<ListenableFuture<Void>> futures, ExecutorService executor) {
remainingFutures = new AtomicInteger(futures.size());
for (ListenableFuture<Void> future : futures) {
Futures.addCallback(
future,
new FutureCallback<Object>() {
@Override
public void onSuccess(Object result) {
if (remainingFutures.decrementAndGet() == 0) {
set(null);
}
}

@Override
public void onFailure(Throwable t) {
setException(t);
}
},
executor);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2019 Google, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.clouddriver.google.compute;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.Executors;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ComputeConfiguration {

public static final String BATCH_REQUEST_EXECUTOR = "batchRequestExecutor";

@Bean
@Qualifier(BATCH_REQUEST_EXECUTOR)
public ListeningExecutorService batchRequestExecutor() {
return MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,35 @@

package com.netflix.spinnaker.clouddriver.google.compute;

import com.google.api.services.compute.ComputeRequest;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.clouddriver.google.deploy.GoogleOperationPoller;
import com.netflix.spinnaker.clouddriver.google.model.GoogleServerGroup;
import com.netflix.spinnaker.clouddriver.google.security.GoogleNamedAccountCredentials;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
public class GoogleComputeApiFactory {

private final GoogleOperationPoller operationPoller;
private final Registry registry;
private String clouddriverUserAgentApplicationName;
private ListeningExecutorService batchExecutor;

@Autowired
public GoogleComputeApiFactory(GoogleOperationPoller operationPoller, Registry registry) {
public GoogleComputeApiFactory(
GoogleOperationPoller operationPoller,
Registry registry,
String clouddriverUserAgentApplicationName,
@Qualifier(ComputeConfiguration.BATCH_REQUEST_EXECUTOR)
ListeningExecutorService batchExecutor) {
this.operationPoller = operationPoller;
this.registry = registry;
this.clouddriverUserAgentApplicationName = clouddriverUserAgentApplicationName;
this.batchExecutor = batchExecutor;
}

public GoogleServerGroupManagers createServerGroupManagers(
Expand All @@ -44,7 +56,18 @@ public GoogleServerGroupManagers createServerGroupManagers(
credentials, operationPoller, registry, serverGroup.getName(), serverGroup.getZone());
}

public Images createImages(GoogleNamedAccountCredentials credentials) {
return new Images(credentials, registry);
}

public InstanceTemplates createInstanceTemplates(GoogleNamedAccountCredentials credentials) {
return new InstanceTemplates(credentials, operationPoller, registry);
}

public <RequestT extends ComputeRequest<ResponseT>, ResponseT>
ComputeBatchRequest<RequestT, ResponseT> createBatchRequest(
GoogleNamedAccountCredentials credentials) {
return new ComputeBatchRequest<>(
credentials.getCompute(), registry, clouddriverUserAgentApplicationName, batchExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package com.netflix.spinnaker.clouddriver.google.compute;

import com.google.api.services.compute.ComputeRequest;
import com.google.api.services.compute.model.Operation;
import com.netflix.spinnaker.clouddriver.data.task.Task;
import java.io.IOException;

public interface GoogleComputeOperationRequest extends GoogleComputeRequest<Operation> {
public interface GoogleComputeOperationRequest<RequestT extends ComputeRequest<Operation>>
extends GoogleComputeRequest<RequestT, Operation> {

Operation executeAndWait(Task task, String phase) throws IOException;
}
Loading

0 comments on commit b29b07c

Please sign in to comment.