Skip to content
Permalink
Browse files

Version 539. Improved management of TCP connections.

  • Loading branch information
n-y-z-o committed Jul 31, 2019
1 parent c405250 commit c358bfda1523c0a107bf4fcf42719c9f58bc7487
@@ -0,0 +1,22 @@
package co.nyzo.verifier;

import java.net.Socket;

public class Connection {

private long timestamp;
private Socket socket;

public Connection(Socket socket) {
this.timestamp = System.currentTimeMillis();
this.socket = socket;
}

public long getTimestamp() {
return timestamp;
}

public Socket getSocket() {
return socket;
}
}
@@ -0,0 +1,69 @@
package co.nyzo.verifier;

import co.nyzo.verifier.util.PreferencesUtil;
import co.nyzo.verifier.util.PrintUtil;
import co.nyzo.verifier.util.ThreadUtil;
import co.nyzo.verifier.util.UpdateUtil;

import java.net.Socket;
import java.util.concurrent.ConcurrentLinkedQueue;

public class ConnectionManager {

private static final long closeDelay = PreferencesUtil.getLong("connection_close_delay", 500L);
private static final ConcurrentLinkedQueue<Connection> connections = new ConcurrentLinkedQueue<>();

static {
start();
}

private static void start() {

new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (!UpdateUtil.shouldTerminate()) {
try {
// The queue is FIFO. If there is no connection in the queue, sleeping for the close delay is
// safe, because any connection added after that will need at least that delay.
Connection connection = connections.poll();
if (connection == null) {
ThreadUtil.sleep(closeDelay);
} else {
long sleepTime = closeDelay + connection.getTimestamp() - System.currentTimeMillis();
if (sleepTime > 0) {
ThreadUtil.sleep(sleepTime);
}

fastCloseSocket(connection.getSocket());
}
} catch (Exception e) {
System.out.println("exception in ConnectionManager loop: " + PrintUtil.printException(e));
}
}
}
}).start();
}

public static void slowCloseSocket(Socket socket) {

if (socket != null) {
// Attempt to add the connection to the queue. If unsuccessful, close the socket immediately.
Connection connection = new Connection(socket);
if (!connections.offer(connection)) {
fastCloseSocket(socket);
}
}
}

public static void fastCloseSocket(Socket socket) {

if (socket != null) {
try {
socket.setSoLinger(true, 0);
socket.close();
} catch (Exception ignored) { }
}
}
}
@@ -208,10 +208,8 @@ private static void processSocket(Socket clientSocket, AtomicInteger activeReadT

byte[] ipAddress = clientSocket.getInetAddress().getAddress();
if (BlacklistManager.inBlacklist(ipAddress)) {
try {
numberOfMessagesRejected.incrementAndGet();
clientSocket.close();
} catch (Exception ignored) { }
numberOfMessagesRejected.incrementAndGet();
ConnectionManager.fastCloseSocket(clientSocket);
} else {
ByteBuffer ipBuffer = ByteBuffer.wrap(ipAddress);
int connectionsForIp = connectionsPerIp.merge(ipBuffer, 1, mergeFunction);
@@ -224,9 +222,7 @@ private static void processSocket(Socket clientSocket, AtomicInteger activeReadT
// Decrement the counter, add the IP to the blacklist, and close the socket without responding.
connectionsPerIp.merge(ipBuffer, -1, mergeFunction);
BlacklistManager.addToBlacklist(ipAddress);
try {
clientSocket.close();
} catch (Exception ignored) { }
ConnectionManager.fastCloseSocket(clientSocket);

} else {

@@ -239,7 +235,7 @@ public void run() {

try {
clientSocket.setSoTimeout(300);
readMessageAndRespond(clientSocket);
readMessageAndRespond(clientSocket); // socket is closed in this method
} catch (Exception ignored) { }

// Decrement the counter for this IP.
@@ -276,15 +272,13 @@ private static void readMessageAndRespond(Socket clientSocket) {
Message response = response(message);
if (response != null) {
clientSocket.getOutputStream().write(response.getBytesForTransmission());
clientSocket.getOutputStream().flush();
}
}

} catch (Exception ignored) { }

try {
Thread.sleep(3L);
clientSocket.close();
} catch (Exception ignored) { }
ConnectionManager.slowCloseSocket(clientSocket);
}

private static void readMessage(DatagramPacket packet) {
@@ -199,12 +199,10 @@ public static void fetchTcp(String hostNameOrIp, int port, Message message, Mess
public void run() {
Socket socket = new Socket();
try {
socket.connect(new InetSocketAddress(hostNameOrIp, port), 3000);
socket.connect(new InetSocketAddress(hostNameOrIp, port), 2000);
} catch (Exception e) {
if (socket.isConnected()) {
try {
socket.close();
} catch (Exception ignored) { }
ConnectionManager.fastCloseSocket(socket);
}
socket = null;
}
@@ -219,18 +217,15 @@ public void run() {
OutputStream outputStream = socket.getOutputStream();
outputStream.write(message.getBytesForTransmission());

socket.setSoTimeout(1000);
response = readFromStream(socket.getInputStream(), socket.getInetAddress().getAddress(),
message.getType());
} catch (Exception reportOnly) {
System.err.println("Exception sending message " + message.getType() + " to " +
hostNameOrIp + ":" + port + ": " + PrintUtil.printException(reportOnly));
}

try {
socket.close();
} catch (Exception ignored) {
System.out.println("unable to close socket to " + hostNameOrIp + ":" + port);
}
ConnectionManager.fastCloseSocket(socket);
}

if (messageCallback != null) {
@@ -2,7 +2,7 @@

public class Version {

private static final int version = 538;
private static final int version = 539;

public static int getVersion() {

@@ -4,8 +4,10 @@

public static void sleep(long milliseconds) {

try {
Thread.sleep(Math.max(0L, milliseconds));
} catch (Exception ignored) { }
if (milliseconds > 0) {
try {
Thread.sleep(milliseconds);
} catch (Exception ignored) { }
}
}
}

0 comments on commit c358bfd

Please sign in to comment.
You can’t perform that action at this time.