Skip to content

Commit

Permalink
[grid] Land an event bus that works
Browse files Browse the repository at this point in the history
  • Loading branch information
shs96c committed Feb 1, 2019
1 parent 812995b commit 1a67d56
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 184 deletions.
112 changes: 112 additions & 0 deletions java/client/src/org/openqa/selenium/net/NetworkUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@

import static org.openqa.selenium.net.NetworkInterface.isIpv6;

import com.google.common.collect.Iterables;

import org.openqa.selenium.Platform;
import org.openqa.selenium.WebDriverException;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.MulticastSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class NetworkUtils {

private final NetworkInterfaceProvider networkInterfaceProvider;
private volatile String hostname;
private volatile String address;

NetworkUtils(NetworkInterfaceProvider networkInterfaceProvider) {
this.networkInterfaceProvider = networkInterfaceProvider;
Expand All @@ -38,6 +50,23 @@ public NetworkUtils() {
this(new DefaultNetworkInterfaceProvider());
}

/**
* Makes a best-effort attempt to figure out an externally addressable name for this host, falling
* back to a local connection only. This may be a hostname, an IPv4 address, an IPv6 address, or
* (as a last resort) localhost.
*/
public String getHostname() {
determineHostnameAndAddress();

return hostname;
}

public String getHostAddress() {
determineHostnameAndAddress();

return address;
}

public String getPrivateLocalAddress() {
List<InetAddress> addresses = getLocalInterfaceAddress();
if (addresses.isEmpty()) {
Expand Down Expand Up @@ -214,4 +243,87 @@ private static void dumpAddresses(StringBuilder result, Iterable<InetAddress> in
result.append("\n");
}
}

private synchronized void determineHostnameAndAddress() {
if (hostname != null) {
return;
}

// Ideally, we'd use InetAddress.getLocalHost, but this does a reverse DNS lookup. On Windows
// and Linux this is apparently pretty fast, so we don't get random hangs. On OS X it's
// amazingly slow. That's less than ideal. Figure things out and cache.

String host = System.getenv("HOSTNAME"); // Most OSs
if (host == null) {
host = System.getenv("COMPUTERNAME"); // Windows
}
if (host == null && Platform.getCurrent().is(Platform.MAC)) {
try {
Process process = Runtime.getRuntime().exec("hostname");

if (!process.waitFor(2, TimeUnit.SECONDS)) {
process.destroyForcibly();
// According to the docs for `destroyForcibly` this is a good idea.
process.waitFor(2, TimeUnit.SECONDS);
}
if (process.exitValue() == 0) {
try (InputStreamReader isr = new InputStreamReader(process.getInputStream());
BufferedReader reader = new BufferedReader(isr)) {
host = reader.readLine();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (Exception e) {
// fall through
}
}
if (host == null) {
// Give up.
try {
host = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
host = "localhost"; // At least we tried.
}
}

this.hostname = host;

String address = null;
// Now for the IP address. We're going to do silly shenanigans on OS X only.
if (Platform.getCurrent().is(Platform.MAC)) {
try {
for (NetworkInterface iface : networkInterfaceProvider.getNetworkInterfaces()) {
if (iface.getName().startsWith("en")) {
for (InetAddress inetAddress : iface.getInetAddresses()) {
try {
if (inetAddress.isReachable(100)) {
address = inetAddress.getHostAddress();
break;
}
} catch (ConnectException e) {
// Well, this is fine.
}
}
}
if (address != null) {
break;
}
}
} catch (Exception e) {
// Fall through and go the slow way.
}
}
if (address == null) {
// Alright. I give up.
try {
address = InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) {
address = "127.0.0.1";
}
}

this.address = address;
}
}
27 changes: 27 additions & 0 deletions java/server/src/org/openqa/selenium/events/Event.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.openqa.selenium.events;

import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;

public class Event {
Expand Down Expand Up @@ -47,4 +48,30 @@ public Type getType() {
public Object getData() {
return data;
}

@Override
public String toString() {
return new StringJoiner(", ", Event.class.getSimpleName() + "[", "]")
.add("id=" + id)
.add("type=" + type)
.add("data=" + data)
.toString();
}

@Override
public boolean equals(Object o) {
if (!(o instanceof Event)) {
return false;
}

Event that = (Event) o;
return Objects.equals(this.getId(), that.getId()) &&
Objects.equals(this.getType(), that.getType()) &&
Objects.equals(this.getData(), that.getData());
}

@Override
public int hashCode() {
return Objects.hash(getId(), getType(), getData());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.events.zeromq;

import org.openqa.selenium.events.Event;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.events.Type;
import org.openqa.selenium.net.NetworkUtils;
import org.openqa.selenium.net.PortProber;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.logging.Logger;

class BoundZmqEventBus implements EventBus {

public static final Logger LOG = Logger.getLogger(EventBus.class.getName());
private final UnboundZmqEventBus delegate;
private final ZMQ.Socket xpub;
private final ZMQ.Socket xsub;
private final ExecutorService executor;

BoundZmqEventBus(ZContext context, String connection) {
LOG.info("Binding event bus to " + connection);
ZMQ.Socket rep = context.createSocket(ZMQ.REP);
rep.bind(connection);

xpub = context.createSocket(ZMQ.XPUB);
xsub = context.createSocket(ZMQ.XSUB);

String address = new NetworkUtils().getHostAddress();
Addresses xpubAddr = deriveAddresses(address, connection);
Addresses xsubAddr = deriveAddresses(address, connection);

LOG.info(String.format("XPUB binding to %s, XSUB binding to %s", xpubAddr, xsubAddr));

xpub.bind(xpubAddr.bindTo);
xsub.bind(xsubAddr.bindTo);

executor = Executors.newFixedThreadPool(2, r -> {
Thread thread = new Thread(r, "Message Bus Proxy");
thread.setDaemon(true);
return thread;
});
executor.submit(() -> ZMQ.proxy(xsub, xpub, null));

executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
rep.recvStr();
rep.sendMore(xpubAddr.advertise);
rep.send(xsubAddr.advertise);
}
});

delegate = new UnboundZmqEventBus(context, connection);
LOG.info("Event bus ready");
}



@Override
public void addListener(Type type, Consumer<Event> onType) {
delegate.addListener(type, onType);
}

@Override
public void fire(Event event) {
delegate.fire(event);
}

@Override
public void close() {
delegate.close();
executor.shutdown();
xsub.close();
xpub.close();
}

private Addresses deriveAddresses(String host, String connection) {
if (connection.startsWith("inproc:")) {
String address = String.format(
"%s-%s",
connection,
Long.toHexString(UUID.randomUUID().getMostSignificantBits()).substring(0, 8));
return new Addresses(address, address);
}

if (!connection.startsWith("tcp://")) {
throw new IllegalArgumentException("Connection string must begin with inproc:// or tcp://");
}

int length = "tcp://".length();
int colon = connection.indexOf(":", length);
if (colon == -1) {
throw new IllegalArgumentException("Unable to determine hostname from " + connection);
}
String hostName = connection.substring(length, colon);

if (!"*".equals(hostName)) {
host = hostName;
}

int port = PortProber.findFreePort();
return new Addresses(
String.format("tcp://%s:%d", hostName, port),
String.format("tcp://%s:%d", host, port));
}

private static class Addresses {
Addresses(String bindTo, String advertise) {
this.bindTo = bindTo;
this.advertise = advertise;
}

String bindTo;
String advertise;

@Override
public String toString() {
return advertise;
}
}
}

0 comments on commit 1a67d56

Please sign in to comment.