From 3cce9b76af193ec35009c8fe7f0a2223bf787a1c Mon Sep 17 00:00:00 2001 From: 51-code <146736881+51-code@users.noreply.github.com> Date: Fri, 24 May 2024 11:34:46 +0300 Subject: [PATCH] Fix #64: Rebind RELP connections after configurable amount of messages have been sent (#66) * Open a new RELP connection after a certain amount of messages have been sent with it. Two new configuration parameters for rebinding. * Refactor counting sent records to IRelpConnection * Add more rebind tests for RelpConnectionPool * Refactor rebinding to a decorator, use generic Pool with managedRelpConnection * Remove unnecessary function already defined in extending interface, apply spotless * Add test for rebind * Remove unnecessary functions from IManagedRelpConnection interface, create new rebind tests * Change ManagedRelpConnection to only follow has been a connection made atleast once and not the connection itself. Change counter in CountingFrameDelegate to AtomicInteger. * Fix hasConnected in ManagedRelpConnection * Add tests for invalid rebindRequestAmount system property --- README.adoc | 2 + etc/config.properties | 2 + src/main/java/com/teragrep/lsh_01/Main.java | 9 +- .../com/teragrep/lsh_01/RelpConversion.java | 13 +- .../teragrep/lsh_01/config/RelpConfig.java | 11 +- .../lsh_01/pool/IManagedRelpConnection.java | 25 +++ .../teragrep/lsh_01/pool/IRelpConnection.java | 2 - .../lsh_01/pool/ManagedRelpConnection.java | 64 ++++--- .../pool/ManagedRelpConnectionStub.java | 38 ++++ .../{RelpConnectionPool.java => Pool.java} | 59 +++--- .../com/teragrep/lsh_01/pool/Poolable.java | 26 +++ .../lsh_01/pool/RebindableRelpConnection.java | 70 +++++++ .../lsh_01/pool/RelpConnectionFactory.java | 15 +- .../lsh_01/pool/RelpConnectionStub.java | 121 ------------ .../lsh_01/pool/RelpConnectionWithConfig.java | 8 +- .../com/teragrep/lsh_01/pool/Stubable.java | 25 +++ src/test/java/CredentialsTest.java | 43 +++-- src/test/java/EndToEndTest.java | 6 +- src/test/java/LookupTest.java | 15 +- src/test/java/RebindTest.java | 177 ++++++++++++++++++ .../lsh_01/util/CountingFrameDelegate.java | 68 +++++++ .../com/teragrep/lsh_01/util/RelpServer.java | 45 ++++- 22 files changed, 592 insertions(+), 252 deletions(-) create mode 100644 src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java create mode 100644 src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java rename src/main/java/com/teragrep/lsh_01/pool/{RelpConnectionPool.java => Pool.java} (55%) create mode 100644 src/main/java/com/teragrep/lsh_01/pool/Poolable.java create mode 100644 src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java delete mode 100644 src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java create mode 100644 src/main/java/com/teragrep/lsh_01/pool/Stubable.java create mode 100644 src/test/java/RebindTest.java create mode 100644 src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java diff --git a/README.adoc b/README.adoc index f31bdf2..6fda0d4 100644 --- a/README.adoc +++ b/README.adoc @@ -27,6 +27,8 @@ server.maxContentLength,262144,How big requests are allowed in bytes relp.target,127.0.0.1,RELP server address relp.port,601,RELP server port relp.reconnectInterval,10000,How long to wait before reconnecting in milliseconds +relp.rebindRequestAmount, 1000000, How many requests from a RELP connection until rebinding it +relp.rebindEnabled, false, Sets whether rebinding RELP connections is enabled healthcheck.enabled,true,Sets if an internal healthcheck endpoint is enabled. healthcheck.url,/healthcheck,An internal healthcheck endpoint that will always reply 200 ok regardless of security settings. Accessing this url won't generate any events. security.authRequired,true,Sets whether Basic HTTP Authorization headers are required. Username for lookups will be empty string '' if set to false. diff --git a/etc/config.properties b/etc/config.properties index 2ae68a9..722a5b7 100644 --- a/etc/config.properties +++ b/etc/config.properties @@ -10,6 +10,8 @@ healthcheck.url=/healthcheck relp.target=127.0.0.1 relp.port=601 relp.reconnectInterval=10000 +relp.rebindRequestAmount=1000000 +relp.rebindEnabled=false security.authRequired=true diff --git a/src/main/java/com/teragrep/lsh_01/Main.java b/src/main/java/com/teragrep/lsh_01/Main.java index 2b3b269..ba5c235 100644 --- a/src/main/java/com/teragrep/lsh_01/Main.java +++ b/src/main/java/com/teragrep/lsh_01/Main.java @@ -22,8 +22,7 @@ import com.teragrep.lsh_01.authentication.BasicAuthentication; import com.teragrep.lsh_01.authentication.BasicAuthenticationFactory; import com.teragrep.lsh_01.config.*; -import com.teragrep.lsh_01.pool.RelpConnectionFactory; -import com.teragrep.lsh_01.pool.RelpConnectionPool; +import com.teragrep.lsh_01.pool.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,10 +58,10 @@ public static void main(String[] args) { LOGGER.info("Authentication required: <[{}]>", securityConfig.authRequired); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, lookupConfig, @@ -81,7 +80,7 @@ public static void main(String[] args) { server.run(); } finally { - relpConnectionPool.close(); + pool.close(); } } } diff --git a/src/main/java/com/teragrep/lsh_01/RelpConversion.java b/src/main/java/com/teragrep/lsh_01/RelpConversion.java index a48dd3a..ab48e2d 100644 --- a/src/main/java/com/teragrep/lsh_01/RelpConversion.java +++ b/src/main/java/com/teragrep/lsh_01/RelpConversion.java @@ -26,9 +26,7 @@ import com.teragrep.lsh_01.config.PayloadConfig; import com.teragrep.lsh_01.config.SecurityConfig; import com.teragrep.lsh_01.lookup.LookupTableFactory; -import com.teragrep.lsh_01.pool.IRelpConnection; -import com.teragrep.lsh_01.pool.ManagedRelpConnection; -import com.teragrep.lsh_01.pool.RelpConnectionPool; +import com.teragrep.lsh_01.pool.*; import com.teragrep.rlo_14.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -42,7 +40,7 @@ public class RelpConversion implements IMessageHandler { private final static Logger LOGGER = LogManager.getLogger(RelpConversion.class); - private final RelpConnectionPool relpConnectionPool; + private final Pool relpConnectionPool; private final SecurityConfig securityConfig; private final BasicAuthentication basicAuthentication; private final LookupConfig lookupConfig; @@ -51,7 +49,7 @@ public class RelpConversion implements IMessageHandler { private final StringLookupTable appnameLookup; public RelpConversion( - RelpConnectionPool relpConnectionPool, + Pool relpConnectionPool, SecurityConfig securityConfig, BasicAuthentication basicAuthentication, LookupConfig lookupConfig, @@ -139,9 +137,8 @@ private void sendMessage( .withSDElement(headerSDElement) .withSDElement(sdElement); - IRelpConnection relpConnection = relpConnectionPool.take(); - ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnection); - managedRelpConnection.ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); + IManagedRelpConnection relpConnection = relpConnectionPool.get(); + relpConnection.ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); relpConnectionPool.offer(relpConnection); } } diff --git a/src/main/java/com/teragrep/lsh_01/config/RelpConfig.java b/src/main/java/com/teragrep/lsh_01/config/RelpConfig.java index 099c0ed..977f2ad 100644 --- a/src/main/java/com/teragrep/lsh_01/config/RelpConfig.java +++ b/src/main/java/com/teragrep/lsh_01/config/RelpConfig.java @@ -24,6 +24,8 @@ public class RelpConfig implements Validateable { public final String relpTarget; public final int relpPort; public final int relpReconnectInterval; + public final int rebindRequestAmount; + public final boolean rebindEnabled; public RelpConfig() { PropertiesReaderUtilityClass propertiesReader = new PropertiesReaderUtilityClass( @@ -32,16 +34,21 @@ public RelpConfig() { relpTarget = propertiesReader.getStringProperty("relp.target"); relpPort = propertiesReader.getIntProperty("relp.port"); relpReconnectInterval = propertiesReader.getIntProperty("relp.reconnectInterval"); + rebindRequestAmount = propertiesReader.getIntProperty("relp.rebindRequestAmount"); + rebindEnabled = propertiesReader.getBooleanProperty("relp.rebindEnabled"); } @Override public void validate() { - + if (rebindEnabled && rebindRequestAmount < 1) { + throw new IllegalArgumentException("relp.rebindRequestAmount has to be a positive number"); + } } @Override public String toString() { return "RelpConfig{" + "relpTarget='" + relpTarget + '\'' + ", relpPort=" + relpPort - + ", relpReconnectInterval=" + relpReconnectInterval + '}'; + + ", relpReconnectInterval=" + relpReconnectInterval + ", rebindRequestAmount=" + rebindRequestAmount + + ", rebindEnabled=" + rebindEnabled + '}'; } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java new file mode 100644 index 0000000..2e63fdb --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/IManagedRelpConnection.java @@ -0,0 +1,25 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +public interface IManagedRelpConnection extends Poolable { + + void ensureSent(byte[] bytes); +} diff --git a/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java index 3e73883..47c1ebb 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/IRelpConnection.java @@ -57,7 +57,5 @@ public interface IRelpConnection { void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException; - boolean isStub(); - RelpConfig relpConfig(); } diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java index b0cbc0e..bacbf34 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnection.java @@ -26,20 +26,22 @@ import java.io.IOException; import java.util.concurrent.TimeoutException; -public class ManagedRelpConnection { +public class ManagedRelpConnection implements IManagedRelpConnection { private static final Logger LOGGER = LogManager.getLogger(ManagedRelpConnection.class); private final IRelpConnection relpConnection; + private boolean hasConnected; public ManagedRelpConnection(IRelpConnection relpConnection) { this.relpConnection = relpConnection; + this.hasConnected = false; } - void connect() { - boolean notConnected = true; - while (notConnected) { - boolean connected = false; + private void connect() { + boolean connected = false; + while (!connected) { try { + this.hasConnected = true; connected = relpConnection .connect(relpConnection.relpConfig().relpTarget, relpConnection.relpConfig().relpPort); } @@ -51,37 +53,27 @@ void connect() { e.getMessage() ); } - if (connected) { - notConnected = false; + + try { + Thread.sleep(relpConnection.relpConfig().relpReconnectInterval); } - else { - try { - Thread.sleep(relpConnection.relpConfig().relpReconnectInterval); - } - catch (InterruptedException e) { - LOGGER.error("Reconnect timer interrupted, reconnecting now"); - } + catch (InterruptedException e) { + LOGGER.error("Reconnect timer interrupted, reconnecting now"); } } } private void tearDown() { - relpConnection.tearDown(); - } - - private void disconnect() { - boolean disconnected = false; - try { - disconnected = relpConnection.disconnect(); - } - catch (IllegalStateException | IOException | TimeoutException e) { - LOGGER.error("Forcefully closing connection due to exception <{}>", e.getMessage()); - } - finally { - this.tearDown(); + /* + TODO remove: wouldn't need a check hasConnected but there is a bug in RLP-01 tearDown() + see https://github.com/teragrep/rlp_01/issues/63 for further info + */ + if (hasConnected) { + relpConnection.tearDown(); } } + @Override public void ensureSent(byte[] bytes) { final RelpBatch relpBatch = new RelpBatch(); relpBatch.insert(bytes); @@ -103,4 +95,22 @@ public void ensureSent(byte[] bytes) { } } } + + @Override + public boolean isStub() { + return false; + } + + @Override + public void close() { + try { + this.relpConnection.disconnect(); + } + catch (IllegalStateException | IOException | TimeoutException e) { + LOGGER.error("Forcefully closing connection due to exception <{}>", e.getMessage()); + } + finally { + tearDown(); + } + } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java new file mode 100644 index 0000000..d998ad8 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/ManagedRelpConnectionStub.java @@ -0,0 +1,38 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +public class ManagedRelpConnectionStub implements IManagedRelpConnection { + + @Override + public void ensureSent(byte[] bytes) { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } + + @Override + public boolean isStub() { + return true; + } + + @Override + public void close() { + throw new IllegalStateException("ManagedRelpConnectionStub does not support this"); + } +} diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java b/src/main/java/com/teragrep/lsh_01/pool/Pool.java similarity index 55% rename from src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java rename to src/main/java/com/teragrep/lsh_01/pool/Pool.java index 831feea..e3808e4 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionPool.java +++ b/src/main/java/com/teragrep/lsh_01/pool/Pool.java @@ -28,75 +28,69 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; -// TODO create test cases -public class RelpConnectionPool implements AutoCloseable { +public class Pool implements AutoCloseable, Supplier { - private static final Logger LOGGER = LogManager.getLogger(RelpConnectionPool.class); + private static final Logger LOGGER = LogManager.getLogger(com.teragrep.lsh_01.pool.Pool.class); - private final Supplier relpConnectionWrapSupplier; + private final Supplier supplier; - private final ConcurrentLinkedQueue queue; + private final ConcurrentLinkedQueue queue; - private final RelpConnectionStub relpConnectionStub; + private final T stub; private final Lock lock = new ReentrantLock(); private final AtomicBoolean close; - public RelpConnectionPool(final Supplier relpConnectionWrapSupplier) { - this.relpConnectionWrapSupplier = relpConnectionWrapSupplier; + public Pool(final Supplier supplier, T stub) { + this.supplier = supplier; this.queue = new ConcurrentLinkedQueue<>(); - this.relpConnectionStub = new RelpConnectionStub(); + this.stub = stub; this.close = new AtomicBoolean(); - - // TODO maximum number of available connections should be perhaps limited? } - public IRelpConnection take() { - IRelpConnection frameDelegate; + public T get() { + T object; if (close.get()) { - frameDelegate = relpConnectionStub; + object = stub; } else { // get or create - frameDelegate = queue.poll(); - if (frameDelegate == null) { - frameDelegate = relpConnectionWrapSupplier.get(); + object = queue.poll(); + if (object == null) { + object = supplier.get(); } } - return frameDelegate; + return object; } - public void offer(IRelpConnection iRelpConnection) { - if (!iRelpConnection.isStub()) { - queue.add(iRelpConnection); + public void offer(T object) { + if (!object.isStub()) { + queue.add(object); } if (close.get()) { while (queue.peek() != null) { if (lock.tryLock()) { while (true) { - IRelpConnection queuedConnection = queue.poll(); - if (queuedConnection == null) { + T pooled = queue.poll(); + if (pooled == null) { break; } else { try { - LOGGER.debug("Closing frameDelegate <{}>", queuedConnection); - queuedConnection.disconnect(); - LOGGER.debug("Closed frameDelegate <{}>", queuedConnection); + LOGGER.debug("Closing poolable <{}>", pooled); + pooled.close(); + LOGGER.debug("Closed poolable <{}>", pooled); } catch (Exception exception) { LOGGER .warn( - "Exception <{}> while closing frameDelegate <{}>", - exception.getMessage(), queuedConnection + "Exception <{}> while closing poolable <{}>", exception.getMessage(), + pooled ); } - finally { - queuedConnection.tearDown(); - } } } lock.unlock(); @@ -112,6 +106,7 @@ public void close() { close.set(true); // close all that are in the pool right now - offer(relpConnectionStub); + offer(stub); } + } diff --git a/src/main/java/com/teragrep/lsh_01/pool/Poolable.java b/src/main/java/com/teragrep/lsh_01/pool/Poolable.java new file mode 100644 index 0000000..510d482 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/Poolable.java @@ -0,0 +1,26 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +import java.io.Closeable; + +public interface Poolable extends Stubable, Closeable { + +} diff --git a/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java b/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java new file mode 100644 index 0000000..59f7854 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/RebindableRelpConnection.java @@ -0,0 +1,70 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; + +public class RebindableRelpConnection implements IManagedRelpConnection { + + private static final Logger LOGGER = LogManager.getLogger(RebindableRelpConnection.class); + + private final IManagedRelpConnection managedRelpConnection; + private int recordsSent; + private final int rebindRequestAmount; + + public RebindableRelpConnection(IManagedRelpConnection managedRelpConnection, int rebindRequestAmount) { + this.managedRelpConnection = managedRelpConnection; + this.recordsSent = 0; + this.rebindRequestAmount = rebindRequestAmount; + } + + @Override + public void ensureSent(byte[] bytes) { + if (recordsSent >= rebindRequestAmount) { + LOGGER.debug("Rebinding ManagedRelpConnection <{}>", managedRelpConnection); + try { + close(); + } + catch (Exception exception) { + LOGGER + .warn( + "Exception <{}> while closing ManagedRelpConnection <{}>", exception.getMessage(), + managedRelpConnection + ); + } + recordsSent = 0; + } + managedRelpConnection.ensureSent(bytes); + recordsSent++; + } + + @Override + public boolean isStub() { + return false; + } + + @Override + public void close() throws IOException { + managedRelpConnection.close(); + } +} diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java index 35d0042..c8ef654 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionFactory.java @@ -24,7 +24,7 @@ import java.util.function.Supplier; -public class RelpConnectionFactory implements Supplier { +public class RelpConnectionFactory implements Supplier { private final RelpConfig relpConfig; @@ -33,18 +33,17 @@ public RelpConnectionFactory(RelpConfig relpConfig) { } @Override - public IRelpConnection get() { + public IManagedRelpConnection get() { RelpConnection relpConnection = new RelpConnection(); RelpConnectionWithConfig relpConnectionWithConfig = new RelpConnectionWithConfig(relpConnection, relpConfig); + IManagedRelpConnection managedRelpConnection = new ManagedRelpConnection(relpConnectionWithConfig); - /* - TODO remove: shouldn't be here, but there is a bug in tearDown, so we initialize connection here - see https://github.com/teragrep/rlp_01/issues/63 for further info - */ - new ManagedRelpConnection(relpConnectionWithConfig).connect(); + if (relpConfig.rebindEnabled) { + managedRelpConnection = new RebindableRelpConnection(managedRelpConnection, relpConfig.rebindRequestAmount); + } - return relpConnectionWithConfig; + return managedRelpConnection; } } diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java deleted file mode 100644 index f94d9fe..0000000 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionStub.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - logstash-http-input to syslog bridge - Copyright 2024 Suomen Kanuuna Oy - - Derivative Work of Elasticsearch - Copyright 2012-2015 Elasticsearch - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ -package com.teragrep.lsh_01.pool; - -import com.teragrep.lsh_01.config.RelpConfig; -import com.teragrep.rlp_01.RelpBatch; - -import java.io.IOException; -import java.util.concurrent.TimeoutException; - -public class RelpConnectionStub implements IRelpConnection { - - @Override - public int getReadTimeout() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setReadTimeout(int readTimeout) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public int getWriteTimeout() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setWriteTimeout(int writeTimeout) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public int getConnectionTimeout() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setConnectionTimeout(int timeout) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public void setKeepAlive(boolean on) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public int getRxBufferSize() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setRxBufferSize(int size) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public int getTxBufferSize() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void setTxBufferSize(int size) { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public boolean connect(String hostname, int port) throws IOException, IllegalStateException, TimeoutException { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public void tearDown() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public boolean disconnect() throws IOException, IllegalStateException, TimeoutException { - throw new IllegalStateException("RelpConnectionStub does not support this"); - - } - - @Override - public void commit(RelpBatch relpBatch) throws IOException, IllegalStateException, TimeoutException { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - - @Override - public boolean isStub() { - return true; - } - - @Override - public RelpConfig relpConfig() { - throw new IllegalStateException("RelpConnectionStub does not support this"); - } - -} diff --git a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java index f4afa36..23e0956 100644 --- a/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java +++ b/src/main/java/com/teragrep/lsh_01/pool/RelpConnectionWithConfig.java @@ -22,12 +22,15 @@ import com.teragrep.lsh_01.config.RelpConfig; import com.teragrep.rlp_01.RelpBatch; import com.teragrep.rlp_01.RelpConnection; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RelpConnectionWithConfig implements IRelpConnection { + private final static Logger LOGGER = LogManager.getLogger(RelpConnectionWithConfig.class); private final RelpConnection relpConnection; private final RelpConfig relpConfig; @@ -111,11 +114,6 @@ public void commit(RelpBatch relpBatch) throws IOException, IllegalStateExceptio relpConnection.commit(relpBatch); } - @Override - public boolean isStub() { - return false; - } - @Override public RelpConfig relpConfig() { return relpConfig; diff --git a/src/main/java/com/teragrep/lsh_01/pool/Stubable.java b/src/main/java/com/teragrep/lsh_01/pool/Stubable.java new file mode 100644 index 0000000..7b8a1d7 --- /dev/null +++ b/src/main/java/com/teragrep/lsh_01/pool/Stubable.java @@ -0,0 +1,25 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.pool; + +public interface Stubable { + + boolean isStub(); +} diff --git a/src/test/java/CredentialsTest.java b/src/test/java/CredentialsTest.java index 88b6e6c..c2bdfb5 100644 --- a/src/test/java/CredentialsTest.java +++ b/src/test/java/CredentialsTest.java @@ -24,8 +24,7 @@ import com.teragrep.lsh_01.config.PayloadConfig; import com.teragrep.lsh_01.config.RelpConfig; import com.teragrep.lsh_01.config.SecurityConfig; -import com.teragrep.lsh_01.pool.RelpConnectionFactory; -import com.teragrep.lsh_01.pool.RelpConnectionPool; +import com.teragrep.lsh_01.pool.*; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -56,9 +55,9 @@ public void testNoAuthRequired() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -84,9 +83,9 @@ public void testAuthRequired() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -107,9 +106,9 @@ public void testValidBase64ButNoColon() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -130,9 +129,9 @@ public void testMultipleColons() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -151,9 +150,9 @@ public void testInvalidBase64Auth() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -173,9 +172,9 @@ public void testNonBasicAuth() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -198,9 +197,9 @@ public void testWrongCredentials() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -223,9 +222,9 @@ public void testEmptyUsername() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -248,9 +247,9 @@ public void testEmptyPassword() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -271,9 +270,9 @@ public void testNullToken() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), diff --git a/src/test/java/EndToEndTest.java b/src/test/java/EndToEndTest.java index 7c876d5..b3afd47 100644 --- a/src/test/java/EndToEndTest.java +++ b/src/test/java/EndToEndTest.java @@ -44,15 +44,16 @@ public class EndToEndTest { void setUp() throws InterruptedException { System.setProperty("payload.splitEnabled", "true"); System.setProperty("security.authRequired", "false"); + System.setProperty("relp.reconnectInterval", "1000"); // Start listening to HTTP-requests - Thread program = new Thread(() -> Main.main(new String[]{})); + Thread program = new Thread(() -> Main.main(new String[] {})); program.start(); Thread.sleep(3000); // wait for netty to start up this.relpServer = new RelpServer(); - this.relpServer.setUp(); + this.relpServer.setUpDefault(); this.nettyConfig = new NettyConfig(); } @@ -66,6 +67,7 @@ void reset() { void tearDown() { System.clearProperty("payload.splitEnabled"); System.clearProperty("security.authRequired"); + System.clearProperty("relp.reconnectInterval"); this.relpServer.tearDown(); } diff --git a/src/test/java/LookupTest.java b/src/test/java/LookupTest.java index fd8b3cd..2d2a9b0 100644 --- a/src/test/java/LookupTest.java +++ b/src/test/java/LookupTest.java @@ -27,8 +27,7 @@ import com.teragrep.lsh_01.config.RelpConfig; import com.teragrep.lsh_01.config.SecurityConfig; import com.teragrep.lsh_01.lookup.LookupTableFactory; -import com.teragrep.lsh_01.pool.RelpConnectionFactory; -import com.teragrep.lsh_01.pool.RelpConnectionPool; +import com.teragrep.lsh_01.pool.*; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -63,9 +62,9 @@ public void testAppnameLookup() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -88,9 +87,9 @@ public void testHostnameLookup() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), @@ -115,9 +114,9 @@ public void testMissingLookups() { SecurityConfig securityConfig = new SecurityConfig(); BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create(); RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); - RelpConnectionPool relpConnectionPool = new RelpConnectionPool(relpConnectionFactory); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); RelpConversion relpConversion = new RelpConversion( - relpConnectionPool, + pool, securityConfig, basicAuthentication, new LookupConfig(), diff --git a/src/test/java/RebindTest.java b/src/test/java/RebindTest.java new file mode 100644 index 0000000..7627405 --- /dev/null +++ b/src/test/java/RebindTest.java @@ -0,0 +1,177 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +import com.teragrep.lsh_01.config.RelpConfig; +import com.teragrep.lsh_01.pool.*; +import com.teragrep.lsh_01.util.CountingFrameDelegate; +import com.teragrep.lsh_01.util.RelpServer; +import com.teragrep.rlo_14.Facility; +import com.teragrep.rlo_14.Severity; +import com.teragrep.rlo_14.SyslogMessage; +import com.teragrep.rlp_01.RelpConnection; +import org.junit.jupiter.api.*; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class RebindTest { + + private RelpServer relpServer; + + @BeforeAll + void setUp() { + this.relpServer = new RelpServer(); + this.relpServer.setUpCounting(); + } + + @BeforeEach + void setProperties() { + System.setProperty("relp.rebindEnabled", "true"); + System.setProperty("relp.rebindRequestAmount", "5"); + System.setProperty("relp.reconnectInterval", "500"); + } + + @AfterEach + void reset() { + this.relpServer.clear(); + } + + @AfterAll + void tearDown() { + System.clearProperty("relp.rebindEnabled"); + System.clearProperty("relp.rebindRequestAmount"); + System.clearProperty("relp.reconnectInterval"); + this.relpServer.tearDown(); + } + + @Test + public void testRebind() { + RelpConfig relpConfig = new RelpConfig(); + RebindableRelpConnection connection = new RebindableRelpConnection( + new ManagedRelpConnection(new RelpConnectionWithConfig(new RelpConnection(), relpConfig)), + relpConfig.rebindRequestAmount + ); + + SyslogMessage syslogMessage = new SyslogMessage() + .withFacility(Facility.USER) + .withSeverity(Severity.INFORMATIONAL) + .withMsg("foobar"); + + for (int i = 0; i <= relpConfig.rebindRequestAmount * 3; i++) { // 3 rebinds + connection.ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); + } + + List frameDelegates = relpServer.frameDelegates(); + Assertions.assertEquals(4, frameDelegates.size()); + + Assertions.assertEquals(relpConfig.rebindRequestAmount, frameDelegates.get(0).recordsReceived()); + Assertions.assertEquals(relpConfig.rebindRequestAmount, frameDelegates.get(1).recordsReceived()); + Assertions.assertEquals(relpConfig.rebindRequestAmount, frameDelegates.get(2).recordsReceived()); + Assertions.assertEquals(1, frameDelegates.get(3).recordsReceived()); // last one receives only 1 message + } + + @Test + public void testRebindMultipleConnections() { + RelpConfig relpConfig = new RelpConfig(); + + // Multiple connections together using the RelpConnectionFactory and the Pool + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); + + IManagedRelpConnection firstConnection = pool.get(); + IManagedRelpConnection secondConnection = pool.get(); + IManagedRelpConnection thirdConnection = pool.get(); + + SyslogMessage syslogMessage = new SyslogMessage() + .withFacility(Facility.USER) + .withSeverity(Severity.INFORMATIONAL) + .withMsg("foobar"); + + byte[] bytes = syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8); + + for (int i = 0; i <= relpConfig.rebindRequestAmount * 3; i++) { // 3 rebinds + firstConnection.ensureSent(bytes); + pool.offer(firstConnection); + secondConnection.ensureSent(bytes); + pool.offer(secondConnection); + thirdConnection.ensureSent(bytes); + pool.offer(thirdConnection); + } + + List frameDelegates = relpServer.frameDelegates(); + Assertions.assertEquals(12, frameDelegates.size()); + + for (int i = 0; i < 9; i++) { + Assertions.assertEquals(relpConfig.rebindRequestAmount, frameDelegates.get(i).recordsReceived()); + } + + for (int i = 9; i < 12; i++) { + Assertions.assertEquals(1, frameDelegates.get(i).recordsReceived()); // last one receives only 1 message + } + } + + @Test + public void testRebindDisabled() { + System.setProperty("relp.rebindEnabled", "false"); + + RelpConfig relpConfig = new RelpConfig(); + + RelpConnectionFactory relpConnectionFactory = new RelpConnectionFactory(relpConfig); + Pool pool = new Pool<>(relpConnectionFactory, new ManagedRelpConnectionStub()); + + IManagedRelpConnection connection = pool.get(); + + SyslogMessage syslogMessage = new SyslogMessage() + .withFacility(Facility.USER) + .withSeverity(Severity.INFORMATIONAL) + .withMsg("foobar"); + + for (int i = 0; i <= relpConfig.rebindRequestAmount; i++) { // rebind + 1 + connection.ensureSent(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); + } + + // Only one connection should have been used with rebindEnabled = false + Assertions.assertEquals(1, relpServer.frameDelegates().size()); + Assertions + .assertEquals(relpConfig.rebindRequestAmount + 1, relpServer.frameDelegates().get(0).recordsReceived()); + } + + @Test + public void testCloseWithoutConnecting() { + ManagedRelpConnection managedRelpConnection = new ManagedRelpConnection( + new RelpConnectionWithConfig(new RelpConnection(), new RelpConfig()) + ); + Assertions.assertDoesNotThrow(managedRelpConnection::close); + } + + @Test + public void testThrowsWithInvalidRebindRequestAmount_negative() { + System.setProperty("relp.rebindRequestAmount", "-1"); + + Assertions.assertThrows(IllegalArgumentException.class, () -> new RelpConfig().validate()); + } + + @Test + public void testThrowsWithInvalidRebindRequestAmount_zero() { + System.setProperty("relp.rebindRequestAmount", "0"); + + Assertions.assertThrows(IllegalArgumentException.class, () -> new RelpConfig().validate()); + } +} diff --git a/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java b/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java new file mode 100644 index 0000000..efc0575 --- /dev/null +++ b/src/test/java/com/teragrep/lsh_01/util/CountingFrameDelegate.java @@ -0,0 +1,68 @@ +/* + logstash-http-input to syslog bridge + Copyright 2024 Suomen Kanuuna Oy + + Derivative Work of Elasticsearch + Copyright 2012-2015 Elasticsearch + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ +package com.teragrep.lsh_01.util; + +import com.teragrep.rlp_03.frame.delegate.DefaultFrameDelegate; +import com.teragrep.rlp_03.frame.delegate.FrameContext; +import com.teragrep.rlp_03.frame.delegate.FrameDelegate; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +/** + * FrameDelegate that counts the number of incoming messages (for the connection). Useful for rebind testing. + */ +public class CountingFrameDelegate implements FrameDelegate { + + private final DefaultFrameDelegate frameDelegate; + private final AtomicInteger recordsReceived = new AtomicInteger(); + + public CountingFrameDelegate() { + Consumer countingConsumer = new Consumer<>() { + + // NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections + @Override + public synchronized void accept(FrameContext frameContext) { + recordsReceived.getAndIncrement(); + } + }; + + this.frameDelegate = new DefaultFrameDelegate(countingConsumer); + } + + public int recordsReceived() { + return recordsReceived.get(); + } + + @Override + public boolean accept(FrameContext frameContext) { + return frameDelegate.accept(frameContext); + } + + @Override + public void close() throws Exception { + frameDelegate.close(); + } + + @Override + public boolean isStub() { + return false; + } +} diff --git a/src/test/java/com/teragrep/lsh_01/util/RelpServer.java b/src/test/java/com/teragrep/lsh_01/util/RelpServer.java index 7928524..e502ce3 100644 --- a/src/test/java/com/teragrep/lsh_01/util/RelpServer.java +++ b/src/test/java/com/teragrep/lsh_01/util/RelpServer.java @@ -43,6 +43,8 @@ public class RelpServer { private Thread eventLoopThread; private final int listenPort; private final ExecutorService executorService; + private final Consumer syslogConsumer; + private final List frameDelegates; private final List payloads; @@ -50,16 +52,12 @@ public RelpServer() { RelpConfig relpConfig = new RelpConfig(); this.listenPort = relpConfig.relpPort; this.payloads = new ArrayList<>(); + this.frameDelegates = new ArrayList<>(); int threads = 1; this.executorService = Executors.newFixedThreadPool(threads); - } - /** - * Set up the server before running end-to-end tests. - */ - public void setUp() { - Consumer syslogConsumer = new Consumer<>() { + this.syslogConsumer = new Consumer<>() { // NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections @Override @@ -67,12 +65,28 @@ public synchronized void accept(FrameContext frameContext) { payloads.add(frameContext.relpFrame().payload().toString()); } }; + } - /* - * DefaultFrameDelegate accepts Consumer for processing syslog frames - */ - Supplier frameDelegateSupplier = () -> new DefaultFrameDelegate(syslogConsumer); + /** + * Set up the server before running end-to-end tests. Uses DefaultFrameDelegate. + */ + public void setUpDefault() { + setUp(() -> new DefaultFrameDelegate(this.syslogConsumer)); + } + /** + * Set up the server before running end-to-end tests. Uses CountingFrameDelegate. + */ + public void setUpCounting() { + setUp(this::addCountingFrameDelegate); + } + + /** + * Set up the server before running end-to-end tests. + * + * @param frameDelegateSupplier custom FrameDelegateSupplier + */ + private void setUp(Supplier frameDelegateSupplier) { /* * EventLoop is used to notice any events from the connections */ @@ -135,9 +149,20 @@ public void tearDown() { */ public void clear() { payloads.clear(); + frameDelegates.clear(); } public List payloads() { return payloads; } + + public List frameDelegates() { + return frameDelegates; + } + + private FrameDelegate addCountingFrameDelegate() { + CountingFrameDelegate frameDelegate = new CountingFrameDelegate(); + frameDelegates.add(frameDelegate); + return frameDelegate; + } }