Skip to content

Commit

Permalink
Re #7: Added configuration option 'maxRequestsInFlight' that is used …
Browse files Browse the repository at this point in the history
…to size monitoring array. Added Netty pipeline stage for tracking number of requests in flight. Added monitoring of number of bytes of logs sent. Added thread that dumps stats to stdout.
  • Loading branch information
tkowalcz committed Jan 16, 2021
1 parent 64626a3 commit a6e63cb
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class LokiAppenderBuilder<B extends LokiAppenderBuilder<B>> extends Abstr
@PluginBuilderAttribute
private String logLevelLabel;

@PluginBuilderAttribute
private int maxRequestsInFlight;

@PluginElement("Headers")
private Header[] headers;

Expand All @@ -71,6 +74,7 @@ public LokiAppender build() {
.withPort(port)
.withMaxRetries(maxRetries)
.withRequestTimeoutMillis(readTimeoutMillis)
.withMaxRequestsInFlight(maxRequestsInFlight)
.build();

String[] additionalHeaders = stream(headers)
Expand Down Expand Up @@ -189,6 +193,14 @@ public void setLogLevelLabel(String logLevelLabel) {
this.logLevelLabel = logLevelLabel;
}

public void setMaxRequestsInFlight(int maxRequestsInFlight) {
this.maxRequestsInFlight = maxRequestsInFlight;
}

public int getMaxRequestsInFlight() {
return maxRequestsInFlight;
}

public void setHeaders(Header[] headers) {
this.headers = headers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public static ChannelFuture initConnection(
new HttpClientInitializer(
monitoringModule,
clientConfiguration.getRequestTimeoutMillis(),
clientConfiguration.getMaxInFlightRequests()
clientConfiguration.getMaxRequestsInFlight()
)
)
.remoteAddress(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class ClientConfiguration {

private final int connectionTimeoutMillis;
private final int requestTimeoutMillis;
private final int maxInFlightRequests;
private final int maxRequestsInFlight;

private final int maxRetries;

Expand All @@ -18,14 +18,14 @@ public ClientConfiguration(
int port,
int connectionTimeoutMillis,
int requestTimeoutMillis,
int maxInFlightRequests,
int maxRequestsInFlight,
int maxRetries) {
this.logEndpoint = logEndpoint;
this.host = host;
this.port = port;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.requestTimeoutMillis = requestTimeoutMillis;
this.maxInFlightRequests = maxInFlightRequests;
this.maxRequestsInFlight = maxRequestsInFlight;

this.maxRetries = maxRetries;
}
Expand All @@ -50,8 +50,8 @@ public int getRequestTimeoutMillis() {
return requestTimeoutMillis;
}

public int getMaxInFlightRequests() {
return maxInFlightRequests;
public int getMaxRequestsInFlight() {
return maxRequestsInFlight;
}

public int getMaxRetries() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class ClientConfigurationBuilder {
public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 5_000;
public static final int DEFAULT_REQUEST_TIMEOUT_MILLIS = 60_000;

public static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 10;
public static final int DEFAULT_MAX_REQUESTS_IN_FLIGHT = 10;

public static final int DEFAULT_MAX_RETRIES = 0;

Expand All @@ -17,7 +17,7 @@ public class ClientConfigurationBuilder {

private int connectionTimeoutMillis = DEFAULT_CONNECT_TIMEOUT_MILLIS;
private int requestTimeoutMillis = DEFAULT_REQUEST_TIMEOUT_MILLIS;
private int maxInFlightRequests = DEFAULT_MAX_IN_FLIGHT_REQUESTS;
private int maxRequestsInFlight = DEFAULT_MAX_REQUESTS_IN_FLIGHT;

private int maxRetries = DEFAULT_MAX_RETRIES;

Expand Down Expand Up @@ -46,8 +46,8 @@ public ClientConfigurationBuilder withRequestTimeoutMillis(int requestTimeoutMil
return this;
}

public ClientConfigurationBuilder withMaxInFlightRequests(int maxInFlightRequests) {
this.maxInFlightRequests = maxInFlightRequests;
public ClientConfigurationBuilder withMaxRequestsInFlight(int maxRequestsInFlight) {
this.maxRequestsInFlight = maxRequestsInFlight;
return this;
}

Expand All @@ -63,7 +63,7 @@ public ClientConfiguration build() {
port,
connectionTimeoutMillis,
requestTimeoutMillis,
maxInFlightRequests,
maxRequestsInFlight,
maxRetries
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.timeout.IdleStateHandler;
import pl.tkowalcz.tjahzi.stats.MonitoringModule;
import pl.tkowalcz.tjahzi.stats.PipelinedHttpRequestTimer;

class HttpClientInitializer extends ChannelInitializer<SocketChannel> {

Expand All @@ -19,18 +18,18 @@ class HttpClientInitializer extends ChannelInitializer<SocketChannel> {
private final ResponseHandler responseHandler;

private final int requestTimeoutMillis;
private final int maxNumberOfRequests;
private final int maxRequestsInFlight;

HttpClientInitializer(
MonitoringModule monitoringModule,
int requestTimeoutMillis,
int maxNumberOfRequests
int maxRequestsInFlight
) {
this.monitoringModule = monitoringModule;
this.responseHandler = new ResponseHandler(monitoringModule);

this.requestTimeoutMillis = requestTimeoutMillis;
this.maxNumberOfRequests = maxNumberOfRequests;
this.maxRequestsInFlight = maxRequestsInFlight;
}

@Override
Expand All @@ -49,8 +48,9 @@ protected void initChannel(SocketChannel ch) {
p.addLast(
new PipelinedHttpRequestTimer(
monitoringModule,
maxNumberOfRequests
maxRequestsInFlight
)
);
p.addLast(new InFlightRequestsTracker());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private void execute(FullHttpRequest request, Retry retry) {
stableReference.awaitUninterruptibly();
if (stableReference.isSuccess() && stableReference.channel().isActive()) {
stableReference.channel().writeAndFlush(request);
monitoringModule.incrementSentHttpRequests();
monitoringModule.incrementSentHttpRequests(request.content().readableBytes());
} else {
retry.retry();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package pl.tkowalcz.tjahzi.http;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.util.concurrent.atomic.AtomicInteger;

public class InFlightRequestsTracker extends ChannelDuplexHandler {

private final AtomicInteger requestsInFlight = new AtomicInteger();

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
requestsInFlight.incrementAndGet();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
requestsInFlight.decrementAndGet();
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package pl.tkowalcz.tjahzi.stats;
package pl.tkowalcz.tjahzi.http;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import pl.tkowalcz.tjahzi.stats.MonitoringModule;
import pl.tkowalcz.tjahzi.stats.TimingRingBuffer;

public class PipelinedHttpRequestTimer extends ChannelDuplexHandler {

Expand All @@ -11,10 +13,10 @@ public class PipelinedHttpRequestTimer extends ChannelDuplexHandler {

public PipelinedHttpRequestTimer(
MonitoringModule monitoringModule,
int maxNumberOfRequests
int maxRequestsInFlight
) {
this.monitoringModule = monitoringModule;
this.timer = new TimingRingBuffer(monitoringModule.getClock(), maxNumberOfRequests);
this.timer = new TimingRingBuffer(monitoringModule.getClock(), maxRequestsInFlight);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpStatusClass;
import pl.tkowalcz.tjahzi.stats.MonitoringModule;

@ChannelHandler.Sharable
Expand All @@ -20,7 +20,7 @@ class ResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
monitoringModule.incrementHttpResponses();
if (msg.status() != HttpResponseStatus.OK) {
if (msg.status().codeClass() != HttpStatusClass.SUCCESS) {
monitoringModule.incrementHttpErrors(msg.status(), msg.content());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public void incrementDroppedPuts(Throwable throwable) {
}

@Override
public void incrementSentHttpRequests() {
public void incrementSentHttpRequests(int sizeBytes) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,38 +13,22 @@ public interface MonitoringModule {

void incrementDroppedPuts(Throwable throwable);

// long getDroppedPuts();

void incrementSentHttpRequests();

// long getSentHttpRequests();
void incrementSentHttpRequests(int sizeBytes);

void incrementFailedHttpRequests();

// long getFailedHttpRequests();

void incrementRetriedHttpRequests();

// long getRetriedHttpRequests();

void addAgentError(Throwable throwable);

// long getAgentErrors();

void incrementHttpConnectAttempts();

// long getHttpConnectAttempts();

void addPipelineError(Throwable cause);

void incrementChannelInactive();

// long getChannelInactive();

void incrementHttpResponses();

// long getHttpResponses();

void incrementHttpErrors(HttpResponseStatus status, ByteBuf content);

void recordResponseTime(long time);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package pl.tkowalcz.tjahzi.stats;

import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.agrona.concurrent.SystemEpochClock;
Expand All @@ -8,25 +9,35 @@

import java.nio.charset.Charset;
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class StandardMonitoringModule implements MonitoringModule {
public class StandardMonitoringModule implements MonitoringModule, Runnable {

private AtomicLong droppedPuts = new AtomicLong();
private AtomicLong httpConnectAttempts = new AtomicLong();
private static final int ERROR_LOG_CAPACITY = 1024;

private AtomicLong sentHttpRequests = new AtomicLong();
private AtomicLong failedHttpRequests = new AtomicLong();
private AtomicLong retriedHttpRequests = new AtomicLong();
private AtomicLong httpResponses = new AtomicLong();
private AtomicLong channelInactive = new AtomicLong();
private AtomicLong agentErrors = new AtomicLong();
private final AtomicLong droppedPuts = new AtomicLong();
private final AtomicLong httpConnectAttempts = new AtomicLong();

private final AtomicLong sentHttpRequests = new AtomicLong();
private final AtomicLong sentBytes = new AtomicLong();

private final AtomicLong failedHttpRequests = new AtomicLong();
private final AtomicLong retriedHttpRequests = new AtomicLong();
private final AtomicLong httpResponses = new AtomicLong();
private final AtomicLong channelInactive = new AtomicLong();
private final AtomicLong agentErrors = new AtomicLong();

private final DistinctErrorLog distinctErrorLog;

public StandardMonitoringModule() {
StatsDumpingThread thread = new StatsDumpingThread(this);
if (thread.isEnabled()) {
thread.start();
}

distinctErrorLog = new DistinctErrorLog(
new UnsafeBuffer(new byte[1024]),
new UnsafeBuffer(new byte[ERROR_LOG_CAPACITY]),
new SystemEpochClock()
);

Expand Down Expand Up @@ -54,8 +65,9 @@ public void incrementDroppedPuts(Throwable throwable) {
}

@Override
public void incrementSentHttpRequests() {
public void incrementSentHttpRequests(int sizeBytes) {
sentHttpRequests.incrementAndGet();
sentBytes.addAndGet(sizeBytes);
}

public long getSentHttpRequests() {
Expand Down Expand Up @@ -133,6 +145,28 @@ public void incrementHttpErrors(HttpResponseStatus status, ByteBuf content) {

@Override
public void recordResponseTime(long time) {
}

@Override
public void run() {
while (true) {
System.out.println(toString());
Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
}
}

@Override
public String toString() {
return "StandardMonitoringModule{" +
"droppedPuts=" + droppedPuts +
", httpConnectAttempts=" + httpConnectAttempts +
", sentHttpRequests=" + sentHttpRequests +
", sentKilobytes=" + (sentBytes.longValue() / 1024) +
", failedHttpRequests=" + failedHttpRequests +
", retriedHttpRequests=" + retriedHttpRequests +
", httpResponses=" + httpResponses +
", channelInactive=" + channelInactive +
", agentErrors=" + agentErrors +
'}';
}
}
Loading

0 comments on commit a6e63cb

Please sign in to comment.