Skip to content

Commit

Permalink
Adds a bunch of proper logging, replaces all print/println/printStack… (
Browse files Browse the repository at this point in the history
#34)

* Adds a bunch of proper logging, replaces all print/println/printStackTrace calls with logging statements

* Some more logging, lower stuff to debug
  • Loading branch information
StrongestNumber9 committed Mar 25, 2024
1 parent 5376677 commit 1919b5f
Show file tree
Hide file tree
Showing 18 changed files with 165 additions and 52 deletions.
10 changes: 9 additions & 1 deletion src/main/java/com/teragrep/cfe_16/AckManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import com.teragrep.cfe_16.config.Configuration;
import com.teragrep.cfe_16.exceptionhandling.InternalServerErrorException;
import com.teragrep.cfe_16.exceptionhandling.ServerIsBusyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand All @@ -70,7 +72,7 @@
*/
@Component
public class AckManager implements Runnable, LifeCycle {

private static final Logger LOGGER = LoggerFactory.getLogger(AckManager.class);
/**
* A class that encapsulates state of individual channels regarding to ACKs.
*
Expand Down Expand Up @@ -163,12 +165,14 @@ public void stop() {
* @return
*/
private State getOrCreateState(String authToken, String channel) {
LOGGER.debug("Getting or creating state for channel <{}>", channel);
String key = authToken + channel;
State state = this.ackStates.get(key);
if (state == null) {
state = new State();
this.ackStates.put(key, state);
}
LOGGER.debug("Created state <{}> for channel <{}>", state, channel);
return state;
}

Expand All @@ -180,8 +184,10 @@ private State getOrCreateState(String authToken, String channel) {
* @param channel
*/
public void initializeContext(String authToken, String channel) {
LOGGER.debug("Initializing context for channel <{}>", channel);
String key = authToken + channel;
if (!this.ackStates.containsKey(key)) {
LOGGER.debug("Adding new state to channel <{}>", channel);
State state = new State();
this.ackStates.put(key, state);
}
Expand Down Expand Up @@ -238,6 +244,7 @@ public boolean incrementAckValue(String authToken, String channel) {
public boolean acknowledge(String authToken, String channel, int ackId) {
String key = authToken + channel;
State state = this.ackStates.get(key);
LOGGER.debug("Acknowledging ackId <{}> on channel <{}>", ackId, channel);
if (state == null) {
throw new IllegalStateException("An Ack cannot be acknowledge before it is added to the Ack list.");
}
Expand Down Expand Up @@ -380,6 +387,7 @@ public void run() {

while (true) {
try {
LOGGER.debug("Sleeping for <{}> while waiting for polls", this.configuration.getPollTime());
Thread.sleep(this.configuration.getPollTime());
} catch (InterruptedException e) {
break;
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/com/teragrep/cfe_16/Cfe16Application.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@

package com.teragrep.cfe_16;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan({ "com.teragrep.cfe_16" })
public class Cfe16Application {

private static final Logger LOGGER = LoggerFactory.getLogger(Cfe16Application.class);
public static void main(String[] args) {
LOGGER.info("Starting Cfe16Application...");
SpringApplication.run(Cfe16Application.class, args);
//Runtime.getRuntime().addShutdownHook(new ShutdownHook());
}
Expand Down
17 changes: 15 additions & 2 deletions src/main/java/com/teragrep/cfe_16/Converter.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import com.cloudbees.syslog.SyslogMessage;
import com.teragrep.cfe_16.bo.HeaderInfo;
import com.teragrep.cfe_16.bo.HttpEventData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/*
Expand All @@ -62,7 +64,7 @@
*/
@Component
public class Converter {

private static final Logger LOGGER = LoggerFactory.getLogger(Converter.class);
private Severity severity;
private Facility facility;

Expand All @@ -83,16 +85,17 @@ public SyslogMessage httpToSyslog(HttpEventData httpEventData, HeaderInfo header
/*
* Creates a Syslogmessage with a time stamp
*/
LOGGER.debug("Creating new syslog message with timestamp");
syslogMessage = new SyslogMessage().withTimestamp(httpEventData.getTimeAsLong()).withSeverity(severity)
.withAppName("capsulated").withHostname("cfe_16").withFacility(facility).withSDElement(metadataSDE)
.withSDElement(headerSDE).withMsg(httpEventData.getEvent());

} else {

/*
* Creates a Syslogmessage without timestamp, because the time is already given
* in the request.
*/
LOGGER.debug("Creating new syslog message without timestamp");
syslogMessage = new SyslogMessage().withSeverity(severity).withAppName("capsulated").withHostname("cfe_16")
.withFacility(facility).withSDElement(metadataSDE).withSDElement(headerSDE)
.withMsg(httpEventData.getEvent());
Expand Down Expand Up @@ -120,25 +123,31 @@ private void setEventFacility() {
* Parameters.
*/
private void setStructuredDataParams(HttpEventData eventData) {
LOGGER.debug("Setting Structured Data params");
metadataSDE = new SDElement("cfe_16-metadata@48577");

if (eventData.getAuthenticationToken() != null) {
LOGGER.debug("Setting authentication token");
metadataSDE.addSDParam("authentication_token", eventData.getAuthenticationToken());
}

if (eventData.getChannel() != null) {
LOGGER.debug("Setting channel");
metadataSDE.addSDParam("channel", eventData.getChannel());
}

if (eventData.getAckID() != null) {
LOGGER.debug("Setting ack id");
metadataSDE.addSDParam("ack_id", String.valueOf(eventData.getAckID()));
}

if (eventData.getTimeSource() != null) {
LOGGER.debug("Setting time source");
metadataSDE.addSDParam("time_source", eventData.getTimeSource());
}

if (eventData.isTimeParsed()) {
LOGGER.debug("Setting time_parsed and time");
metadataSDE.addSDParam("time_parsed", "true");
metadataSDE.addSDParam("time", eventData.getTime());
}
Expand All @@ -155,15 +164,19 @@ public SyslogMessage getHeaderInfoSyslogMessage(HeaderInfo headerInfo) {
}

private void setHeaderSDE(HeaderInfo headerInfo) {
LOGGER.debug("Setting Structured Data headers");
headerSDE = new SDElement("cfe_16-origin@48577");

if (headerInfo.getxForwardedFor() != null) {
LOGGER.debug("Adding X-Forwarded-For header to headerSDE");
headerSDE.addSDParam("X-Forwarded-For", headerInfo.getxForwardedFor());
}
if (headerInfo.getxForwardedHost() != null) {
LOGGER.debug("Adding X-Forwarder-Host to headerSDE");
headerSDE.addSDParam("X-Forwarded-Host", headerInfo.getxForwardedHost());
}
if (headerInfo.getxForwardedProto() != null) {
LOGGER.debug("Adding X-Forwarded-Proto to headerSDE");
headerSDE.addSDParam("X-Forwarded-Proto", headerInfo.getxForwardedProto());
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/com/teragrep/cfe_16/EventManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import com.teragrep.cfe_16.exceptionhandling.InternalServerErrorException;
import com.teragrep.cfe_16.sender.AbstractSender;
import com.teragrep.cfe_16.sender.SenderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand All @@ -76,7 +78,7 @@
*/
@Component
public class EventManager {

private static final Logger LOGGER = LoggerFactory.getLogger(EventManager.class);
private final ObjectMapper objectMapper;

@Autowired
Expand All @@ -90,12 +92,13 @@ public EventManager() {

@PostConstruct
public void setupSender() {
LOGGER.debug("Setting up sender");
try {
this.sender = SenderFactory.createSender(this.configuration.getSysLogProtocol(),
this.configuration.getSyslogHost(),
this.configuration.getSyslogPort());
} catch (IOException e) {
e.printStackTrace();
LOGGER.error("Error creating sender", e);
throw new InternalServerErrorException();
}
}
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/com/teragrep/cfe_16/RequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,30 +48,39 @@

import com.teragrep.cfe_16.bo.HeaderInfo;
import jakarta.servlet.http.HttpServletRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
*
*/
@Component
public class RequestHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(RequestHandler.class);
private String xForwardedFor;
private String xForwardedHost;
private String xForwardedProto;

public HeaderInfo createHeaderInfoObject(HttpServletRequest request) {
LOGGER.debug("Creating new Header Info");
HeaderInfo headerInfo = new HeaderInfo();
xForwardedFor = request.getHeader("X-Forwarded-For");
xForwardedHost = request.getHeader("X-Forwarded-Host");
xForwardedProto = request.getHeader("X-Forwarded-Proto");
if (xForwardedFor != null) {
LOGGER.debug("Setting X-Forwarded-For");
LOGGER.trace("Setting X-Forwarded-For to value <[{}]>", xForwardedFor);
headerInfo.setxForwardedFor(xForwardedFor);
}
if (xForwardedHost != null) {
LOGGER.debug("Setting X-Forwarded-Host");
LOGGER.trace("Setting X-Forwarded-Host to value <[{}]>", xForwardedHost);
headerInfo.setxForwardedHost(xForwardedHost);
}
if (xForwardedProto != null) {
LOGGER.debug("Setting X-Forwarded-Proto");
LOGGER.trace("Setting X-Forwarded-Proto to value <[{}]>", xForwardedProto);
headerInfo.setxForwardedProto(xForwardedProto);
}

Expand Down
11 changes: 10 additions & 1 deletion src/main/java/com/teragrep/cfe_16/SessionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

import com.teragrep.cfe_16.bo.Session;
import com.teragrep.cfe_16.config.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand All @@ -63,7 +65,7 @@
*/
@Component
public class SessionManager implements Runnable, LifeCycle {

private static final Logger LOGGER = LoggerFactory.getLogger(SessionManager.class);
/**
* Maps auth token string => session object.
*/
Expand Down Expand Up @@ -100,6 +102,7 @@ public void stop() {
public void run() {
while (true) {
try {
LOGGER.debug("Sleeping for <{}> while waiting for poll", this.configuration.getPollTime());
Thread.sleep(this.configuration.getPollTime());
} catch (InterruptedException e) {
break;
Expand Down Expand Up @@ -139,6 +142,8 @@ public Session getSession(String authenticationToken) {
* @return
*/
public Session getOrCreateSession(String authenticationToken) {
LOGGER.debug("Getting or creating session");
LOGGER.trace("Getting or creating session for authenticationToken: {}", authenticationToken);
synchronized (this) {
Session session = this.sessions.get(authenticationToken);
if (session == null) {
Expand All @@ -150,6 +155,8 @@ public Session getOrCreateSession(String authenticationToken) {
}

public void removeSession(String authenticationToken) {
LOGGER.debug("Removing session");
LOGGER.trace("Removing session for authenticationToken: {}", authenticationToken);
synchronized (this) {
this.sessions.remove(authenticationToken);
}
Expand All @@ -159,6 +166,8 @@ public void removeSession(String authenticationToken) {
* Creates a new session object
*/
public Session createSession(String authenticationToken) {
LOGGER.debug("Creating new session");
LOGGER.trace("Creating new session for authenticationToken: {}", authenticationToken);
synchronized (this) {
Session session = new Session(authenticationToken);
this.sessions.put(authenticationToken, session);
Expand Down
20 changes: 16 additions & 4 deletions src/main/java/com/teragrep/cfe_16/TestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@

package com.teragrep.cfe_16;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.Socket;
Expand All @@ -56,7 +59,7 @@
*
*/
public class TestClient implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(TestClient.class);
/**
* How many loops a single thread does.
*/
Expand All @@ -76,6 +79,7 @@ public TestClient(int n, String host, int port) throws IOException {
this.n = n;
this.host = host;
this.port = port;
LOGGER.info("Initialized TestClient, sending <[{}]> messages to <[{}]>:<[{}]>", n, host, port);
}

public void run() {
Expand Down Expand Up @@ -357,7 +361,7 @@ public static void main(String[] args) throws IOException, InterruptedException
if (args.length >= 4) {
String host = args[0];
int port = Integer.valueOf(args[1]);
System.out.println("Connecting to " + host + ":" + port);
LOGGER.info("Connecting to <[{}]>:<[{}]>", host, port);
int numberOfThreads = Integer.valueOf(args[2]);
int numberOfLoops = Integer.valueOf(args[3]);
TestClient[] testClients = createTestClients(host, port, numberOfThreads, numberOfLoops);
Expand All @@ -370,42 +374,50 @@ public static void main(String[] args) throws IOException, InterruptedException
int numberOfRequests = numberOfThreads * numberOfLoops;
double millisecsPerRequest = (double)dt / (double)numberOfRequests;
double throughput = 1000.0 / millisecsPerRequest;
System.out.println("\nDid " + numberOfRequests + " requests in " + dt + " milliseconds, that is " + millisecsPerRequest + " ms/req., which is " + throughput + " transactions/sec.");
LOGGER.info("Did <[{}]> requests in <{}> milliseconds, that is <{}> ms/req., which is <{}> transactions/sec.", numberOfRequests, dt, millisecsPerRequest, throughput);
FileOutputStream fileOutputStream = new FileOutputStream("stats.csv", true);
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
String line = numberOfThreads + "," + millisecsPerRequest + "," + throughput + "\n";
bufferedWriter.write(line);
bufferedWriter.close();
fileOutputStream.close();
} else {
System.err.println("Usage: <host> <port> <n threads> <n loops>");
LOGGER.error("Usage: Usage: <host> <port> <n threads> <n loops>");
}
}

private static void waitThreadsToFinish(int numberOfThreads, Thread[] threads) throws InterruptedException {
for (int i = 0; i < numberOfThreads; i++) {
LOGGER.debug("Waiting for thread <{}> out of <{}> threads to finish", i, numberOfThreads);
threads[i].join();
LOGGER.debug("Thread <[{}]> of <[{}]> threads finished", i, numberOfThreads);
}
}

private static void startThreads(int numberOfThreads, Thread[] threads) {
LOGGER.debug("Starting <[{}]> threads", numberOfThreads);
for (int i = 0; i < numberOfThreads; i++) {
LOGGER.debug("Starting thread <{}> of <{}>", i, numberOfThreads);
threads[i].start();
}
}

private static Thread[] createThreads(int numberOfThreads, TestClient[] testClients) {
LOGGER.debug("Creating <[{}]> threads", numberOfThreads);
Thread[] threads = new Thread[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
LOGGER.debug("Creating thread <{}> of <[{}]>", i, numberOfThreads);
threads[i] = new Thread(testClients[i]);
}
return threads;
}

private static TestClient[] createTestClients(String host, int port, int numberOfThreads, int numberOfLoops)
throws IOException {
LOGGER.debug("Creating <[{}]> test clients", numberOfThreads);
TestClient[] testClients = new TestClient[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++) {
LOGGER.debug("Creating testClient <{}> of <[{}]>", i, numberOfThreads);
testClients[i] = new TestClient(numberOfLoops, host, port);
}
return testClients;
Expand Down
Loading

0 comments on commit 1919b5f

Please sign in to comment.