Skip to content

Commit

Permalink
[grid]: Avoid endlessly updating the same node to a local distributor
Browse files Browse the repository at this point in the history
  • Loading branch information
shs96c committed Mar 19, 2019
1 parent 036d5ac commit f5fecf2
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.openqa.selenium.grid.distributor.local.Slot.Status.ACTIVE;
import static org.openqa.selenium.grid.distributor.local.Slot.Status.AVAILABLE;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import org.openqa.selenium.Capabilities;
Expand Down Expand Up @@ -58,49 +59,22 @@ class Host {
private final Node node;
private final UUID nodeId;
private final URI uri;
private final int maxSessionCount;

private Status status;
private final Runnable performHealthCheck;

// Used any time we need to read or modify `available`
// Used any time we need to read or modify the mutable state of this host
private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true);
private final List<Slot> slots;
private Status status;
private List<Slot> slots;
private int maxSessionCount;

public Host(EventBus bus, Node node) {
this.node = Objects.requireNonNull(node);

this.nodeId = node.getId();
this.uri = node.getUri();

NodeStatus status = node.getStatus();

// This is grossly inefficient. But we're on a modern processor and we're expecting 10s to 100s
// of nodes, so this is probably ok.
Set<NodeStatus.Active> sessions = status.getCurrentSessions();
Map<Capabilities, Integer> actives = sessions.parallelStream().collect(
groupingBy(NodeStatus.Active::getStereotype, summingInt(active -> 1)));

ImmutableList.Builder<Slot> slots = ImmutableList.builder();
status.getStereotypes().forEach((caps, count) -> {
if (actives.containsKey(caps)) {
Integer activeCount = actives.get(caps);
for (int i = 0; i < activeCount; i++) {
slots.add(new Slot(node, caps, ACTIVE));
}
count -= activeCount;
}

for (int i = 0; i < count; i++) {
slots.add(new Slot(node, caps, AVAILABLE));
}
});
this.slots = slots.build();

// By definition, we can never have more sessions than we have slots available
this.maxSessionCount = Math.min(this.slots.size(), status.getMaxSessionCount());

this.status = Status.DOWN;
this.slots = ImmutableList.of();

HealthCheck healthCheck = node.getHealthCheck();

Expand Down Expand Up @@ -128,6 +102,43 @@ public Host(EventBus bus, Node node) {
SessionId id = event.getData(SessionId.class);
this.slots.forEach(slot -> slot.onEnd(id));
});

update(node.getStatus());
}

void update(NodeStatus status) {
Objects.requireNonNull(status);

Lock writeLock = lock.writeLock();
writeLock.lock();
try {
// This is grossly inefficient. But we're on a modern processor and we're expecting 10s to 100s
// of nodes, so this is probably ok.
Set<NodeStatus.Active> sessions = status.getCurrentSessions();
Map<Capabilities, Integer> actives = sessions.parallelStream().collect(
groupingBy(NodeStatus.Active::getStereotype, summingInt(active -> 1)));

ImmutableList.Builder<Slot> slots = ImmutableList.builder();
status.getStereotypes().forEach((caps, count) -> {
if (actives.containsKey(caps)) {
Integer activeCount = actives.get(caps);
for (int i = 0; i < activeCount; i++) {
slots.add(new Slot(node, caps, ACTIVE));
}
count -= activeCount;
}

for (int i = 0; i < count; i++) {
slots.add(new Slot(node, caps, AVAILABLE));
}
});
this.slots = slots.build();

// By definition, we can never have more sessions than we have slots available
this.maxSessionCount = Math.min(this.slots.size(), status.getMaxSessionCount());
} finally {
writeLock.unlock();
}
}

public UUID getId() {
Expand Down Expand Up @@ -228,7 +239,8 @@ public Supplier<CreateSessionResponse> reserve(CreateSessionRequest sessionReque
}
}

void refresh() {
@VisibleForTesting
void runHealthCheck() {
performHealthCheck.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class LocalDistributor extends Distributor {
private final Set<Host> hosts = new HashSet<>();
private final DistributedTracer tracer;
private final EventBus bus;
private final HttpClient.Factory clientFactory;
private final SessionMap sessions;
private final Regularly hostChecker = new Regularly("distributor host checker");
private final Map<UUID, Collection<Runnable>> allChecks = new ConcurrentHashMap<>();
Expand All @@ -87,14 +88,10 @@ public LocalDistributor(
super(tracer, clientFactory);
this.tracer = Objects.requireNonNull(tracer);
this.bus = Objects.requireNonNull(bus);
this.clientFactory = Objects.requireNonNull(clientFactory);
this.sessions = Objects.requireNonNull(sessions);

bus.addListener(NODE_STATUS, event -> {
NodeStatus status = event.getData(NodeStatus.class);

Node node = new RemoteNode(tracer, clientFactory, status.getNodeId(), status.getUri(), status.getStereotypes().keySet());
add(node);
});
bus.addListener(NODE_STATUS, event -> refresh(event.getData(NodeStatus.class)));
}

@Override
Expand Down Expand Up @@ -152,8 +149,42 @@ public CreateSessionResponse newSession(HttpRequest request)
}
}

private void refresh(NodeStatus status) {
Objects.requireNonNull(status);

// Iterate over the available nodes to find a match.
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
Optional<Host> existing = hosts.stream()
.filter(host -> host.getId().equals(status.getNodeId()))
.findFirst();


if (existing.isPresent()) {
// Modify the state
existing.get().update(status);
} else {
// No match made. Add a new host.
Node node = new RemoteNode(
tracer,
clientFactory,
status.getNodeId(),
status.getUri(),
status.getStereotypes().keySet());
add(node, status);
}
} finally {
writeLock.unlock();
}
}

@Override
public LocalDistributor add(Node node) {
return add(node, node.getStatus());
}

private LocalDistributor add(Node node, NodeStatus status) {
StringBuilder sb = new StringBuilder();

Lock writeLock = this.lock.writeLock();
Expand All @@ -163,13 +194,13 @@ public LocalDistributor add(Node node) {
out.setPrettyPrint(false).write(node);
span.addTag("node", sb.toString());

// TODO: We should check to see what happens for duplicate nodes.
Host host = new Host(bus, node);
host.update(status);
hosts.add(host);
LOG.info(String.format("Added node %s.", node.getId()));
host.refresh();
host.runHealthCheck();

Runnable runnable = host::refresh;
Runnable runnable = host::runHealthCheck;
Collection<Runnable> nodeRunnables = allChecks.getOrDefault(node.getId(), new ArrayList<>());
nodeRunnables.add(runnable);
allChecks.put(node.getId(), nodeRunnables);
Expand Down Expand Up @@ -215,7 +246,7 @@ public void refresh() {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
hosts.forEach(Host::refresh);
hosts.forEach(Host::runHealthCheck);
} finally {
writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertEquals;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -155,6 +156,38 @@ public void shouldBeAbleToRegisterNodesByListeningForEvents() throws URISyntaxEx
assertEquals(1, summary.getStereotypes().get(CAPS).intValue());
}

@Test
public void distributorShouldUpdateStateOfExistingNodeWhenNodePublishesStateChange()
throws URISyntaxException {
URI sessionUri = new URI("http://example:1234");
Node node = LocalNode.builder(tracer, bus, clientFactory, externalUrl.toURI())
.add(CAPS, c -> new Session(new SessionId(UUID.randomUUID()), sessionUri, c))
.build();
handler.addHandler(node);

bus.fire(new NodeStatusEvent(node.getStatus()));

// Start empty
wait.until(obj -> distributor.getStatus().hasCapacity());

DistributorStatus.NodeSummary summary = getOnlyElement(distributor.getStatus().getNodes());
assertEquals(1, summary.getStereotypes().get(CAPS).intValue());

// Craft a status that makes it look like the node is busy, and post it on the bus.
NodeStatus status = node.getStatus();
NodeStatus crafted = new NodeStatus(
status.getNodeId(),
status.getUri(),
status.getMaxSessionCount(),
status.getStereotypes(),
ImmutableSet.of(new NodeStatus.Active(CAPS, new SessionId(UUID.randomUUID()), CAPS)));

bus.fire(new NodeStatusEvent(crafted));

// We claimed the only slot is filled. Life is good.
wait.until(obj -> !distributor.getStatus().hasCapacity());
}

static class CustomNode extends Node {

private final EventBus bus;
Expand Down

0 comments on commit f5fecf2

Please sign in to comment.