Skip to content

Commit

Permalink
Selenium 4.0 grid browser priority, first cut (#7442)
Browse files Browse the repository at this point in the history
Implements a prioritisation where we attempt to schedule jobs on nodes
so that we keep nodes with scarce resources (such as IE nodes) unused
as long as possible.
  • Loading branch information
mmerrell authored and shs96c committed Aug 5, 2019
1 parent cab7ff4 commit 0223766
Show file tree
Hide file tree
Showing 7 changed files with 464 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@

class Host {

private static final Logger LOG = Logger.getLogger("Selenium Distributor");
private static final Logger LOG = Logger.getLogger("Selenium Host");
private final Node node;
private final UUID nodeId;
private final URI uri;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -67,11 +69,12 @@
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class LocalDistributor extends Distributor {

private static final Json JSON = new Json();
private static final Logger LOG = Logger.getLogger("Selenium Distributor");
private static final Logger LOG = Logger.getLogger("Selenium Distributor (Local)");
private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true);
private final Set<Host> hosts = new HashSet<>();
private final DistributedTracer tracer;
Expand Down Expand Up @@ -117,18 +120,24 @@ public CreateSessionResponse newSession(HttpRequest request)
Lock writeLock = this.lock.writeLock();
writeLock.lock();
try {
selected = this.hosts.stream()
Stream<Host> firstRound = this.hosts.stream()
.filter(host -> host.getHostStatus() == UP)
// Find a host that supports this kind of thing
.filter(host -> host.hasCapacity(firstRequest.getCapabilities()))
.filter(host -> host.hasCapacity(firstRequest.getCapabilities()));

//of the hosts that survived the first round, separate into buckets and prioritize by browser "rarity"
Stream<Host> prioritizedHosts = getPrioritizedHostStream(firstRound, firstRequest.getCapabilities());

//Take the further-filtered Stream and prioritize by load, then by session age
selected = prioritizedHosts
.min(
// Now sort by node which has the lowest load (natural ordering)
Comparator.comparingDouble(Host::getLoad)
// Then last session created (oldest first), so natural ordering again
.thenComparingLong(Host::getLastSessionCreated)
// And use the host id as a tie-breaker.
.thenComparing(Host::getId))
// And reserve some space
// And reserve some space for this session
.map(host -> host.reserve(firstRequest));
} finally {
writeLock.unlock();
Expand All @@ -150,6 +159,100 @@ public CreateSessionResponse newSession(HttpRequest request)
}
}

/**
* Takes a Stream of Hosts, along with the Capabilities of the current request, and prioritizes the
* request by removing Hosts that offer Capabilities that are more rare. e.g. if there are only a
* couple Edge nodes, but a lot of Chrome nodes, the Edge nodes should be removed from
* consideration when Chrome is requested. This does not currently take the amount of load on the
* server into consideration--it only checks for availability, not how much availability
* @param hostStream Stream of hosts attached to the Distributor (assume it's filtered for only those that offer these Capabilities)
* @param capabilities Passing in the whole Capabilities object will allow us to prioritize more than just browser
* @return Stream of distinct Hosts with the more rare Capabilities removed
*/
@VisibleForTesting
Stream<Host> getPrioritizedHostStream(Stream<Host> hostStream, Capabilities capabilities) {
//TODO for the moment, we're not going to operate on the Stream that was passed in--we need to
// alter and futz with the contents, so the stream isn't the right place to operate. This
// will likely be optimized back into the algo, but not yet
Set<Host> filteredHostSet = hostStream.collect(Collectors.toSet());

//A "bucket" is a list of hosts that can use a particular browser. The "edge" bucket is the
// complete list of Hosts that support "edge". By separating Hosts into buckets, we will
// know which browsers have fewer nodes available for the browsers we're not interested in, and
// can prioritize based on the browsers that have more availability
Map<String, Set<Host>> hostBuckets = sortHostsToBucketsByBrowser(filteredHostSet);

//First, check to see if all buckets are the same size. If they are, just send back the full list of hosts
// (i.e. the hosts are all "balanced" with regard to browser priority)
if (allBucketsSameSize(hostBuckets)) {
return hostBuckets.values().stream().distinct().flatMap(Set::stream);
}

//Then, starting with the smallest bucket that isn't the current browser being prioritized,
// remove all hosts in that bucket from consideration, then rebuild the buckets. Then do the
// "same size" check again, and keep doing this until either a) there is only one bucket, or b)
// all buckets are the same size

//Note: there should never be a case where a bucket will have *more* nodes available for the
// given browser than the one being requested. The first filter in this check looks for
// "equal", not "equal-or-greater" as a result of this assumption

//There might be unforeseen cases that challenge this assumption. TODO Create unit tests to prove it

//TODO a List of Map.Entry is silly. whatever this structure needs to be needs to be returned by
// the sortHostsToBucketsByBrowser method in a way that we don't have to sort it separately like this
final List<Map.Entry<String, Set<Host>>> sorted = hostBuckets.entrySet().stream().sorted(
Comparator.comparingInt(v -> v.getValue().size())
).collect(Collectors.toList());

// Until the buckets are the same size, keep removing hosts that have more "rare" browser capabilities
Map<String, Set<Host>> newHostBuckets;
for (Map.Entry<String, Set<Host>> entry: sorted) {
//Don't examine the bucket containing the browser in question--we're prioritizing the other browsers
//TODO This shouldn't be necessary, because if the list is sorted by size, this won't be possible until
// they're all the same size. Create a unit test to prove it
if (entry.getKey().equals(capabilities.getBrowserName())) {
continue;
}

//Remove all hosts from this bucket from the full set of eligible hosts
final Set<Host> filteredHosts = filteredHostSet.stream().filter(host -> !entry.getValue().contains(host)).collect(Collectors.toSet());

//Rebuild the buckets by browser
newHostBuckets = sortHostsToBucketsByBrowser(filteredHosts);

//Check the bucket sizes--if they're the same, then we're done
if (allBucketsSameSize(newHostBuckets)) {
LOG.fine("Hosts have been balanced according to browser priority");
return newHostBuckets.values().stream().distinct().flatMap(Set::stream);
}
}

return hostBuckets.values().stream().distinct().flatMap(Set::stream);
}

@VisibleForTesting
Map<String, Set<Host>> sortHostsToBucketsByBrowser(Set<Host> hostSet) {
//Make a hash of browserType -> list of hosts that support it
Map<String, Set<Host>> hostBuckets = new HashMap<>();
hostSet.forEach(host -> host.asSummary().getStereotypes().forEach((k, v) -> {
if (!hostBuckets.containsKey(k.getBrowserName())) {
Set<Host> newSet = new HashSet<>();
newSet.add(host);
hostBuckets.put(k.getBrowserName(), newSet);
}
hostBuckets.get(k.getBrowserName()).add(host);
}));
return hostBuckets;
}

@VisibleForTesting
boolean allBucketsSameSize(Map<String, Set<Host>> hostBuckets) {
Set<Integer> intSet = new HashSet<>();
hostBuckets.values().forEach(bucket -> intSet.add(bucket.size()));
return intSet.size() == 1;
}

private void refresh(NodeStatus status) {
Objects.requireNonNull(status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.net.URL;
import java.util.Objects;
import java.util.UUID;
import java.util.logging.Logger;

import static org.openqa.selenium.remote.http.Contents.utf8String;
import static org.openqa.selenium.remote.http.HttpMethod.DELETE;
Expand All @@ -42,12 +43,15 @@
public class RemoteDistributor extends Distributor {

public static final Json JSON = new Json();
private static final Logger LOG = Logger.getLogger("Selenium Distributor (Remote)");
private final DistributedTracer tracer;
private final HttpHandler client;

public RemoteDistributor(DistributedTracer tracer, HttpClient.Factory factory, URL url) {
super(tracer, factory);

Objects.requireNonNull(factory);
this.tracer = Objects.requireNonNull(tracer);
Objects.requireNonNull(url);

this.client = factory.createClient(url);
Expand All @@ -74,6 +78,8 @@ public RemoteDistributor add(Node node) {

Values.get(response, Void.class);

LOG.info(String.format("Added node %s.", node.getId()));

return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void basic() {
* state
*/
@Test(timeout = 1000)
public void requestIsremovedFromTheQeueAfterItcrashes() {
public void requestIsRemovedFromTheQueueAfterItCrashes() {
// // should work
// try {
// SeleniumBasedRequest newSession = GridHelper.createNewSessionRequest(registry, ff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.openqa.selenium.grid.component.HealthCheck;
import org.openqa.selenium.grid.data.CreateSessionRequest;
import org.openqa.selenium.grid.data.DistributorStatus;
import org.openqa.selenium.grid.data.NodeStatus;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.distributor.local.LocalDistributor;
import org.openqa.selenium.grid.distributor.remote.RemoteDistributor;
Expand Down Expand Up @@ -61,14 +62,19 @@
import java.net.URISyntaxException;
import java.net.URL;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.openqa.selenium.remote.http.Contents.utf8String;
import static org.openqa.selenium.remote.http.HttpMethod.POST;

Expand All @@ -78,26 +84,27 @@ public class DistributorTest {
private EventBus bus;
private HttpClient.Factory clientFactory;
private Distributor local;
private Distributor distributor;
private ImmutableCapabilities caps;
private static final Logger LOG = Logger.getLogger("Distributor Test");

@Before
public void setUp() throws MalformedURLException {
public void setUp() {
tracer = DistributedTracer.builder().build();
bus = new GuavaEventBus();
clientFactory = HttpClient.Factory.createDefault();
LocalSessionMap sessions = new LocalSessionMap(tracer, bus);
local = new LocalDistributor(tracer, bus, HttpClient.Factory.createDefault(), sessions);
distributor = new RemoteDistributor(
tracer,
new PassthroughHttpClient.Factory(local),
new URL("http://does.not.exist/"));

caps = new ImmutableCapabilities("browserName", "cheese");
}

@Test
public void creatingANewSessionWithoutANodeEndsInFailure() {
public void creatingANewSessionWithoutANodeEndsInFailure() throws MalformedURLException {
Distributor distributor = new RemoteDistributor(
tracer,
new PassthroughHttpClient.Factory(local),
new URL("http://does.not.exist/"));

try (NewSessionPayload payload = NewSessionPayload.create(caps)) {
assertThatExceptionOfType(SessionNotCreatedException.class)
.isThrownBy(() -> distributor.newSession(createRequest(payload)));
Expand Down Expand Up @@ -174,7 +181,7 @@ public void shouldBeAbleToRemoveANode() throws URISyntaxException, MalformedURLE
bus,
new PassthroughHttpClient.Factory(node),
sessions);
distributor = new RemoteDistributor(
Distributor distributor = new RemoteDistributor(
tracer,
new PassthroughHttpClient.Factory(local),
new URL("http://does.not.exist"));
Expand Down Expand Up @@ -508,14 +515,87 @@ public void shouldReturnNodesThatWereDownToPoolOfNodesOnceTheyMarkTheirHealthChe
}
}

private Set<Node> createNodeSet(Distributor distributor, int count, Capabilities...capabilities) {
Set<Node> nodeSet = new HashSet<>();
for (int i=0; i<count; i++) {
URI uri = createUri();
LocalNode.Builder builder = LocalNode.builder(tracer, bus, clientFactory, uri);
for (Capabilities caps: capabilities) {
builder.add(caps, new TestSessionFactory((id, hostCaps) -> new HandledSession(uri, hostCaps)));
}
Node node = builder.build();
distributor.add(node);
nodeSet.add(node);
}
return nodeSet;
}

@Test
@Ignore
public void shouldPriotizeHostsWithTheMostSlotsAvailableForASessionType() {
// Consider the case where you have 1 Windows machine and 5 linux machines. All of these hosts
public void shouldPrioritizeHostsWithTheMostSlotsAvailableForASessionType() {
//SS: Consider the case where you have 1 Windows machine and 5 linux machines. All of these hosts
// can run Chrome and Firefox sessions, but only one can run Edge sessions. Ideally, the machine
// able to run Edge would be sorted last.

fail("Write me");
//Create the Distributor
CombinedHandler handler = new CombinedHandler();
SessionMap sessions = new LocalSessionMap(tracer, bus);
handler.addHandler(sessions);

LocalDistributor distributor = new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(handler),
sessions);
handler.addHandler(distributor);

//Create all three Capability types
Capabilities edgeCapabilities = new ImmutableCapabilities("browserName", "edge");
Capabilities firefoxCapabilities = new ImmutableCapabilities("browserName", "firefox");
Capabilities chromeCapabilities = new ImmutableCapabilities("browserName", "chrome");

//TODO This should probably be a map of browser -> all nodes that support <browser>
//Store our "expected results" sets for the various browser-specific nodes
Set<Node> edgeNodes = createNodeSet(distributor, 3, edgeCapabilities, chromeCapabilities, firefoxCapabilities);

//chromeNodes is all these new nodes PLUS all the Edge nodes from before
Set<Node> chromeNodes = createNodeSet(distributor,5, chromeCapabilities, firefoxCapabilities);
chromeNodes.addAll(edgeNodes);

//all nodes support firefox, so add them to the firefoxNodes set
Set<Node> firefoxNodes = createNodeSet(distributor,3, firefoxCapabilities);
firefoxNodes.addAll(edgeNodes);
firefoxNodes.addAll(chromeNodes);

//Assign 5 Chrome and 5 Firefox sessions to the distributor, make sure they don't go to the Edge node
for (int i=0; i<5; i++) {
try (NewSessionPayload chromePayload = NewSessionPayload.create(chromeCapabilities);
NewSessionPayload firefoxPayload = NewSessionPayload.create(firefoxCapabilities)) {

Session chromeSession = distributor.newSession(createRequest(chromePayload)).getSession();

assertThat( //Ensure the Uri of the Session matches one of the Chrome Nodes, not the Edge Node
chromeSession.getUri()).isIn(
chromeNodes
.stream().map(Node::getStatus).collect(Collectors.toList()) //List of getStatus() from the Set
.stream().map(NodeStatus::getUri).collect(Collectors.toList()) //List of getUri() from the Set
);

Session firefoxSession = distributor.newSession(createRequest(firefoxPayload)).getSession();
LOG.info(String.format("Firefox Session %d assigned to %s", i, chromeSession.getUri()));

boolean inFirefoxNodes = firefoxNodes.stream().anyMatch(node -> node.getUri().equals(firefoxSession.getUri()));
boolean inChromeNodes = chromeNodes.stream().anyMatch(node -> node.getUri().equals(chromeSession.getUri()));
//This could be either, or, or both
assertTrue(inFirefoxNodes || inChromeNodes);
}
}

//The Chrome Nodes should be full at this point, but Firefox isn't... so send an Edge session and make sure it routes to an Edge node
try (NewSessionPayload edgePayload = NewSessionPayload.create(edgeCapabilities)) {
Session edgeSession = distributor.newSession(createRequest(edgePayload)).getSession();

assertTrue(edgeNodes.stream().anyMatch(node -> node.getUri().equals(edgeSession.getUri())));
}
}

private Node createNode(Capabilities stereotype, int count, int currentLoad) {
Expand All @@ -540,14 +620,14 @@ private Node createNode(Capabilities stereotype, int count, int currentLoad) {
@Test
@Ignore
public void shouldCorrectlySetSessionCountsWhenStartedAfterNodeWithSession() {
fail("write me");
fail("write me!");
}

@Test
public void statusShouldIndicateThatDistributorIsNotAvailableIfNodesAreDown()
throws URISyntaxException {
Capabilities capabilities = new ImmutableCapabilities("cheese", "peas");
URI uri = new URI("http://exmaple.com");
URI uri = new URI("http://example.com");

Node node = LocalNode.builder(tracer, bus, clientFactory, uri)
.add(capabilities, new TestSessionFactory((id, caps) -> new Session(id, uri, caps)))
Expand Down

0 comments on commit 0223766

Please sign in to comment.