Skip to content

Commit

Permalink
Fix concurrency bug in AbstractStringScriptFieldAutomatonQuery
Browse files Browse the repository at this point in the history
Back when we introduced queries against runtime fields, Elasticsearch did not support
inter-segment concurrency yet. At the time, it was fine to assume that segments will be
searched sequentially. AbstractStringScriptFieldAutomatonQuery used to have a BytesRefBuilder
instance shared across the segments, which gets re-initialized when each segment starts its work.
This is no longer possible with inter-segment concurrency.

Closes elastic#105911
  • Loading branch information
javanna committed Mar 25, 2024
1 parent a85599f commit 05abfe4
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ abstract class AbstractBooleanScriptFieldQuery extends AbstractScriptFieldQuery<
}

@Override
protected boolean matches(BooleanFieldScript scriptContext, int docId) {
protected final boolean matches(BooleanFieldScript scriptContext, int docId) {
scriptContext.runForDoc(docId);
return matches(scriptContext.trues(), scriptContext.falses());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ abstract class AbstractDoubleScriptFieldQuery extends AbstractScriptFieldQuery<D
}

@Override
protected boolean matches(DoubleFieldScript scriptContext, int docId) {
protected final boolean matches(DoubleFieldScript scriptContext, int docId) {
scriptContext.runForDoc(docId);
return matches(scriptContext.values(), scriptContext.count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ abstract class AbstractGeoPointScriptFieldQuery extends AbstractScriptFieldQuery
}

@Override
protected boolean matches(GeoPointFieldScript scriptContext, int docId) {
protected final boolean matches(GeoPointFieldScript scriptContext, int docId) {
scriptContext.runForDoc(docId);
return matches(scriptContext.lats(), scriptContext.lons(), scriptContext.count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ abstract class AbstractIpScriptFieldQuery extends AbstractScriptFieldQuery<IpFie
}

@Override
protected boolean matches(IpFieldScript scriptContext, int docId) {
protected final boolean matches(IpFieldScript scriptContext, int docId) {
scriptContext.runForDoc(docId);
return matches(scriptContext.values(), scriptContext.count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ abstract class AbstractLongScriptFieldQuery extends AbstractScriptFieldQuery<Abs
}

@Override
protected boolean matches(AbstractLongFieldScript scriptContext, int docId) {
protected final boolean matches(AbstractLongFieldScript scriptContext, int docId) {
scriptContext.runForDoc(docId);
return AbstractLongScriptFieldQuery.this.matches(scriptContext.values(), scriptContext.count());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.List;

public abstract class AbstractStringScriptFieldAutomatonQuery extends AbstractStringScriptFieldQuery {
private final BytesRefBuilder scratch = new BytesRefBuilder();
private final ThreadLocal<BytesRefBuilder> bytesRefBuilderThreadLocal = ThreadLocal.withInitial(BytesRefBuilder::new);
private final ByteRunAutomaton automaton;

public AbstractStringScriptFieldAutomatonQuery(
Expand All @@ -33,11 +33,15 @@ public AbstractStringScriptFieldAutomatonQuery(
@Override
protected final boolean matches(List<String> values) {
for (String value : values) {
BytesRefBuilder scratch = bytesRefBuilderThreadLocal.get();
scratch.copyChars(value);
System.out.println("copyChars - " + value + " - " + Thread.currentThread().getName());
if (automaton.run(scratch.bytes(), 0, scratch.length())) {
System.out.println("match " + Thread.currentThread().getName());
return true;
}
}
System.out.println("no match " + Thread.currentThread().getName());
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@
package org.elasticsearch.search.runtime;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.automaton.ByteRunAutomaton;
import org.elasticsearch.script.Script;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -82,6 +88,29 @@ public void testMatches() {
assertFalse(query.matches(List.of("faaa")));
}

public void testConcurrentMatches() {
StringScriptFieldFuzzyQuery query = StringScriptFieldFuzzyQuery.build(randomScript(), leafFactory, "test", "foo", 1, 0, false);
List<Future<?>> futures = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
futures.add(executorService.submit(() -> assertTrue(query.matches(List.of("foo")))));
futures.add(executorService.submit(() -> assertTrue(query.matches(List.of("foa")))));
futures.add(executorService.submit(() -> assertTrue(query.matches(List.of("foo", "bar")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("bar")))));
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
fail(e);
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
} finally {
terminate(executorService);
}
}

@Override
protected void assertToString(StringScriptFieldFuzzyQuery query) {
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@
package org.elasticsearch.search.runtime;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.automaton.ByteRunAutomaton;
import org.apache.lucene.util.automaton.Operations;
import org.apache.lucene.util.automaton.RegExp;
import org.elasticsearch.script.Script;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -106,6 +112,40 @@ public void testMatches() {

}

public void testConcurrentMatches() {
StringScriptFieldRegexpQuery query = new StringScriptFieldRegexpQuery(
randomScript(),
leafFactory,
"test",
"a.+b",
0,
0,
Operations.DEFAULT_DETERMINIZE_WORK_LIMIT
);
List<Future<?>> futures = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
futures.add(executorService.submit(() -> assertTrue(query.matches(List.of("astuffb")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("astuffB")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("fffff")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("ab")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("aasdf")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("dsfb")))));
futures.add(executorService.submit(() -> assertTrue(query.matches(List.of("astuffb", "fffff")))));
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
fail(e);
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
} finally {
terminate(executorService);
}
}

@Override
protected void assertToString(StringScriptFieldRegexpQuery query) {
assertThat(query.toString(query.fieldName()), equalTo("/" + query.pattern() + "/"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@
package org.elasticsearch.search.runtime;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.ThreadInterruptedException;
import org.apache.lucene.util.automaton.ByteRunAutomaton;
import org.elasticsearch.script.Script;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static org.hamcrest.Matchers.equalTo;

Expand Down Expand Up @@ -67,6 +73,30 @@ public void testMatches() {

}

public void testConcurrentMatches() {
StringScriptFieldWildcardQuery query = new StringScriptFieldWildcardQuery(randomScript(), leafFactory, "test", "a*b", false);
List<Future<?>> futures = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(3);
try {
futures.add(executorService.submit(() -> assertTrue(query.matches(List.of("astuffb")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("Astuffb")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("fffff")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("a")))));
futures.add(executorService.submit(() -> assertFalse(query.matches(List.of("b")))));
for (Future<?> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
fail(e);
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
}
}
} finally {
terminate(executorService);
}
}

@Override
protected void assertToString(StringScriptFieldWildcardQuery query) {
assertThat(query.toString(query.fieldName()), equalTo(query.pattern()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ abstract class AbstractGeoShapeScriptFieldQuery extends AbstractScriptFieldQuery
}

@Override
protected boolean matches(GeometryFieldScript scriptContext, int docId) {
protected final boolean matches(GeometryFieldScript scriptContext, int docId) {
scriptContext.runForDoc(docId);
return matches(scriptContext.geometry());
}
Expand Down

0 comments on commit 05abfe4

Please sign in to comment.