Skip to content

Commit

Permalink
NIFI-4344: Improve bulletin messages with exception details.
Browse files Browse the repository at this point in the history
This closes apache#5093.

Signed-off-by: Tamas Palfy <tamas.bertalan.palfy@gmail.com>
  • Loading branch information
Lehel44 authored and timeabarna committed Jul 21, 2021
1 parent d49ea17 commit 98c886e
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 25 deletions.
Expand Up @@ -34,8 +34,6 @@

public class StandardLogRepository implements LogRepository {

public static final int DEFAULT_MAX_CAPACITY_PER_LEVEL = 10;

private final Map<LogLevel, Collection<LogObserver>> observers = new HashMap<>();
private final Map<String, LogObserver> observerLookup = new HashMap<>();

Expand Down Expand Up @@ -82,7 +80,7 @@ public void addLogMessage(final LogLevel level, final String format, final Objec
addLogMessage(level, formattedMessage, t);
}

private void replaceThrowablesWithMessage(Object[] params) {
private void replaceThrowablesWithMessage(final Object[] params) {
for (int i = 0; i < params.length; i++) {
if(params[i] instanceof Throwable) {
params[i] = ((Throwable) params[i]).getLocalizedMessage();
Expand Down
Expand Up @@ -24,8 +24,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.LinkedList;
import java.util.stream.Collectors;

public class SimpleProcessLogger implements ComponentLog {

public static final String NEW_LINE_ARROW = "\u21B3";
public static final String CAUSES = NEW_LINE_ARROW + " causes: ";

private final Logger logger;
private final LogRepository logRepository;
private final Object component;
Expand Down Expand Up @@ -58,7 +64,7 @@ public void warn(String msg, final Throwable t) {
}

msg = "{} " + msg;
final Object[] os = {component, t.toString(), t};
final Object[] os = {component, getCauses(t), t};
logger.warn(msg, os);
logRepository.addLogMessage(LogLevel.WARN, msg, os, t);
}
Expand Down Expand Up @@ -110,7 +116,7 @@ public void trace(String msg, Throwable t) {
}

msg = "{} " + msg;
final Object[] os = {component, t.toString(), t};
final Object[] os = {component, getCauses(t), t};
logger.trace(msg, os);
logRepository.addLogMessage(LogLevel.TRACE, msg, os, t);
}
Expand Down Expand Up @@ -184,7 +190,7 @@ public void info(String msg, Throwable t) {
}

msg = "{} " + msg;
final Object[] os = {component, t.toString()};
final Object[] os = {component, getCauses(t)};

logger.info(msg, os);
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -245,12 +251,12 @@ public void error(String msg, Throwable t) {

if (t == null) {
msg = "{} " + msg;
final Object[] os = new Object[] {component};
final Object[] os = new Object[]{component};
logger.error(msg, os);
logRepository.addLogMessage(LogLevel.ERROR, msg, os);
} else {
msg = "{} " + msg + ": {}";
final Object[] os = new Object[] {component, t.toString(), t};
final Object[] os = new Object[]{component, getCauses(t), t};
logger.error(msg, os);
logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
}
Expand Down Expand Up @@ -301,7 +307,7 @@ private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t,
modifiedArgs = new Object[os.length + 3];
modifiedArgs[0] = component.toString();
System.arraycopy(os, 0, modifiedArgs, 1, os.length);
modifiedArgs[modifiedArgs.length - 2] = t.toString();
modifiedArgs[modifiedArgs.length - 2] = getCauses(t);
modifiedArgs[modifiedArgs.length - 1] = t;
}

Expand Down Expand Up @@ -448,4 +454,12 @@ public void log(LogLevel level, String msg, Object[] os, Throwable t) {
}
}

private String getCauses(final Throwable throwable) {
final LinkedList<String> causes = new LinkedList<>();
for (Throwable t = throwable; t != null; t = t.getCause()) {
causes.push(t.toString());
}
return causes.stream().collect(Collectors.joining(System.lineSeparator() + CAUSES));
}

}
Expand Up @@ -16,6 +16,15 @@
*/
package org.apache.nifi.processor;

import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.reporting.ReportingTask;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

import java.lang.reflect.Field;

import static org.apache.nifi.processor.SimpleProcessLogger.NEW_LINE_ARROW;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -24,16 +33,13 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.reflect.Field;
public class TestSimpleProcessLogger {

import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.reporting.ReportingTask;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
private static final String EXPECTED_CAUSES = "java.lang.RuntimeException: third" + System.lineSeparator() +
NEW_LINE_ARROW + " causes: java.lang.RuntimeException: second" + System.lineSeparator() +
NEW_LINE_ARROW + " causes: java.lang.RuntimeException: first";

public class TestSimpleProcessLogger {
private final Exception e = new RuntimeException("intentional");
private final Exception e = new RuntimeException("first", new RuntimeException("second", new RuntimeException("third")));

private ReportingTask task;

Expand Down Expand Up @@ -68,7 +74,7 @@ public void before() {
@Test
public void validateDelegateLoggerReceivesThrowableToStringOnError() {
componentLog.error("Hello {}", e);
verify(logger, times(1)).error(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).error(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}

@Test
Expand All @@ -80,24 +86,24 @@ public void validateDelegateLoggerReceivesThrowableToStringOnInfo() {
@Test
public void validateDelegateLoggerReceivesThrowableToStringOnTrace() {
componentLog.trace("Hello {}", e);
verify(logger, times(1)).trace(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).trace(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}

@Test
public void validateDelegateLoggerReceivesThrowableToStringOnWarn() {
componentLog.warn("Hello {}", e);
verify(logger, times(1)).warn(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).warn(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}

@Test
public void validateDelegateLoggerReceivesThrowableToStringOnLogWithLevel() {
componentLog.log(LogLevel.WARN, "Hello {}", e);
verify(logger, times(1)).warn(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).warn(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
componentLog.log(LogLevel.ERROR, "Hello {}", e);
verify(logger, times(1)).error(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).error(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
componentLog.log(LogLevel.INFO, "Hello {}", e);
verify(logger, times(1)).info(anyString(), eq(e));
componentLog.log(LogLevel.TRACE, "Hello {}", e);
verify(logger, times(1)).trace(anyString(), eq(task), eq(e.toString()), eq(e));
verify(logger, times(1)).trace(anyString(), eq(task), eq(EXPECTED_CAUSES), eq(e));
}
}
Expand Up @@ -251,8 +251,8 @@ public InvocationResult invoke() {
} catch (final Throwable t) {
// Use ComponentLog to log the event so that a bulletin will be created for this processor
final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());
procLog.error("{} failed to process session due to {}; Processor Administratively Yielded for {}",
new Object[] {connectable.getRunnableComponent(), t, schedulingAgent.getAdministrativeYieldDuration()}, t);
procLog.error("Failed to process session due to {}; Processor Administratively Yielded for {}",
new Object[] {t, schedulingAgent.getAdministrativeYieldDuration()}, t);
logger.warn("Administratively Yielding {} due to uncaught Exception: {}", connectable.getRunnableComponent(), t.toString(), t);

connectable.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
Expand Down

0 comments on commit 98c886e

Please sign in to comment.