Skip to content

Commit

Permalink
DEV-32671 Grok processing should fail faster on timeout interruption (#…
Browse files Browse the repository at this point in the history
…304)

1. Add interruption checks before each grok match, to fail processing faster in case of interruption.
2. Bump org.jruby.joni library version to gain some internal changes that will allow faster interruption of grok matching
  • Loading branch information
DanMelman committed Jul 7, 2022
1 parent ac5d4c9 commit 1f2369f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
<assertj.version>3.5.2</assertj.version>
<logback.version>1.1.7</logback.version>
<commonsIO.version>2.5</commonsIO.version>
<jrubyJoni.version>2.1.16</jrubyJoni.version>
<jrubyJoni.version>2.1.43</jrubyJoni.version>
<wiremock.version>2.32.0</wiremock.version>
<awaitility.version>4.2.0</awaitility.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ private Map<String,Object> parse(String message) throws InterruptedException {
byte[] messageAsBytes = message.getBytes();
Matcher matcher = pattern.matcher(messageAsBytes);

int result = matcher.search(0, messageAsBytes.length, Option.MULTILINE);
int result = matcher.searchInterruptible(0, messageAsBytes.length, Option.MULTILINE);

while (result != -1 && matchesCounter < MAX_MATCHES) {
Region region = matcher.getEagerRegion();
Expand All @@ -199,7 +199,7 @@ private Map<String,Object> parse(String message) throws InterruptedException {
}
}
int endOfFullMatch = region.end[0];
result = matcher.search(endOfFullMatch, messageAsBytes.length, Option.MULTILINE);
result = matcher.searchInterruptible(endOfFullMatch, messageAsBytes.length, Option.MULTILINE);

matchesCounter++;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,14 @@ private void addPattern(String patternName, String definition) {
}

public List<Match> matches(String text) throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
}

List<Match> matches = new ArrayList<>();
byte[] textAsBytes = text.getBytes(StandardCharsets.UTF_8);
Matcher matcher = compiledExpression.matcher(textAsBytes);
int result = matcher.search(0, textAsBytes.length, Option.MULTILINE);
int result = matcher.searchInterruptible(0, textAsBytes.length, Option.MULTILINE);
boolean matchNotFound = result == -1;
if (matchNotFound) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
package io.logz.sawmill.processors;

import com.google.common.collect.ImmutableMap;
import io.logz.sawmill.Doc;
import io.logz.sawmill.ProcessResult;
import io.logz.sawmill.exceptions.ProcessorConfigurationException;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static io.logz.sawmill.utils.DocUtils.createDoc;
import static io.logz.sawmill.utils.FactoryUtils.createProcessor;
import static io.logz.sawmill.utils.FactoryUtils.createProcessorFactory;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -308,4 +315,27 @@ public void testInvalidExpression() throws InterruptedException {
public void testBadConfigs() {
assertThatThrownBy(() -> createProcessor(GrokProcessor.class, "patterns", Arrays.asList("pattern"))).isInstanceOf(NullPointerException.class);
}

@Test
public void testInterruptStopsProcessor() {
String field = "message";
List<String> patterns = IntStream.range(1, 10000).mapToObj(i -> "%{COMBINEDAPACHELOG}").collect(Collectors.toList());
Doc doc = createDoc(field, RandomStringUtils.randomAlphanumeric(100000));

Map<String,Object> config = ImmutableMap.of(
"field", field,
"patterns", patterns
);
GrokProcessor grokProcessor = factory.create(config);

interruptCurrentThreadIn(100);
assertThatThrownBy(() -> grokProcessor.process(doc))
.isInstanceOf(InterruptedException.class);
}

private void interruptCurrentThreadIn(long millis) {
Thread currentThread = Thread.currentThread();
ScheduledExecutorService interrupter = Executors.newScheduledThreadPool(1);
interrupter.schedule(currentThread::interrupt, millis, MILLISECONDS);
}
}

0 comments on commit 1f2369f

Please sign in to comment.