Skip to content

Commit

Permalink
[NOID] Changes review
Browse files Browse the repository at this point in the history
  • Loading branch information
vga91 committed Jun 21, 2023
1 parent 644daf3 commit cc1e933
Show file tree
Hide file tree
Showing 20 changed files with 112 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import apoc.export.util.ExportConfig;
import apoc.export.util.ExportFormat;
import apoc.export.util.ProgressReporter;
import apoc.export.util.Reporter;
import apoc.util.Util;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -254,8 +253,7 @@ public void buildStatementForNodes(String nodeClause, String setClause,
});
addCommitToEnd(exportConfig, out, batchCount);

// because of unwind batched result, we update the statusDetail for each line
((ProgressReporter) reporter).update(nodeCount.get(), 0, propertiesCount.longValue(), true);
reporter.update(nodeCount.get(), 0, propertiesCount.longValue());
}

private void closeUnwindNodes(String nodeClause, String setClause, Map<String, Set<String>> uniqueConstraints, ExportConfig exportConfig, PrintWriter out, Map.Entry<Set<String>, Set<String>> key, Node last) {
Expand Down Expand Up @@ -392,8 +390,7 @@ public void buildStatementForRelationships(String relationshipClause,
});
addCommitToEnd(exportConfig, out, batchCount);

// because of unwind batched result, we update the statusDetail for each line
((ProgressReporter) reporter).update(0, relCount.get(), propertiesCount.longValue(), true);
reporter.update(0, relCount.get(), propertiesCount.longValue());
}

private void closeUnwindRelationships(String relationshipClause, String setClause, Map<String, Set<String>> uniqueConstraints, ExportConfig exportConfig, PrintWriter out, String start, String end, Map<String, Object> path, Relationship last, boolean withMultipleRels) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/apoc/export/util/FormatUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@
*/
public class FormatUtils {

public static <T> String asListed(Map<String, T> map) {
public static <T> String asHyphenSeparatedList(Map<String, T> map) {
return map.entrySet()
.stream()
.map(e -> "- " + e.getKey() + ": " + e.getValue())
.collect(Collectors.joining("\n"));
.collect(Collectors.joining(",\n"));
}

public static String formatNumber(Number value) {
Expand Down
23 changes: 10 additions & 13 deletions core/src/main/java/apoc/export/util/ProgressReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import apoc.result.ProgressInfo;
import apoc.util.JsonUtil;
import apoc.util.Util;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.graphdb.Transaction;

Expand All @@ -28,8 +29,6 @@
import java.util.function.Consumer;
import java.util.stream.Stream;

import static apoc.util.Util.setKernelStatusMap;

/**
* @author mh
* @since 22.05.16
Expand Down Expand Up @@ -78,15 +77,11 @@ private long percent() {
}

public void update(long nodes, long relationships, long properties) {
update(nodes, relationships, properties, false);
}

public void update(long nodes, long relationships, long properties, boolean updateCurrent) {
time = System.currentTimeMillis();
progressInfo.update(nodes, relationships, properties);
totalEntities += nodes + relationships;
acceptBatch();
updateStatus(updateCurrent);
updateStatus();
}

public void acceptBatch() {
Expand Down Expand Up @@ -139,15 +134,17 @@ public Stream<ProgressInfo> stream() {
public void nextRow() {
this.progressInfo.nextRow();
this.totalEntities++;
updateStatus(false);
updateStatusPeriodically();
acceptBatch();
}

public void updateStatus(boolean updateCurrent) {
private void updateStatusPeriodically() {
final Map<String, Object> statusMap = JsonUtil.convertToMap(this.progressInfo);
if (updateCurrent) {
setKernelStatusMap(tx, true, statusMap);
}
setKernelStatusMap(tx, progressInfo.nodes + progressInfo.relationships + progressInfo.properties, statusMap);
Util.setKernelStatusPeriodically(tx, progressInfo.nodes + progressInfo.relationships + progressInfo.properties, statusMap);
}

private void updateStatus() {
final Map<String, Object> statusMap = JsonUtil.convertToMap(this.progressInfo);
Util.setKernelStatus(tx, true, statusMap);
}
}
7 changes: 3 additions & 4 deletions core/src/main/java/apoc/load/LoadJson.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.stream.Stream;

import static apoc.util.CompressionConfig.COMPRESSION;
import static apoc.util.Util.setKernelStatusMap;

public class LoadJson {

Expand All @@ -60,7 +59,7 @@ public Stream<ObjectResult> jsonArray(@Name("url") String url, @Name(value = "pa
.flatMap((value) -> {
if (value instanceof List) {
final long counter = rows.incrementAndGet();
setKernelStatusMap(tx, counter, Map.of("rows", counter));
Util.setKernelStatusPeriodically(tx, counter, Map.of("records", counter));
List list = (List) value;
if (list.isEmpty()) return Stream.empty();
if (list.get(0) instanceof Map) return list.stream().map(ObjectResult::new);
Expand Down Expand Up @@ -100,12 +99,12 @@ public static Stream<MapResult> loadJsonStream(@Name("urlOrKeyOrBinary") Object
return stream.flatMap((value) -> {
if (value instanceof Map) {
final long counter = rows.incrementAndGet();
setKernelStatusMap(tx, counter, Map.of("rows", counter));
Util.setKernelStatusPeriodically(tx, counter, Map.of("records", counter));
return Stream.of(new MapResult((Map) value));
}
if (value instanceof List) {
final long counter = rows.incrementAndGet();
setKernelStatusMap(tx, counter, Map.of("rows", counter));
Util.setKernelStatusPeriodically(tx, counter, Map.of("records", counter));
if (((List)value).isEmpty()) return Stream.empty();
if (((List) value).get(0) instanceof Map)
return ((List) value).stream().map((v) -> new MapResult((Map) v));
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/java/apoc/load/Xml.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import static apoc.util.FileUtils.getInputStreamFromBinary;
import static apoc.util.Util.ERROR_BYTES_OR_STRING;
import static apoc.util.Util.map;
import static apoc.util.Util.setKernelStatusMap;

public class Xml {

Expand Down Expand Up @@ -266,9 +265,9 @@ private void handleNode(Deque<Map<String, Object>> stack, Node node, boolean sim

if (!elementMap.isEmpty()) {
final int counter = stack.size();
final Map<String, Object> statusMap = map("curr. element", counter);
final Map<String, Object> statusMap = map("record", counter);
statusMap.putAll(elementMap);
setKernelStatusMap(tx, counter, statusMap);
Util.setKernelStatusPeriodically(tx, counter, statusMap);
stack.addLast(elementMap);
}
}
Expand Down Expand Up @@ -528,14 +527,14 @@ public void updateLast(org.neo4j.graphdb.Node thisNode) {
}
statusDetail.merge("nodes", 1L, Long::sum);
statusDetail.merge("relationships", 1L, Long::sum);
setKernelStatusMap(tx, ++counter, statusDetail);
Util.setKernelStatusPeriodically(tx, ++counter, statusDetail);
parentAndChildPair.setPreviousChild(thisNode);
last = thisNode;
}

public void updateNumTags() {
statusDetail.merge("elements", 1L, Long::sum);
setKernelStatusMap(tx, counter, statusDetail);
Util.setKernelStatusPeriodically(tx, counter, statusDetail);
}

public void addCurrentCharacterIndex(int length) {
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/java/apoc/periodic/Periodic.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,9 @@
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static apoc.periodic.PeriodicUtils.*;
import static org.neo4j.graphdb.QueryExecutionType.QueryType;
import static apoc.periodic.PeriodicUtils.submitJob;
import static apoc.periodic.PeriodicUtils.submitProc;
import static apoc.periodic.PeriodicUtils.wrapTask;
import static apoc.util.Util.merge;
import static apoc.util.Util.setKernelStatusMap;

public class Periodic {

Expand Down Expand Up @@ -122,8 +119,8 @@ public Stream<RundownResult> commit(@Name("statement") String statement, @Name(v
return 0L;
}
}), commitErrors, failedCommits, 0L);
setKernelStatusMap(tx, true,
Map.of("successes", batches.get() - failedBatches.get(), "errors", failedBatches.get()));
Util.setKernelStatus(tx, true,
Map.of(COMMITTED, batches.get() - failedBatches.get(), FAILED, failedBatches.get()));
total += updates;
if (updates > 0) executions++;
if (log.isDebugEnabled()) {
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/java/apoc/periodic/PeriodicUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import static apoc.periodic.Periodic.JobInfo;

public class PeriodicUtils {
public static final String COMMITTED = "committed";
public static final String FAILED = "failed";

private PeriodicUtils() {

Expand Down Expand Up @@ -110,9 +112,9 @@ public static Stream<BatchAndTotalResult> iterateAndExecuteBatchedInSeparateThre
retryCount -> collector.incrementRetried(),
onComplete -> {
// in this case we update statusDetails for each batch, instead of counting rows/lines
Util.setKernelStatusMap(tx, true,
Map.of("successes", collector.getBatches() - collector.getFailedBatches().get(),
"errors", collector.getFailedBatches().get()));
Util.setKernelStatus(tx, true,
Map.of(COMMITTED, collector.getBatches() - collector.getFailedBatches().get(),
FAILED, collector.getFailedBatches().get()));
collector.incrementBatches();
executeBatch.release();
activeFutures.decrementAndGet();
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/apoc/util/Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -1190,17 +1190,17 @@ public static Object getStringOrCompressedData(StringWriter writer, ExportConfig
}
}

public static <T> void setKernelStatusMap(Transaction tx, boolean updateResult, Map<String, T> map) {
public static <T> void setKernelStatus(Transaction tx, boolean updateResult, Map<String, T> status) {
// we don't write anything if transaction is not an InternalTransaction
if (updateResult && tx instanceof InternalTransaction) {
final KernelTransaction ktx = ((InternalTransaction) tx).kernelTransaction();
ktx.setStatusDetails(FormatUtils.asListed(map));
ktx.setStatusDetails(FormatUtils.asHyphenSeparatedList(status));
}
}

public static <T> void setKernelStatusMap(Transaction tx, long counter, Map<String, T> map) {
public static <T> void setKernelStatusPeriodically(Transaction tx, long counter, Map<String, T> map) {
boolean updateResult = counter != 0 && counter % 1000 == 0;
setKernelStatusMap(tx, updateResult, map);
setKernelStatus(tx, updateResult, map);
}

public static String toCypherMap(Map<String, Object> map) {
Expand Down
8 changes: 0 additions & 8 deletions core/src/test/java/apoc/export/cypher/ExportCypherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import static apoc.util.BinaryTestUtil.getDecompressedData;
import static apoc.util.TestUtil.assertError;
import static apoc.util.Util.INVALID_QUERY_MODE_ERROR;
import static apoc.kernel.KernelTestUtils.checkStatusDetails;
import static apoc.util.Util.map;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.*;
Expand Down Expand Up @@ -315,13 +314,6 @@ public void testExportGraphCypher() throws Exception {
assertEquals(EXPECTED_NEO4J_SHELL, readFile(fileName));
}

@Test
public void testExportWithStatusDetails() {
db.executeTransactionally("UNWIND range(0,22222) AS x CREATE (:Status)-[:REL]->(:StatusBis)");
checkStatusDetails(db, "CALL apoc.export.cypher.all('status.cypher', {separateFiles:true})", Map.of());
db.executeTransactionally("MATCH (n) DETACH DELETE n");
}

// -- Separate files tests -- //
@Test
public void testExportAllCypherNodes() throws Exception {
Expand Down
8 changes: 0 additions & 8 deletions core/src/test/java/apoc/load/XmlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import apoc.ApocSettings;
import apoc.util.CompressionAlgo;
import apoc.util.MapUtil;
import apoc.util.TestUtil;
import apoc.xml.XmlTestUtils;
import inet.ipaddr.IPAddressString;
Expand Down Expand Up @@ -48,7 +47,6 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static apoc.kernel.KernelTestUtils.checkStatusDetails;
import static apoc.util.BinaryTestUtil.fileToBinary;
import static apoc.util.CompressionConfig.COMPRESSION;
import static apoc.util.TestUtil.*;
Expand Down Expand Up @@ -108,12 +106,6 @@ public void testMixedContent() {
testCall(db, "CALL apoc.load.xml('" + TestUtil.getUrlFileName("xml/mixedcontent.xml") + "')", // YIELD value RETURN value
this::commonAssertionsMixedContent);
}

@Test
public void testXmlStatusDetails() {
final String file = ClassLoader.getSystemResource("largeFile.xml").toString();
checkStatusDetails(db, "CALL apoc.import.xml($file)", MapUtil.map("file", file));
}

@Test
public void testMixedContentWithBinary() {
Expand Down
8 changes: 6 additions & 2 deletions core/src/test/java/apoc/periodic/PeriodicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import java.util.stream.Stream;

import static apoc.periodic.Periodic.applyPlanner;
import static apoc.periodic.PeriodicUtils.COMMITTED;
import static apoc.periodic.PeriodicUtils.FAILED;
import static apoc.util.TestUtil.testCall;
import static apoc.util.TestUtil.testResult;
import static apoc.util.Util.map;
Expand Down Expand Up @@ -399,16 +401,18 @@ public void testStatusDetailsPeriodicIterate() {

// test periodic iterate
KernelTestUtils.checkStatusDetails(db,
"CALL apoc.periodic.iterate('match (p:StatusIterate) return p', 'SET p.lastname =p.name REMOVE p.name', {batchSize:10,parallel:true})",
"CALL apoc.periodic.iterate('match (p:StatusIterate) return p', 'SET p.lastname =p.name REMOVE p.name', {batchSize:10,parallel:true})",
(row) -> row.contains(COMMITTED) && row.contains(FAILED),
Collections.emptyMap(),
"cypher runtime=slotted match (p:StatusIterate)");

// test periodic commit
String query = "MATCH (p:StatusIterate) WHERE NOT p:Processed WITH p LIMIT 200 SET p:Processed RETURN count(*)";
KernelTestUtils.checkStatusDetails(db,
"CALL apoc.periodic.commit($query, {})",
(row) -> row.contains(COMMITTED) && row.contains(FAILED),
map("query", query));

db.executeTransactionally("MATCH (s:StatusIterate) DELETE s");
}

Expand Down
3 changes: 1 addition & 2 deletions full/src/main/java/apoc/cypher/CypherExtended.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import static apoc.util.MapUtil.map;
import static apoc.util.Util.param;
import static apoc.util.Util.quote;
import static apoc.util.Util.setKernelStatusMap;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -234,7 +233,7 @@ private Object consumeResult(Result result, BlockingQueue<RowResult> queue, bool
Map.Entry::getValue,
(v1, v2) -> (long) v1 + (long) v2));
// in this case we update statusDetails for each query result instead of count rows/lines
setKernelStatusMap(tx, true, update);
Util.setKernelStatus(tx, true, update);
queue.put(new RowResult(-1, resultMap));
}
return row;
Expand Down
4 changes: 1 addition & 3 deletions full/src/main/java/apoc/load/HtmlResultInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
package apoc.load;

import org.jsoup.nodes.Document;
import org.neo4j.graphdb.Transaction;
import org.neo4j.logging.Log;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public interface HtmlResultInterface {

Expand All @@ -41,5 +39,5 @@ public HtmlResultInterface get() {
}
}

Object getResult(Document document, String selector, LoadHtmlConfig config, List<String> errorList, Log log, AtomicInteger rows, Transaction tx);
Object getResult(Document document, String selector, LoadHtmlConfig config, List<String> errorList, Log log);
}
Loading

0 comments on commit cc1e933

Please sign in to comment.