Skip to content

Commit

Permalink
[WIP] docker compose refactoring, to support scaling, better output l…
Browse files Browse the repository at this point in the history
…ogs, and eventually docker-compose v2 format. Refs #146, #147
  • Loading branch information
rnorth committed Jun 10, 2016
1 parent abe0862 commit 14d400a
Show file tree
Hide file tree
Showing 16 changed files with 410 additions and 109 deletions.
12 changes: 6 additions & 6 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@
</dependency>

<!-- Test dependencies -->
<!--<dependency>-->
<!--<groupId>org.redisson</groupId>-->
<!--<artifactId>redisson</artifactId>-->
<!--<version>1.3.0</version>-->
<!--<scope>test</scope>-->
<!--</dependency>-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>1.3.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,6 @@ ExecResult execInContainer(Charset outputCharset, String... command)

Map<String, LinkableContainer> getLinkedContainers();

Duration getMinimumRunningDuration();

DockerClient getDockerClient();

Info getDockerDaemonInfo();
Expand All @@ -331,8 +329,6 @@ ExecResult execInContainer(Charset outputCharset, String... command)

void setLinkedContainers(Map<String, LinkableContainer> linkedContainers);

void setMinimumRunningDuration(Duration minimumRunningDuration);

void setDockerClient(DockerClient dockerClient);

void setDockerDaemonInfo(Info dockerDaemonInfo);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package org.testcontainers.containers;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.exception.DockerException;
import com.github.dockerjava.api.model.Container;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.runner.Description;
import org.rnorth.ducttape.ratelimits.RateLimiter;
import org.rnorth.ducttape.ratelimits.RateLimiterBuilder;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.profiler.Profiler;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.traits.LinkableContainer;
import org.testcontainers.containers.startupcheck.OneShotStartupCheckStrategy;
import org.testcontainers.utility.Base58;
import org.testcontainers.utility.ContainerReaper;

Expand All @@ -25,62 +32,103 @@
/**
* Container which launches Docker Compose, for the purposes of launching a defined set of containers.
*/
public class DockerComposeContainer<SELF extends DockerComposeContainer<SELF>> extends GenericContainer<SELF> implements LinkableContainer {
public class DockerComposeContainer<SELF extends DockerComposeContainer<SELF>> extends FailureDetectingExternalResource {

/**
* Random identifier which will become part of spawned containers names, so we can shut them down
*/
private final String identifier;
private final Map<String, AmbassadorContainer> ambassadorContainers = new HashMap<>();
private final File composeFile;
private Set<String> spawnedContainerIds;
private Map<String, Integer> scalingPreferences = new HashMap<>();
private DockerClient dockerClient;

private static final RateLimiter AMBASSADOR_CREATION_RATE_LIMITER = RateLimiterBuilder
.newBuilder()
.withRate(1, TimeUnit.SECONDS)
.withConstantThroughput()
.build();

public DockerComposeContainer(File composeFile) {
this(composeFile, "up -d");
}

@SuppressWarnings("WeakerAccess")
public DockerComposeContainer(File composeFile, String command) {
super("dduportal/docker-compose:1.6.0");

// Create a unique identifier and tell compose
identifier = Base58.randomString(6).toLowerCase();
addEnv("COMPOSE_PROJECT_NAME", identifier);
this(composeFile, command, Base58.randomString(6).toLowerCase());
}

// Map the docker compose file into the container
addEnv("COMPOSE_FILE", "/compose/" + composeFile.getAbsoluteFile().getName());
addFileSystemBind(composeFile.getAbsoluteFile().getParentFile().getAbsolutePath(), "/compose", READ_ONLY);
@SuppressWarnings("WeakerAccess")
public DockerComposeContainer(File composeFile, String command, String identifier) {
this.composeFile = composeFile;

// Ensure that compose can access docker. Since the container is assumed to be running on the same machine
// as the docker daemon, just mapping the docker control socket is OK.
// As there seems to be a problem with mapping to the /var/run directory in certain environments (e.g. CircleCI)
// we map the socket file outside of /var/run, as just /docker.sock
addFileSystemBind("/var/run/docker.sock", "/docker.sock", READ_WRITE);
addEnv("DOCKER_HOST", "unix:///docker.sock");
// Use a unique identifier so that containers created for this compose environment can be identified
this.identifier = identifier;

if (command != null) {
setCommand(command);
}
this.dockerClient = DockerClientFactory.instance().client();
}

@Override
public void start() {

protected void starting(Description description) {
final Profiler profiler = new Profiler("Docker compose container rule");
profiler.setLogger(logger());
profiler.start("Docker compose container startup");

applyScaling(); // scale before up, so that all scaled instances are available first for linking
createServices();
registerContainersForShutdown();
startAmbassadorContainers(profiler);

}

private GenericContainer createComposeInstance() {
return new GenericContainer("dduportal/docker-compose:1.6.0")
.withEnv("COMPOSE_PROJECT_NAME", identifier)
// Map the docker compose file into the container
.withEnv("COMPOSE_FILE", "/compose/" + composeFile.getAbsoluteFile().getName())
.withFileSystemBind(composeFile.getAbsoluteFile().getParentFile().getAbsolutePath(), "/compose", READ_ONLY)
// Ensure that compose can access docker. Since the container is assumed to be running on the same machine
// as the docker daemon, just mapping the docker control socket is OK.
// As there seems to be a problem with mapping to the /var/run directory in certain environments (e.g. CircleCI)
// we map the socket file outside of /var/run, as just /docker.sock
.withFileSystemBind("/var/run/docker.sock", "/docker.sock", READ_WRITE)
.withEnv("DOCKER_HOST", "unix:///docker.sock")
.withStartupCheckStrategy(new OneShotStartupCheckStrategy());
}

private void createServices() {
// Start the docker-compose container, which starts up the services
super.start();
followOutput(new Slf4jLogConsumer(logger()), OutputFrame.OutputType.STDERR);
GenericContainer composeInstance = createComposeInstance().withCommand("up -d");
runCompose(composeInstance);
}

private void runCompose(GenericContainer composeInstance) {
composeInstance.start();
composeInstance.followOutput(new Slf4jLogConsumer(logger()), OutputFrame.OutputType.STDERR);

// wait for the compose container to stop, which should only happen after it has spawned all the service containers
logger().info("Docker compose container is running - service creation will start now");
while (isRunning()) {
while (composeInstance.isRunning()) {
logger().trace("Compose container is still running");
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
logger().info("Docker compose has finished running");
}

private void applyScaling() {
// Apply scaling
if (!scalingPreferences.isEmpty()) {
StringBuffer sb = new StringBuffer("scale");
for (Map.Entry<String, Integer> scale : scalingPreferences.entrySet()) {
sb.append(" ").append(scale.getKey()).append("=").append(scale.getValue());
}

GenericContainer composeInstance = createComposeInstance().withCommand(sb.toString());
runCompose(composeInstance);
}
}

private void registerContainersForShutdown() {
// Ensure that all service containers that were launched by compose will be killed at shutdown
try {
List<Container> containers = dockerClient.listContainersCmd()
Expand All @@ -101,7 +149,9 @@ public void start() {
} catch (DockerException e) {
logger().debug("Failed to stop a service container with exception", e);
}
}

private void startAmbassadorContainers(Profiler profiler) {
for (final Map.Entry<String, AmbassadorContainer> address : ambassadorContainers.entrySet()) {

try {
Expand All @@ -110,11 +160,14 @@ public void start() {

final AmbassadorContainer ambassadorContainer = address.getValue();
Unreliables.retryUntilSuccess(120, TimeUnit.SECONDS, () -> {
Profiler localProfiler = profiler.startNested("Ambassador container: " + ambassadorContainer.getContainerName());

localProfiler.start("Start ambassador container");
AMBASSADOR_CREATION_RATE_LIMITER.doWhenReady(() -> {
Profiler localProfiler = profiler.startNested("Ambassador container: " + ambassadorContainer.getContainerName());

localProfiler.start("Start ambassador container");

ambassadorContainer.start();
ambassadorContainer.start();
});

return null;
});
Expand All @@ -126,10 +179,14 @@ public void start() {
}
}

private Logger logger() {
return LoggerFactory.getLogger(DockerComposeContainer.class);
}

@Override
public void stop() {
protected void finished(Description description) {
// this, the compose container, should not be running, but just in case something has gone wrong
super.stop();
createComposeInstance().stop();

// shut down all the ambassador containers
ambassadorContainers.forEach((String address, AmbassadorContainer container) -> container.stop());
Expand All @@ -139,12 +196,6 @@ public void stop() {
spawnedContainerIds.clear();
}

@Override
@Deprecated
public SELF withExposedPorts(Integer... ports) {
throw new UnsupportedOperationException("Use withExposedService instead");
}

public SELF withExposedService(String serviceName, int servicePort) {

/**
Expand All @@ -162,7 +213,7 @@ public SELF withExposedService(String serviceName, int servicePort) {
// Ambassador containers will all be started together after docker compose has started
ambassadorContainers.put(serviceName + ":" + servicePort, ambassadorContainer);

return self();
return (SELF) this;
}

/**
Expand Down Expand Up @@ -192,4 +243,9 @@ public String getServiceHost(String serviceName, Integer servicePort) {
public Integer getServicePort(String serviceName, Integer servicePort) {
return ambassadorContainers.get(serviceName + ":" + servicePort).getMappedPort(servicePort);
}

public DockerComposeContainer withScaledService(String serviceBaseName, int numInstances) {
scalingPreferences.put(serviceBaseName, numInstances);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,24 @@
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.output.ToStringConsumer;
import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy;
import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy;
import org.testcontainers.containers.startupcheck.StartupCheckStrategy;
import org.testcontainers.containers.traits.LinkableContainer;
import org.testcontainers.containers.wait.Wait;
import org.testcontainers.containers.wait.WaitStrategy;
import org.testcontainers.images.RemoteDockerImage;
import org.testcontainers.utility.*;
import org.testcontainers.utility.ContainerReaper;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.DockerMachineClient;
import org.testcontainers.utility.PathOperations;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -90,8 +95,7 @@ public class GenericContainer<SELF extends GenericContainer<SELF>>
@NonNull
private Map<String, LinkableContainer> linkedContainers = new HashMap<>();

@NonNull
private Duration minimumRunningDuration = null;
private StartupCheckStrategy startupCheckStrategy = new IsRunningStartupCheckStrategy();

/*
* Unique instance of DockerClient for use by this container object.
Expand Down Expand Up @@ -189,31 +193,10 @@ private void tryStart(Profiler profiler) {
containerIsStarting(containerInfo);

// Wait until the container is running (may not be fully started)
profiler.start("Wait until container state=running, or there's evidence it failed to start.");
final Boolean[] startedOK = {null};
Unreliables.retryUntilTrue(CONTAINER_RUNNING_TIMEOUT_SEC, TimeUnit.SECONDS, () -> {
//noinspection CodeBlock2Expr
return DOCKER_CLIENT_RATE_LIMITER.getWhenReady(() -> {
// record "now" before fetching status; otherwise the time to fetch the status
// will contribute to how long the container has been running.
Instant now = Instant.now();
InspectContainerResponse inspectionResponse = dockerClient.inspectContainerCmd(containerId).exec();

if (DockerStatus.isContainerRunning(
inspectionResponse.getState(),
minimumRunningDuration,
now)) {
startedOK[0] = true;
return true;
} else if (DockerStatus.isContainerStopped(inspectionResponse.getState())) {
startedOK[0] = false;
return true;
}
return false;
});
});
profiler.start("Wait until container has started properly, or there's evidence it failed to start.");
boolean startedOK = this.startupCheckStrategy.waitUntilStartupSuccessful(dockerClient, containerId);

if (!startedOK[0]) {
if (!startedOK) {

logger().error("Container did not start correctly; container log output (if any) will be fetched and logged shortly");
FrameConsumerResultCallback resultCallback = new FrameConsumerResultCallback();
Expand All @@ -223,10 +206,10 @@ private void tryStart(Profiler profiler) {

// Bail out, don't wait for the port to start listening.
// (Exception thrown here will be caught below and wrapped)
throw new IllegalStateException("Container has already stopped.");
throw new IllegalStateException("Container did not start correctly.");
}

profiler.start("Wait until container started");
profiler.start("Wait until container started properly");
waitUntilContainerStarted();

logger().info("Container {} started", dockerImageName);
Expand Down Expand Up @@ -550,7 +533,12 @@ public String getContainerIpAddress() {
*/
@Override
public SELF withMinimumRunningDuration(Duration minimumRunningDuration) {
this.setMinimumRunningDuration(minimumRunningDuration);
this.startupCheckStrategy = new MinimumDurationRunningStartupCheckStrategy(minimumRunningDuration);
return self();
}

public SELF withStartupCheckStrategy(StartupCheckStrategy strategy) {
this.startupCheckStrategy = strategy;
return self();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.testcontainers.containers.startupcheck;

import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.utility.DockerStatus;

/**
* Simplest possible implementation of {@link StartupCheckStrategy} - just check that the container
* has reached the running state and has not exited.
*/
public class IsRunningStartupCheckStrategy extends StartupCheckStrategy {

@Override
public StartupStatus checkStartupState(DockerClient dockerClient, String containerId) {
InspectContainerResponse.ContainerState state = getCurrentState(dockerClient, containerId);
if (state.getRunning()) {
return StartupStatus.SUCCESSFUL;
} else if (!DockerStatus.isContainerExitCodeSuccess(state)) {
return StartupStatus.FAILED;
} else {
return StartupStatus.NOT_YET_KNOWN;
}
}
}
Loading

0 comments on commit 14d400a

Please sign in to comment.