Skip to content

Commit

Permalink
Adds some structural support for internal urls, adds tokenless health… (
Browse files Browse the repository at this point in the history
#30)

* Adds some structural support for internal urls, adds tokenless healthcheck endpoint support

* Adds docs
  • Loading branch information
StrongestNumber9 committed Apr 11, 2024
1 parent b14d4ed commit 0ab825c
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 25 deletions.
2 changes: 2 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ relp.appName,lsh_01,Appname to use in RELP records
relp.hostname,localhost,Hostname to use in RELP records
security.tokenRequired,true,Sets whether "Authorization: SomeSecretToken" headers are required
security.token,SomeSecretToken,A token every request must contain if security.tokenRequired is enabled.
healthcheck.enabled,true,Sets if an internal healthcheck endpoint is enabled.
healthcheck.url,/healthcheck,An internal healthcheck endpoint that will always reply 200 ok regardless of security settings. Accessing this url won't generate any events.
|===

== Limitations
Expand Down
3 changes: 3 additions & 0 deletions etc/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ server.threads=1
server.maxPendingRequests=128
server.maxContentLength=262144

healthcheck.enabled=true
healthcheck.url=/healthcheck

relp.target=127.0.0.1
relp.port=601
relp.reconnectInterval=10000
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/com/teragrep/lsh_01/HttpInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.teragrep.lsh_01;

import com.teragrep.lsh_01.config.InternalEndpointUrlConfig;
import com.teragrep.lsh_01.util.SslHandlerProvider;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
Expand All @@ -41,17 +42,20 @@ public class HttpInitializer extends ChannelInitializer<SocketChannel> {
private final int maxContentLength;
private final HttpResponseStatus responseStatus;
private final ThreadPoolExecutor executorGroup;
private final InternalEndpointUrlConfig internalEndpointUrlConfig;

public HttpInitializer(
IMessageHandler messageHandler,
ThreadPoolExecutor executorGroup,
int maxContentLength,
HttpResponseStatus responseStatus
HttpResponseStatus responseStatus,
InternalEndpointUrlConfig internalEndpointUrlConfig
) {
this.messageHandler = messageHandler;
this.executorGroup = executorGroup;
this.maxContentLength = maxContentLength;
this.responseStatus = responseStatus;
this.internalEndpointUrlConfig = internalEndpointUrlConfig;
}

protected void initChannel(SocketChannel socketChannel) throws Exception {
Expand All @@ -64,7 +68,15 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpContentDecompressor());
pipeline.addLast(new HttpObjectAggregator(maxContentLength));
pipeline.addLast(new HttpServerHandler(messageHandler.copy(), executorGroup, responseStatus));
pipeline
.addLast(
new HttpServerHandler(
messageHandler.copy(),
executorGroup,
responseStatus,
internalEndpointUrlConfig
)
);
}

public void enableSSL(SslHandlerProvider sslHandlerProvider) {
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/teragrep/lsh_01/HttpServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.teragrep.lsh_01;

import com.teragrep.lsh_01.config.InternalEndpointUrlConfig;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
Expand All @@ -42,15 +43,18 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque
private final IMessageHandler messageHandler;
private final ThreadPoolExecutor executorGroup;
private final HttpResponseStatus responseStatus;
private final InternalEndpointUrlConfig internalEndpointUrlConfig;

public HttpServerHandler(
IMessageHandler messageHandler,
ThreadPoolExecutor executorGroup,
HttpResponseStatus responseStatus
HttpResponseStatus responseStatus,
InternalEndpointUrlConfig internalEndpointUrlConfig
) {
this.messageHandler = messageHandler;
this.executorGroup = executorGroup;
this.responseStatus = responseStatus;
this.internalEndpointUrlConfig = internalEndpointUrlConfig;
}

@Override
Expand All @@ -62,7 +66,8 @@ public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {
msg,
remoteAddress,
messageHandler,
responseStatus
responseStatus,
internalEndpointUrlConfig
);
executorGroup.execute(messageProcessor);
}
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/com/teragrep/lsh_01/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.teragrep.lsh_01;

import com.teragrep.lsh_01.config.InternalEndpointUrlConfig;
import com.teragrep.lsh_01.config.NettyConfig;
import com.teragrep.lsh_01.config.RelpConfig;
import com.teragrep.lsh_01.config.SecurityConfig;
Expand All @@ -33,20 +34,31 @@ public static void main(String[] args) {
NettyConfig nettyConfig = new NettyConfig();
RelpConfig relpConfig = new RelpConfig();
SecurityConfig securityConfig = new SecurityConfig();
InternalEndpointUrlConfig internalEndpointUrlConfig = new InternalEndpointUrlConfig();
try {
nettyConfig.validate();
relpConfig.validate();
securityConfig.validate();
internalEndpointUrlConfig.validate();
}
catch (IllegalArgumentException e) {
LOGGER.error("Can't parse config properly: {}", e.getMessage());
System.exit(1);
}
LOGGER.info("Got server config: <[{}]>", nettyConfig);
LOGGER.info("Got relp config: <[{}]>", relpConfig);
LOGGER.info("Got internal endpoint config: <[{}]>", internalEndpointUrlConfig);
LOGGER.info("Requires token: <[{}]>", securityConfig.tokenRequired);
RelpConversion relpConversion = new RelpConversion(relpConfig, securityConfig);
try (NettyHttpServer server = new NettyHttpServer(nettyConfig, relpConversion, null, 200)) {
try (
NettyHttpServer server = new NettyHttpServer(
nettyConfig,
relpConversion,
null,
200,
internalEndpointUrlConfig
)
) {
server.run();
}
}
Expand Down
46 changes: 28 additions & 18 deletions src/main/java/com/teragrep/lsh_01/MessageProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.teragrep.lsh_01;

import com.teragrep.lsh_01.config.InternalEndpointUrlConfig;
import com.teragrep.rlo_14.*;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand Down Expand Up @@ -47,6 +48,7 @@ public class MessageProcessor implements RejectableRunnable {
private final String remoteAddress;
private final IMessageHandler messageHandler;
private final HttpResponseStatus responseStatus;
private final InternalEndpointUrlConfig internalEndpointUrlConfig;

private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
private final static Logger LOGGER = LogManager.getLogger(MessageProcessor.class);
Expand All @@ -56,13 +58,15 @@ public class MessageProcessor implements RejectableRunnable {
FullHttpRequest req,
String remoteAddress,
IMessageHandler messageHandler,
HttpResponseStatus responseStatus
HttpResponseStatus responseStatus,
InternalEndpointUrlConfig internalEndpointUrlConfig
) {
this.ctx = ctx;
this.req = req;
this.remoteAddress = remoteAddress;
this.messageHandler = messageHandler;
this.responseStatus = responseStatus;
this.internalEndpointUrlConfig = internalEndpointUrlConfig;
}

public void onRejection() {
Expand All @@ -79,34 +83,40 @@ public void onRejection() {
public void run() {
try {
final HttpResponse response;
if (!messageHandler.requiresToken()) {
if (isInternalEndpoint()) {
LOGGER.debug("Healthcheck endpoint called");
response = generateResponse(messageHandler.responseHeaders());
}
else if (isTokenOk()) {
LOGGER.debug("Processing message");
response = processMessage();
}
else if (!req.headers().contains(HttpHeaderNames.AUTHORIZATION)) {
LOGGER.debug("Required authorization not provided; requesting authentication.");
response = generateAuthenticationRequestResponse();
}
else {
if (!req.headers().contains(HttpHeaderNames.AUTHORIZATION)) {
LOGGER.debug("Required authorization not provided; requesting authentication.");
response = generateAuthenticationRequestResponse();
}
else {
final String token = req.headers().get(HttpHeaderNames.AUTHORIZATION);
req.headers().remove(HttpHeaderNames.AUTHORIZATION);
if (messageHandler.validatesToken(token)) {
LOGGER.debug("Valid authorization; processing request.");
response = processMessage();
}
else {
LOGGER.debug("Invalid authorization; rejecting request.");
response = generateFailedResponse(HttpResponseStatus.UNAUTHORIZED);
}
}
LOGGER.debug("Invalid authorization; rejecting request.");
response = generateFailedResponse(HttpResponseStatus.UNAUTHORIZED);
}

ctx.writeAndFlush(response);
}
finally {
req.release();
}
}

private boolean isInternalEndpoint() {
return internalEndpointUrlConfig.healthcheckEnabled
&& internalEndpointUrlConfig.healthcheckUrl.equals(req.uri());
}

private boolean isTokenOk() {
return !messageHandler.requiresToken()
|| messageHandler.validatesToken(req.headers().get(HttpHeaderNames.AUTHORIZATION));
}

private FullHttpResponse processMessage() {
final Map<String, String> formattedHeaders = formatHeaders(req.headers());
final String body = req.content().toString(UTF8_CHARSET);
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/com/teragrep/lsh_01/NettyHttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
package com.teragrep.lsh_01;

import com.teragrep.lsh_01.config.InternalEndpointUrlConfig;
import com.teragrep.lsh_01.config.NettyConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
Expand Down Expand Up @@ -50,15 +51,18 @@ public class NettyHttpServer implements Runnable, Closeable {
private final EventLoopGroup processorGroup;
private final ThreadPoolExecutor executorGroup;
private final HttpResponseStatus responseStatus;
private final InternalEndpointUrlConfig internalEndpointUrlConfig;

public NettyHttpServer(
NettyConfig nettyConfig,
IMessageHandler messageHandler,
SslHandlerProvider sslHandlerProvider,
int responseCode
int responseCode,
InternalEndpointUrlConfig internalEndpointUrlConfig
) {
this.host = nettyConfig.listenAddress;
this.port = nettyConfig.listenPort;
this.internalEndpointUrlConfig = internalEndpointUrlConfig;
this.responseStatus = HttpResponseStatus.valueOf(responseCode);
processorGroup = new NioEventLoopGroup(nettyConfig.threads, daemonThreadFactory("http-input-processor"));

Expand All @@ -76,7 +80,8 @@ public NettyHttpServer(
messageHandler,
executorGroup,
nettyConfig.maxContentLength,
responseStatus
responseStatus,
internalEndpointUrlConfig
);

if (sslHandlerProvider != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
logstash-http-input to syslog bridge
Copyright 2024 Suomen Kanuuna Oy
Derivative Work of Elasticsearch
Copyright 2012-2015 Elasticsearch
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.teragrep.lsh_01.config;

public class InternalEndpointUrlConfig implements Validateable {

public final boolean healthcheckEnabled;
public final String healthcheckUrl;

public InternalEndpointUrlConfig() {
PropertiesReaderUtilityClass propertiesReader = new PropertiesReaderUtilityClass(
System.getProperty("properties.file", "etc/config.properties")
);
healthcheckEnabled = propertiesReader.getBooleanProperty("healthcheck.enabled");
healthcheckUrl = propertiesReader.getStringProperty("healthcheck.url");
}

@Override
public void validate() {
}

@Override
public String toString() {
return "InternalEndpointUrlConfig{" + "healthcheckEnabled=" + healthcheckEnabled + ", healthcheckUrl='"
+ healthcheckUrl + '\'' + '}';
}
}

0 comments on commit 0ab825c

Please sign in to comment.