Skip to content

Commit

Permalink
Add a health check to the node
Browse files Browse the repository at this point in the history
This allows them to be tested to see if they're alive
and well. If they're down, then no sessions will be
scheduled on them.
  • Loading branch information
shs96c committed Dec 10, 2018
1 parent dcff832 commit ff80caf
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ java_library(
name = "component",
srcs = glob(["*.java"]),
visibility = [
"//java/server/src/org/openqa/selenium/grid/server:__pkg__",
"//java/server/src/org/openqa/selenium/grid:__subpackages__",
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.openqa.selenium.grid.component;

import java.util.Objects;

@FunctionalInterface
public interface HealthCheck {

Result check();

class Result {
private final boolean isAlive;
private final String message;

public Result(boolean isAlive, String message) {
this.isAlive = isAlive;
this.message = Objects.requireNonNull(message, "Message must be set");
}

public boolean isAlive() {
return isAlive;
}

public String getMessage() {
return message;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ java_library(
],
deps = [
"//java/client/src/org/openqa/selenium/remote/tracing",
"//java/server/src/org/openqa/selenium/concurrent",
"//java/server/src/org/openqa/selenium/grid/distributor",
"//third_party/java/guava",
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,37 @@

package org.openqa.selenium.grid.distributor.local;

import static org.openqa.selenium.grid.distributor.local.Host.Status.DOWN;
import static org.openqa.selenium.grid.distributor.local.Host.Status.DRAINING;
import static org.openqa.selenium.grid.distributor.local.Host.Status.UP;
import static org.openqa.selenium.grid.distributor.local.Slot.Status.ACTIVE;
import static org.openqa.selenium.grid.distributor.local.Slot.Status.AVAILABLE;

import com.google.common.collect.ImmutableList;

import org.openqa.selenium.Capabilities;
import org.openqa.selenium.SessionNotCreatedException;
import org.openqa.selenium.grid.component.HealthCheck;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.node.Node;
import org.openqa.selenium.grid.node.NodeStatus;

import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.logging.Logger;

class Host {

private static final Logger LOG = Logger.getLogger("Selenium Distributor");
private final Node node;
private final int maxSessionCount;
private Status status;
private final Runnable performHealthCheck;

// Used any time we need to read or modify `available`
private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true);
Expand All @@ -67,6 +73,30 @@ public Host(Node node) {

// By definition, we can never have more sessions than we have slots available
this.maxSessionCount = Math.min(this.slots.size(), status.getMaxSessionCount());

this.status = Status.DOWN;

HealthCheck healthCheck = node.getHealthCheck();

this.performHealthCheck = () -> {
HealthCheck.Result result = healthCheck.check();
Host.Status current = result.isAlive() ? UP : DOWN;
Host.Status previous = setHostStatus(current);
if (previous == DRAINING) {
// We want to continue to allow the node to drain.
setHostStatus(DRAINING);
return;
}

if (current != previous) {
LOG.info(String.format(
"Changing status of node %s from %s to %s. Reason: %s",
node.getId(),
previous,
current,
result.getMessage()));
}
};
}

public UUID getId() {
Expand All @@ -77,6 +107,19 @@ public NodeStatus getStatus() {
return node.getStatus();
}

public Status getHostStatus() {
return status;
}

/**
* @return The previous status of the node.
*/
private Status setHostStatus(Status status) {
Status toReturn = this.status;
this.status = Objects.requireNonNull(status, "Status must be set.");
return toReturn;
}

public boolean hasCapacity(Capabilities caps) {
Lock read = lock.readLock();
read.lock();
Expand Down Expand Up @@ -135,6 +178,10 @@ public Supplier<Session> reserve(Capabilities caps) {
}
}

void refresh() {
performHealthCheck.run();
}

@Override
public int hashCode() {
return node.hashCode();
Expand All @@ -149,4 +196,10 @@ public boolean equals(Object obj) {
Host that = (Host) obj;
return this.node.equals(that.node);
}

public enum Status {
UP,
DRAINING,
DOWN,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@
package org.openqa.selenium.grid.distributor.local;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static org.openqa.selenium.grid.distributor.local.Host.Status.DOWN;
import static org.openqa.selenium.grid.distributor.local.Host.Status.DRAINING;
import static org.openqa.selenium.grid.distributor.local.Host.Status.UP;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import org.openqa.selenium.Beta;
import org.openqa.selenium.Capabilities;
import org.openqa.selenium.SessionNotCreatedException;
import org.openqa.selenium.concurrent.Regularly;
import org.openqa.selenium.grid.component.HealthCheck;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.distributor.Distributor;
import org.openqa.selenium.grid.distributor.DistributorStatus;
Expand All @@ -35,20 +42,33 @@
import org.openqa.selenium.remote.tracing.DistributedTracer;
import org.openqa.selenium.remote.tracing.Span;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.logging.Logger;

public class LocalDistributor extends Distributor {

public static final Json JSON = new Json();
private static final Json JSON = new Json();
private static final Logger LOG = Logger.getLogger("Selenium Distributor");
private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true);
private final Set<Host> hosts = new HashSet<>();
private final DistributedTracer tracer;
private final Regularly hostChecker = new Regularly("distributor host checker");
private final Map<UUID, Collection<Runnable>> allChecks = new ConcurrentHashMap<>();

public LocalDistributor(DistributedTracer tracer, HttpClient.Factory httpClientFactory) {
super(tracer, httpClientFactory);
Expand All @@ -64,8 +84,11 @@ public Session newSession(NewSessionPayload payload) throws SessionNotCreatedExc

Capabilities caps = allCaps.next();
Optional<Supplier<Session>> selected;
synchronized (hosts) {
Lock writeLock = this.lock.writeLock();
writeLock.lock();
try {
selected = this.hosts.stream()
.filter(host -> host.getHostStatus() == UP)
// Find a host that supports this kind of thing
.filter(host -> host.hasCapacity(caps))
.min(
Expand All @@ -77,6 +100,8 @@ public Session newSession(NewSessionPayload payload) throws SessionNotCreatedExc
.thenComparing(Host::getId))
// And reserve some space
.map(host -> host.reserve(caps));
} finally {
writeLock.unlock();
}

return selected
Expand All @@ -89,31 +114,67 @@ public Session newSession(NewSessionPayload payload) throws SessionNotCreatedExc
public LocalDistributor add(Node node) {
StringBuilder sb = new StringBuilder();

Lock writeLock = this.lock.writeLock();
writeLock.lock();
try (Span span = tracer.createSpan("distributor.add", tracer.getActiveSpan());
JsonOutput out = JSON.newOutput(sb)) {
out.setPrettyPrint(false).write(node);
span.addTag("node", sb.toString());
hosts.add(new Host(node));

Host host = new Host(node);
hosts.add(host);
LOG.info(String.format("Added node %s.", node.getId()));
host.refresh();

Runnable runnable = host::refresh;
Collection<Runnable> nodeRunnables = allChecks.getOrDefault(node.getId(), new ArrayList<>());
nodeRunnables.add(runnable);
allChecks.put(node.getId(), nodeRunnables);
hostChecker.submit(runnable, Duration.ofMinutes(5), Duration.ofSeconds(30));
} finally {
writeLock.unlock();
}

return this;
}

@Override
public void remove(UUID nodeId) {
Lock writeLock = lock.writeLock();
writeLock.lock();
try (Span span = tracer.createSpan("distributor.remove", tracer.getActiveSpan())) {
span.addTag("node.id", nodeId);
hosts.removeIf(host -> nodeId.equals(host.getId()));
allChecks.getOrDefault(nodeId, new ArrayList<>()).forEach(hostChecker::remove);
} finally {
writeLock.unlock();
}
}

@Override
public DistributorStatus getStatus() {
ImmutableList<NodeStatus> nodesStatuses = this.hosts.stream()
.map(Host::getStatus)
.collect(toImmutableList());

return new DistributorStatus(nodesStatuses);
Lock readLock = this.lock.readLock();
readLock.lock();
try {
ImmutableList<NodeStatus> nodesStatuses = this.hosts.stream()
.map(Host::getStatus)
.collect(toImmutableList());

return new DistributorStatus(nodesStatuses);
} finally {
readLock.unlock();
}
}

@VisibleForTesting
@Beta
public void refresh() {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
hosts.forEach(Host::refresh);
} finally {
writeLock.unlock();
}
}
}
1 change: 1 addition & 0 deletions java/server/src/org/openqa/selenium/grid/node/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ java_library(
exported_deps = [
# Because session id is exposed as part of our APIs
"//java/client/src/org/openqa/selenium/remote:remote",
"//java/server/src/org/openqa/selenium/grid/component:component",
"//java/server/src/org/openqa/selenium/grid/data:data",
],
deps = [
Expand Down
4 changes: 4 additions & 0 deletions java/server/src/org/openqa/selenium/grid/node/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ java_library(
"//java/server/test/org/openqa/selenium/grid:__subpackages__",
],
exports = [
# Nodes export their health checks
"//java/server/src/org/openqa/selenium/grid/component",

# Exports Session from Node.getSession
"//java/server/src/org/openqa/selenium/grid/data",

Expand All @@ -14,6 +17,7 @@ java_library(
],
deps = [
"//java/client/src/org/openqa/selenium/remote/tracing",
"//java/server/src/org/openqa/selenium/grid/component",
"//java/server/src/org/openqa/selenium/grid/data",
"//java/server/src/org/openqa/selenium/grid/web",
"//java/server/src/org/openqa/selenium/injector",
Expand Down
3 changes: 3 additions & 0 deletions java/server/src/org/openqa/selenium/grid/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.openqa.selenium.Capabilities;
import org.openqa.selenium.NoSuchSessionException;
import org.openqa.selenium.grid.component.HealthCheck;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.web.CommandHandler;
import org.openqa.selenium.grid.web.HandlerNotFoundException;
Expand Down Expand Up @@ -148,6 +149,8 @@ public UUID getId() {

public abstract NodeStatus getStatus();

public abstract HealthCheck getHealthCheck();

@Override
public boolean test(HttpRequest req) {
return routes.match(injector, req).isPresent();
Expand Down

0 comments on commit ff80caf

Please sign in to comment.