From 5f3bab396d19a1374f1aa89e842ce4c5c15d3c51 Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Thu, 16 May 2024 15:22:35 +0300 Subject: [PATCH 01/10] Open a new RELP connection after a certain amount of messages have been sent with it. Two new configuration parameters for rebinding. --- README.adoc | 2 + etc/config.properties | 2 + src/main/java/com/teragrep/lsh_01/Main.java | 6 +- .../teragrep/lsh_01/config/RelpConfig.java | 11 ++- .../lsh_01/pool/RelpConnectionPool.java | 58 ++++++++---- src/test/java/CredentialsTest.java | 60 ++++++++++--- src/test/java/EndToEndTest.java | 2 +- src/test/java/LookupTest.java | 18 +++- src/test/java/RelpConnectionPoolTest.java | 90 +++++++++++++++++++ 9 files changed, 216 insertions(+), 33 deletions(-) create mode 100644 src/test/java/RelpConnectionPoolTest.java diff --git a/README.adoc b/README.adoc index f31bdf2..6fda0d4 100644 --- a/README.adoc +++ b/README.adoc @@ -27,6 +27,8 @@ server.maxContentLength,262144,How big requests are allowed in bytes relp.target,127.0.0.1,RELP server address relp.port,601,RELP server port relp.reconnectInterval,10000,How long to wait before reconnecting in milliseconds +relp.rebindRequestAmount, 1000000, How many requests from a RELP connection until rebinding it +relp.rebindEnabled, false, Sets whether rebinding RELP connections is enabled healthcheck.enabled,true,Sets if an internal healthcheck endpoint is enabled. healthcheck.url,/healthcheck,An internal healthcheck endpoint that will always reply 200 ok regardless of security settings. Accessing this url won't generate any events. security.authRequired,true,Sets whether Basic HTTP Authorization headers are required. Username for lookups will be empty string '' if set to false. diff --git a/etc/config.properties b/etc/config.properties index 2ae68a9..722a5b7 100644 --- a/etc/config.properties +++ b/etc/config.properties @@ -10,6 +10,8 @@ healthcheck.url=/healthcheck relp.target=127.0.0.1 relp.port=601 relp.reconnectInterval=10000 +relp.rebindRequestAmount=1000000 +relp.rebindEnabled=false security.authRequired=true diff --git a/src/main/java/com/teragrep/lsh_01/Main.java b/src/main/java/com/teragrep/lsh_01/Main.java index 2b3b269..5fa8782 100644 --- a/src/main/java/com/teragrep/lsh_01/Main.java +++ b/src/main/java/com/teragrep/lsh_01/Main.java @@ -59,7 +59,11 @@ public static void main(String[] args) { LOGGER.info("Authentication required: <[{}]>", securityConfig.authRequired); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, diff --git a/src/main/java/com/teragrep/lsh_01/config/RelpConfig.java b/src/main/java/com/teragrep/lsh_01/config/RelpConfig.java index 099c0ed..977f2ad 100644 --- a/src/main/java/com/teragrep/lsh_01/config/RelpConfig.java +++ b/src/main/java/com/teragrep/lsh_01/config/RelpConfig.java @@ -24,6 +24,8 @@ public class RelpConfig implements Validateable { public final String relpTarget; public final int relpPort; public final int relpReconnectInterval; + public final int rebindRequestAmount; + public final boolean rebindEnabled; public RelpConfig() { PropertiesReaderUtilityClass propertiesReader = new PropertiesReaderUtilityClass( @@ -32,16 +34,21 @@ public RelpConfig() { relpTarget = propertiesReader.getStringProperty("relp.target"); relpPort = propertiesReader.getIntProperty("relp.port"); relpReconnectInterval = propertiesReader.getIntProperty("relp.reconnectInterval"); + rebindRequestAmount = propertiesReader.getIntProperty("relp.rebindRequestAmount"); + rebindEnabled = propertiesReader.getBooleanProperty("relp.rebindEnabled"); } @Override public void validate() { - + if (rebindEnabled && rebindRequestAmount < 1) { + throw new IllegalArgumentException("relp.rebindRequestAmount has to be a positive number"); + } } @Override public String toString() { return "RelpConfig{" + "relpTarget='" + relpTarget + '\'' + ", relpPort=" + relpPort - + ", relpReconnectInterval=" + relpReconnectInterval + '}'; + + ", relpReconnectInterval=" + relpReconnectInterval + ", rebindRequestAmount=" + rebindRequestAmount + + ", rebindEnabled=" + rebindEnabled + '}'; } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java index 831feea..7de0b5a 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; @@ -35,17 +36,31 @@ public class RelpConnectionPool implements AutoCloseable { private final Supplier relpConnectionWrapSupplier; + private final int rebindRequestAmount; + + private final boolean rebindEnabled; + private final ConcurrentLinkedQueue queue; + // connection, how many times it has been used + private final HashMap connectionsTaken; + private final RelpConnectionStub relpConnectionStub; private final Lock lock = new ReentrantLock(); private final AtomicBoolean close; - public RelpConnectionPool(final Supplier relpConnectionWrapSupplier) { + public RelpConnectionPool( + final Supplier relpConnectionWrapSupplier, + int rebindRequestAmount, + boolean rebindEnabled + ) { this.relpConnectionWrapSupplier = relpConnectionWrapSupplier; + this.rebindRequestAmount = rebindRequestAmount; + this.rebindEnabled = rebindEnabled; this.queue = new ConcurrentLinkedQueue<>(); + this.connectionsTaken = new HashMap<>(); this.relpConnectionStub = new RelpConnectionStub(); this.close = new AtomicBoolean(); @@ -62,6 +77,17 @@ public IRelpConnection take() { frameDelegate = queue.poll(); if (frameDelegate == null) { frameDelegate = relpConnectionWrapSupplier.get(); + connectionsTaken.put(frameDelegate, 1); + } + else if (connectionsTaken.get(frameDelegate) >= rebindRequestAmount && rebindEnabled) { + // Rebind + this.connectionTearDown(frameDelegate); + frameDelegate = relpConnectionWrapSupplier.get(); + connectionsTaken.put(frameDelegate, 1); + } + else { + // Reuse + connectionsTaken.put(frameDelegate, connectionsTaken.get(frameDelegate) + 1); } } @@ -82,21 +108,7 @@ public void offer(IRelpConnection iRelpConnection) { break; } else { - try { - LOGGER.debug("Closing frameDelegate <{}>", queuedConnection); - queuedConnection.disconnect(); - LOGGER.debug("Closed frameDelegate <{}>", queuedConnection); - } - catch (Exception exception) { - LOGGER - .warn( - "Exception <{}> while closing frameDelegate <{}>", - exception.getMessage(), queuedConnection - ); - } - finally { - queuedConnection.tearDown(); - } + this.connectionTearDown(queuedConnection); } } lock.unlock(); @@ -114,4 +126,18 @@ public void close() { // close all that are in the pool right now offer(relpConnectionStub); } + + private void connectionTearDown(IRelpConnection connection) { + try { + LOGGER.debug("Closing frameDelegate <{}>", connection); + connection.disconnect(); + LOGGER.debug("Closed frameDelegate <{}>", connection); + } + catch (Exception exception) { + LOGGER.warn("Exception <{}> while closing frameDelegate <{}>", exception.getMessage(), connection); + } + finally { + connection.tearDown(); + } + } } diff --git a/src/test/java/CredentialsTest.java b/src/test/java/CredentialsTest.java index 88b6e6c..0d0fc9a 100644 --- a/src/test/java/CredentialsTest.java +++ b/src/test/java/CredentialsTest.java @@ -56,7 +56,11 @@ public void testNoAuthRequired() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -84,7 +88,11 @@ public void testAuthRequired() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -107,7 +115,11 @@ public void testValidBase64ButNoColon() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -130,7 +142,11 @@ public void testMultipleColons() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -151,7 +167,11 @@ public void testInvalidBase64Auth() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -173,7 +193,11 @@ public void testNonBasicAuth() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -198,7 +222,11 @@ public void testWrongCredentials() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -223,7 +251,11 @@ public void testEmptyUsername() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -248,7 +280,11 @@ public void testEmptyPassword() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -271,7 +307,11 @@ public void testNullToken() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, diff --git a/src/test/java/EndToEndTest.java b/src/test/java/EndToEndTest.java index 7c876d5..006e320 100644 --- a/src/test/java/EndToEndTest.java +++ b/src/test/java/EndToEndTest.java @@ -46,7 +46,7 @@ void setUp() throws InterruptedException { System.setProperty("security.authRequired", "false"); // Start listening to HTTP-requests - Thread program = new Thread(() -> Main.main(new String[]{})); + Thread program = new Thread(() -> Main.main(new String[] {})); program.start(); Thread.sleep(3000); // wait for netty to start up diff --git a/src/test/java/LookupTest.java b/src/test/java/LookupTest.java index fd8b3cd..bb9414e 100644 --- a/src/test/java/LookupTest.java +++ b/src/test/java/LookupTest.java @@ -63,7 +63,11 @@ public void testAppnameLookup() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -88,7 +92,11 @@ public void testHostnameLookup() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, @@ -115,7 +123,11 @@ public void testMissingLookups() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); RelpConversion relpConversion = new RelpConversion( relpConnectionPool, securityConfig, diff --git a/src/test/java/RelpConnectionPoolTest.java b/src/test/java/RelpConnectionPoolTest.java new file mode 100644 index 0000000..bf57d97 --- /dev/null +++ b/src/test/java/RelpConnectionPoolTest.java @@ -0,0 +1,90 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +import com.teragrep.lsh_01.config.RelpConfig; +import com.teragrep.lsh_01.pool.IRelpConnection; +import com.teragrep.lsh_01.pool.RelpConnectionFactory; +import com.teragrep.lsh_01.pool.RelpConnectionPool; +import com.teragrep.lsh_01.util.RelpServer; +import org.junit.jupiter.api.*; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class RelpConnectionPoolTest { + + private RelpServer relpServer; + + @BeforeAll + void setUp() { + this.relpServer = new RelpServer(); + this.relpServer.setUp(); + } + + @BeforeEach + public void addProperties() { + // defaults + System.setProperty("relp.rebindRequestAmount", "1000000"); + System.setProperty("relp.rebindEnabled", "false"); + } + + @AfterEach + void reset() { + System.clearProperty("relp.rebindRequestAmount"); + System.clearProperty("relp.rebindEnabled"); + this.relpServer.clear(); + } + + @AfterAll + void tearDown() { + this.relpServer.tearDown(); + } + + @Test + public void multipleRebindsTest() { + System.setProperty("relp.rebindEnabled", "true"); + System.setProperty("relp.rebindRequestAmount", "1000"); + + RelpConfig relpConfig = new RelpConfig(); + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); + + IRelpConnection initialRelpConnection = relpConnectionPool.take(); + relpConnectionPool.offer(initialRelpConnection); + IRelpConnection newRelpConnection; + + for (int i = 0; i < 3; i++) { // three rebinds + for (int j = 0; j < relpConfig.rebindRequestAmount - 1; j++) { + newRelpConnection = relpConnectionPool.take(); + Assertions.assertEquals(initialRelpConnection, newRelpConnection); // reuse the same connection + + relpConnectionPool.offer(newRelpConnection); + } + + newRelpConnection = relpConnectionPool.take(); + relpConnectionPool.offer(newRelpConnection); + + // after rebindRequestAmount of requests, there should be a new connection + Assertions.assertNotEquals(initialRelpConnection, newRelpConnection); + initialRelpConnection = newRelpConnection; + } + } +} From 24663a79092ae750d9af3a1ae224cd34dc7721c9 Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Fri, 17 May 2024 10:48:54 +0300 Subject: [PATCH 02/10] Refactor counting sent records to IRelpConnection --- .../teragrep/lsh_01/pool/IRelpConnection.java | 4 +++- .../lsh_01/pool/ManagedRelpConnection.java | 22 +++++++------------ .../lsh_01/pool/RelpConnectionPool.java | 13 +---------- .../lsh_01/pool/RelpConnectionStub.java | 7 +++++- .../lsh_01/pool/RelpConnectionWithConfig.java | 12 ++++++++-- src/test/java/EndToEndTest.java | 2 ++ src/test/java/RelpConnectionPoolTest.java | 13 ++++++++++- 7 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java index 3e73883..a9a1f72 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java @@ -49,7 +49,9 @@ public interface IRelpConnection { void setTxBufferSize(int size); - boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException; + int recordsSent(); + + boolean connect() throws IOException, IllegalStateException, TimeoutException; void tearDown(); diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java index b0cbc0e..0be79b0 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java @@ -36,12 +36,10 @@ public ManagedRelpConnection(IRelpConnection relpConnection) { } void connect() { - boolean notConnected = true; - while (notConnected) { - boolean connected = false; + boolean connected = false; + while (!connected) { try { - connected = relpConnection - .connect(relpConnection.relpConfig().relpTarget, relpConnection.relpConfig().relpPort); + connected = relpConnection.connect(); } catch (Exception e) { LOGGER @@ -51,16 +49,12 @@ void connect() { e.getMessage() ); } - if (connected) { - notConnected = false; + + try { + Thread.sleep(relpConnection.relpConfig().relpReconnectInterval); } - else { - try { - Thread.sleep(relpConnection.relpConfig().relpReconnectInterval); - } - catch (InterruptedException e) { - LOGGER.error("Reconnect timer interrupted, reconnecting now"); - } + catch (InterruptedException e) { + LOGGER.error("Reconnect timer interrupted, reconnecting now"); } } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java index 7de0b5a..229569c 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java @@ -22,7 +22,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; @@ -42,9 +41,6 @@ public class RelpConnectionPool implements AutoCloseable { private final ConcurrentLinkedQueue queue; - // connection, how many times it has been used - private final HashMap connectionsTaken; - private final RelpConnectionStub relpConnectionStub; private final Lock lock = new ReentrantLock(); @@ -60,7 +56,6 @@ public RelpConnectionPool( this.rebindRequestAmount = rebindRequestAmount; this.rebindEnabled = rebindEnabled; this.queue = new ConcurrentLinkedQueue<>(); - this.connectionsTaken = new HashMap<>(); this.relpConnectionStub = new RelpConnectionStub(); this.close = new AtomicBoolean(); @@ -77,17 +72,11 @@ public IRelpConnection take() { frameDelegate = queue.poll(); if (frameDelegate == null) { frameDelegate = relpConnectionWrapSupplier.get(); - connectionsTaken.put(frameDelegate, 1); } - else if (connectionsTaken.get(frameDelegate) >= rebindRequestAmount && rebindEnabled) { + else if (frameDelegate.recordsSent() >= rebindRequestAmount && rebindEnabled) { // Rebind this.connectionTearDown(frameDelegate); frameDelegate = relpConnectionWrapSupplier.get(); - connectionsTaken.put(frameDelegate, 1); - } - else { - // Reuse - connectionsTaken.put(frameDelegate, connectionsTaken.get(frameDelegate) + 1); } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java index f94d9fe..3f6de99 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java @@ -87,7 +87,12 @@ public void setTxBufferSize(int size) { } @Override - public boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException { + public int recordsSent() { + return 0; + } + + @Override + public boolean connect() throws IOException, IllegalStateException, TimeoutException { throw new IllegalStateException("RelpConnectionStub does not support this"); } diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java index f4afa36..ead3170 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java @@ -30,10 +30,12 @@ public class RelpConnectionWithConfig implements IRelpConnection { private final RelpConnection relpConnection; private final RelpConfig relpConfig; + private int recordsSent; public RelpConnectionWithConfig(RelpConnection relpConnection, RelpConfig relpConfig) { this.relpConnection = relpConnection; this.relpConfig = relpConfig; + this.recordsSent = 0; } @Override @@ -92,8 +94,13 @@ public void setTxBufferSize(int size) { } @Override - public boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException { - return relpConnection.connect(hostname, port); + public int recordsSent() { + return recordsSent; + } + + @Override + public boolean connect() throws IOException, IllegalStateException, TimeoutException { + return relpConnection.connect(relpConfig.relpTarget, relpConfig().relpPort); } @Override @@ -109,6 +116,7 @@ public boolean disconnect() throws IOException, IllegalStateException, TimeoutEx @Override public void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException { relpConnection.commit(relpBatch); + recordsSent++; } @Override diff --git a/src/test/java/EndToEndTest.java b/src/test/java/EndToEndTest.java index 006e320..c750c3f 100644 --- a/src/test/java/EndToEndTest.java +++ b/src/test/java/EndToEndTest.java @@ -44,6 +44,7 @@ public class EndToEndTest { void setUp() throws InterruptedException { System.setProperty("payload.splitEnabled", "true"); System.setProperty("security.authRequired", "false"); + System.setProperty("relp.reconnectInterval", "1000"); // Start listening to HTTP-requests Thread program = new Thread(() -> Main.main(new String[] {})); @@ -66,6 +67,7 @@ void reset() { void tearDown() { System.clearProperty("payload.splitEnabled"); System.clearProperty("security.authRequired"); + System.clearProperty("relp.reconnectInterval"); this.relpServer.tearDown(); } diff --git a/src/test/java/RelpConnectionPoolTest.java b/src/test/java/RelpConnectionPoolTest.java index bf57d97..090585d 100644 --- a/src/test/java/RelpConnectionPoolTest.java +++ b/src/test/java/RelpConnectionPoolTest.java @@ -22,8 +22,12 @@ import com.teragrep.lsh_01.pool.RelpConnectionFactory; import com.teragrep.lsh_01.pool.RelpConnectionPool; import com.teragrep.lsh_01.util.RelpServer; +import com.teragrep.rlp_01.RelpBatch; import org.junit.jupiter.api.*; +import java.io.IOException; +import java.util.concurrent.TimeoutException; + @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class RelpConnectionPoolTest { @@ -31,6 +35,8 @@ public class RelpConnectionPoolTest { @BeforeAll void setUp() { + System.setProperty("relp.reconnectInterval", "1000"); + this.relpServer = new RelpServer(); this.relpServer.setUp(); } @@ -51,11 +57,12 @@ void reset() { @AfterAll void tearDown() { + System.clearProperty("relp.reconnectInterval"); this.relpServer.tearDown(); } @Test - public void multipleRebindsTest() { + public void multipleRebindsTest() throws IOException, TimeoutException { System.setProperty("relp.rebindEnabled", "true"); System.setProperty("relp.rebindRequestAmount", "1000"); @@ -68,6 +75,7 @@ public void multipleRebindsTest() { ); IRelpConnection initialRelpConnection = relpConnectionPool.take(); + initialRelpConnection.commit(new RelpBatch()); relpConnectionPool.offer(initialRelpConnection); IRelpConnection newRelpConnection; @@ -76,10 +84,13 @@ public void multipleRebindsTest() { newRelpConnection = relpConnectionPool.take(); Assertions.assertEquals(initialRelpConnection, newRelpConnection); // reuse the same connection + newRelpConnection.commit(new RelpBatch()); // send empty batch to RELP server + relpConnectionPool.offer(newRelpConnection); } newRelpConnection = relpConnectionPool.take(); + newRelpConnection.commit(new RelpBatch()); relpConnectionPool.offer(newRelpConnection); // after rebindRequestAmount of requests, there should be a new connection From c7096f052741e8ea7cf30298f159e9198482c52c Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Fri, 17 May 2024 12:17:13 +0300 Subject: [PATCH 03/10] Add more rebind tests for RelpConnectionPool --- src/test/java/RelpConnectionPoolTest.java | 76 +++++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/src/test/java/RelpConnectionPoolTest.java b/src/test/java/RelpConnectionPoolTest.java index 090585d..1c4305c 100644 --- a/src/test/java/RelpConnectionPoolTest.java +++ b/src/test/java/RelpConnectionPoolTest.java @@ -26,6 +26,7 @@ import org.junit.jupiter.api.*; import java.io.IOException; +import java.util.ArrayList; import java.util.concurrent.TimeoutException; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -98,4 +99,79 @@ public void multipleRebindsTest() throws IOException, TimeoutException { initialRelpConnection = newRelpConnection; } } + + @Test + public void rebindDisabledTest() throws IOException, TimeoutException { + System.setProperty("relp.rebindRequestAmount", "1000"); + + RelpConfig relpConfig = new RelpConfig(); + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); + + IRelpConnection initialRelpConnection = relpConnectionPool.take(); + initialRelpConnection.commit(new RelpBatch()); + relpConnectionPool.offer(initialRelpConnection); + IRelpConnection newRelpConnection; + + for (int i = 0; i < relpConfig.rebindRequestAmount * 4; i++) { + newRelpConnection = relpConnectionPool.take(); + Assertions.assertEquals(initialRelpConnection, newRelpConnection); // always the same connection (rebind disabled) + + newRelpConnection.commit(new RelpBatch()); // send empty batch to RELP server + + relpConnectionPool.offer(newRelpConnection); + } + } + + @Test + public void multipleConnectionsRebindTest() throws IOException, TimeoutException { + System.setProperty("relp.rebindEnabled", "true"); + System.setProperty("relp.rebindRequestAmount", "1000"); + + RelpConfig relpConfig = new RelpConfig(); + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + RelpConnectionPool relpConnectionPool = new RelpConnectionPool( + relpConnectionFactory, + relpConfig.rebindRequestAmount, + relpConfig.rebindEnabled + ); + + // get three connections + ArrayList connections = new ArrayList<>(); + connections.add(relpConnectionPool.take()); + connections.add(relpConnectionPool.take()); + connections.add(relpConnectionPool.take()); + + for (IRelpConnection connection : connections) { + connection.commit(new RelpBatch()); + relpConnectionPool.offer(connection); + } + + for (int i = 0; i < 3; i++) { // three rebinds + for (int j = 0; j < relpConfig.rebindRequestAmount - 1; j++) { // run until rebind + for (IRelpConnection connection : connections) { // assert same connection for each + IRelpConnection takenConnection = relpConnectionPool.take(); + + Assertions.assertEquals(connection, takenConnection); // reuse the connection + + takenConnection.commit(new RelpBatch()); // send empty batch to RELP server + relpConnectionPool.offer(takenConnection); + } + } + + for (IRelpConnection connection : connections) { // assert new connection for each + IRelpConnection newRelpConnection = relpConnectionPool.take(); + newRelpConnection.commit(new RelpBatch()); + relpConnectionPool.offer(newRelpConnection); + + // after rebindRequestAmount of requests, there should be a new connection + Assertions.assertNotEquals(connection, newRelpConnection); + connections.set(connections.indexOf(connection), newRelpConnection); + } + } + } } From ccfe8ea5772affa14d5f1b438827f4c1201e308b Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Wed, 22 May 2024 10:53:15 +0300 Subject: [PATCH 04/10] Refactor rebinding to a decorator, use generic Pool with managedRelpConnection --- src/main/java/com/teragrep/lsh_01/Main.java | 13 +- .../com/teragrep/lsh_01/RelpConversion.java | 13 +- .../lsh_01/pool/IManagedRelpConnection.java | 33 ++++ .../teragrep/lsh_01/pool/IRelpConnection.java | 6 +- .../lsh_01/pool/ManagedRelpConnection.java | 33 +++- .../pool/ManagedRelpConnectionStub.java | 55 ++++++ .../java/com/teragrep/lsh_01/pool/Pool.java | 112 +++++++++++ .../com/teragrep/lsh_01/pool/Poolable.java | 26 +++ .../lsh_01/pool/RebindableRelpConnection.java | 83 ++++++++ .../lsh_01/pool/RelpConnectionFactory.java | 13 +- .../lsh_01/pool/RelpConnectionPool.java | 132 ------------- .../lsh_01/pool/RelpConnectionStub.java | 126 ------------- .../lsh_01/pool/RelpConnectionWithConfig.java | 20 +- .../com/teragrep/lsh_01/pool/Stubable.java | 25 +++ src/test/java/CredentialsTest.java | 83 +++----- src/test/java/LookupTest.java | 27 +-- src/test/java/RelpConnectionPoolTest.java | 177 ------------------ 17 files changed, 414 insertions(+), 563 deletions(-) create mode 100644 src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java create mode 100644 src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java create mode 100644 src/main/java/com/teragrep/lsh_01/pool/Pool.java create mode 100644 src/main/java/com/teragrep/lsh_01/pool/Poolable.java create mode 100644 src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java delete mode 100644 src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java delete mode 100644 src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java create mode 100644 src/main/java/com/teragrep/lsh_01/pool/Stubable.java delete mode 100644 src/test/java/RelpConnectionPoolTest.java diff --git a/src/main/java/com/teragrep/lsh_01/Main.java b/src/main/java/com/teragrep/lsh_01/Main.java index 5fa8782..ba5c235 100644 --- a/src/main/java/com/teragrep/lsh_01/Main.java +++ b/src/main/java/com/teragrep/lsh_01/Main.java @@ -22,8 +22,7 @@ import com.teragrep.lsh_01.authentication.BasicAuthentication; import com.teragrep.lsh_01.authentication.BasicAuthenticationFactory; import com.teragrep.lsh_01.config.*; -import com.teragrep.lsh_01.pool.RelpConnectionFactory; -import com.teragrep.lsh_01.pool.RelpConnectionPool; +import com.teragrep.lsh_01.pool.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,14 +58,10 @@ public static void main(String[] args) { LOGGER.info("Authentication required: <[{}]>", securityConfig.authRequired); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, lookupConfig, @@ -85,7 +80,7 @@ public static void main(String[] args) { server.run(); } finally { - relpConnectionPool.close(); + pool.close(); } } } diff --git a/src/main/java/com/teragrep/lsh_01/RelpConversion.java b/src/main/java/com/teragrep/lsh_01/RelpConversion.java index a48dd3a..ab48e2d 100644 --- a/src/main/java/com/teragrep/lsh_01/RelpConversion.java +++ b/src/main/java/com/teragrep/lsh_01/RelpConversion.java @@ -26,9 +26,7 @@ import com.teragrep.lsh_01.config.PayloadConfig; import com.teragrep.lsh_01.config.SecurityConfig; import com.teragrep.lsh_01.lookup.LookupTableFactory; -import com.teragrep.lsh_01.pool.IRelpConnection; -import com.teragrep.lsh_01.pool.ManagedRelpConnection; -import com.teragrep.lsh_01.pool.RelpConnectionPool; +import com.teragrep.lsh_01.pool.*; import com.teragrep.rlo_14.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,7 +40,7 @@ public class RelpConversion implements IMessageHandler { private final static Logger LOGGER = LogManager.getLogger(RelpConversion.class); - private final RelpConnectionPool relpConnectionPool; + private final Pool relpConnectionPool; private final SecurityConfig securityConfig; private final BasicAuthentication basicAuthentication; private final LookupConfig lookupConfig; @@ -51,7 +49,7 @@ public class RelpConversion implements IMessageHandler { private final StringLookupTable appnameLookup; public RelpConversion( - RelpConnectionPool relpConnectionPool, + Pool relpConnectionPool, SecurityConfig securityConfig, BasicAuthentication basicAuthentication, LookupConfig lookupConfig, @@ -139,9 +137,8 @@ private void sendMessage( .withSDElement(headerSDElement) .withSDElement(sdElement); - IRelpConnection relpConnection = relpConnectionPool.take(); - ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection); - managedRelpConnection.ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); + IManagedRelpConnection relpConnection = relpConnectionPool.get(); + relpConnection.ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); relpConnectionPool.offer(relpConnection); } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java new file mode 100644 index 0000000..68c0ff0 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java @@ -0,0 +1,33 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +public interface IManagedRelpConnection extends Poolable { + + void ensureSent(byte[] bytes); + + void connect(); + + void disconnect(); + + void tearDown(); + + boolean isStub(); +} diff --git a/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java index a9a1f72..47c1ebb 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java @@ -49,9 +49,7 @@ public interface IRelpConnection { void setTxBufferSize(int size); - int recordsSent(); - - boolean connect() throws IOException, IllegalStateException, TimeoutException; + boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException; void tearDown(); @@ -59,7 +57,5 @@ public interface IRelpConnection { void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException; - boolean isStub(); - RelpConfig relpConfig(); } diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java index 0be79b0..12345ee 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java @@ -26,7 +26,7 @@ import java.io.IOException; import java.util.concurrent.TimeoutException; -public class ManagedRelpConnection { +public class ManagedRelpConnection implements IManagedRelpConnection { private static final Logger LOGGER = LogManager.getLogger(ManagedRelpConnection.class); private final IRelpConnection relpConnection; @@ -35,11 +35,13 @@ public ManagedRelpConnection(IRelpConnection relpConnection) { this.relpConnection = relpConnection; } - void connect() { + @Override + public void connect() { boolean connected = false; while (!connected) { try { - connected = relpConnection.connect(); + connected = relpConnection + .connect(relpConnection.relpConfig().relpTarget, relpConnection.relpConfig().relpPort); } catch (Exception e) { LOGGER @@ -59,11 +61,13 @@ void connect() { } } - private void tearDown() { + @Override + public void tearDown() { relpConnection.tearDown(); } - private void disconnect() { + @Override + public void disconnect() { boolean disconnected = false; try { disconnected = relpConnection.disconnect(); @@ -76,6 +80,7 @@ private void disconnect() { } } + @Override public void ensureSent(byte[] bytes) { final RelpBatch relpBatch = new RelpBatch(); relpBatch.insert(bytes); @@ -97,4 +102,22 @@ public void ensureSent(byte[] bytes) { } } } + + @Override + public boolean isStub() { + return false; + } + + @Override + public void close() { + try { + this.relpConnection.disconnect(); + } + catch (IllegalStateException | IOException | TimeoutException e) { + LOGGER.error("Forcefully closing connection due to exception <{}>", e.getMessage()); + } + finally { + this.relpConnection.tearDown(); + } + } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java new file mode 100644 index 0000000..9e59c30 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java @@ -0,0 +1,55 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +import java.io.IOException; + +public class ManagedRelpConnectionStub implements IManagedRelpConnection { + + @Override + public void ensureSent(byte[] bytes) { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + + @Override + public void connect() { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + + @Override + public void disconnect() { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + + @Override + public void tearDown() { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + + @Override + public boolean isStub() { + return true; + } + + @Override + public void close() { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } +} diff --git a/src/main/java/com/teragrep/lsh_01/pool/Pool.java b/src/main/java/com/teragrep/lsh_01/pool/Pool.java new file mode 100644 index 0000000..e3808e4 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/Pool.java @@ -0,0 +1,112 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +public class Pool implements AutoCloseable, Supplier { + + private static final Logger LOGGER = LogManager.getLogger(com.teragrep.lsh_01.pool.Pool.class); + + private final Supplier supplier; + + private final ConcurrentLinkedQueue queue; + + private final T stub; + + private final Lock lock = new ReentrantLock(); + + private final AtomicBoolean close; + + public Pool(final Supplier supplier, T stub) { + this.supplier = supplier; + this.queue = new ConcurrentLinkedQueue<>(); + this.stub = stub; + this.close = new AtomicBoolean(); + } + + public T get() { + T object; + if (close.get()) { + object = stub; + } + else { + // get or create + object = queue.poll(); + if (object == null) { + object = supplier.get(); + } + } + + return object; + } + + public void offer(T object) { + if (!object.isStub()) { + queue.add(object); + } + + if (close.get()) { + while (queue.peek() != null) { + if (lock.tryLock()) { + while (true) { + T pooled = queue.poll(); + if (pooled == null) { + break; + } + else { + try { + LOGGER.debug("Closing poolable <{}>", pooled); + pooled.close(); + LOGGER.debug("Closed poolable <{}>", pooled); + } + catch (Exception exception) { + LOGGER + .warn( + "Exception <{}> while closing poolable <{}>", exception.getMessage(), + pooled + ); + } + } + } + lock.unlock(); + } + else { + break; + } + } + } + } + + public void close() { + close.set(true); + + // close all that are in the pool right now + offer(stub); + } + +} diff --git a/src/main/java/com/teragrep/lsh_01/pool/Poolable.java b/src/main/java/com/teragrep/lsh_01/pool/Poolable.java new file mode 100644 index 0000000..510d482 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/Poolable.java @@ -0,0 +1,26 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +import java.io.Closeable; + +public interface Poolable extends Stubable, Closeable { + +} diff --git a/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java new file mode 100644 index 0000000..5273020 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java @@ -0,0 +1,83 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; + +public class RebindableRelpConnection implements IManagedRelpConnection { + + private static final Logger LOGGER = LogManager.getLogger(RebindableRelpConnection.class); + + private final IManagedRelpConnection managedRelpConnection; + private int recordsSent; + private final int rebindRequestAmount; + + public RebindableRelpConnection(IManagedRelpConnection managedRelpConnection, int rebindRequestAmount) { + this.managedRelpConnection = managedRelpConnection; + this.recordsSent = 0; + this.rebindRequestAmount = rebindRequestAmount; + } + + @Override + public void ensureSent(byte[] bytes) { + if (recordsSent >= rebindRequestAmount) { + rebind(); + recordsSent = 0; + } + managedRelpConnection.ensureSent(bytes); + recordsSent++; + } + + @Override + public void connect() { + managedRelpConnection.connect(); + } + + @Override + public void disconnect() { + managedRelpConnection.disconnect(); + } + + @Override + public void tearDown() { + managedRelpConnection.tearDown(); + } + + @Override + public boolean isStub() { + return false; + } + + @Override + public void close() throws IOException { + managedRelpConnection.close(); + } + + private void rebind() { + LOGGER.debug("Starting to rebind ManagedRelpConnection <{}>", managedRelpConnection); + managedRelpConnection.disconnect(); + managedRelpConnection.tearDown(); + managedRelpConnection.connect(); + LOGGER.debug("Done rebinding ManagedRelpConnection <{}>", managedRelpConnection); + } +} diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java index 35d0042..94e7263 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java @@ -24,7 +24,7 @@ import java.util.function.Supplier; -public class RelpConnectionFactory implements Supplier { +public class RelpConnectionFactory implements Supplier { private final RelpConfig relpConfig; @@ -33,18 +33,23 @@ public RelpConnectionFactory(RelpConfig relpConfig) { } @Override - public IRelpConnection get() { + public IManagedRelpConnection get() { RelpConnection relpConnection = new RelpConnection(); RelpConnectionWithConfig relpConnectionWithConfig = new RelpConnectionWithConfig(relpConnection, relpConfig); + IManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnectionWithConfig); + + if (relpConfig.rebindEnabled) { + managedRelpConnection = new RebindableRelpConnection(managedRelpConnection, relpConfig.rebindRequestAmount); + } /* TODO remove: shouldn't be here, but there is a bug in tearDown, so we initialize connection here see https://github.com/teragrep/rlp_01/issues/63 for further info */ - new ManagedRelpConnection(relpConnectionWithConfig).connect(); + managedRelpConnection.connect(); - return relpConnectionWithConfig; + return managedRelpConnection; } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java deleted file mode 100644 index 229569c..0000000 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - logstash-http-input to syslog bridge - Copyright 2024 Suomen Kanuuna Oy - - Derivative Work of Elasticsearch - Copyright 2012-2015 Elasticsearch - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ -package com.teragrep.lsh_01.pool; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Supplier; - -// TODO create test cases -public class RelpConnectionPool implements AutoCloseable { - - private static final Logger LOGGER = LogManager.getLogger(RelpConnectionPool.class); - - private final Supplier relpConnectionWrapSupplier; - - private final int rebindRequestAmount; - - private final boolean rebindEnabled; - - private final ConcurrentLinkedQueue queue; - - private final RelpConnectionStub relpConnectionStub; - - private final Lock lock = new ReentrantLock(); - - private final AtomicBoolean close; - - public RelpConnectionPool( - final Supplier relpConnectionWrapSupplier, - int rebindRequestAmount, - boolean rebindEnabled - ) { - this.relpConnectionWrapSupplier = relpConnectionWrapSupplier; - this.rebindRequestAmount = rebindRequestAmount; - this.rebindEnabled = rebindEnabled; - this.queue = new ConcurrentLinkedQueue<>(); - this.relpConnectionStub = new RelpConnectionStub(); - this.close = new AtomicBoolean(); - - // TODO maximum number of available connections should be perhaps limited? - } - - public IRelpConnection take() { - IRelpConnection frameDelegate; - if (close.get()) { - frameDelegate = relpConnectionStub; - } - else { - // get or create - frameDelegate = queue.poll(); - if (frameDelegate == null) { - frameDelegate = relpConnectionWrapSupplier.get(); - } - else if (frameDelegate.recordsSent() >= rebindRequestAmount && rebindEnabled) { - // Rebind - this.connectionTearDown(frameDelegate); - frameDelegate = relpConnectionWrapSupplier.get(); - } - } - - return frameDelegate; - } - - public void offer(IRelpConnection iRelpConnection) { - if (!iRelpConnection.isStub()) { - queue.add(iRelpConnection); - } - - if (close.get()) { - while (queue.peek() != null) { - if (lock.tryLock()) { - while (true) { - IRelpConnection queuedConnection = queue.poll(); - if (queuedConnection == null) { - break; - } - else { - this.connectionTearDown(queuedConnection); - } - } - lock.unlock(); - } - else { - break; - } - } - } - } - - public void close() { - close.set(true); - - // close all that are in the pool right now - offer(relpConnectionStub); - } - - private void connectionTearDown(IRelpConnection connection) { - try { - LOGGER.debug("Closing frameDelegate <{}>", connection); - connection.disconnect(); - LOGGER.debug("Closed frameDelegate <{}>", connection); - } - catch (Exception exception) { - LOGGER.warn("Exception <{}> while closing frameDelegate <{}>", exception.getMessage(), connection); - } - finally { - connection.tearDown(); - } - } -} diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java deleted file mode 100644 index 3f6de99..0000000 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - logstash-http-input to syslog bridge - Copyright 2024 Suomen Kanuuna Oy - - Derivative Work of Elasticsearch - Copyright 2012-2015 Elasticsearch - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ -package com.teragrep.lsh_01.pool; - -import com.teragrep.lsh_01.config.RelpConfig; -import com.teragrep.rlp_01.RelpBatch; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -public class RelpConnectionStub implements IRelpConnection { - - @Override - public int getReadTimeout() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setReadTimeout(int readTimeout) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public int getWriteTimeout() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setWriteTimeout(int writeTimeout) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public int getConnectionTimeout() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setConnectionTimeout(int timeout) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public void setKeepAlive(boolean on) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public int getRxBufferSize() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setRxBufferSize(int size) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public int getTxBufferSize() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setTxBufferSize(int size) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public int recordsSent() { - return 0; - } - - @Override - public boolean connect() throws IOException, IllegalStateException, TimeoutException { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void tearDown() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public boolean disconnect() throws IOException, IllegalStateException, TimeoutException { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public boolean isStub() { - return true; - } - - @Override - public RelpConfig relpConfig() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - -} diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java index ead3170..23e0956 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java @@ -22,20 +22,21 @@ import com.teragrep.lsh_01.config.RelpConfig; import com.teragrep.rlp_01.RelpBatch; import com.teragrep.rlp_01.RelpConnection; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RelpConnectionWithConfig implements IRelpConnection { + private final static Logger LOGGER = LogManager.getLogger(RelpConnectionWithConfig.class); private final RelpConnection relpConnection; private final RelpConfig relpConfig; - private int recordsSent; public RelpConnectionWithConfig(RelpConnection relpConnection, RelpConfig relpConfig) { this.relpConnection = relpConnection; this.relpConfig = relpConfig; - this.recordsSent = 0; } @Override @@ -94,13 +95,8 @@ public void setTxBufferSize(int size) { } @Override - public int recordsSent() { - return recordsSent; - } - - @Override - public boolean connect() throws IOException, IllegalStateException, TimeoutException { - return relpConnection.connect(relpConfig.relpTarget, relpConfig().relpPort); + public boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException { + return relpConnection.connect(hostname, port); } @Override @@ -116,12 +112,6 @@ public boolean disconnect() throws IOException, IllegalStateException, TimeoutEx @Override public void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException { relpConnection.commit(relpBatch); - recordsSent++; - } - - @Override - public boolean isStub() { - return false; } @Override diff --git a/src/main/java/com/teragrep/lsh_01/pool/Stubable.java b/src/main/java/com/teragrep/lsh_01/pool/Stubable.java new file mode 100644 index 0000000..7b8a1d7 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/Stubable.java @@ -0,0 +1,25 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +public interface Stubable { + + boolean isStub(); +} diff --git a/src/test/java/CredentialsTest.java b/src/test/java/CredentialsTest.java index 0d0fc9a..c2bdfb5 100644 --- a/src/test/java/CredentialsTest.java +++ b/src/test/java/CredentialsTest.java @@ -24,8 +24,7 @@ import com.teragrep.lsh_01.config.PayloadConfig; import com.teragrep.lsh_01.config.RelpConfig; import com.teragrep.lsh_01.config.SecurityConfig; -import com.teragrep.lsh_01.pool.RelpConnectionFactory; -import com.teragrep.lsh_01.pool.RelpConnectionPool; +import com.teragrep.lsh_01.pool.*; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -56,13 +55,9 @@ public void testNoAuthRequired() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -88,13 +83,9 @@ public void testAuthRequired() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -115,13 +106,9 @@ public void testValidBase64ButNoColon() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -142,13 +129,9 @@ public void testMultipleColons() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -167,13 +150,9 @@ public void testInvalidBase64Auth() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -193,13 +172,9 @@ public void testNonBasicAuth() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -222,13 +197,9 @@ public void testWrongCredentials() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -251,13 +222,9 @@ public void testEmptyUsername() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -280,13 +247,9 @@ public void testEmptyPassword() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -307,13 +270,9 @@ public void testNullToken() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), diff --git a/src/test/java/LookupTest.java b/src/test/java/LookupTest.java index bb9414e..2d2a9b0 100644 --- a/src/test/java/LookupTest.java +++ b/src/test/java/LookupTest.java @@ -27,8 +27,7 @@ import com.teragrep.lsh_01.config.RelpConfig; import com.teragrep.lsh_01.config.SecurityConfig; import com.teragrep.lsh_01.lookup.LookupTableFactory; -import com.teragrep.lsh_01.pool.RelpConnectionFactory; -import com.teragrep.lsh_01.pool.RelpConnectionPool; +import com.teragrep.lsh_01.pool.*; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -63,13 +62,9 @@ public void testAppnameLookup() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -92,13 +87,9 @@ public void testHostnameLookup() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -123,13 +114,9 @@ public void testMissingLookups() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), diff --git a/src/test/java/RelpConnectionPoolTest.java b/src/test/java/RelpConnectionPoolTest.java deleted file mode 100644 index 1c4305c..0000000 --- a/src/test/java/RelpConnectionPoolTest.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - logstash-http-input to syslog bridge - Copyright 2024 Suomen Kanuuna Oy - - Derivative Work of Elasticsearch - Copyright 2012-2015 Elasticsearch - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ -import com.teragrep.lsh_01.config.RelpConfig; -import com.teragrep.lsh_01.pool.IRelpConnection; -import com.teragrep.lsh_01.pool.RelpConnectionFactory; -import com.teragrep.lsh_01.pool.RelpConnectionPool; -import com.teragrep.lsh_01.util.RelpServer; -import com.teragrep.rlp_01.RelpBatch; -import org.junit.jupiter.api.*; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.concurrent.TimeoutException; - -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class RelpConnectionPoolTest { - - private RelpServer relpServer; - - @BeforeAll - void setUp() { - System.setProperty("relp.reconnectInterval", "1000"); - - this.relpServer = new RelpServer(); - this.relpServer.setUp(); - } - - @BeforeEach - public void addProperties() { - // defaults - System.setProperty("relp.rebindRequestAmount", "1000000"); - System.setProperty("relp.rebindEnabled", "false"); - } - - @AfterEach - void reset() { - System.clearProperty("relp.rebindRequestAmount"); - System.clearProperty("relp.rebindEnabled"); - this.relpServer.clear(); - } - - @AfterAll - void tearDown() { - System.clearProperty("relp.reconnectInterval"); - this.relpServer.tearDown(); - } - - @Test - public void multipleRebindsTest() throws IOException, TimeoutException { - System.setProperty("relp.rebindEnabled", "true"); - System.setProperty("relp.rebindRequestAmount", "1000"); - - RelpConfig relpConfig = new RelpConfig(); - RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); - - IRelpConnection initialRelpConnection = relpConnectionPool.take(); - initialRelpConnection.commit(new RelpBatch()); - relpConnectionPool.offer(initialRelpConnection); - IRelpConnection newRelpConnection; - - for (int i = 0; i < 3; i++) { // three rebinds - for (int j = 0; j < relpConfig.rebindRequestAmount - 1; j++) { - newRelpConnection = relpConnectionPool.take(); - Assertions.assertEquals(initialRelpConnection, newRelpConnection); // reuse the same connection - - newRelpConnection.commit(new RelpBatch()); // send empty batch to RELP server - - relpConnectionPool.offer(newRelpConnection); - } - - newRelpConnection = relpConnectionPool.take(); - newRelpConnection.commit(new RelpBatch()); - relpConnectionPool.offer(newRelpConnection); - - // after rebindRequestAmount of requests, there should be a new connection - Assertions.assertNotEquals(initialRelpConnection, newRelpConnection); - initialRelpConnection = newRelpConnection; - } - } - - @Test - public void rebindDisabledTest() throws IOException, TimeoutException { - System.setProperty("relp.rebindRequestAmount", "1000"); - - RelpConfig relpConfig = new RelpConfig(); - RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); - - IRelpConnection initialRelpConnection = relpConnectionPool.take(); - initialRelpConnection.commit(new RelpBatch()); - relpConnectionPool.offer(initialRelpConnection); - IRelpConnection newRelpConnection; - - for (int i = 0; i < relpConfig.rebindRequestAmount * 4; i++) { - newRelpConnection = relpConnectionPool.take(); - Assertions.assertEquals(initialRelpConnection, newRelpConnection); // always the same connection (rebind disabled) - - newRelpConnection.commit(new RelpBatch()); // send empty batch to RELP server - - relpConnectionPool.offer(newRelpConnection); - } - } - - @Test - public void multipleConnectionsRebindTest() throws IOException, TimeoutException { - System.setProperty("relp.rebindEnabled", "true"); - System.setProperty("relp.rebindRequestAmount", "1000"); - - RelpConfig relpConfig = new RelpConfig(); - RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool( - relpConnectionFactory, - relpConfig.rebindRequestAmount, - relpConfig.rebindEnabled - ); - - // get three connections - ArrayList connections = new ArrayList<>(); - connections.add(relpConnectionPool.take()); - connections.add(relpConnectionPool.take()); - connections.add(relpConnectionPool.take()); - - for (IRelpConnection connection : connections) { - connection.commit(new RelpBatch()); - relpConnectionPool.offer(connection); - } - - for (int i = 0; i < 3; i++) { // three rebinds - for (int j = 0; j < relpConfig.rebindRequestAmount - 1; j++) { // run until rebind - for (IRelpConnection connection : connections) { // assert same connection for each - IRelpConnection takenConnection = relpConnectionPool.take(); - - Assertions.assertEquals(connection, takenConnection); // reuse the connection - - takenConnection.commit(new RelpBatch()); // send empty batch to RELP server - relpConnectionPool.offer(takenConnection); - } - } - - for (IRelpConnection connection : connections) { // assert new connection for each - IRelpConnection newRelpConnection = relpConnectionPool.take(); - newRelpConnection.commit(new RelpBatch()); - relpConnectionPool.offer(newRelpConnection); - - // after rebindRequestAmount of requests, there should be a new connection - Assertions.assertNotEquals(connection, newRelpConnection); - connections.set(connections.indexOf(connection), newRelpConnection); - } - } - } -} From 2f5df053784888321015dcd90b39498319ba40b2 Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Wed, 22 May 2024 11:00:16 +0300 Subject: [PATCH 05/10] Remove unnecessary function already defined in extending interface, apply spotless --- .../java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java | 2 -- .../com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java index 68c0ff0..b45bfcc 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java @@ -28,6 +28,4 @@ public interface IManagedRelpConnection extends Poolable { void disconnect(); void tearDown(); - - boolean isStub(); } diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java index 9e59c30..554e5f4 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java @@ -19,8 +19,6 @@ */ package com.teragrep.lsh_01.pool; -import java.io.IOException; - public class ManagedRelpConnectionStub implements IManagedRelpConnection { @Override From 4754190728e537d72393a299377f5f6fbf67d56d Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Wed, 22 May 2024 16:30:20 +0300 Subject: [PATCH 06/10] Add test for rebind --- src/test/java/EndToEndTest.java | 2 +- src/test/java/RebindTest.java | 89 +++++++++++++++++++ .../lsh_01/util/CountingFrameDelegate.java | 67 ++++++++++++++ .../com/teragrep/lsh_01/util/RelpServer.java | 44 ++++++--- 4 files changed, 191 insertions(+), 11 deletions(-) create mode 100644 src/test/java/RebindTest.java create mode 100644 src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java diff --git a/src/test/java/EndToEndTest.java b/src/test/java/EndToEndTest.java index c750c3f..b3afd47 100644 --- a/src/test/java/EndToEndTest.java +++ b/src/test/java/EndToEndTest.java @@ -53,7 +53,7 @@ void setUp() throws InterruptedException { Thread.sleep(3000); // wait for netty to start up this.relpServer = new RelpServer(); - this.relpServer.setUp(); + this.relpServer.setUpDefault(); this.nettyConfig = new NettyConfig(); } diff --git a/src/test/java/RebindTest.java b/src/test/java/RebindTest.java new file mode 100644 index 0000000..ecdcfa0 --- /dev/null +++ b/src/test/java/RebindTest.java @@ -0,0 +1,89 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +import com.teragrep.lsh_01.config.RelpConfig; +import com.teragrep.lsh_01.pool.ManagedRelpConnection; +import com.teragrep.lsh_01.pool.RebindableRelpConnection; +import com.teragrep.lsh_01.pool.RelpConnectionWithConfig; +import com.teragrep.lsh_01.util.CountingFrameDelegate; +import com.teragrep.lsh_01.util.RelpServer; +import com.teragrep.rlo_14.Facility; +import com.teragrep.rlo_14.Severity; +import com.teragrep.rlo_14.SyslogMessage; +import com.teragrep.rlp_01.RelpConnection; +import org.junit.jupiter.api.*; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class RebindTest { + + private RelpServer relpServer; + + @BeforeAll + void setUp() { + System.setProperty("relp.rebindEnabled", "true"); + System.setProperty("relp.rebindRequestAmount", "5"); + System.setProperty("relp.reconnectInterval", "1000"); + + this.relpServer = new RelpServer(); + this.relpServer.setUpCounting(); + } + + @AfterEach + void reset() { + this.relpServer.clear(); + } + + @AfterAll + void tearDown() { + System.clearProperty("relp.rebindEnabled"); + System.clearProperty("relp.rebindRequestAmount"); + System.clearProperty("relp.reconnectInterval"); + this.relpServer.tearDown(); + } + + @Test + public void testRebind() { + RelpConfig relpConfig = new RelpConfig(); + RebindableRelpConnection connection = new RebindableRelpConnection( + new ManagedRelpConnection(new RelpConnectionWithConfig(new RelpConnection(), relpConfig)), + relpConfig.rebindRequestAmount + ); + connection.connect(); + + SyslogMessage syslogMessage = new SyslogMessage() + .withFacility(Facility.USER) + .withSeverity(Severity.INFORMATIONAL) + .withMsg("foobar"); + + for (int i = 0; i <= relpConfig.rebindRequestAmount * 3; i++) { // 3 rebinds + connection.ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); + } + + List frameDelegates = relpServer.frameDelegates(); + Assertions.assertEquals(4, frameDelegates.size()); + + Assertions.assertEquals(relpConfig.rebindRequestAmount, frameDelegates.get(0).recordsReceived()); + Assertions.assertEquals(relpConfig.rebindRequestAmount, frameDelegates.get(1).recordsReceived()); + Assertions.assertEquals(relpConfig.rebindRequestAmount, frameDelegates.get(2).recordsReceived()); + Assertions.assertEquals(1, frameDelegates.get(3).recordsReceived()); // last one receives only 1 message + } +} diff --git a/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java b/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java new file mode 100644 index 0000000..dcf3514 --- /dev/null +++ b/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java @@ -0,0 +1,67 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.util; + +import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate; +import com.teragrep.rlp_03.frame.delegate.FrameContext; +import com.teragrep.rlp_03.frame.delegate.FrameDelegate; + +import java.util.function.Consumer; + +/** + * FrameDelegate that counts the number of incoming messages (for the connection). Useful for rebind testing. + */ +public class CountingFrameDelegate implements FrameDelegate { + + private final DefaultFrameDelegate frameDelegate; + private int recordsReceived = 0; + + public CountingFrameDelegate() { + Consumer countingConsumer = new Consumer<>() { + + // NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections + @Override + public synchronized void accept(FrameContext frameContext) { + recordsReceived++; + } + }; + + this.frameDelegate = new DefaultFrameDelegate(countingConsumer); + } + + public int recordsReceived() { + return recordsReceived; + } + + @Override + public boolean accept(FrameContext frameContext) { + return frameDelegate.accept(frameContext); + } + + @Override + public void close() throws Exception { + frameDelegate.close(); + } + + @Override + public boolean isStub() { + return false; + } +} diff --git a/src/test/java/com/teragrep/lsh_01/util/RelpServer.java b/src/test/java/com/teragrep/lsh_01/util/RelpServer.java index 7928524..f8a6df9 100644 --- a/src/test/java/com/teragrep/lsh_01/util/RelpServer.java +++ b/src/test/java/com/teragrep/lsh_01/util/RelpServer.java @@ -43,6 +43,8 @@ public class RelpServer { private Thread eventLoopThread; private final int listenPort; private final ExecutorService executorService; + private final Consumer syslogConsumer; + private final List frameDelegates; private final List payloads; @@ -50,16 +52,12 @@ public RelpServer() { RelpConfig relpConfig = new RelpConfig(); this.listenPort = relpConfig.relpPort; this.payloads = new ArrayList<>(); + this.frameDelegates = new ArrayList<>(); int threads = 1; this.executorService = Executors.newFixedThreadPool(threads); - } - /** - * Set up the server before running end-to-end tests. - */ - public void setUp() { - Consumer syslogConsumer = new Consumer<>() { + this.syslogConsumer = new Consumer<>() { // NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections @Override @@ -67,12 +65,28 @@ public synchronized void accept(FrameContext frameContext) { payloads.add(frameContext.relpFrame().payload().toString()); } }; + } - /* - * DefaultFrameDelegate accepts Consumer for processing syslog frames - */ - Supplier frameDelegateSupplier = () -> new DefaultFrameDelegate(syslogConsumer); + /** + * Set up the server before running end-to-end tests. Uses DefaultFrameDelegate. + */ + public void setUpDefault() { + setUp(() -> new DefaultFrameDelegate(this.syslogConsumer)); + } + /** + * Set up the server before running end-to-end tests. Uses CountingFrameDelegate. + */ + public void setUpCounting() { + setUp(this::addCountingFrameDelegate); + } + + /** + * Set up the server before running end-to-end tests. + * + * @param frameDelegateSupplier custom FrameDelegateSupplier + */ + private void setUp(Supplier frameDelegateSupplier) { /* * EventLoop is used to notice any events from the connections */ @@ -140,4 +154,14 @@ public void clear() { public List payloads() { return payloads; } + + public List frameDelegates() { + return frameDelegates; + } + + private FrameDelegate addCountingFrameDelegate() { + CountingFrameDelegate frameDelegate = new CountingFrameDelegate(); + frameDelegates.add(frameDelegate); + return frameDelegate; + } } From 85b56ec7ebbfdd1f4cf131d4d6219a9acc5a6655 Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Thu, 23 May 2024 12:35:14 +0300 Subject: [PATCH 07/10] Remove unnecessary functions from IManagedRelpConnection interface, create new rebind tests --- .../lsh_01/pool/IManagedRelpConnection.java | 6 -- .../lsh_01/pool/ManagedRelpConnection.java | 34 ++++---- .../pool/ManagedRelpConnectionStub.java | 15 ---- .../lsh_01/pool/RebindableRelpConnection.java | 35 +++----- .../lsh_01/pool/RelpConnectionFactory.java | 6 -- src/test/java/RebindTest.java | 82 +++++++++++++++++-- .../com/teragrep/lsh_01/util/RelpServer.java | 1 + 7 files changed, 101 insertions(+), 78 deletions(-) diff --git a/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java index b45bfcc..2e63fdb 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java @@ -22,10 +22,4 @@ public interface IManagedRelpConnection extends Poolable { void ensureSent(byte[] bytes); - - void connect(); - - void disconnect(); - - void tearDown(); } diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java index 12345ee..f03076e 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java @@ -30,14 +30,14 @@ public class ManagedRelpConnection implements IManagedRelpConnection { private static final Logger LOGGER = LogManager.getLogger(ManagedRelpConnection.class); private final IRelpConnection relpConnection; + private boolean connected; public ManagedRelpConnection(IRelpConnection relpConnection) { this.relpConnection = relpConnection; + this.connected = false; } - @Override - public void connect() { - boolean connected = false; + private void connect() { while (!connected) { try { connected = relpConnection @@ -61,27 +61,23 @@ public void connect() { } } - @Override - public void tearDown() { - relpConnection.tearDown(); - } - - @Override - public void disconnect() { - boolean disconnected = false; - try { - disconnected = relpConnection.disconnect(); - } - catch (IllegalStateException | IOException | TimeoutException e) { - LOGGER.error("Forcefully closing connection due to exception <{}>", e.getMessage()); - } - finally { - this.tearDown(); + private void tearDown() { + /* + TODO remove: wouldn't need a check but there is a bug in RLP-01 tearDown() + see https://github.com/teragrep/rlp_01/issues/63 for further info + */ + if (connected) { + relpConnection.tearDown(); } + connected = false; } @Override public void ensureSent(byte[] bytes) { + if (!connected) { + connect(); + } + final RelpBatch relpBatch = new RelpBatch(); relpBatch.insert(bytes); boolean notSent = true; diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java index 554e5f4..d998ad8 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java @@ -26,21 +26,6 @@ public void ensureSent(byte[] bytes) { throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); } - @Override - public void connect() { - throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); - } - - @Override - public void disconnect() { - throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); - } - - @Override - public void tearDown() { - throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); - } - @Override public boolean isStub() { return true; diff --git a/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java index 5273020..59f7854 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java @@ -41,28 +41,23 @@ public RebindableRelpConnection(IManagedRelpConnection managedRelpConnection, in @Override public void ensureSent(byte[] bytes) { if (recordsSent >= rebindRequestAmount) { - rebind(); + LOGGER.debug("Rebinding ManagedRelpConnection <{}>", managedRelpConnection); + try { + close(); + } + catch (Exception exception) { + LOGGER + .warn( + "Exception <{}> while closing ManagedRelpConnection <{}>", exception.getMessage(), + managedRelpConnection + ); + } recordsSent = 0; } managedRelpConnection.ensureSent(bytes); recordsSent++; } - @Override - public void connect() { - managedRelpConnection.connect(); - } - - @Override - public void disconnect() { - managedRelpConnection.disconnect(); - } - - @Override - public void tearDown() { - managedRelpConnection.tearDown(); - } - @Override public boolean isStub() { return false; @@ -72,12 +67,4 @@ public boolean isStub() { public void close() throws IOException { managedRelpConnection.close(); } - - private void rebind() { - LOGGER.debug("Starting to rebind ManagedRelpConnection <{}>", managedRelpConnection); - managedRelpConnection.disconnect(); - managedRelpConnection.tearDown(); - managedRelpConnection.connect(); - LOGGER.debug("Done rebinding ManagedRelpConnection <{}>", managedRelpConnection); - } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java index 94e7263..c8ef654 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java @@ -43,12 +43,6 @@ public IManagedRelpConnection get() { managedRelpConnection = new RebindableRelpConnection(managedRelpConnection, relpConfig.rebindRequestAmount); } - /* - TODO remove: shouldn't be here, but there is a bug in tearDown, so we initialize connection here - see https://github.com/teragrep/rlp_01/issues/63 for further info - */ - managedRelpConnection.connect(); - return managedRelpConnection; } diff --git a/src/test/java/RebindTest.java b/src/test/java/RebindTest.java index ecdcfa0..fb6e5d7 100644 --- a/src/test/java/RebindTest.java +++ b/src/test/java/RebindTest.java @@ -18,9 +18,7 @@ limitations under the License. */ import com.teragrep.lsh_01.config.RelpConfig; -import com.teragrep.lsh_01.pool.ManagedRelpConnection; -import com.teragrep.lsh_01.pool.RebindableRelpConnection; -import com.teragrep.lsh_01.pool.RelpConnectionWithConfig; +import com.teragrep.lsh_01.pool.*; import com.teragrep.lsh_01.util.CountingFrameDelegate; import com.teragrep.lsh_01.util.RelpServer; import com.teragrep.rlo_14.Facility; @@ -39,14 +37,17 @@ public class RebindTest { @BeforeAll void setUp() { - System.setProperty("relp.rebindEnabled", "true"); - System.setProperty("relp.rebindRequestAmount", "5"); - System.setProperty("relp.reconnectInterval", "1000"); - this.relpServer = new RelpServer(); this.relpServer.setUpCounting(); } + @BeforeEach + void setProperties() { + System.setProperty("relp.rebindEnabled", "true"); + System.setProperty("relp.rebindRequestAmount", "5"); + System.setProperty("relp.reconnectInterval", "500"); + } + @AfterEach void reset() { this.relpServer.clear(); @@ -67,7 +68,6 @@ public void testRebind() { new ManagedRelpConnection(new RelpConnectionWithConfig(new RelpConnection(), relpConfig)), relpConfig.rebindRequestAmount ); - connection.connect(); SyslogMessage syslogMessage = new SyslogMessage() .withFacility(Facility.USER) @@ -86,4 +86,70 @@ public void testRebind() { Assertions.assertEquals(relpConfig.rebindRequestAmount, frameDelegates.get(2).recordsReceived()); Assertions.assertEquals(1, frameDelegates.get(3).recordsReceived()); // last one receives only 1 message } + + @Test + public void testRebindMultipleConnections() { + RelpConfig relpConfig = new RelpConfig(); + + // Multiple connections together using the RelpConnectionFactory and the Pool + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); + + IManagedRelpConnection firstConnection = pool.get(); + IManagedRelpConnection secondConnection = pool.get(); + IManagedRelpConnection thirdConnection = pool.get(); + + SyslogMessage syslogMessage = new SyslogMessage() + .withFacility(Facility.USER) + .withSeverity(Severity.INFORMATIONAL) + .withMsg("foobar"); + + byte[] bytes = syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8); + + for (int i = 0; i <= relpConfig.rebindRequestAmount * 3; i++) { // 3 rebinds + firstConnection.ensureSent(bytes); + pool.offer(firstConnection); + secondConnection.ensureSent(bytes); + pool.offer(secondConnection); + thirdConnection.ensureSent(bytes); + pool.offer(thirdConnection); + } + + List frameDelegates = relpServer.frameDelegates(); + Assertions.assertEquals(12, frameDelegates.size()); + + for (int i = 0; i < 9; i++) { + Assertions.assertEquals(relpConfig.rebindRequestAmount, frameDelegates.get(i).recordsReceived()); + } + + for (int i = 9; i < 12; i++) { + Assertions.assertEquals(1, frameDelegates.get(i).recordsReceived()); // last one receives only 1 message + } + } + + @Test + public void testRebindDisabled() { + System.setProperty("relp.rebindEnabled", "false"); + + RelpConfig relpConfig = new RelpConfig(); + + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); + + IManagedRelpConnection connection = pool.get(); + + SyslogMessage syslogMessage = new SyslogMessage() + .withFacility(Facility.USER) + .withSeverity(Severity.INFORMATIONAL) + .withMsg("foobar"); + + for (int i = 0; i <= relpConfig.rebindRequestAmount; i++) { // rebind + 1 + connection.ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); + } + + // Only one connection should have been used with rebindEnabled = false + Assertions.assertEquals(1, relpServer.frameDelegates().size()); + Assertions + .assertEquals(relpConfig.rebindRequestAmount + 1, relpServer.frameDelegates().get(0).recordsReceived()); + } } diff --git a/src/test/java/com/teragrep/lsh_01/util/RelpServer.java b/src/test/java/com/teragrep/lsh_01/util/RelpServer.java index f8a6df9..e502ce3 100644 --- a/src/test/java/com/teragrep/lsh_01/util/RelpServer.java +++ b/src/test/java/com/teragrep/lsh_01/util/RelpServer.java @@ -149,6 +149,7 @@ public void tearDown() { */ public void clear() { payloads.clear(); + frameDelegates.clear(); } public List payloads() { From 576e5d28361ee28f92eae53fa1b8051d323bfab0 Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Thu, 23 May 2024 14:27:18 +0300 Subject: [PATCH 08/10] Change ManagedRelpConnection to only follow has been a connection made atleast once and not the connection itself. Change counter in CountingFrameDelegate to AtomicInteger. --- .../lsh_01/pool/ManagedRelpConnection.java | 17 +++++++---------- src/test/java/RebindTest.java | 8 ++++++++ .../lsh_01/util/CountingFrameDelegate.java | 7 ++++--- 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java index f03076e..254d9bc 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java @@ -30,14 +30,15 @@ public class ManagedRelpConnection implements IManagedRelpConnection { private static final Logger LOGGER = LogManager.getLogger(ManagedRelpConnection.class); private final IRelpConnection relpConnection; - private boolean connected; + private boolean hasConnected; public ManagedRelpConnection(IRelpConnection relpConnection) { this.relpConnection = relpConnection; - this.connected = false; + this.hasConnected = false; } private void connect() { + boolean connected = false; while (!connected) { try { connected = relpConnection @@ -59,25 +60,21 @@ private void connect() { LOGGER.error("Reconnect timer interrupted, reconnecting now"); } } + this.hasConnected = true; } private void tearDown() { /* - TODO remove: wouldn't need a check but there is a bug in RLP-01 tearDown() + TODO remove: wouldn't need a check hasConnected but there is a bug in RLP-01 tearDown() see https://github.com/teragrep/rlp_01/issues/63 for further info */ - if (connected) { + if (hasConnected) { relpConnection.tearDown(); } - connected = false; } @Override public void ensureSent(byte[] bytes) { - if (!connected) { - connect(); - } - final RelpBatch relpBatch = new RelpBatch(); relpBatch.insert(bytes); boolean notSent = true; @@ -113,7 +110,7 @@ public void close() { LOGGER.error("Forcefully closing connection due to exception <{}>", e.getMessage()); } finally { - this.relpConnection.tearDown(); + tearDown(); } } } diff --git a/src/test/java/RebindTest.java b/src/test/java/RebindTest.java index fb6e5d7..b8f5a95 100644 --- a/src/test/java/RebindTest.java +++ b/src/test/java/RebindTest.java @@ -152,4 +152,12 @@ public void testRebindDisabled() { Assertions .assertEquals(relpConfig.rebindRequestAmount + 1, relpServer.frameDelegates().get(0).recordsReceived()); } + + @Test + public void testCloseWithoutConnecting() { + ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection( + new RelpConnectionWithConfig(new RelpConnection(), new RelpConfig()) + ); + Assertions.assertDoesNotThrow(managedRelpConnection::close); + } } diff --git a/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java b/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java index dcf3514..efc0575 100644 --- a/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java +++ b/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java @@ -23,6 +23,7 @@ import com.teragrep.rlp_03.frame.delegate.FrameContext; import com.teragrep.rlp_03.frame.delegate.FrameDelegate; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; /** @@ -31,7 +32,7 @@ public class CountingFrameDelegate implements FrameDelegate { private final DefaultFrameDelegate frameDelegate; - private int recordsReceived = 0; + private final AtomicInteger recordsReceived = new AtomicInteger(); public CountingFrameDelegate() { Consumer countingConsumer = new Consumer<>() { @@ -39,7 +40,7 @@ public CountingFrameDelegate() { // NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections @Override public synchronized void accept(FrameContext frameContext) { - recordsReceived++; + recordsReceived.getAndIncrement(); } }; @@ -47,7 +48,7 @@ public synchronized void accept(FrameContext frameContext) { } public int recordsReceived() { - return recordsReceived; + return recordsReceived.get(); } @Override From a2d179e04045d989364aba8c841b7ec51cf8c1f7 Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Thu, 23 May 2024 14:43:57 +0300 Subject: [PATCH 09/10] Fix hasConnected in ManagedRelpConnection --- .../java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java index 254d9bc..bacbf34 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java @@ -41,6 +41,7 @@ private void connect() { boolean connected = false; while (!connected) { try { + this.hasConnected = true; connected = relpConnection .connect(relpConnection.relpConfig().relpTarget, relpConnection.relpConfig().relpPort); } @@ -60,7 +61,6 @@ private void connect() { LOGGER.error("Reconnect timer interrupted, reconnecting now"); } } - this.hasConnected = true; } private void tearDown() { From 6baca0dcb39b72f2e645de5492d6123349229dfb Mon Sep 17 00:00:00 2001 From: Rasmus Jokinen <146736881+51-code@users.noreply.github.com> Date: Thu, 23 May 2024 15:54:50 +0300 Subject: [PATCH 10/10] Add tests for invalid rebindRequestAmount system property --- src/test/java/RebindTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/test/java/RebindTest.java b/src/test/java/RebindTest.java index b8f5a95..7627405 100644 --- a/src/test/java/RebindTest.java +++ b/src/test/java/RebindTest.java @@ -160,4 +160,18 @@ public void testCloseWithoutConnecting() { ); Assertions.assertDoesNotThrow(managedRelpConnection::close); } + + @Test + public void testThrowsWithInvalidRebindRequestAmount_negative() { + System.setProperty("relp.rebindRequestAmount", "-1"); + + Assertions.assertThrows(IllegalArgumentException.class, () -> new RelpConfig().validate()); + } + + @Test + public void testThrowsWithInvalidRebindRequestAmount_zero() { + System.setProperty("relp.rebindRequestAmount", "0"); + + Assertions.assertThrows(IllegalArgumentException.class, () -> new RelpConfig().validate()); + } }