Skip to content

Commit

Permalink
Fix error with simultaneous docker operations by synchronizing those …
Browse files Browse the repository at this point in the history
…operations

see also:
- eclipse-ee4j/jersey#3772
- zalando#808
  • Loading branch information
tstern committed Apr 5, 2019
1 parent 5e04c90 commit 69ae6ab
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 119 deletions.
Expand Up @@ -2,9 +2,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.spotify.docker.client.AnsiProgressHandler;
import com.spotify.docker.client.DefaultDockerClient;
import com.spotify.docker.client.DockerClient;
import com.spotify.docker.client.exceptions.DockerException;
import com.spotify.docker.client.messages.*;
import com.spotify.docker.client.messages.mount.Mount;
Expand All @@ -30,9 +27,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static com.spotify.docker.client.DockerClient.ListContainersParam.withStatusCreated;
import static com.spotify.docker.client.DockerClient.ListContainersParam.withStatusRunning;

@SuppressWarnings("ConstantConditions")
public class SwarmContainerClient implements ContainerClient {

Expand All @@ -46,7 +40,6 @@ public class SwarmContainerClient implements ContainerClient {
private static Environment env = defaultEnvironment;
private static String seleniumContainerCpuLimit;
private static String seleniumContainerMemoryLimit;
private static String dockerHost;
private static AtomicBoolean environmentInitialised = new AtomicBoolean(false);

static {
Expand All @@ -55,7 +48,6 @@ public class SwarmContainerClient implements ContainerClient {

private final Logger logger = LoggerFactory.getLogger(SwarmContainerClient.class.getName());
private final GoogleAnalyticsApi ga = new GoogleAnalyticsApi();
private DockerClient dockerClient = new DefaultDockerClient(dockerHost);
private String nodeId;
private Map<String, String> seleniumContainerLabels = new HashMap<>();
private AtomicBoolean seleniumContainerLabelsChecked = new AtomicBoolean(false);
Expand All @@ -67,21 +59,13 @@ private static void readConfigurationFromEnvVariables() {

String memoryLimit = env.getEnvVariable(ZALENIUM_SELENIUM_CONTAINER_MEMORY_LIMIT);
setSeleniumContainerMemoryLimit(memoryLimit);

String dockerHost = env.getStringEnvVariable("DOCKER_HOST", "unix:///var/run/docker.sock");
setDockerHost(dockerHost);
}

@VisibleForTesting
protected static void setEnv(final Environment env) {
SwarmContainerClient.env = env;
}

private static void setDockerHost(String dockerHost) {
// https://github.com/spotify/docker-client/issues/946
SwarmContainerClient.dockerHost = dockerHost.replace("tcp", "http");
}

private static String getSeleniumContainerCpuLimit() {
return seleniumContainerCpuLimit;
}
Expand All @@ -104,15 +88,9 @@ public void setNodeId(String nodeId) {

private String getContainerId(URL remoteUrl) {
try {
List<Task> tasks = dockerClient.listTasks();
for (Task task : tasks) {
for (NetworkAttachment networkAttachment : CollectionUtils.emptyIfNull(task.networkAttachments())) {
for (String address : networkAttachment.addresses()) {
if (address.startsWith(remoteUrl.getHost())) {
return task.status().containerStatus().containerId();
}
}
}
ContainerStatus containerStatus = SwarmUtilities.getContainerByRemoteUrl(remoteUrl);
if (containerStatus != null) {
return containerStatus.containerId();
}
} catch (DockerException | InterruptedException e) {
e.printStackTrace();
Expand All @@ -127,7 +105,7 @@ private String getContainerId(String containerName) {

List<Container> containerList = null;
try {
containerList = dockerClient.listContainers(withStatusRunning(), withStatusCreated());
containerList = SwarmUtilities.getRunningAndCreatedContainers();
} catch (DockerException | InterruptedException e) {
logger.debug(nodeId + " Error while getting containerId", e);
ga.trackException(e);
Expand All @@ -152,20 +130,7 @@ public InputStream copyFiles(String containerId, String folderName) {

public void stopContainer(String containerId) {
try {
List<Task> tasks = dockerClient.listTasks();
for (Task task : tasks) {
ContainerStatus containerStatus = task.status().containerStatus();
if (containerStatus != null && containerStatus.containerId().equals(containerId)) {
String serviceId = task.serviceId();
Service.Criteria criteria = Service.Criteria.builder()
.serviceId(serviceId)
.build();
List<Service> services = dockerClient.listServices(criteria);
if (!CollectionUtils.isEmpty(services)) {
dockerClient.removeService(serviceId);
}
}
}
SwarmUtilities.stopServiceByContainerId(containerId);
} catch (DockerException | InterruptedException e) {
logger.warn(nodeId + " Error while stopping the container", e);
ga.trackException(e);
Expand All @@ -174,32 +139,22 @@ public void stopContainer(String containerId) {

public void executeCommand(String containerId, String[] command, boolean waitForExecution) {
try {
List<Task> tasks = dockerClient.listTasks();

pullSwarmExecImage();

for (Task task : CollectionUtils.emptyIfNull(tasks)) {
ContainerStatus containerStatus = task.status().containerStatus();

if (containerStatus != null && containerStatus.containerId().equals(containerId)) {
startSwarmExecContainer(task, command, containerId);
return;
}
Task task = SwarmUtilities.getTaskByContainerId(containerId);
if (task != null) {
pullSwarmExecImage();
startSwarmExecContainer(task, command, containerId);
} else {
logger.warn("Couldn't execute command on container {}", containerId);
}
} catch (DockerException | InterruptedException e) {
logger.warn("Error while executing comman on container {}", containerId);
ga.trackException(e);
}

logger.warn("Couldn't execute command on container {}", containerId);
}

private void pullSwarmExecImage() {
try {
List<Image> images = dockerClient.listImages(DockerClient.ListImagesParam.byName(SWARM_EXEC_IMAGE));
if (CollectionUtils.isEmpty(images)) {
dockerClient.pull(SWARM_EXEC_IMAGE, new AnsiProgressHandler());
}
SwarmUtilities.pullImageIfNotPresent(SWARM_EXEC_IMAGE);
} catch (DockerException | InterruptedException e) {
logger.warn(nodeId + " Error while checking (and pulling) if the image is present", e);
ga.trackException(e);
Expand Down Expand Up @@ -231,9 +186,7 @@ private void startSwarmExecContainer(Task task, String[] command, String contain
.cmd(command)
.build();

ContainerCreation containerCreation = dockerClient.createContainer(containerConfig);

dockerClient.startContainer(containerCreation.id());
SwarmUtilities.startContainer(containerConfig);
}

public String getLatestDownloadedImage(String imageName) {
Expand All @@ -256,7 +209,7 @@ public ContainerCreationStatus createContainer(String zaleniumContainerName, Str
final ServiceSpec serviceSpec = buildServiceSpec(taskSpec, nodePort, noVncPort);

try {
ServiceCreateResponse service = dockerClient.createService(serviceSpec);
ServiceCreateResponse service = SwarmUtilities.createService(serviceSpec);

TaskStatus taskStatus = waitForTaskStatus(service.id());

Expand Down Expand Up @@ -361,14 +314,7 @@ private TaskStatus waitForTaskStatus(String serviceId) throws DockerException, I
private TaskStatus waitForTaskStatus(String serviceId, int attempts) throws DockerException, InterruptedException {
int attemptsLimit = 100;
Thread.sleep(100);
String serviceName = dockerClient.inspectService(serviceId).spec().name();
Task.Criteria criteria = Task.Criteria.builder().serviceName(serviceName).build();
List<Task> tasks = dockerClient.listTasks(criteria);
Task task = null;

if (!CollectionUtils.isEmpty(tasks)) {
task = tasks.get(0);
}
Task task = SwarmUtilities.getTaskByServiceId(serviceId);

if (task == null && attempts < attemptsLimit) {
return waitForTaskStatus(serviceId, attempts + 1);
Expand All @@ -385,19 +331,14 @@ private TaskStatus waitForTaskStatus(String serviceId, int attempts) throws Dock
}

private ContainerCreationStatus getContainerCreationStatus(String serviceId, String nodePort) throws DockerException, InterruptedException {
Service service = dockerClient.inspectService(serviceId);
Task.Criteria criteria = Task.Criteria.builder()
.serviceName(service.spec().name())
.build();
List<Task> tasks = dockerClient.listTasks(criteria);
for (Task task : tasks) {
if (task.serviceId().equals(serviceId)) {
ContainerStatus containerStatus = task.status().containerStatus();
if (containerStatus != null) {
String containerId = containerStatus.containerId();
String containerName = containerStatus.containerId();
return new ContainerCreationStatus(true, containerName, containerId, nodePort);
}
Task task = SwarmUtilities.getTaskByServiceId(serviceId);

if (task != null) {
ContainerStatus containerStatus = task.status().containerStatus();
if (containerStatus != null) {
String containerId = containerStatus.containerId();
String containerName = containerStatus.containerId();
return new ContainerCreationStatus(true, containerName, containerId, nodePort);
}
}

Expand Down Expand Up @@ -469,18 +410,13 @@ public String getContainerIp(String containerName) {
}

try {
List<Task> tasks = dockerClient.listTasks();
String swarmOverlayNetwork = ZaleniumConfiguration.getSwarmOverlayNetwork();
for (Task task : tasks) {
ContainerStatus containerStatus = task.status().containerStatus();
if (containerStatus != null) {
if (containerStatus.containerId().equals(containerId)) {
for (NetworkAttachment networkAttachment : CollectionUtils.emptyIfNull(task.networkAttachments())) {
if (networkAttachment.network().spec().name().equals(swarmOverlayNetwork)) {
String cidrSuffix = "/\\d+$";
return networkAttachment.addresses().get(0).replaceAll(cidrSuffix, "");
}
}
Task task = SwarmUtilities.getTaskByContainerId(containerId);
if (task != null) {
for (NetworkAttachment networkAttachment : CollectionUtils.emptyIfNull(task.networkAttachments())) {
if (networkAttachment.network().spec().name().equals(swarmOverlayNetwork)) {
String cidrSuffix = "/\\d+$";
return networkAttachment.addresses().get(0).replaceAll(cidrSuffix, "");
}
}
}
Expand Down Expand Up @@ -516,30 +452,21 @@ public boolean isTerminated(ContainerCreationStatus container) {
try {
List<String> termStates = Arrays.asList("complete", "failed", "shutdown", "rejected", "orphaned", "removed");
String containerId = container.getContainerId();
List<Task> tasks = dockerClient.listTasks();
boolean containerExists = tasks.stream().anyMatch(task -> {
ContainerStatus containerStatus = task.status().containerStatus();
return containerStatus != null && containerStatus.containerId().equals(containerId);
});
Task task = SwarmUtilities.getTaskByContainerId(containerId);

if (!containerExists) {
if (task == null) {
logger.info("Container {} has no corresponding task - flagging it as terminated", container);
return true;
} else {
return tasks.stream().anyMatch(task -> {
ContainerStatus containerStatus = task.status().containerStatus();
boolean hasTerminalState = termStates.contains(task.status().state());
boolean isContainer = containerStatus != null && containerStatus.containerId().equals(containerId);
boolean isTerminated = isContainer && hasTerminalState;

if (isTerminated) {
logger.info("State of Container {} is {} - flagging it as terminated",
container.getContainerId(),
task.status().state());
}
boolean isTerminated = termStates.contains(task.status().state());

if (isTerminated) {
logger.info("State of Container {} is {} - flagging it as terminated",
container.getContainerId(),
task.status().state());
}

return isTerminated;
});
return isTerminated;
}
} catch (DockerException | InterruptedException e) {
logger.warn("Failed to fetch container status [" + container.getContainerId() + "].", e);
Expand Down

0 comments on commit 69ae6ab

Please sign in to comment.