Skip to content

Commit

Permalink
fix(cf): CF forkjoin threading improvements (#5071) (#5161)
Browse files Browse the repository at this point in the history
* feat(kubernetes): Send SIGKILL to kubectl

* fix(cf): CF forkjoin threading improvements

(cherry picked from commit 1f7d7b7)

Co-authored-by: German Muzquiz <35276119+german-muzquiz@users.noreply.github.com>
  • Loading branch information
mergify[bot] and german-muzquiz committed Dec 11, 2020
1 parent 0d6869b commit 759b905
Show file tree
Hide file tree
Showing 16 changed files with 157 additions and 33 deletions.
Expand Up @@ -75,15 +75,15 @@ public Applications(
ApplicationService api,
Spaces spaces,
Integer resultsPerPage,
int maxConnections) {
ForkJoinPool forkJoinPool) {
this.account = account;
this.appsManagerUri = appsManagerUri;
this.metricsUri = metricsUri;
this.api = api;
this.spaces = spaces;
this.resultsPerPage = resultsPerPage;

this.forkJoinPool = new ForkJoinPool(maxConnections);
this.forkJoinPool = forkJoinPool;
this.serverGroupCache =
CacheBuilder.newBuilder()
.build(
Expand Down
Expand Up @@ -51,6 +51,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -181,7 +182,7 @@ public HttpCloudFoundryClient(
String password,
boolean skipSslValidation,
Integer resultsPerPage,
int maxCapiConnectionsForCache) {
ForkJoinPool forkJoinPool) {
this.apiHost = apiHost;
this.user = user;
this.password = password;
Expand Down Expand Up @@ -217,7 +218,7 @@ public HttpCloudFoundryClient(
createService(ApplicationService.class),
spaces,
resultsPerPage,
maxCapiConnectionsForCache);
forkJoinPool);
this.domains = new Domains(createService(DomainService.class), organizations);
this.serviceInstances =
new ServiceInstances(
Expand All @@ -233,7 +234,7 @@ public HttpCloudFoundryClient(
domains,
spaces,
resultsPerPage,
maxCapiConnectionsForCache);
forkJoinPool);
this.serviceKeys = new ServiceKeys(createService(ServiceKeyService.class), spaces);
this.tasks = new Tasks(createService(TaskService.class));

Expand Down
Expand Up @@ -67,15 +67,15 @@ public Routes(
Domains domains,
Spaces spaces,
Integer resultsPerPage,
int maxConnections) {
ForkJoinPool forkJoinPool) {
this.account = account;
this.api = api;
this.applications = applications;
this.domains = domains;
this.spaces = spaces;
this.resultsPerPage = resultsPerPage;

this.forkJoinPool = new ForkJoinPool(maxConnections);
this.forkJoinPool = forkJoinPool;
this.routeMappings =
CacheBuilder.newBuilder()
.expireAfterWrite(3, TimeUnit.MINUTES)
Expand Down
Expand Up @@ -45,6 +45,8 @@ public class CloudFoundryConfigurationProperties implements DisposableBean {

private List<ManagedAccount> accounts = new ArrayList<>();

private int apiRequestParallelism = 100;

@Override
public void destroy() {
this.accounts = new ArrayList<>();
Expand All @@ -63,7 +65,11 @@ public static class ManagedAccount implements CredentialsDefinition {
private String environment;
private boolean skipSslValidation;
private Integer resultsPerPage;
private Integer maxCapiConnectionsForCache;

@Deprecated
private Integer
maxCapiConnectionsForCache; // Deprecated in favor of cloudfoundry.apiRequestParallelism

private Permissions.Builder permissions = new Permissions.Builder();
}
}
Expand Up @@ -28,6 +28,7 @@
import com.netflix.spinnaker.credentials.definition.BasicCredentialsLoader;
import com.netflix.spinnaker.credentials.definition.CredentialsDefinitionSource;
import com.netflix.spinnaker.credentials.poller.Poller;
import java.util.concurrent.ForkJoinPool;
import javax.annotation.Nullable;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
Expand All @@ -41,6 +42,12 @@ public CloudFoundryProvider cloudFoundryProvider() {
return new CloudFoundryProvider();
}

@Bean
public ForkJoinPool cloudFoundryThreadPool(
CloudFoundryConfigurationProperties cloudFoundryConfigurationProperties) {
return new ForkJoinPool(cloudFoundryConfigurationProperties.getApiRequestParallelism());
}

@Bean
@ConditionalOnMissingBean(
value = CloudFoundryCredentials.class,
Expand All @@ -51,7 +58,8 @@ public AbstractCredentialsLoader<CloudFoundryCredentials> cloudFoundryCredential
cloudFoundryCredentialSource,
CloudFoundryConfigurationProperties configurationProperties,
CacheRepository cacheRepository,
CredentialsRepository<CloudFoundryCredentials> cloudFoundryCredentialsRepository) {
CredentialsRepository<CloudFoundryCredentials> cloudFoundryCredentialsRepository,
ForkJoinPool cloudFoundryThreadPool) {

if (cloudFoundryCredentialSource == null) {
cloudFoundryCredentialSource = configurationProperties::getAccounts;
Expand All @@ -69,9 +77,9 @@ public AbstractCredentialsLoader<CloudFoundryCredentials> cloudFoundryCredential
a.getEnvironment(),
a.isSkipSslValidation(),
a.getResultsPerPage(),
a.getMaxCapiConnectionsForCache(),
cacheRepository,
a.getPermissions().build()),
a.getPermissions().build(),
cloudFoundryThreadPool),
cloudFoundryCredentialsRepository);
}

Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.netflix.spinnaker.clouddriver.security.AbstractAccountCredentials;
import com.netflix.spinnaker.fiat.model.resources.Permissions;
import java.util.*;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -68,8 +69,6 @@ public class CloudFoundryCredentials extends AbstractAccountCredentials<CloudFou

@Nullable private final Integer resultsPerPage;

private final int maxCapiConnectionsForCache;

private final Supplier<List<CloudFoundrySpace>> spaceSupplier =
Memoizer.memoizeWithExpiration(this::spaceSupplier, SPACE_EXPIRY_SECONDS, TimeUnit.SECONDS);

Expand All @@ -79,6 +78,8 @@ public class CloudFoundryCredentials extends AbstractAccountCredentials<CloudFou

private Permissions permissions;

private final ForkJoinPool forkJoinPool;

public CloudFoundryCredentials(
String name,
String appsManagerUri,
Expand All @@ -89,9 +90,9 @@ public CloudFoundryCredentials(
String environment,
boolean skipSslValidation,
Integer resultsPerPage,
Integer maxCapiConnectionsForCache,
CacheRepository cacheRepository,
Permissions permissions) {
Permissions permissions,
ForkJoinPool forkJoinPool) {
this.name = name;
this.appsManagerUri = appsManagerUri;
this.metricsUri = metricsUri;
Expand All @@ -101,9 +102,9 @@ public CloudFoundryCredentials(
this.environment = Optional.ofNullable(environment).orElse("dev");
this.skipSslValidation = skipSslValidation;
this.resultsPerPage = Optional.ofNullable(resultsPerPage).orElse(100);
this.maxCapiConnectionsForCache = Optional.ofNullable(maxCapiConnectionsForCache).orElse(16);
this.cacheRepository = cacheRepository;
this.permissions = permissions == null ? Permissions.EMPTY : permissions;
this.forkJoinPool = forkJoinPool;
}

public CloudFoundryClient getCredentials() {
Expand All @@ -118,7 +119,7 @@ public CloudFoundryClient getCredentials() {
password,
skipSslValidation,
resultsPerPage,
maxCapiConnectionsForCache);
forkJoinPool);
}
return credentials;
}
Expand Down
Expand Up @@ -41,6 +41,7 @@
import com.netflix.spinnaker.clouddriver.cloudfoundry.security.CloudFoundryCredentials;
import com.netflix.spinnaker.clouddriver.model.HealthState;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -121,9 +122,9 @@ private CloudFoundryCredentials createCredentials(String name) {
null,
false,
null,
16,
repo,
null);
null,
ForkJoinPool.commonPool());
}

@Test
Expand Down
Expand Up @@ -36,6 +36,7 @@
import io.vavr.collection.HashMap;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.ForkJoinPool;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -47,7 +48,13 @@ class ApplicationsTest {
private Spaces spaces = mock(Spaces.class);
private Applications apps =
new Applications(
"pws", "some-apps-man-uri", "some-metrics-uri", applicationService, spaces, 500, 16);
"pws",
"some-apps-man-uri",
"some-metrics-uri",
applicationService,
spaces,
500,
ForkJoinPool.commonPool());
private String spaceId = "space-guid";
private CloudFoundrySpace cloudFoundrySpace =
CloudFoundrySpace.builder()
Expand All @@ -68,7 +75,7 @@ void errorHandling() {
"badpassword",
false,
500,
16);
ForkJoinPool.commonPool());

assertThatThrownBy(() -> client.getApplications().all())
.isInstanceOf(CloudFoundryApiException.class);
Expand Down
Expand Up @@ -39,6 +39,7 @@
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import org.cloudfoundry.dropsonde.events.EventFactory.Envelope;
import org.junit.jupiter.api.Test;
import retrofit.converter.ConversionException;
Expand Down Expand Up @@ -68,7 +69,15 @@ void createRetryInterceptorShouldRetryOnInternalServerErrorsThenTimeOut() {

HttpCloudFoundryClient cloudFoundryClient =
new HttpCloudFoundryClient(
"account", "appsManUri", "metricsUri", "host", "user", "password", false, 500, 16);
"account",
"appsManUri",
"metricsUri",
"host",
"user",
"password",
false,
500,
ForkJoinPool.commonPool());
Response response = cloudFoundryClient.createRetryInterceptor(chain);

try {
Expand Down Expand Up @@ -97,7 +106,15 @@ void createRetryInterceptorShouldNotRefreshTokenOnBadCredentials() {

HttpCloudFoundryClient cloudFoundryClient =
new HttpCloudFoundryClient(
"account", "appsManUri", "metricsUri", "host", "user", "password", false, 500, 16);
"account",
"appsManUri",
"metricsUri",
"host",
"user",
"password",
false,
500,
ForkJoinPool.commonPool());
Response response = cloudFoundryClient.createRetryInterceptor(chain);

try {
Expand Down Expand Up @@ -126,7 +143,15 @@ void createRetryInterceptorShouldReturnOnEverythingElse() {

HttpCloudFoundryClient cloudFoundryClient =
new HttpCloudFoundryClient(
"account", "appsManUri", "metricsUri", "host", "user", "password", false, 500, 16);
"account",
"appsManUri",
"metricsUri",
"host",
"user",
"password",
false,
500,
ForkJoinPool.commonPool());
Response response = cloudFoundryClient.createRetryInterceptor(chain);

try {
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.netflix.spinnaker.clouddriver.cloudfoundry.model.CloudFoundrySpace;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
import org.junit.jupiter.api.Test;

class RoutesTest {
Expand All @@ -57,7 +58,8 @@ void toRouteId() {
when(routeService.all(any(), any(), any())).thenReturn(Page.singleton(route, "abc123"));
when(routeService.routeMappings(any(), any())).thenReturn(new Page<>());

Routes routes = new Routes("pws", routeService, null, domains, spaces, 500, 16);
Routes routes =
new Routes("pws", routeService, null, domains, spaces, 500, ForkJoinPool.commonPool());
RouteId routeId = routes.toRouteId("demo1-prod.apps.calabasas.cf-app.com/path");
assertThat(routeId).isNotNull();
assertThat(routeId.getHost()).isEqualTo("demo1-prod");
Expand All @@ -67,7 +69,7 @@ void toRouteId() {

@Test
void toRouteIdReturnsNullForInvalidRoute() {
Routes routes = new Routes(null, null, null, null, null, 500, 16);
Routes routes = new Routes(null, null, null, null, null, 500, ForkJoinPool.commonPool());
assertNull(routes.toRouteId("demo1-pro cf-app.com/path"));
}

Expand Down Expand Up @@ -127,7 +129,8 @@ void findShouldFilterCorrectlyOnMultipleResults() {
when(routeService.all(any(), any(), any())).thenReturn(routePage);
when(routeService.routeMappings(any(), any())).thenReturn(routeMappingPage);

Routes routes = new Routes("pws", routeService, null, domains, spaces, 500, 16);
Routes routes =
new Routes("pws", routeService, null, domains, spaces, 500, ForkJoinPool.commonPool());

CloudFoundryLoadBalancer loadBalancer =
routes.find(new RouteId().setHost("somehost").setDomainGuid("domain-guid"), "space-guid");
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -87,7 +88,18 @@ class AbstractLoadBalancersAtomicOperationConverterTest {

private final CloudFoundryCredentials cloudFoundryCredentials =
new CloudFoundryCredentials(
"test", "", "", "", "", "", "", false, 500, 16, cacheRepository, null) {
"test",
"",
"",
"",
"",
"",
"",
false,
500,
cacheRepository,
null,
ForkJoinPool.commonPool()) {
public CloudFoundryClient getClient() {
return cloudFoundryClient;
}
Expand Down
Expand Up @@ -34,6 +34,7 @@
import io.vavr.collection.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
import org.junit.jupiter.api.Test;

class CreateCloudFoundryServiceKeyAtomicOperationConverterTest {
Expand All @@ -54,7 +55,18 @@ class CreateCloudFoundryServiceKeyAtomicOperationConverterTest {

private final CloudFoundryCredentials cloudFoundryCredentials =
new CloudFoundryCredentials(
"my-account", "", "", "", "", "", "", false, 500, 16, cacheRepository, null) {
"my-account",
"",
"",
"",
"",
"",
"",
false,
500,
cacheRepository,
null,
ForkJoinPool.commonPool()) {
public CloudFoundryClient getClient() {
return cloudFoundryClient;
}
Expand Down

0 comments on commit 759b905

Please sign in to comment.