Skip to content

Commit

Permalink
Introduce Data Virtualization procedures (#2233)
Browse files Browse the repository at this point in the history
The APOC library now supports the definition of a catalog of virtual resources.

A virtual resource is an external data source that neo4j can use to query and retrieve data on demand presenting it as virtual nodes enriching the data stored in the graph.

Virtual resources are currently limited to CSV and JDBC sources.

Co-authored-by: Andrea Santurbano <santand@gmail.com>
  • Loading branch information
github-actions[bot] and conker84 committed Dec 7, 2021
1 parent 8f8f9bd commit c7f71d4
Show file tree
Hide file tree
Showing 43 changed files with 1,473 additions and 19 deletions.
4 changes: 3 additions & 1 deletion core/src/main/java/apoc/SystemLabels.java
Expand Up @@ -8,5 +8,7 @@ public enum SystemLabels implements Label {
Procedure,
Function,
ApocUuid,
ApocTrigger
ApocTriggerMeta,
ApocTrigger,
DataVirtualizationCatalog
}
27 changes: 26 additions & 1 deletion core/src/main/java/apoc/result/VirtualPath.java
Expand Up @@ -20,11 +20,17 @@
public class VirtualPath implements Path {

private final Node start;
private final List<Relationship> relationships = new ArrayList<>();
private final List<Relationship> relationships;

public VirtualPath(Node start) {
this(start, new ArrayList<>());
}

private VirtualPath(Node start, List<Relationship> relationships) {
Objects.requireNonNull(start);
Objects.requireNonNull(relationships);
this.start = start;
this.relationships = relationships;
}

public void addRel(Relationship relationship) {
Expand Down Expand Up @@ -118,4 +124,23 @@ public void remove() {
public String toString() {
return Paths.defaultPathToString(this);
}

public static final class Builder {
private final Node start;
private final List<Relationship> relationships = new ArrayList<>();

public Builder(Node start) {
this.start = start;
}

public Builder push(Relationship relationship) {
this.relationships.add(relationship);
return this;
}

public VirtualPath build() {
return new VirtualPath(start, relationships);
}

}
}
35 changes: 18 additions & 17 deletions core/src/main/java/apoc/util/Util.java
Expand Up @@ -6,7 +6,7 @@
import apoc.result.VirtualNode;
import apoc.result.VirtualRelationship;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.collections.api.iterator.LongIterator;
import org.neo4j.graphdb.Direction;
import org.neo4j.graphdb.Entity;
Expand All @@ -15,6 +15,7 @@
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.NotInTransactionException;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.RelationshipType;
import org.neo4j.graphdb.ResourceIterator;
Expand Down Expand Up @@ -88,6 +89,7 @@
import static apoc.ApocConfig.apocConfig;
import static apoc.util.DateFormatUtil.getOrCreate;
import static java.lang.String.format;
import static org.eclipse.jetty.util.URIUtil.encodePath;

/**
* @author mh
Expand Down Expand Up @@ -659,15 +661,8 @@ public static void logErrors(String message, Map<String, Long> errors, Log log)
}
}

public static void checkAdmin( SecurityContext securityContext, ProcedureCallContext callContext, String procedureName )
{
switch ( securityContext.allowExecuteAdminProcedure( callContext.id() ) )
{
case EXPLICIT_GRANT:
return;
default:
throw new RuntimeException( "This procedure " + procedureName + " is only available to admin users" );
}
public static void checkAdmin(SecurityContext securityContext, ProcedureCallContext callContext, String procedureName) {
if (!securityContext.allowExecuteAdminProcedure(callContext.id())) throw new RuntimeException("This procedure "+ procedureName +" is only available to admin users");
}

public static void sleep(int millis) {
Expand Down Expand Up @@ -747,7 +742,7 @@ public static void removeFinished(List<Future> futures) {
}
}

public static void close(AutoCloseable closeable, Consumer<Exception> onErrror) {
public static void close(Closeable closeable, Consumer<Exception> onErrror) {
try {
if (closeable!=null) closeable.close();
} catch (Exception e) {
Expand All @@ -758,7 +753,7 @@ public static void close(AutoCloseable closeable, Consumer<Exception> onErrror)
}
}

public static void close(AutoCloseable closeable) {
public static void close(Closeable closeable) {
close(closeable, null);
}

Expand Down Expand Up @@ -882,9 +877,9 @@ public static <T extends Entity> List<T> rebind(List<T> entities, Transaction tx
}

public static Node mergeNode(Transaction tx, Label primaryLabel, Label addtionalLabel,
Pair<String, Object>... pairs ) {
Pair<String, Object>... pairs) {
Node node = Iterators.singleOrNull(tx.findNodes(primaryLabel, pairs[0].first(), pairs[0].other()).stream()
.filter(n -> addtionalLabel!=null && n.hasLabel(addtionalLabel))
.filter(n -> addtionalLabel == null || n.hasLabel(addtionalLabel))
.filter( n -> {
for (int i=1; i<pairs.length; i++) {
if (!Objects.deepEquals(pairs[i].other(), n.getProperty(pairs[i].first(), null))) {
Expand Down Expand Up @@ -915,8 +910,14 @@ public static <T> Set<T> intersection(Collection<T> a, Collection<T> b) {
return intersection;
}

public static void validateQuery(GraphDatabaseService db, String statement) {
db.executeTransactionally("EXPLAIN " + statement);
public static void validateQuery(GraphDatabaseService db, String statement, QueryExecutionType.QueryType... supportedQueryTypes) {
final boolean isValid = db.executeTransactionally("EXPLAIN " + statement, Collections.emptyMap(), result ->
supportedQueryTypes == null || supportedQueryTypes.length == 0 || Stream.of(supportedQueryTypes)
.anyMatch(sqt -> sqt.equals(result.getQueryExecutionType().queryType())));

if (!isValid) {
throw new RuntimeException("Supported query types for the operation are " + Arrays.toString(supportedQueryTypes));
}
}

/**
Expand All @@ -936,7 +937,7 @@ public static String encodeUserColonPassToBase64(String userPass) {

public static Map<String, Object> extractCredentialsIfNeeded(String url, boolean failOnError) {
try {
URI uri = new URI(url);
URI uri = new URI(encodePath(url));
String authInfo = uri.getUserInfo();
if (null != authInfo) {
String[] parts = authInfo.split(":");
Expand Down
41 changes: 41 additions & 0 deletions core/src/test/java/apoc/util/UtilTest.java
Expand Up @@ -5,17 +5,22 @@
import org.junit.Test;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.QueryExecutionException;
import org.neo4j.graphdb.ResultTransformer;
import org.neo4j.graphdb.Transaction;
import org.neo4j.internal.helpers.collection.Pair;
import org.neo4j.test.rule.DbmsRule;
import org.neo4j.test.rule.ImpermanentDbmsRule;

import java.util.List;
import java.util.Map;

import static apoc.util.MapUtil.map;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/**
* @author mh
Expand Down Expand Up @@ -73,4 +78,40 @@ public void cleanPassword() throws Exception {
public void mergeNullMaps() {
assertNotNull(Util.merge(null, null));
}

@Test(expected = QueryExecutionException.class)
public void testValidateQuery() {
try {
Util.validateQuery(db, "Match (n) return m");
} catch (QueryExecutionException e) {
assertTrue(e.getMessage().contains("Variable `m` not defined"));
throw e;
}
}

@Test
public void testMerge() {
try {
final ResultTransformer<Object> resultTransformer = res -> res.next().get("count");
try (final Transaction transaction = db.beginTx()) {
Util.mergeNode(transaction, Label.label("Test"), null, Pair.of("foo", "bar"));
transaction.commit();
final long count = (long) db.executeTransactionally("MATCH (n:Test{foo: 'bar'}) RETURN count(n) AS count", Map.of(),
resultTransformer);
assertEquals(1, count);
}
try (final Transaction transaction = db.beginTx()) {
Util.mergeNode(transaction, Label.label("Test"), Label.label("Bar"), Pair.of("foo", "bar1"));
transaction.commit();
final long count = (long) db.executeTransactionally("MATCH (n:Test:Bar{foo: 'bar1'}) RETURN count(n) AS count", Map.of(),
resultTransformer);
assertEquals(1, count);
}
final long count = (long) db.executeTransactionally("MATCH (n:Test) RETURN count(n) AS count", Map.of(),
resultTransformer);
assertEquals(2, count);
} finally {
db.executeTransactionally("MATCH (n:Test) DETACH DELETE n");
}
}
}
@@ -0,0 +1,3 @@
¦signature
¦apoc.dv.query(name :: STRING?, params = {} :: ANY?, config = {} :: MAP?) :: (node :: NODE?)
¦apoc.dv.queryAndLink(node :: NODE?, relName :: STRING?, name :: STRING?, params = {} :: ANY?, config = {} :: MAP?) :: (path :: PATH?)
@@ -0,0 +1,4 @@
¦signature
¦apoc.dv.catalog.add(name :: STRING?, config = {} :: MAP?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
¦apoc.dv.catalog.list() :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
¦apoc.dv.catalog.remove(name :: STRING?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
@@ -0,0 +1,2 @@
¦signature
¦apoc.dv.catalog.add(name :: STRING?, config = {} :: MAP?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
@@ -0,0 +1,5 @@
¦xref::overview/apoc.dv/apoc.dv.catalog.add.adoc[apoc.dv.catalog.add icon:book[]] +


¦label:procedure[]
¦label:apoc-core[]
@@ -0,0 +1,16 @@
¦Qualified Name¦Type¦Release
|xref::overview/apoc.dv.catalog/apoc.dv.adoc[apoc.dv.catalog.add icon:book[]]

Add a virtualized resource configuration
|label:procedure[]
|label:apoc-core[]
|xref::overview/apoc.dv.catalog/apoc.dv.adoc[apoc.dv.catalog.list icon:book[]]

List all virtualized resource configuration
|label:procedure[]
|label:apoc-core[]
|xref::overview/apoc.dv.catalog/apoc.dv.adoc[apoc.dv.catalog.remove icon:book[]]

Remove a virtualized resource config by name
|label:procedure[]
|label:apoc-core[]
@@ -0,0 +1,2 @@
¦signature
¦apoc.dv.catalog.list() :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
@@ -0,0 +1,5 @@
¦xref::overview/apoc.dv/apoc.dv.catalog.list.adoc[apoc.dv.catalog.list icon:book[]] +


¦label:procedure[]
¦label:apoc-core[]
@@ -0,0 +1,2 @@
¦signature
¦apoc.dv.catalog.remove(name :: STRING?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
@@ -0,0 +1,5 @@
¦xref::overview/apoc.dv/apoc.dv.catalog.remove.adoc[apoc.dv.catalog.remove icon:book[]] +


¦label:procedure[]
¦label:apoc-core[]
@@ -0,0 +1,11 @@
¦Qualified Name¦Type¦Release
|xref::overview/apoc.dv/apoc.dv.adoc[apoc.dv.query icon:book[]]

Query a virtualized resource by name and return virtual nodes
|label:procedure[]
|label:apoc-core[]
|xref::overview/apoc.dv/apoc.dv.adoc[apoc.dv.queryAndLink icon:book[]]

Query a virtualized resource by name and return virtual nodes linked using virtual rels to the node passed as first param
|label:procedure[]
|label:apoc-core[]
@@ -0,0 +1,2 @@
¦signature
¦apoc.dv.query(name :: STRING?, params = {} :: ANY?, config = {} :: MAP?) :: (node :: NODE?)
@@ -0,0 +1,5 @@
¦xref::overview/apoc.dv/apoc.dv.query.adoc[apoc.dv.query icon:book[]] +


¦label:procedure[]
¦label:apoc-core[]
@@ -0,0 +1,2 @@
¦signature
¦apoc.dv.queryAndLink(node :: NODE?, relName :: STRING?, name :: STRING?, params = {} :: ANY?, config = {} :: MAP?) :: (path :: PATH?)
@@ -0,0 +1,5 @@
¦xref::overview/apoc.dv/apoc.dv.queryAndLink.adoc[apoc.dv.queryAndLink icon:book[]] +


¦label:procedure[]
¦label:apoc-core[]
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions docs/asciidoc/modules/ROOT/nav.adoc
Expand Up @@ -116,6 +116,8 @@ include::partial$generated-documentation/nav.adoc[]
** xref::virtual/virtual-graph.adoc[]
** xref::virtual/graph-grouping.adoc[]
* xref:virtual-resource/index.adoc[]
* xref:nlp/index.adoc[]
** xref:nlp/gcp.adoc[]
** xref:nlp/aws.adoc[]
Expand Down
@@ -0,0 +1,40 @@
////
This file is generated by DocsTest, so don't change it!
////

= apoc.dv.catalog.add
:description: This section contains reference documentation for the apoc.dv.catalog.add procedure.

label:procedure[] label:apoc-core[]

[.emphasis]
Add a virtualized resource configuration

== Signature

[source]
----
apoc.dv.catalog.add(name :: STRING?, config = {} :: MAP?) :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
----

== Input parameters
[.procedures, opts=header]
|===
| Name | Type | Default
|name|STRING?|null
|config|MAP?|{}
|===

== Output parameters
[.procedures, opts=header]
|===
| Name | Type
|name|STRING?
|type|STRING?
|url|STRING?
|desc|STRING?
|labels|LIST? OF STRING?
|query|STRING?
|params|LIST? OF STRING?
|===

@@ -0,0 +1,32 @@
////
This file is generated by DocsTest, so don't change it!
////

= apoc.dv.catalog.list
:description: This section contains reference documentation for the apoc.dv.catalog.list procedure.

label:procedure[] label:apoc-core[]

[.emphasis]
List all virtualized resource configuration

== Signature

[source]
----
apoc.dv.catalog.list() :: (name :: STRING?, type :: STRING?, url :: STRING?, desc :: STRING?, labels :: LIST? OF STRING?, query :: STRING?, params :: LIST? OF STRING?)
----

== Output parameters
[.procedures, opts=header]
|===
| Name | Type
|name|STRING?
|type|STRING?
|url|STRING?
|desc|STRING?
|labels|LIST? OF STRING?
|query|STRING?
|params|LIST? OF STRING?
|===

0 comments on commit c7f71d4

Please sign in to comment.