Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime WebSockets Event Server #261

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .construction_tools/NodeJS_ws_overload_test/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "ws_overload_test",
"version": "1.0.0",
"description": "",
"main": "ws_overload_test.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"license": "ISC",
"dependencies": {
"ws": "^1.1.5"
}
}
17 changes: 17 additions & 0 deletions .construction_tools/NodeJS_ws_overload_test/run.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#check if Node is installed
if ! [ -x "$(command -v node)" ]; then
echo -e '\033[0;31mError: Node.js is not installed.\033[0m' >&2
exit 1
fi

#check if ./node_modules/ws exists
if [ ! -d "./node_modules/ws" ]; then
npm install
fi


#amount of processes
#connections per process
#address

node ws_overload_test.js 20 1000 "ws://localhost:8081/realtime_events/v1"
36 changes: 36 additions & 0 deletions .construction_tools/NodeJS_ws_overload_test/ws_overload_test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
const cluster = require('cluster');
const WebSocket = require('ws')

const numProcesses = process.argv[2]; // Get number of processes from argument
const numConnectionsPerProcess = process.argv[3]; // Get number of connections per process from argument
const serverAddress = process.argv[4]; // Get server address from argument
const futureTime = Date.now() + 5000; // Set time 5 seconds in the future


if (cluster.isMaster) {
console.log(`Master process started (PID: ${process.pid})`);
console.log(`Starting ${numProcesses} processes, each creating ${numConnectionsPerProcess} connections to ${serverAddress}`);

// Spawn worker processes
for (let i = 0; i < numProcesses; i++) {
cluster.fork();
}

cluster.on('exit', (worker, code, signal) => {
console.log(`Worker process (PID: ${worker.process.pid}) exited with code: ${code}, signal: ${signal}`);
});
}

else {
console.log(`Worker process started (PID: ${process.pid})`);

// Wait until future time before creating connections
while (Date.now() < futureTime) {}

// Create WebSocket connections
for (let i = 0; i < numConnectionsPerProcess; i++) {
const ws = new WebSocket(serverAddress);
ws.on('open', () => console.log(`Worker ${process.pid} - Connection ${i+1} opened`));
ws.on('error', (error) => console.error(`Worker ${process.pid} - Connection ${i+1} error:`, error));
}
}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ science/
.idea/
/screenshots/
latest/
node_modules/
14 changes: 14 additions & 0 deletions GlobalQuakeCore/src/main/java/globalquake/core/Settings.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,13 @@ public final class Settings {
public static String discordBotChannelID;
@SuppressWarnings("unused")
public static Boolean discordBotSendRevisions;

public static Boolean enableRTWSEventServer;
public static String RTWSEventIP;
public static Integer RTWSEventPort;
public static Integer RTWSEventMaxConnections;
public static Integer RTWSMaxConnectionsPerUniqueIP;

public static Boolean hideClustersWithQuake;
public static Boolean antialiasingQuakes;
public static Boolean antialiasingOldQuakes;
Expand Down Expand Up @@ -245,6 +252,13 @@ private static void load() {
loadProperty("FDSNWSEventIP", "localhost"); //As a default, localhost is used for security.
loadProperty("FDSNWSEventPort", "8080");
loadProperty("autoStartFDSNWSEventServer", "false");


loadProperty("enableRTWSEventServer", "false");
loadProperty("RTWSEventIP", "localhost"); //As a default, localhost is used for security.
loadProperty("RTWSEventPort", "8081");
loadProperty("RTWSEventMaxConnections", "10000");
loadProperty("RTWSMaxConnectionsPerUniqueIP", "10");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


loadProperty("shakingLevelScale", "0",
o -> validateInt(0, IntensityScales.INTENSITY_SCALES.length - 1, (Integer) o));
Expand Down
21 changes: 20 additions & 1 deletion GlobalQuakeServer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,26 @@
<artifactId>JTransforms</artifactId>
<version>3.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json/json -->


<!-- https://mvnrepository.com/artifact/org.eclipse.jetty/jetty-server -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>11.0.20</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.eclipse.jetty.websocket/websocket-jetty-server -->
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-jetty-server</artifactId>
<version>11.0.20</version>
</dependency>






<dependency>
<groupId>org.tinylog</groupId>
Expand Down
12 changes: 10 additions & 2 deletions GlobalQuakeServer/src/main/java/gqserver/main/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import gqserver.bot.DiscordBot;
import gqserver.fdsnws_event.FdsnwsEventsHTTPServer;

import gqserver.websocketserver.WebSocketEventServer;
import globalquake.utils.Scale;
import gqserver.server.GlobalQuakeServer;
import gqserver.ui.server.DatabaseMonitorFrame;
Expand Down Expand Up @@ -137,7 +137,8 @@ public static void updateProgressBar(String status, int value) {
}
}

private static final double PHASES = 10.0;
private static final double PHASES = 11.0;

private static int phase = 0;

public static void initAll() throws Exception {
Expand Down Expand Up @@ -173,6 +174,13 @@ public static void initAll() throws Exception {
getErrorHandler().handleWarning(new RuntimeException("Unable to start FDSNWS EVENT server! Check logs for more info.", e));
}
}

//Start the WebSocket Server, if enabled
updateProgressBar("Starting WebSocket Server...", (int) ((phase++ / PHASES) * 100.0));
if(Settings.enableRTWSEventServer){
WebSocketEventServer.getInstance().init();
WebSocketEventServer.getInstance().start();
}

updateProgressBar("Starting Discord Bot...", (int) ((phase++ / PHASES) * 100.0));
if (Settings.discordBotEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package gqserver.websocketserver;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.Future;

import org.eclipse.jetty.websocket.api.Session;

import org.tinylog.Logger;

public class Client {
private Session session;
private String ip;
private String uniqueID;
private Future<?> pingFuture;
private Long lastMessageTime = 0L;

private static Duration pingInterval = Duration.ofSeconds(25);

/**
* Create a new client object from a Jetty WebSocket session
* @param session
*/
public Client(Session session) {
this.session = session;


SocketAddress remoteAddress = session.getRemoteAddress();
//If the remote address is null, close the connection. Might happen.. idk
if(remoteAddress == null) {
Logger.error("A critical error occurred while trying to get the remote address for a new client");
session.close(0, "No remote address");
return;
}

InetSocketAddress inetAddress = (InetSocketAddress) remoteAddress;

ip = inetAddress.getAddress().getHostAddress();
uniqueID = ip + ":" + inetAddress.getPort();

pingFuture = WebSocketEventServer.getInstance().getClientsHandler().getPingExecutor().scheduleAtFixedRate(this::pingThread, pingInterval.toMillis(), pingInterval.toMillis(), java.util.concurrent.TimeUnit.MILLISECONDS);
}


private void pingThread(){
if(!isConnected()) {
pingFuture.cancel(true);
return;
}

Long timeSinceLastMessage = System.currentTimeMillis() - lastMessageTime;

//If the time since the last message is less than a third of the ping interval, don't send a ping
if(timeSinceLastMessage<pingInterval.toMillis()/3){
DecryptingElectrons marked this conversation as resolved.
Show resolved Hide resolved
return;
}

try {
session.getRemote().sendPing(null);
} catch (Exception e) {
session.close();
}

lastMessageTime = System.currentTimeMillis();
}

public void sendString(String message) throws IOException {
session.getRemote().sendString(message);
updateLastMessageTime();
}

public boolean isConnected() {
return session.isOpen();
}

public void disconnectEvent() {
WebSocketEventServer.getInstance().getClientsHandler().clientDisconnected(this.getUniqueID());
}

public void updateLastMessageTime() {
lastMessageTime = System.currentTimeMillis();
}

public String getIP() {
return ip;
}

public Session getSession() {
return session;
}

public String getUniqueID() {
return uniqueID;
}
}
Loading
Loading