Skip to content

Commit

Permalink
Fixes #3751: mapParallel2 with rebind error (#3810)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrea Santurbano <santand@gmail.com>
  • Loading branch information
vga91 and conker84 committed Oct 26, 2023
1 parent 002bc5b commit 20c4634
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 9 deletions.
12 changes: 6 additions & 6 deletions extended/src/main/java/apoc/cypher/CypherExtended.java
Expand Up @@ -174,7 +174,7 @@ private void runDataStatementsInTx(Scanner scanner, BlockingQueue<RowResult> que
if (isPeriodicOperation(stmt)) {
Util.inThread(pools , () -> {
try {
return db.executeTransactionally(stmt, params, result -> consumeResult(result, queue, addStatistics, timeout));
return db.executeTransactionally(stmt, params, result -> consumeResult(result, queue, addStatistics, tx));
} catch (Exception e) {
collectError(queue, reportError, e, fileName);
return null;
Expand All @@ -184,7 +184,7 @@ private void runDataStatementsInTx(Scanner scanner, BlockingQueue<RowResult> que
else {
Util.inTx(db, pools, threadTx -> {
try (Result result = threadTx.execute(stmt, params)) {
return consumeResult(result, queue, addStatistics, timeout);
return consumeResult(result, queue, addStatistics, tx);
} catch (Exception e) {
collectError(queue, reportError, e, fileName);
return null;
Expand Down Expand Up @@ -227,7 +227,7 @@ private void runSchemaStatementsInTx(Scanner scanner, BlockingQueue<RowResult> q
if (schemaOperation) {
Util.inTx(db, pools, txInThread -> {
try (Result result = txInThread.execute(stmt, params)) {
return consumeResult(result, queue, addStatistics, timeout);
return consumeResult(result, queue, addStatistics, tx);
} catch (Exception e) {
collectError(queue, reportError, e, fileName);
return null;
Expand All @@ -239,13 +239,13 @@ private void runSchemaStatementsInTx(Scanner scanner, BlockingQueue<RowResult> q

private final static Pattern shellControl = Pattern.compile("^:?\\b(begin|commit|rollback)\\b", Pattern.CASE_INSENSITIVE);

private Object consumeResult(Result result, BlockingQueue<RowResult> queue, boolean addStatistics, long timeout) {
private Object consumeResult(Result result, BlockingQueue<RowResult> queue, boolean addStatistics, Transaction transaction) {
try {
long time = System.currentTimeMillis();
int row = 0;
while (result.hasNext()) {
terminationGuard.check();
Map<String, Object> res = EntityUtil.anyRebind(tx, result.next());
Map<String, Object> res = EntityUtil.anyRebind(transaction, result.next());
queue.put(new RowResult(row++, res));
}
if (addStatistics) {
Expand Down Expand Up @@ -375,7 +375,7 @@ public Stream<MapResult> mapParallel2(@Name("fragment") String fragment, @Name("
.map((List<Object> partition) -> {
try (Transaction transaction = db.beginTx();
Result result = transaction.execute(statement, parallelParams(params, "_", partition))) {
return consumeResult(result, queue, false, timeout);
return consumeResult(result, queue, false, transaction);
} catch (Exception e) {
throw new RuntimeException(e);
}}
Expand Down
78 changes: 75 additions & 3 deletions extended/src/test/java/apoc/cypher/CypherExtendedTest.java
Expand Up @@ -5,6 +5,7 @@
import apoc.util.Util;
import apoc.util.Utils;
import apoc.util.collection.Iterators;
import org.apache.commons.io.FileUtils;
import org.junit.*;
import org.junit.rules.ExpectedException;
import org.neo4j.configuration.GraphDatabaseSettings;
Expand All @@ -19,6 +20,8 @@
import org.neo4j.test.rule.ImpermanentDbmsRule;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -46,11 +49,11 @@
* @since 08.05.16
*/
public class CypherExtendedTest {

public static final String IMPORT_DIR = "src/test/resources";
@ClassRule
public static DbmsRule db = new ImpermanentDbmsRule()
.withSetting(GraphDatabaseSettings.allow_file_urls, true)
.withSetting(GraphDatabaseSettings.load_csv_file_url_root, new File("src/test/resources").toPath().toAbsolutePath());
.withSetting(GraphDatabaseSettings.load_csv_file_url_root, new File(IMPORT_DIR).toPath().toAbsolutePath());

@Rule
public ExpectedException thrown= ExpectedException.none();
Expand Down Expand Up @@ -226,7 +229,76 @@ public void shouldNotFailWithTransactionErrorWithMapParallel2() {
"RETURN value.title limit 5",
r -> assertEquals(5, Iterators.count(r)));
}


@Test
public void testIssue3751MapParallel2() {
int expected = 500;
db.executeTransactionally("UNWIND range(1, $int) as id CREATE (:Polling {id: id})",
Map.of("int", expected));

// the error is flaky, so we need to run the query several times to replicate it
for (int i = 0; i < 30; i++) {
testCallCount(db, """
MATCH (n:Polling) WITH collect({childD: n}) as params \s
CALL apoc.cypher.mapParallel2(" WITH _.childD as childD RETURN childD", {}, params, 6, 10)\s
YIELD value RETURN value""",
Map.of(),
expected);
}
}

@Test
public void testIssue3751RunFiles() {
int numEntities = 500;
db.executeTransactionally("UNWIND range(1, $int) as id CREATE (:Polling {id: id})",
Map.of("int", numEntities));

for (int i = 0; i < 30; i++) {
int numFiles = 30;
List<String> files = Collections.nCopies(numFiles, "parallel.cypher");

int expected = numEntities * numFiles;
testCallCount(db, "CALL apoc.cypher.runFiles($files, {statistics: false})",
Map.of("files", files),
expected);
}
}

@Test
public void testIssue3751RunSchemaFiles() throws IOException {
for (int i = 0; i < 15; i++) {
int numFiles = 10;
List<File> files = new ArrayList<>();

for (int fileIdx = 0; fileIdx < numFiles; fileIdx++) {
String id = i + "" + fileIdx;
File file = new File(IMPORT_DIR, "schema" + id);

String content = """
CREATE INDEX index%1$s FOR (n:Person%1$s) ON (n.name%1$s);
CREATE INDEX secondIndex%1$s FOR (n:Foo%1$s) ON (n.bar%1$s);
CREATE INDEX thirdIndex%1$s FOR (n:Ajeje%1$s) ON (n.brazorf%1$s);
"""
.formatted(id);

FileUtils.writeStringToFile(file,
content,
StandardCharsets.UTF_8);

files.add(file);
}

// 4 is the number of indexes
int expected = numFiles * 3;
List<String> fileNames = files.stream().map(File::getName).toList();
testCallCount(db, "CALL apoc.cypher.runSchemaFiles($files, {statistics: true})",
Map.of("files", fileNames),
expected);

files.forEach(File::delete);
}

}

@Test
public void testRunFileWithParameters() throws Exception {
Expand Down
1 change: 1 addition & 0 deletions extended/src/test/resources/parallel.cypher
@@ -0,0 +1 @@
MATCH (n:Polling) SET n.test = date() RETURN n;

0 comments on commit 20c4634

Please sign in to comment.