Skip to content

Commit

Permalink
Adds support for splitting incoming requests into multiple outgoing m…
Browse files Browse the repository at this point in the history
…essages with a regex defined in config
  • Loading branch information
Rasmus Jokinen committed Apr 30, 2024
1 parent caa323e commit 167dfec
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 19 deletions.
2 changes: 2 additions & 0 deletions etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ credentials.file=etc/credentials.json

lookups.hostname.file=etc/hostname.json
lookups.appname.file=etc/appname.json

payload.splitRegex=\n
6 changes: 5 additions & 1 deletion src/main/java/com/teragrep/lsh_01/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ public static void main(String[] args) {
BasicAuthentication basicAuthentication = new BasicAuthenticationFactory().create();
InternalEndpointUrlConfig internalEndpointUrlConfig = new InternalEndpointUrlConfig();
LookupConfig lookupConfig = new LookupConfig();
PayloadConfig payloadConfig = new PayloadConfig();
try {
nettyConfig.validate();
relpConfig.validate();
securityConfig.validate();
internalEndpointUrlConfig.validate();
lookupConfig.validate();
payloadConfig.validate();
}
catch (IllegalArgumentException e) {
LOGGER.error("Can't parse config properly: {}", e.getMessage());
Expand All @@ -51,12 +53,14 @@ public static void main(String[] args) {
LOGGER.info("Got relp config: <[{}]>", relpConfig);
LOGGER.info("Got internal endpoint config: <[{}]>", internalEndpointUrlConfig);
LOGGER.info("Got lookup table config: <[{}]>", lookupConfig);
LOGGER.info("Got payload config: <[{}]>", payloadConfig);
LOGGER.info("Authentication required: <[{}]>", securityConfig.authRequired);
RelpConversion relpConversion = new RelpConversion(
relpConfig,
securityConfig,
basicAuthentication,
lookupConfig
lookupConfig,
payloadConfig
);
try (
NettyHttpServer server = new NettyHttpServer(
Expand Down
73 changes: 73 additions & 0 deletions src/main/java/com/teragrep/lsh_01/Payload.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
logstash-http-input to syslog bridge
Copyright 2024 Suomen Kanuuna Oy
Derivative Work of Elasticsearch
Copyright 2012-2015 Elasticsearch
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.teragrep.lsh_01;

import com.teragrep.lsh_01.config.PayloadConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.PatternSyntaxException;

/**
* A message from a log source
*/
final public class Payload {

private final static Logger LOGGER = LogManager.getLogger(Payload.class);
private final PayloadConfig payloadConfig;
private final String body;

public Payload(PayloadConfig payloadConfig, String body) {
this.payloadConfig = payloadConfig;
this.body = body;
}

/**
* Splits the payload into multiple payloads if there is a defined split regex in the body.
* @return list of Payloads
*/
public List<Payload> split() {
ArrayList<Payload> payloads = new ArrayList<>();

try {
String[] messages = body.split(payloadConfig.splitRegex);

for (String message: messages) {
payloads.add(new Payload(payloadConfig, message));
}
} catch (PatternSyntaxException e) {
LOGGER.error("Invalid splitRegex in configuration: <{}>", payloadConfig.splitRegex);
payloads.add(this);
}

return payloads;
}

/**
* Takes the message from the payload.
* @return message body
*/
public String take() {
return body;
}
}
18 changes: 13 additions & 5 deletions src/main/java/com/teragrep/lsh_01/RelpConversion.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.teragrep.lsh_01.authentication.BasicAuthentication;
import com.teragrep.lsh_01.authentication.Subject;
import com.teragrep.lsh_01.config.LookupConfig;
import com.teragrep.lsh_01.config.PayloadConfig;
import com.teragrep.lsh_01.config.RelpConfig;
import com.teragrep.lsh_01.config.SecurityConfig;
import com.teragrep.lsh_01.lookup.LookupTableFactory;
Expand All @@ -48,14 +49,16 @@ public class RelpConversion implements IMessageHandler {
private final SecurityConfig securityConfig;
private final BasicAuthentication basicAuthentication;
private final LookupConfig lookupConfig;
private final PayloadConfig payloadConfig;
private final StringLookupTable hostnameLookup;
private final StringLookupTable appnameLookup;

public RelpConversion(
RelpConfig relpConfig,
SecurityConfig securityConfig,
BasicAuthentication basicAuthentication,
LookupConfig lookupConfig
LookupConfig lookupConfig,
PayloadConfig payloadConfig
) {
this.relpConfig = relpConfig;
this.securityConfig = securityConfig;
Expand All @@ -64,13 +67,18 @@ public RelpConversion(
this.lookupConfig = lookupConfig;
this.hostnameLookup = new LookupTableFactory().create(lookupConfig.hostnamePath);
this.appnameLookup = new LookupTableFactory().create(lookupConfig.appNamePath);
this.payloadConfig = payloadConfig;
}

public boolean onNewMessage(String remoteAddress, Subject subject, Map<String, String> headers, String body) {
try {
sendMessage(
body, headers, subject.subject(), hostnameLookup.lookup(subject.subject()), appnameLookup.lookup(subject.subject())
);
Payload originalPayload = new Payload(payloadConfig, body);

for (Payload payload : originalPayload.split()) {
sendMessage(
payload.take(), headers, subject.subject(), hostnameLookup.lookup(subject.subject()), appnameLookup.lookup(subject.subject())
);
}
}
catch (Exception e) {
LOGGER.error("Unexpected error when sending a message: <{}>", e.getMessage(), e);
Expand All @@ -89,7 +97,7 @@ public boolean requiresToken() {

public RelpConversion copy() {
LOGGER.debug("RelpConversion.copy called");
return new RelpConversion(relpConfig, securityConfig, basicAuthentication, lookupConfig);
return new RelpConversion(relpConfig, securityConfig, basicAuthentication, lookupConfig, payloadConfig);
}

public Map<String, String> responseHeaders() {
Expand Down
43 changes: 43 additions & 0 deletions src/main/java/com/teragrep/lsh_01/config/PayloadConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
logstash-http-input to syslog bridge
Copyright 2024 Suomen Kanuuna Oy
Derivative Work of Elasticsearch
Copyright 2012-2015 Elasticsearch
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.teragrep.lsh_01.config;

public class PayloadConfig implements Validateable {

public final String splitRegex;

public PayloadConfig() {
PropertiesReaderUtilityClass propertiesReader = new PropertiesReaderUtilityClass(
System.getProperty("properties.file", "etc/config.properties")
);
splitRegex = propertiesReader.getStringProperty("payload.splitRegex");
}

@Override
public void validate() {

}

@Override
public String toString() {
return "PayloadConfig{" + "splitRegex='" + splitRegex + '}';
}
}
31 changes: 21 additions & 10 deletions src/test/java/CredentialsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.teragrep.lsh_01.RelpConversion;
import com.teragrep.lsh_01.authentication.BasicAuthenticationFactory;
import com.teragrep.lsh_01.config.LookupConfig;
import com.teragrep.lsh_01.config.PayloadConfig;
import com.teragrep.lsh_01.config.RelpConfig;
import com.teragrep.lsh_01.config.SecurityConfig;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -56,7 +57,8 @@ public void testNoAuthRequired() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertFalse(relpConversion.requiresToken());
}
Expand All @@ -81,7 +83,8 @@ public void testAuthRequired() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertTrue(relpConversion.requiresToken());
// FirstUser:VeryFirstPassword
Expand All @@ -101,7 +104,8 @@ public void testValidBase64ButNoColon() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertTrue(relpConversion.requiresToken());
// Test
Expand All @@ -121,7 +125,8 @@ public void testMultipleColons() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertTrue(relpConversion.requiresToken());
// UserWithColons:My:Password:Yay
Expand All @@ -139,7 +144,8 @@ public void testInvalidBase64Auth() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertTrue(relpConversion.requiresToken());
IllegalArgumentException e = Assertions
Expand All @@ -158,7 +164,8 @@ public void testNonBasicAuth() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertTrue(relpConversion.requiresToken());
IllegalArgumentException e = Assertions
Expand All @@ -180,7 +187,8 @@ public void testWrongCredentials() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertTrue(relpConversion.requiresToken());
// SecondUser:WrongPassword -> Right user
Expand All @@ -202,7 +210,8 @@ public void testEmptyUsername() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertTrue(relpConversion.requiresToken());
// :VeryFirstPassword -> Valid password, null username
Expand All @@ -224,7 +233,8 @@ public void testEmptyPassword() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertTrue(relpConversion.requiresToken());
// FirstUser: -> Valid username, null password
Expand All @@ -244,7 +254,8 @@ public void testNullToken() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);
Assertions.assertTrue(relpConversion.requiresToken());
IllegalArgumentException e = Assertions
Expand Down
10 changes: 7 additions & 3 deletions src/test/java/LookupTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.teragrep.lsh_01.authentication.BasicAuthenticationFactory;
import com.teragrep.lsh_01.authentication.Subject;
import com.teragrep.lsh_01.config.LookupConfig;
import com.teragrep.lsh_01.config.PayloadConfig;
import com.teragrep.lsh_01.config.RelpConfig;
import com.teragrep.lsh_01.config.SecurityConfig;
import com.teragrep.lsh_01.lookup.LookupTableFactory;
Expand Down Expand Up @@ -63,7 +64,8 @@ public void testAppnameLookup() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);

// FirstUser:VeryFirstPassword!
Expand All @@ -85,7 +87,8 @@ public void testHostnameLookup() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);

// FirstUser:VeryFirstPassword!
Expand All @@ -109,7 +112,8 @@ public void testMissingLookups() {
relpConfig,
securityConfig,
basicAuthentication,
new LookupConfig()
new LookupConfig(),
new PayloadConfig()
);

// MissingHostname:MyHostnameIsMissing
Expand Down
Loading

0 comments on commit 167dfec

Please sign in to comment.