Skip to content

Commit

Permalink
Fix procedure apoc.cypher.mapParallel2 "Unable to complete transactio…
Browse files Browse the repository at this point in the history
…n" errors

Fixes #2034
  • Loading branch information
vga91 committed Aug 4, 2021
1 parent c2c344a commit ba67261
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
11 changes: 6 additions & 5 deletions full/src/main/java/apoc/cypher/CypherExtended.java
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,12 @@ public Stream<MapResult> mapParallel2(@Name("fragment") String fragment, @Name("
Util.inFuture(pools, () -> {
long total = parallelPartitions
.map((List<Object> partition) -> {
try {
try (Result result = tx.execute(statement, parallelParams(params, "_", partition))) {
return consumeResult(result, queue, false, timeout);
}
} catch (Exception e) {throw new RuntimeException(e);}}
try (Transaction transaction = db.beginTx();
Result result = transaction.execute(statement, parallelParams(params, "_", partition))) {
return consumeResult(result, queue, false, timeout);
} catch (Exception e) {
throw new RuntimeException(e);
}}
).count();
queue.put(RowResult.TOMBSTONE);
return total;
Expand Down
13 changes: 13 additions & 0 deletions full/src/test/java/apoc/cypher/CypherExtendedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ public void testRunFileWithResults() throws Exception {
assertEquals(false, r.hasNext());
});
}

@Test
public void shouldNotFailWithTransactionErrorWithMapParallel2() {
// Sometimes it was green even without `db.beginTx()` modification
db.executeTransactionally("UNWIND range(1, 100) as i create (p:Page {title: i})-[:Link]->(p1:Page1)<-[:Link]-(p2:Page2 {title: 'myTitle'})");
testResult(db, "MATCH (p:Page) WITH collect(p) as pages\n" +
"CALL apoc.cypher.mapParallel2(\"MATCH (_)-[:Link]->(p1)<-[:Link]-(p2)\n" +
"RETURN p2.title as title\", {}, pages, 1) yield value\n" +
"RETURN value.title limit 5",
r -> assertEquals(5, Iterators.count(r)));
}


@Test
public void testRunFileWithParameters() throws Exception {
testResult(db, "CALL apoc.cypher.runFile('parameterized.cypher', {statistics:false,parameters:{foo:123,bar:'baz'}})",
Expand Down

0 comments on commit ba67261

Please sign in to comment.