6 changes: 6 additions & 0 deletions docs/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ $(document).ready(function() {
</script>
++++

include::custom.adoc[leveloffset=1]

== Virtual

include::create-virtual-nodes-rels.adoc[leveloffset=1]
Expand Down Expand Up @@ -202,6 +204,10 @@ include::atomic.adoc[leveloffset=1]

include::bolt.adoc[leveloffset=1]

== Cypher init script

include::cypher_init.adoc[leveloffset=1]

== Appendix: Complete Overview

include::overview.adoc[tags=overview,leveloffset=1]
Expand Down
8 changes: 4 additions & 4 deletions docs/loadcsv.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ See the following usage-examples for the procedures.

== Import CSV

CSV files with headers that confirm the https://neo4j.com/docs/operations-manual/current/tools/import/file-header-format/[Neo4j import tool's header format] can be imported using the `apoc.import.csv` procedure. This procedure is intended for small- to medium-sized data sets. For bulk importing larger data sets, it is recommended to use the offline import tool.
CSV files that comply with the https://neo4j.com/docs/operations-manual/current/tools/import/file-header-format/[Neo4j import tool's header format] can be imported using the `apoc.import.csv` procedure. This procedure is intended to load small- to medium-sized data sets in an online database. For importing larger data sets, it is recommended to perform a batch import using the (https://neo4j.com/docs/operations-manual/current/tools/import/[import tool], which loads data in bulk to an offline (initially empty) database.

=== Usage

Expand Down Expand Up @@ -188,7 +188,7 @@ Jane

[source,cypher]
----
CALL apoc.import.csv([{filename: 'file:/persons.csv', labels: ['Person']}], [], {})
CALL apoc.import.csv([{fileName: 'file:/persons.csv', labels: ['Person']}], [], {})
----

=== Loading nodes and relationships
Expand All @@ -211,8 +211,8 @@ Given the following CSV files and procedure call, the database loads two `Person
[source,cypher]
----
CALL apoc.import.csv(
[{filename: 'file:/persons.csv', labels: ['Person']}],
[{filename: 'file:/knows.csv', type: 'KNOWS'}],
[{fileName: 'file:/persons.csv', labels: ['Person']}],
[{fileName: 'file:/knows.csv', type: 'KNOWS'}],
{delimiter: '|', arrayDelimiter: ',', stringIds: false}
)
----
Expand Down
44 changes: 38 additions & 6 deletions docs/loadjdbc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ describe products;
.Load the JDBC driver
[source,cypher]
----
cypher CALL apoc.load.driver("com.mysql.jdbc.Driver");
CALL apoc.load.driver("com.mysql.jdbc.Driver");
----

.Count rows in products table
[source,cypher]
----
with "jdbc:mysql://localhost:3306/northwind?user=root" as url
cypher CALL apoc.load.jdbc(url,"products") YIELD row
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN count(*);
----

Expand All @@ -111,7 +111,7 @@ RETURN count(*);
[source,cypher]
----
with "jdbc:mysql://localhost:3306/northwind?user=root" as url
cypher CALL apoc.load.jdbc(url,"products") YIELD row
CALL apoc.load.jdbc(url,"products") YIELD row
RETURN row limit 1;
----

Expand All @@ -133,7 +133,7 @@ image::{img}/apoc-load-jdbc.jpg[width=800]

----
with "select firstname, lastname from employees where firstname like ? and lastname like ?" as sql
cypher call apoc.load.jdbcParams("northwind", sql, ['F%', '%w']) yield row
call apoc.load.jdbcParams("northwind", sql, ['F%', '%w']) yield row
return row
----

Expand Down Expand Up @@ -428,7 +428,7 @@ There are a number of blog posts / examples that details usage of apoc.load.jdbc

== LOAD JDBC - UPDATE

The jdbcUpdate is use for update relational database, from a SQL kernelTransaction with optional parameters
The jdbcUpdate is use for update relational database, from a SQL statement with optional parameters

[source,cypher]
----
Expand Down Expand Up @@ -457,4 +457,36 @@ You can call the procedure without param:
[source,cypher]
----
CALL apoc.load.jdbcUpdate('jdbc:mysql:....','INSERT INTO RECOMMENDATIONS values(user.id, reco.id, score)') YIELD row;
----
----

=== Load JDBC format date

Starting from Neo4j 3.4 there is the support for https://neo4j.com/docs/developer-manual/current/cypher/syntax/temporal/[Temporal Values]

If the returning JdbcType, from the load operation, is TIMESTAMP or TIMESTAMP_WITH_TIMEZONE you could provide the configuration parameter **timezone** with type https://docs.oracle.com/javase/8/docs/api/java/time/ZoneId.html[java.time.ZoneId]

[source, cypher]
----
CALL apoc.load.jdbc('key or url','table or statement', config) YIELD row
----

=== Config

Config param is optional, the default value is an empty map.

[cols="3m,2"]
|===
|timezone| default value: null
|===

Example:

.with timezone
[source, cypher]
----
CALL apoc.load.jdbc('jdbc:derby:derbyDB','SELECT * FROM PERSON WHERE NAME = ?',['John'], {timezone: "Asia/Tokyo"})
----

----
2018-10-31T01:32:25.012+09:00[Asia/Tokyo]
----
16 changes: 16 additions & 0 deletions docs/loadjson.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ If used, this path is applied to the json and can be used to extract sub-documen

There is also a direct `apoc.json.path(json,path)` function.

To simplify the JSON URL syntax, you can configure aliases in `conf/neo4j.conf`:

----
apoc.json.myJson.url=https://api.stackexchange.com/2.2/questions?pagesize=100&order=desc&sort=creation&tagged=neo4j&site=stackoverflow&filter=!5-i6Zw8Y)4W7vpy91PMYsKM-k9yzEsSC1_Uxlf
----

----
CALL apoc.load.json('https://api.stackexchange.com/2.2/questions?pagesize=100&order=desc&sort=creation&tagged=neo4j&site=stackoverflow&filter=!5-i6Zw8Y)4W7vpy91PMYsKM-k9yzEsSC1_Uxlf')
becomes
CALL apoc.load.json('myJson')
----

The 3rd value in the `apoc.json.<alias>.url=` effectively defines an alias to be used in `apoc.load.json('<alias>',....`

== Load JSON StackOverflow Example

There have been articles before about http://neo4j.com/blog/cypher-load-json-from-url/[loading JSON from Web-APIs like StackOverflow].
Expand Down
70 changes: 70 additions & 0 deletions docs/neighbors.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
== Node Neighbors

You can find the distinct connected nodes "n" levels or away quickly with these following procedures.

You can use '>' or '<' for all outgoing or incoming relationships, or specify the types you are interested in.

[cols="1m,5"]
|===
| apoc.neighbors.tohop(node, rel-direction-pattern, distance) | returns distinct nodes of the given relationships in the pattern up to a certain distance
| apoc.neighbors.tohop.count(node, rel-direction-pattern, distance) | returns the count of distinct nodes of the given relationships in the pattern up to a certain distance
| apoc.neighbors.byhop(node, rel-direction-pattern, distance) | returns distinct nodes of the given relationships in the pattern grouped by distance
| apoc.neighbors.byhop.count(node, rel-direction-pattern, distance) | returns the count distinct nodes of the given relationships in the pattern grouped by distance
| apoc.neighbors.athop(node, rel-direction-pattern, distance) | returns distinct nodes of the given relationships in the pattern at a certain distance
| apoc.neighbors.athop.count(node, rel-direction-pattern, distance) | returns the count of distinct nodes of the given relationships in the pattern at a certain distance
|===




=== Example

.Graph Setup
[source,cypher]
----
CREATE (a:First), (b:Neighbor), (c:Neighbor), (d:Neighbor),
(a)-[:KNOWS]->(b), (b)-[:KNOWS]->(a),
(b)-[:KNOWS]->(c), (c)-[:KNOWS]->(d)
----

[source,cypher]
----
MATCH (n:First) WITH n
CALL apoc.neighbors.tohop(n,'KNOWS>', 3) YIELD node AS neighbor
RETURN neighbor
----

[source,cypher]
----
MATCH (n:First) WITH n
CALL apoc.neighbors.tohop.count(n,'KNOWS>', 3) YIELD value AS number
RETURN number
----

[source,cypher]
----
MATCH (n:First) WITH n
CALL apoc.neighbors.byhop(n,'KNOWS>', 3) YIELD nodes AS neighbors
RETURN neighbors
----

[source,cypher]
----
MATCH (n:First) WITH n
CALL apoc.neighbors.byhop.count(n,'KNOWS>', 3) YIELD value AS numbers
RETURN numbers
----

[source,cypher]
----
MATCH (n:First) WITH n
CALL apoc.neighbors.athop(n,'KNOWS>', 3) YIELD nodes AS neighbors
RETURN neighbors
----

[source,cypher]
----
MATCH (n:First) WITH n
CALL apoc.neighbors.athop.count(n,'KNOWS>', 3) YIELD value AS numbers
RETURN numbers
----
23 changes: 20 additions & 3 deletions docs/overview.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ NOTE: Please note that there are (case-sensitive) http://neo4j.com/docs/develope
| apoc.index.addNode(node,['prop1',...]) | add node to an index for each label it has
| apoc.index.addNodeByLabel('Label',node,['prop1',...]) | add node to an index for the given label
| apoc.index.addNodeByName('name',node,['prop1',...]) | add node to an index for the given name
| apoc.index.addNodeMap(node,{key:value}) | add node to an index for each label it has with the given attributes which can also be computed
| apoc.index.addNodeMap(node,{key:value}) | add node to an index for each label it has with the given attributes which can also be computed
| apoc.index.addNodeMapByName(index, node,{key:value}) | add node to an index for each label it has with the given attributes which can also be computed
| apoc.index.addRelationship(rel,['prop1',...]) | add relationship to an index for its type
| apoc.index.addRelationshipByName('name',rel,['prop1',...]) | add relationship to an index for the given name
Expand Down Expand Up @@ -238,6 +238,9 @@ CALL apoc.export.csv.all(null, {stream:true,batchSize:100}) YIELD data RETURN da

// end::export.csv[]

=== Export to Json File
include::exportJson.adoc[leveloffset=1]

=== Export to Cypher Script

include::exportCypher.adoc[leveloffset=1]
Expand Down Expand Up @@ -660,6 +663,7 @@ Enable `apoc.trigger.enabled=true` in `$NEO4J_HOME/config/neo4j.conf` first.
|===
| CALL apoc.trigger.add(name, statement, selector) yield name, statement, installed | add a trigger statement under a name, in the statement you can use {createdNodes}, {deletedNodes} etc., the selector is {phase:'before/after/rollback'} returns previous and new trigger information
| CALL apoc.trigger.remove(name) yield name, statement, installed | remove previously added trigger, returns trigger information
| CALL apoc.trigger.removeAll() yield name, statement, installed | removes all previously added triggers , returns trigger information
| CALL apoc.trigger.list() yield name, statement, installed | update and list all installed triggers
| CALL apoc.trigger.pause(name) | it pauses the trigger
| CALL apoc.trigger.resume(name) | it resumes the paused trigger
Expand Down Expand Up @@ -854,6 +858,7 @@ Sometimes type information gets lost, these functions help you to coerce an "Any
| apoc.coll.duplicatesWithCount(coll) | returns a list of duplicate items in the collection and their count, keyed by `item` and `count` (e.g., `[{item: xyz, count:2}, {item:zyx, count:5}]`)
| apoc.coll.occurrences(coll, item) | returns the count of the given item in the collection
| apoc.coll.frequencies(coll) | returns a list of frequencies of the items in the collection, keyed by `item` and `count` (e.g., `[{item: xyz, count:2}, {item:zyx, count:5}, {item:abc, count:1}]`)
| apoc.coll.frequenciesAsMap(coll) | return a map of frequencies of the items in the collection, keyed by `item` and `count` (e.g., `{1: 2, 3: 2}`)
| apoc.coll.sortMulti | sort list of maps by several sort fields (ascending with ^ prefix) and optionally applies limit and skip
| apoc.coll.flatten | flattens a nested list
| apoc.coll.combinations(coll, minSelect, maxSelect:minSelect) | Returns collection of all combinations of list elements of selection size between minSelect and maxSelect (default:minSelect), inclusive
Expand Down Expand Up @@ -900,6 +905,18 @@ Sometimes type information gets lost, these functions help you to coerce an "Any

Example: `'FRIEND|MENTORS>|<REPORTS_TO'` will match to :FRIEND relationships in either direction, outgoing :MENTORS relationships, and incoming :REPORTS_TO relationships.

=== Neighbor Functions

[cols="1m,5"]
|===
| apoc.neighbors.tohop(node, rel-direction-pattern, distance) | returns distinct nodes of the given relationships in the pattern up to a certain distance
| apoc.neighbors.tohop.count(node, rel-direction-pattern, distance) | returns the count of distinct nodes of the given relationships in the pattern up to a certain distance
| apoc.neighbors.byhop(node, rel-direction-pattern, distance) | returns distinct nodes of the given relationships in the pattern grouped by distance
| apoc.neighbors.byhop.count(node, rel-direction-pattern, distance) | returns the count distinct nodes of the given relationships in the pattern grouped by distance
| apoc.neighbors.athop(node, rel-direction-pattern, distance) | returns distinct nodes of the given relationships in the pattern at a certain distance
| apoc.neighbors.athop.count(node, rel-direction-pattern, distance) | returns the count of distinct nodes of the given relationships in the pattern at a certain distance
|===


=== Math Functions

Expand Down Expand Up @@ -927,7 +944,7 @@ Example: `'FRIEND|MENTORS>|<REPORTS_TO'` will match to :FRIEND relationships in
| apoc.text.replace(text, regex, replacement)| replace each substring of the given string that matches the given regular expression with the given replacement.
| apoc.text.regexGroups(text, regex) | returns an array containing a nested array for each match. The inner array contains all match groups.
| apoc.text.join(['text1','text2',...], delimiter) | join the given strings with the given delimiter.
| apoc.text.format(text,[params]) | sprintf format the string with the params given
| apoc.text.format(text,[params],language) | sprintf format the string with the params given, and optional param language (default value is 'en').
| apoc.text.lpad(text,count,delim) | left pad the string to the given width
| apoc.text.rpad(text,count,delim) | right pad the string to the given width
| apoc.text.random(length, [valid]) | returns a random string to the specified length
Expand Down Expand Up @@ -1356,4 +1373,4 @@ CALL apoc.algo.dijkstra(from, to, 'ROAD', 'd') yield path as path, weight as wei
RETURN path, weight
----

// end::overview[]
// end::overview[]
18 changes: 18 additions & 0 deletions docs/text.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ The clean functionality can be useful for cleaning up slightly dirty text data w

Cleaning will strip the string of all non-alphanumeric characters (including spaces) and convert it to lower case.

== Formatting Text

Format the string with the params given, and optional param language.

.without language param ('en' default)

[source,cypher]
----
RETURN apoc.text.format('ab%s %d %.1f %s%n',['cd',42,3.14,true]) AS value // abcd 42 3.1 true
----

.with language param

[source,cypher]
----
RETURN apoc.text.format('ab%s %d %.1f %s%n',['cd',42,3.14,true],'it') AS value // abcd 42 3,1 true
----

== String Search

The `indexOf` function, provides the fist occurrence of the given `lookup` string within the `text`, or -1 if not found.
Expand Down
1 change: 1 addition & 0 deletions docs/trigger.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Enable `apoc.trigger.enabled=true` in `$NEO4J_HOME/config/neo4j.conf` first.
|===
| CALL apoc.trigger.add(name, statement, selector) yield name, statement, installed | add a trigger statement under a name, in the statement you can use {createdNodes}, {deletedNodes} etc., the selector is {phase:'before/after/rollback'} returns previous and new trigger information
| CALL apoc.trigger.remove(name) yield name, statement, installed | remove previously added trigger, returns trigger information
| CALL apoc.trigger.removeAll() yield name, statement, installed | removes all previously added triggers , returns trigger information
| CALL apoc.trigger.list() yield name, statement, installed | update and list all installed triggers
| CALL apoc.trigger.pause(name) | it pauses the trigger
| CALL apoc.trigger.resume(name) | it resumes the paused trigger
Expand Down
9 changes: 9 additions & 0 deletions docs/ttl.adoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
== TimeToLive (TTL) - Expire Nodes


Some nodes are not meant to live forever.
That's why with APOC you can specify a time by when they are removed from the database, by utilizing a schema index and an additional label.
A few convenience procedures help with that.

++++
<iframe width="560" height="315" src="https://www.youtube.com/embed/e9aoQ9xOmoU" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>
++++

Enable cleanup of expired nodes in `neo4j.conf` with `apoc.ttl.enabled=true`

30s after startup an index is created:
Expand Down
7 changes: 4 additions & 3 deletions readme.adoc
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
:readme:
:branch: 3.4
:docs: https://neo4j-contrib.github.io/neo4j-apoc-procedures/index33.html
:apoc-release: 3.4.0.3
:neo4j-version: 3.4.7
:apoc-release: 3.4.0.4
:neo4j-version: 3.4.10
:img: https://raw.githubusercontent.com/neo4j-contrib/neo4j-apoc-procedures/{branch}/docs/img

= Awesome Procedures for Neo4j {branch}.x
Expand Down Expand Up @@ -177,7 +177,8 @@ The trailing `<apoc>` part of the version number will be incremented with every
[options=headers]
|===
|apoc version | neo4j version
| http://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/3.4.0.2[3.4.0.2^] | 3.4.5 (3.4.x)
| http://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/3.4.0.3[3.4.0.3^] | 3.4.7 (3.4.x)
| http://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/3.4.0.2[3.4.0.2^] | 3.4.5
| http://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/3.4.0.1[3.4.0.1^] | 3.4.0
| http://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/3.3.0.4[3.3.0.4^] | 3.3.6 (3.3.x)
| http://github.com/neo4j-contrib/neo4j-apoc-procedures/releases/3.3.0.3[3.3.0.3^] | 3.3.5
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/apoc/ApocConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;

/**
Expand All @@ -27,10 +28,16 @@ public class ApocConfiguration {

public static void initialize(GraphDatabaseAPI db) {
Static.clear();
Map<String, String> params = db.getDependencyResolver().resolveDependency(Config.class).getRaw();
Config neo4jConfig = db.getDependencyResolver().resolveDependency(Config.class);
Map<String, String> params = neo4jConfig.getRaw();
apocConfig.clear();
apocConfig.putAll(Util.subMap(params, PREFIX));
PARAM_WHITELIST.forEach((k, v) -> apocConfig.put(v, params.get(k)) );
PARAM_WHITELIST.forEach((k, v) -> {
Optional<Object> configValue = neo4jConfig.getValue(k);
if (configValue.isPresent()) {
apocConfig.put(v, configValue.get().toString());
}
});
config.clear();
params.forEach((k, v) -> { if (!SKIP.matcher(k).find()) {config.put(k, v);} });
}
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/apoc/ApocKernelExtensionFactory.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package apoc;

import apoc.custom.CypherProcedures;
import apoc.cypher.CypherInitializer;
import apoc.index.IndexUpdateTransactionEventHandler;
import apoc.trigger.Trigger;
import apoc.ttl.TTLLifeCycle;
Expand All @@ -25,7 +26,12 @@
public class ApocKernelExtensionFactory extends KernelExtensionFactory<ApocKernelExtensionFactory.Dependencies>{

static {
URL.setURLStreamHandlerFactory(new ApocUrlStreamHandlerFactory());
try {
URL.setURLStreamHandlerFactory(new ApocUrlStreamHandlerFactory());
} catch (Error e) {
System.err.println("APOC couln't set a URLStreamHandlerFactory since some other tool already did this (e.g. tomcat). This means you cannot use s3:// or hdfs:// style URLs in APOC. This is a known issue tracked at https://github.com/neo4j-contrib/neo4j-apoc-procedures/issues/778. Full stacktrace below: ");
e.printStackTrace();
}
}
public ApocKernelExtensionFactory() {
super("APOC");
Expand Down Expand Up @@ -83,7 +89,9 @@ public void start() throws Throwable {
indexUpdateLifeCycle.start();

customProcedureStorage = new CypherProcedures.CustomProcedureStorage(db, log.getUserLog(CypherProcedures.class));
dependencies.availabilityGuard().addListener(customProcedureStorage);
AvailabilityGuard availabilityGuard = dependencies.availabilityGuard();
availabilityGuard.addListener(customProcedureStorage);
availabilityGuard.addListener(new CypherInitializer(db, log.getUserLog(CypherInitializer.class)));
}

public void registerCustomProcedures() {
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/apoc/coll/Coll.java
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,13 @@ public List<Map<String, Object>> frequencies(@Name("coll") List<Object> coll) {
return resultList;
}

@UserFunction
@Description("apoc.coll.frequenciesAsMap(coll) - return a map of frequencies of the items in the collection, key `item`, value `count` (e.g., `{1:2, 2:1}`)")
public Map<String, Object> frequenciesAsMap(@Name("coll") List<Object> coll) {

return frequencies(coll).stream().collect(Collectors.toMap(t -> t.get("item").toString(), v-> v.get("count")));
}

@UserFunction
@Description("apoc.coll.occurrences(coll, item) - returns the count of the given item in the collection")
public long occurrences(@Name("coll") List<Object> coll, @Name("item") Object item) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/apoc/couchbase/CouchbaseConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ protected CouchbaseConnection(List<String> nodes, PasswordAuthenticator authenti
* @return
*/
protected int getMajorVersion() {
return this.cluster.authenticate(this.passwordAuthenticator).clusterManager().info().getMinVersion().major();
return this.cluster.authenticate(this.passwordAuthenticator).clusterManager().info(5,TimeUnit.SECONDS).getMinVersion().major();
}

/**
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/apoc/couchbase/CouchbaseManager.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package apoc.couchbase;

import apoc.ApocConfiguration;
import com.couchbase.client.core.retry.FailFastRetryStrategy;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseAsyncCluster;
Expand All @@ -27,7 +29,7 @@
*/
public class CouchbaseManager {

public static final DefaultCouchbaseEnvironment DEFAULT_COUCHBASE_ENVIRONMENT = DefaultCouchbaseEnvironment.create();
public static final DefaultCouchbaseEnvironment DEFAULT_COUCHBASE_ENVIRONMENT = DefaultCouchbaseEnvironment.builder().retryStrategy(FailFastRetryStrategy.INSTANCE).build();

protected static final String COUCHBASE_CONFIG_KEY = "couchbase.";

Expand Down
88 changes: 57 additions & 31 deletions src/main/java/apoc/custom/CypherProcedures.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
import org.neo4j.internal.kernel.api.exceptions.ProcedureException;
import org.neo4j.internal.kernel.api.procs.*;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.api.KernelTransaction;
import org.neo4j.kernel.api.ResourceTracker;
import org.neo4j.kernel.api.proc.CallableProcedure;
import org.neo4j.kernel.api.proc.CallableUserFunction;
import org.neo4j.kernel.api.proc.Key;
import org.neo4j.kernel.impl.core.EmbeddedProxySPI;
import org.neo4j.kernel.impl.core.GraphProperties;
import org.neo4j.kernel.impl.core.GraphPropertiesProxy;
import org.neo4j.kernel.impl.factory.GraphDatabaseFacade;
import org.neo4j.kernel.impl.proc.Procedures;
import org.neo4j.kernel.impl.util.DefaultValueMapper;
Expand Down Expand Up @@ -46,6 +49,8 @@ public class CypherProcedures {
@Context
public GraphDatabaseAPI api;
@Context
public KernelTransaction ktx;
@Context
public Log log;

/*
Expand All @@ -60,11 +65,23 @@ public void asProcedure(@Name("name") String name, @Name("statement") String sta
@Name(value= "outputs", defaultValue = "null") List<List<String>> outputs,
@Name(value= "inputs", defaultValue = "null") List<List<String>> inputs
) throws ProcedureException {
debug(name,"before", ktx);

CustomStatementRegistry registry = new CustomStatementRegistry(api, log);
if (!registry.registerProcedure(name, statement, mode, outputs, inputs)) {
throw new IllegalStateException("Error registering procedure "+name+", see log.");
}
CustomProcedureStorage.storeProcedure(name, statement, mode, outputs, inputs);
CustomProcedureStorage.storeProcedure(api, name, statement, mode, outputs, inputs);
debug(name, "after", ktx);
}

public static void debug(@Name("name") String name, String msg, KernelTransaction ktx) {
try {
org.neo4j.internal.kernel.api.Procedures procedures = ktx.procedures();
// ProcedureHandle procedureHandle = procedures.procedureGet(CustomStatementRegistry.qualifiedName(name));
// if (procedureHandle != null) System.out.printf("%s name: %s id %d%n", msg, procedureHandle.signature().name().toString(), procedureHandle.id());
} catch (Exception e) {
}
}

@Procedure(value = "apoc.custom.asFunction",mode = Mode.WRITE)
Expand All @@ -76,7 +93,7 @@ public void asFunction(@Name("name") String name, @Name("statement") String stat
if (!registry.registerFunction(name, statement, output, inputs, forceSingle)) {
throw new IllegalStateException("Error registering function "+name+", see log.");
}
CustomProcedureStorage.storeFunction(name, statement, output, inputs, forceSingle);
CustomProcedureStorage.storeFunction(api, name, statement, output, inputs, forceSingle);
}

static class CustomStatementRegistry {
Expand All @@ -93,13 +110,15 @@ public CustomStatementRegistry(GraphDatabaseAPI api, Log log) {
public boolean registerProcedure(@Name("name") String name, @Name("statement") String statement, @Name(value = "mode", defaultValue = "read") String mode, @Name(value = "outputs", defaultValue = "null") List<List<String>> outputs, @Name(value = "inputs", defaultValue = "null") List<List<String>> inputs) {
try {
Procedures procedures = api.getDependencyResolver().resolveDependency(Procedures.class);
boolean admin = false; // TODO
ProcedureSignature signature = new ProcedureSignature(qualifiedName(name), inputSignatures(inputs), outputSignatures(outputs),
Mode.valueOf(mode.toUpperCase()), null, new String[0], null, null, false, true
);

procedures.register(new CallableProcedure.BasicProcedure(signature) {
@Override
public RawIterator<Object[], ProcedureException> apply(org.neo4j.kernel.api.proc.Context ctx, Object[] input, ResourceTracker resourceTracker) throws ProcedureException {
KernelTransaction ktx = ctx.get(Key.key("KernelTransaction", KernelTransaction.class));
debug(name, "inside", ktx);
Map<String, Object> params = params(input, inputs);
Result result = api.execute(statement, params);
resourceTracker.registerCloseableResource(result);
Expand Down Expand Up @@ -166,7 +185,7 @@ public AnyValue apply(org.neo4j.kernel.api.proc.Context ctx, AnyValue[] input) t
}


public QualifiedName qualifiedName(@Name("name") String name) {
public static QualifiedName qualifiedName(@Name("name") String name) {
String[] names = name.split("\\.");
List<String> namespace = new ArrayList<>(names.length);
namespace.add(PREFIX);
Expand Down Expand Up @@ -290,7 +309,7 @@ public Map<String, Object> functionParams(Object[] input, @Name(value = "inputs"

public static class CustomProcedureStorage implements AvailabilityGuard.AvailabilityListener {
public static final String APOC_CUSTOM = "apoc.custom";
private static GraphProperties properties;
private GraphProperties properties;
private final GraphDatabaseAPI api;
private final Log log;

Expand All @@ -301,47 +320,50 @@ public CustomProcedureStorage(GraphDatabaseAPI api, Log log) {

@Override
public void available() {
properties = api.getDependencyResolver().resolveDependency(EmbeddedProxySPI.class).newGraphPropertiesProxy();
properties = getProperties(api);
restoreProcedures();
}

public static GraphPropertiesProxy getProperties(GraphDatabaseAPI api) {
return api.getDependencyResolver().resolveDependency(EmbeddedProxySPI.class).newGraphPropertiesProxy();
}

private void restoreProcedures() {
try (Transaction tx = properties.getGraphDatabase().beginTx()) {
CustomStatementRegistry registry = new CustomStatementRegistry(api, log);
Map<String, Map<String,Map<String, Object>>> stored = readData();
stored.get(FUNCTIONS).forEach((name, data) -> {
registry.registerFunction(name, (String) data.get("statement"), (String) data.get("output"),
(List<List<String>>) data.get("inputs"), (Boolean)data.get("forceSingle"));
});
stored.get(PROCEDURES).forEach((name, data) -> {
registry.registerProcedure(name, (String) data.get("statement"), (String) data.get("mode"),
(List<List<String>>) data.get("outputs"), (List<List<String>>) data.get("inputs"));
});
}
CustomStatementRegistry registry = new CustomStatementRegistry(api, log);
Map<String, Map<String,Map<String, Object>>> stored = readData(properties);
stored.get(FUNCTIONS).forEach((name, data) -> {
registry.registerFunction(name, (String) data.get("statement"), (String) data.get("output"),
(List<List<String>>) data.get("inputs"), (Boolean)data.get("forceSingle"));
});
stored.get(PROCEDURES).forEach((name, data) -> {
registry.registerProcedure(name, (String) data.get("statement"), (String) data.get("mode"),
(List<List<String>>) data.get("outputs"), (List<List<String>>) data.get("inputs"));
});
}

@Override
public void unavailable() {
properties = null;
}

public static Map<String, Object> storeProcedure(String name, String statement, String mode, List<List<String>> outputs, List<List<String>> inputs) {
public static Map<String, Object> storeProcedure(GraphDatabaseAPI api, String name, String statement, String mode, List<List<String>> outputs, List<List<String>> inputs) {

Map<String, Object> data = map("statement", statement, "mode", mode, "inputs", inputs, "outputs", outputs);
return updateCustomData(name, PROCEDURES, data);
return updateCustomData(getProperties(api), name, PROCEDURES, data);
}
public static Map<String, Object> storeFunction(String name, String statement, String output, List<List<String>> inputs, boolean forceSingle) {
public static Map<String, Object> storeFunction(GraphDatabaseAPI api, String name, String statement, String output, List<List<String>> inputs, boolean forceSingle) {
Map<String, Object> data = map("statement", statement, "forceSingle", forceSingle, "inputs", inputs, "output", output);
return updateCustomData(name, FUNCTIONS, data);
return updateCustomData(getProperties(api), name, FUNCTIONS, data);
}

public synchronized static Map<String, Object> remove(String name, String type) {
return updateCustomData(name, type,null);
public synchronized static Map<String, Object> remove(GraphDatabaseAPI api, String name, String type) {
return updateCustomData(getProperties(api),name, type,null);
}

private synchronized static Map<String, Object> updateCustomData(String name, String type, Map<String, Object> value) {
private synchronized static Map<String, Object> updateCustomData(GraphProperties properties, String name, String type, Map<String, Object> value) {
if (name == null || type==null) return null;
try (Transaction tx = properties.getGraphDatabase().beginTx()) {
Map<String, Map<String, Map<String, Object>>> data = readData();
Map<String, Map<String, Map<String, Object>>> data = readData(properties);
Map<String, Map<String, Object>> procData = data.get(type);
Map<String, Object> previous = (value == null) ? procData.remove(name) : procData.put(name, value);
if (value != null || previous != null) {
Expand All @@ -352,14 +374,18 @@ private synchronized static Map<String, Object> updateCustomData(String name, St
}
}

private static Map<String, Map<String,Map<String, Object>>> readData() {
String procedurePropertyData = (String) properties.getProperty(APOC_CUSTOM, "{\"functions\":{},\"procedures\":{}}");
return Util.fromJson(procedurePropertyData, Map.class);
private static Map<String, Map<String,Map<String, Object>>> readData(GraphProperties properties) {
try (Transaction tx = properties.getGraphDatabase().beginTx()) {
String procedurePropertyData = (String) properties.getProperty(APOC_CUSTOM, "{\"functions\":{},\"procedures\":{}}");
Map result = Util.fromJson(procedurePropertyData, Map.class);
tx.success();
return result;
}
}

public static Map<String, Map<String, Map<String, Object>>> list() {
public Map<String, Map<String, Map<String, Object>>> list() {
try (Transaction tx = properties.getGraphDatabase().beginTx()) {
return readData();
return readData(properties);
}
}
}
Expand Down
36 changes: 18 additions & 18 deletions src/main/java/apoc/cypher/Cypher.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ public Stream<MapResult> run(@Name("cypher") String statement, @Name("params") M
}

@Procedure(mode = WRITE)
@Description("apoc.cypher.runFile(file or url,[{statistics:true,timeout:10,parameters:{}}]) - runs each kernelTransaction in the file, all semicolon separated - currently no schema operations")
@Description("apoc.cypher.runFile(file or url,[{statistics:true,timeout:10,parameters:{}}]) - runs each statement in the file, all semicolon separated - currently no schema operations")
public Stream<RowResult> runFile(@Name("file") String fileName, @Name(value = "config",defaultValue = "{}") Map<String,Object> config) {
return runFiles(singletonList(fileName),config);
}

@Procedure(mode = WRITE)
@Description("apoc.cypher.runFiles([files or urls],[{statistics:true,timeout:10,parameters:{}}])) - runs each kernelTransaction in the files, all semicolon separated")
@Description("apoc.cypher.runFiles([files or urls],[{statistics:true,timeout:10,parameters:{}}])) - runs each statement in the files, all semicolon separated")
public Stream<RowResult> runFiles(@Name("file") List<String> fileNames, @Name(value = "config",defaultValue = "{}") Map<String,Object> config) {
boolean addStatistics = Util.toBoolean(config.getOrDefault("statistics",true));
int timeout = Util.toInteger(config.getOrDefault("timeout",10));
Expand All @@ -85,13 +85,13 @@ public Stream<RowResult> runFiles(@Name("file") List<String> fileNames, @Name(va
}

@Procedure(mode=Mode.SCHEMA)
@Description("apoc.cypher.runSchemaFile(file or url,[{statistics:true,timeout:10}]) - allows only schema operations, runs each schema kernelTransaction in the file, all semicolon separated")
@Description("apoc.cypher.runSchemaFile(file or url,[{statistics:true,timeout:10}]) - allows only schema operations, runs each schema statement in the file, all semicolon separated")
public Stream<RowResult> runSchemaFile(@Name("file") String fileName, @Name(value = "config",defaultValue = "{}") Map<String,Object> config) {
return runSchemaFiles(singletonList(fileName),config);
}

@Procedure(mode=Mode.SCHEMA)
@Description("apoc.cypher.runSchemaFiles([files or urls],{statistics:true,timeout:10}) - allows only schema operations, runs each schema kernelTransaction in the files, all semicolon separated")
@Description("apoc.cypher.runSchemaFiles([files or urls],{statistics:true,timeout:10}) - allows only schema operations, runs each schema statement in the files, all semicolon separated")
public Stream<RowResult> runSchemaFiles(@Name("file") List<String> fileNames, @Name(value = "config",defaultValue = "{}") Map<String,Object> config) {
boolean addStatistics = Util.toBoolean(config.getOrDefault("statistics",true));
int timeout = Util.toInteger(config.getOrDefault("timeout",10));
Expand All @@ -107,44 +107,44 @@ private Stream<RowResult> runManyStatements(Reader reader, Map<String, Object> p
BlockingQueue<RowResult> queue = new ArrayBlockingQueue<>(100);
Util.inThread(() -> {
if (schemaOperation) {
runSchemaStatementsInTx(reader, queue, params, addStatistics);
runSchemaStatementsInTx(reader, queue, params, addStatistics,timeout);
} else {
runDataStatementsInTx(reader, queue, params, addStatistics);
runDataStatementsInTx(reader, queue, params, addStatistics,timeout);
}
queue.put(RowResult.TOMBSTONE);
return null;
});
return StreamSupport.stream(new QueueBasedSpliterator<>(queue, RowResult.TOMBSTONE, terminationGuard, timeout), false);
}

private void runDataStatementsInTx(Reader reader, BlockingQueue<RowResult> queue, Map<String, Object> params, boolean addStatistics) {
private void runDataStatementsInTx(Reader reader, BlockingQueue<RowResult> queue, Map<String, Object> params, boolean addStatistics, long timeout) {
Scanner scanner = new Scanner(reader);
scanner.useDelimiter(";\r?\n");
while (scanner.hasNext()) {
String stmt = removeShellControlCommands(scanner.next());
if (stmt.trim().isEmpty()) continue;
if (!isSchemaOperation(stmt)) {
if (isPeriodicOperation(stmt))
Util.inThread(() -> executeStatement(queue, stmt, params, addStatistics));
else Util.inTx(db, () -> executeStatement(queue, stmt, params, addStatistics));
Util.inThread(() -> executeStatement(queue, stmt, params, addStatistics,timeout));
else Util.inTx(db, () -> executeStatement(queue, stmt, params, addStatistics,timeout));
}
}
}

private void runSchemaStatementsInTx(Reader reader, BlockingQueue<RowResult> queue, Map<String, Object> params, boolean addStatistics) {
private void runSchemaStatementsInTx(Reader reader, BlockingQueue<RowResult> queue, Map<String, Object> params, boolean addStatistics, long timeout) {
Scanner scanner = new Scanner(reader);
scanner.useDelimiter(";\r?\n");
while (scanner.hasNext()) {
String stmt = removeShellControlCommands(scanner.next());
if (stmt.trim().isEmpty()) continue;
if (isSchemaOperation(stmt)) {
Util.inTx(db, () -> executeStatement(queue, stmt, params, addStatistics));
Util.inTx(db, () -> executeStatement(queue, stmt, params, addStatistics, timeout));
}
}
}

@Procedure(mode = WRITE)
@Description("apoc.cypher.runMany('cypher;\\nstatements;',{params},[{statistics:true,timeout:10}]) - runs each semicolon separated kernelTransaction and returns summary - currently no schema operations")
@Description("apoc.cypher.runMany('cypher;\\nstatements;',{params},[{statistics:true,timeout:10}]) - runs each semicolon separated statement and returns summary - currently no schema operations")
public Stream<RowResult> runMany(@Name("cypher") String cypher, @Name("params") Map<String,Object> params, @Name(value = "config",defaultValue = "{}") Map<String,Object> config) {
boolean addStatistics = Util.toBoolean(config.getOrDefault("statistics",true));
int timeout = Util.toInteger(config.getOrDefault("timeout",1));
Expand All @@ -154,7 +154,7 @@ public Stream<RowResult> runMany(@Name("cypher") String cypher, @Name("params")

private final static Pattern shellControl = Pattern.compile("^:?\\b(begin|commit|rollback)\\b", Pattern.CASE_INSENSITIVE);

private Object executeStatement(BlockingQueue<RowResult> queue, String stmt, Map<String, Object> params, boolean addStatistics) throws InterruptedException {
private Object executeStatement(BlockingQueue<RowResult> queue, String stmt, Map<String, Object> params, boolean addStatistics, long timeout) throws InterruptedException {
try (Result result = db.execute(stmt,params)) {
long time = System.currentTimeMillis();
int row = 0;
Expand All @@ -163,7 +163,7 @@ private Object executeStatement(BlockingQueue<RowResult> queue, String stmt, Map
queue.put(new RowResult(row++, result.next()));
}
if (addStatistics) {
queue.offer(new RowResult(-1, toMap(result.getQueryStatistics(), System.currentTimeMillis() - time, row)), 100, TimeUnit.MILLISECONDS);
queue.offer(new RowResult(-1, toMap(result.getQueryStatistics(), System.currentTimeMillis() - time, row)), timeout,TimeUnit.SECONDS);
}
return row;
}
Expand Down Expand Up @@ -257,7 +257,7 @@ public Stream<MapResult> parallel(@Name("fragment") String fragment, @Name("para
Map map = new HashMap<>(params);
map.put(e.getKey(),as)
}));
return db.execute(kernelTransaction,params).stream().map(MapResult::new);
return db.execute(statement,params).stream().map(MapResult::new);
*/
}

Expand All @@ -273,7 +273,7 @@ public Stream<MapResult> mapParallel(@Name("fragment") String fragment, @Name("p
}
@Procedure
@Description("apoc.cypher.mapParallel2(fragment, params, list-to-parallelize) yield value - executes fragment in parallel batches with the list segments being assigned to _")
public Stream<MapResult> mapParallel2(@Name("fragment") String fragment, @Name("params") Map<String, Object> params, @Name("list") List<Object> data, @Name("partitions") long partitions) {
public Stream<MapResult> mapParallel2(@Name("fragment") String fragment, @Name("params") Map<String, Object> params, @Name("list") List<Object> data, @Name("partitions") long partitions,@Name(value = "timeout",defaultValue = "10") long timeout) {
final String statement = withParamsAndIterator(fragment, params.keySet(), "_");
db.execute("EXPLAIN " + statement).close();
BlockingQueue<RowResult> queue = new ArrayBlockingQueue<>(100000);
Expand All @@ -282,13 +282,13 @@ public Stream<MapResult> mapParallel2(@Name("fragment") String fragment, @Name("
long total = parallelPartitions
.map((List<Object> partition) -> {
try {
return executeStatement(queue, statement, parallelParams(params, "_", partition),false);
return executeStatement(queue, statement, parallelParams(params, "_", partition),false,timeout);
} catch (Exception e) {throw new RuntimeException(e);}}
).count();
queue.put(RowResult.TOMBSTONE);
return total;
});
return StreamSupport.stream(new QueueBasedSpliterator<>(queue, RowResult.TOMBSTONE, terminationGuard),true).map((rowResult) -> new MapResult(rowResult.result));
return StreamSupport.stream(new QueueBasedSpliterator<>(queue, RowResult.TOMBSTONE, terminationGuard, timeout),true).map((rowResult) -> new MapResult(rowResult.result));
}

// todo proper Collector
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/apoc/cypher/CypherFunctions.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ public Object runFirstColumn(@Name("cypher") String statement, @Name("params") M
}

@UserFunction
@Description("apoc.cypher.runFirstColumnMany(statement, params, expectMultipleValues) - executes statement with given parameters, returns first column only collected into a list, params are available as identifiers")
@Description("apoc.cypher.runFirstColumnMany(statement, params) - executes statement with given parameters, returns first column only collected into a list, params are available as identifiers")
public List<Object> runFirstColumnMany(@Name("cypher") String statement, @Name("params") Map<String, Object> params) {
return (List)runFirstColumn(statement, params, true);
}
@UserFunction
@Description("apoc.cypher.runFirstColumnSingle(statement, params, expectMultipleValues) - executes statement with given parameters, returns first element of the first column only, params are available as identifiers")
@Description("apoc.cypher.runFirstColumnSingle(statement, params) - executes statement with given parameters, returns first element of the first column only, params are available as identifiers")
public Object runFirstColumnSingle(@Name("cypher") String statement, @Name("params") Map<String, Object> params) {
return runFirstColumn(statement, params, false);
}
Expand Down
40 changes: 40 additions & 0 deletions src/main/java/apoc/cypher/CypherInitializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package apoc.cypher;

import apoc.ApocConfiguration;
import org.neo4j.helpers.collection.MapUtil;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.logging.Log;

import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

public class CypherInitializer implements AvailabilityGuard.AvailabilityListener {
private final GraphDatabaseAPI db;
private final Log userLog;

public CypherInitializer(GraphDatabaseAPI db, Log userLog) {
this.db = db;
this.userLog = userLog;
}

@Override
public void available() {
SortedMap<String, Object> initializers = new TreeMap<>(ApocConfiguration.get("initializer.cypher"));
for (Object initializer: initializers.values()) {
String query = initializer.toString();
try {
db.execute(query);
userLog.info("successfully initialized: " + query);
} catch (Exception e) {
userLog.warn("error upon initialization, running: "+query, e);
}
}
}

@Override
public void unavailable() {
// intentionally empty
}
}
50 changes: 30 additions & 20 deletions src/main/java/apoc/date/Date.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,18 @@ public Map<String,Object> fields(final @Name("date") String date, final @Name(va
@Description("apoc.date.field(12345,('ms|s|m|h|d|month|year'),('TZ')")
public Long field(final @Name("time") Long time, @Name(value = "unit", defaultValue = "d") String unit, @Name(value = "timezone",defaultValue = "UTC") String timezone) {
return (time == null)
? null
: (long) ZonedDateTime
.ofInstant( Instant.ofEpochMilli( time ), ZoneId.of( timezone ) )
.get( chronoField( unit ) );
? null
: (long) ZonedDateTime
.ofInstant( Instant.ofEpochMilli( time ), ZoneId.of( timezone ) )
.get( chronoField( unit ) );
}

@UserFunction
@Description( "apoc.date.currentTimestamp() - returns System.currentTimeMillis()" )
public long currentTimestamp()
{
return System.currentTimeMillis();
}
@Description( "apoc.date.currentTimestamp() - returns System.currentTimeMillis()" )
public long currentTimestamp()
{
return System.currentTimeMillis();
}

public static class FieldResult {
public final Map<String,Object> value = new LinkedHashMap<>();
Expand All @@ -132,21 +132,26 @@ private TimeUnit unit(String unit) {
// case "month":case "months": return TimeUnit.MONTHS;
// case "years":case "year": return TimeUnit.YEARS;
}
return TimeUnit.MILLISECONDS;

throw new IllegalArgumentException("The unit: "+ unit + " is not correct");

//return TimeUnit.MILLISECONDS;
}

private ChronoField chronoField(String unit) {
switch (unit.toLowerCase()) {
case "ms": case "milli": case "millis": case "milliseconds": return ChronoField.MILLI_OF_SECOND;
case "s": case "second": case "seconds": return ChronoField.SECOND_OF_MINUTE;
case "m": case "minute": case "minutes": return ChronoField.MINUTE_OF_HOUR;
case "h": case "hour": case "hours": return ChronoField.HOUR_OF_DAY;
case "d": case "day": case "days": return ChronoField.DAY_OF_MONTH;
case "w": case "weekday": case "weekdays": return ChronoField.DAY_OF_WEEK;
case "month":case "months": return ChronoField.MONTH_OF_YEAR;
case "year":case "years": return ChronoField.YEAR;
default: return ChronoField.YEAR;
case "ms": case "milli": case "millis": case "milliseconds": return ChronoField.MILLI_OF_SECOND;
case "s": case "second": case "seconds": return ChronoField.SECOND_OF_MINUTE;
case "m": case "minute": case "minutes": return ChronoField.MINUTE_OF_HOUR;
case "h": case "hour": case "hours": return ChronoField.HOUR_OF_DAY;
case "d": case "day": case "days": return ChronoField.DAY_OF_MONTH;
case "w": case "weekday": case "weekdays": return ChronoField.DAY_OF_WEEK;
case "month":case "months": return ChronoField.MONTH_OF_YEAR;
case "year":case "years": return ChronoField.YEAR;
// default: return ChronoField.YEAR;
}

throw new IllegalArgumentException("The unit: "+ unit + " is not correct");
}

@UserFunction
Expand Down Expand Up @@ -199,7 +204,12 @@ public String parse(final @Name("millis") long millis, final @Name(value = "patt

private static DateFormat getFormat(final String pattern, final String timezone) {
String actualPattern = getPattern(pattern);
SimpleDateFormat format = new SimpleDateFormat(actualPattern);
SimpleDateFormat format = null;
try {
format = new SimpleDateFormat(actualPattern);
} catch(Exception e){
throw new IllegalArgumentException("The pattern: "+pattern+" is not correct");
}
if (timezone != null && !"".equals(timezone)) {
format.setTimeZone(TimeZone.getTimeZone(timezone));
} else if (!(containsTimeZonePattern(actualPattern))) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/apoc/export/csv/CsvEntityLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void loadNodes(final String fileName, final List<String> labels, final Gr
)
);

final CSVReader csv = new CSVReader(reader, clc.getDelimiter());
final CSVReader csv = new CSVReader(reader, clc.getDelimiter(), clc.getQuotationCharacter());

final String[] loadCsvCompatibleHeader = fields.stream().map(f -> f.getName()).toArray(String[]::new);
int lineNo = 0;
Expand Down
21 changes: 18 additions & 3 deletions src/main/java/apoc/export/csv/CsvLoaderConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,27 @@ public static Builder builder() {
return new Builder();
}

public static Character getCharacterOrString(Map<String, Object> config, String name) {
Object o = config.get(name);
if (o instanceof String) {
String s = (String) o;
if (s.length() != 1) {
throw new IllegalStateException(name + " must have a length of one.");
}
return s.charAt(0);
}
if (o instanceof Character) {
return (Character) o;
}
return null;
}

public static CsvLoaderConfig from(Map<String, Object> config) {
Builder builder = builder();

if (config.get(DELIMITER) != null) builder.delimiter((char) config.get(DELIMITER));
if (config.get(ARRAY_DELIMITER) != null) builder.arrayDelimiter((char) config.get(ARRAY_DELIMITER));
if (config.get(QUOTATION_CHARACTER) != null) builder.quotationCharacter((char) config.get(QUOTATION_CHARACTER));
if (config.get(DELIMITER) != null) builder.delimiter(getCharacterOrString(config, DELIMITER));
if (config.get(ARRAY_DELIMITER) != null) builder.arrayDelimiter(getCharacterOrString(config, ARRAY_DELIMITER));
if (config.get(QUOTATION_CHARACTER) != null) builder.quotationCharacter(getCharacterOrString(config, QUOTATION_CHARACTER));
if (config.get(STRING_IDS) != null) builder.stringIds((boolean) config.get(STRING_IDS));
if (config.get(SKIP_LINES) != null) builder.skipLines((int) config.get(SKIP_LINES));
if (config.get(BATCH_SIZE) != null) builder.batchSize((int) config.get(BATCH_SIZE));
Expand Down
25 changes: 10 additions & 15 deletions src/main/java/apoc/export/csv/ExportCSV.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import apoc.Description;
import apoc.Pools;
import apoc.export.cypher.ExportCypher;
import apoc.export.util.ExportConfig;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
Expand Down Expand Up @@ -79,12 +78,12 @@ public Stream<ProgressInfo> graph(@Name("graph") Map<String,Object> graph, @Name
}

@Procedure
@Description("apoc.export.csv.query(query,file,{config,...,params:{params}}) - exports results from the cypher kernelTransaction as csv to the provided file")
@Description("apoc.export.csv.query(query,file,{config,...,params:{params}}) - exports results from the cypher statement as csv to the provided file")
public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") String fileName, @Name("config") Map<String, Object> config) throws Exception {
Map<String,Object> params = config == null ? Collections.emptyMap() : (Map<String,Object>)config.getOrDefault("params", Collections.emptyMap());
Result result = db.execute(query,params);

String source = String.format("kernelTransaction: cols(%d)", result.columns().size());
String source = String.format("statement: cols(%d)", result.columns().size());
return exportCsv(fileName, source,result,config);
}

Expand All @@ -97,18 +96,14 @@ private Stream<ProgressInfo> exportCsv(@Name("file") String fileName, String sou
PrintWriter printWriter = getPrintWriter(fileName, null);
CsvFormat exporter = new CsvFormat(db);
if (c.streamStatements()) {
Future<Boolean> future = null;
try {
StringWriter writer = new StringWriter(10_000);
final ArrayBlockingQueue<ProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> queue.offer(pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(writer)));
future = Util.inTxFuture(Pools.DEFAULT, db, () -> { dump(data, c, reporterWithConsumer, writer, exporter); return true; });
QueueBasedSpliterator<ProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard);
return StreamSupport.stream(spliterator, false);
} finally {
Util.waitForFutures(Collections.singletonList(future));
}
long timeout = c.getTimeoutSeconds();
StringWriter writer = new StringWriter(10_000);
final ArrayBlockingQueue<ProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> Util.put(queue, pi == ProgressInfo.EMPTY ? ProgressInfo.EMPTY : new ProgressInfo(pi).drain(writer),timeout));
Util.inTxFuture(Pools.DEFAULT, db, () -> { dump(data, c, reporterWithConsumer, writer, exporter); return true; });
QueueBasedSpliterator<ProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, ProgressInfo.EMPTY, terminationGuard, timeout);
return StreamSupport.stream(spliterator, false);
} else {
dump(data, c, reporter, printWriter, exporter);
return reporter.stream();
Expand Down
24 changes: 11 additions & 13 deletions src/main/java/apoc/export/cypher/ExportCypher.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand Down Expand Up @@ -72,13 +74,13 @@ public Stream<DataProgressInfo> graph(@Name("graph") Map<String, Object> graph,
}

@Procedure
@Description("apoc.export.cypher.query(query,file,config) - exports nodes and relationships from the cypher kernelTransaction incl. indexes as cypher statements to the provided file")
@Description("apoc.export.cypher.query(query,file,config) - exports nodes and relationships from the cypher statement incl. indexes as cypher statements to the provided file")
public Stream<DataProgressInfo> query(@Name("query") String query, @Name(value = "file",defaultValue = "") String fileName, @Name(value = "config",defaultValue = "{}") Map<String, Object> config) throws IOException {
if (Util.isNullOrEmpty(fileName)) fileName=null;
ExportConfig c = new ExportConfig(config);
Result result = db.execute(query);
SubGraph graph = CypherResultSubGraph.from(result, db, c.getRelsInBetween());
String source = String.format("kernelTransaction: nodes(%d), rels(%d)",
String source = String.format("statement: nodes(%d), rels(%d)",
Iterables.count(graph.getNodes()), Iterables.count(graph.getRelationships()));
return exportCypher(fileName, source, graph, c, false);
}
Expand All @@ -101,17 +103,13 @@ private Stream<DataProgressInfo> exportCypher(@Name("file") String fileName, Str
FileManagerFactory.ExportCypherFileManager cypherFileManager = FileManagerFactory.createFileManager(fileName, separatedFiles, c.streamStatements());

if (c.streamStatements()) {
Future<Boolean> future = null;
try {
final ArrayBlockingQueue<DataProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> queue.offer(pi == ProgressInfo.EMPTY ? DataProgressInfo.EMPTY : new DataProgressInfo(pi).enrich(cypherFileManager)));
future = Util.inTxFuture(Pools.DEFAULT, db, () -> { doExport(graph, c, onlySchema, reporterWithConsumer, cypherFileManager); return true; });
QueueBasedSpliterator<DataProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, DataProgressInfo.EMPTY, terminationGuard);
return StreamSupport.stream(spliterator, false);
} finally {
Util.waitForFutures(Collections.singletonList(future));
}
long timeout = c.getTimeoutSeconds();
final BlockingQueue<DataProgressInfo> queue = new ArrayBlockingQueue<>(1000);
ProgressReporter reporterWithConsumer = reporter.withConsumer(
(pi) -> Util.put(queue,pi == ProgressInfo.EMPTY ? DataProgressInfo.EMPTY : new DataProgressInfo(pi).enrich(cypherFileManager),timeout));
Util.inTxFuture(Pools.DEFAULT, db, () -> { doExport(graph, c, onlySchema, reporterWithConsumer, cypherFileManager); return true; });
QueueBasedSpliterator<DataProgressInfo> spliterator = new QueueBasedSpliterator<>(queue, DataProgressInfo.EMPTY, terminationGuard, timeout);
return StreamSupport.stream(spliterator, false);
} else {
doExport(graph, c, onlySchema, reporter, cypherFileManager);
return reporter.stream().map(DataProgressInfo::new).map((dpi) -> dpi.enrich(cypherFileManager));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.spatial.Point;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.values.storable.DateValue;
import org.neo4j.values.storable.DurationValue;
import org.neo4j.values.storable.Value;
import org.neo4j.values.storable.Values;

import java.lang.reflect.Array;
import java.time.temporal.Temporal;
import java.util.*;

/**
Expand All @@ -16,9 +22,11 @@
*/
public class CypherFormatterUtils {

public final static String UNIQUE_ID_LABEL = "UNIQUE IMPORT LABEL";
public final static String UNIQUE_ID_PROP = "UNIQUE IMPORT ID";
public final static String Q_UNIQUE_ID_LABEL = quote(UNIQUE_ID_LABEL);
public final static String UNIQUE_ID_LABEL = "UNIQUE IMPORT LABEL";
public final static String UNIQUE_ID_PROP = "UNIQUE IMPORT ID";
public final static String Q_UNIQUE_ID_LABEL = quote(UNIQUE_ID_LABEL);

public final static String FUNCTION_TEMPLATE = "%s('%s')";

// ---- node id ----

Expand Down Expand Up @@ -167,7 +175,7 @@ private static String formatPropertyName(String id, String prop, Object value, b
return (id != null && !"".equals(id) ? id + "." : "") + "`" + prop + "`" + (jsonStyle ? ":" : "=" ) + toString(value);
}

// ---- to string ----
// ---- to string ----

public static String quote(Iterable<String> ids) {
StringBuilder builder = new StringBuilder();
Expand All @@ -190,23 +198,34 @@ public static String label(String id) {
}

public static String toString(Object value) {
if (value == null) return "null";
if (value instanceof String) return FormatUtils.formatString(value);
if (value instanceof Number) {
return FormatUtils.formatNumber((Number) value);
}
if (value instanceof Boolean) return value.toString();
if (value instanceof Iterator) {
return toString(((Iterator) value));
}
if (value instanceof Iterable) {
return toString(((Iterable) value).iterator());
}
if (value.getClass().isArray()) {
return arrayToString(value);
}
return value.toString();
}
if (value == null) return "null";
if (value instanceof String) return FormatUtils.formatString(value);
if (value instanceof Number) {
return FormatUtils.formatNumber((Number) value);
}
if (value instanceof Boolean) return value.toString();
if (value instanceof Iterator) {
return toString(((Iterator) value));
}
if (value instanceof Iterable) {
return toString(((Iterable) value).iterator());
}
if (value.getClass().isArray()) {
return arrayToString(value);
}
if (value instanceof Temporal){
Value val = Values.of(value);
return toStringFunction(val);
}
if (value instanceof DurationValue) {
return toStringFunction((DurationValue) value);
}
return value.toString();
}

private static String toStringFunction(Value value) {
return String.format(FUNCTION_TEMPLATE, value.getTypeName().toLowerCase(), value.toString());
}

public static String toString(Iterator<?> iterator) {
StringBuilder result = new StringBuilder();
Expand All @@ -219,12 +238,12 @@ public static String toString(Iterator<?> iterator) {
}

public static String arrayToString(Object value) {
int length = Array.getLength(value);
StringBuilder result = new StringBuilder(10 * length);
for (int i = 0; i < length; i++) {
if (i > 0) result.append(", ");
result.append(toString(Array.get(value, i)));
}
return "[" + result.toString() + "]";
}
}
int length = Array.getLength(value);
StringBuilder result = new StringBuilder(10 * length);
for (int i = 0; i < length; i++) {
if (i > 0) result.append(", ");
result.append(toString(Array.get(value, i)));
}
return "[" + result.toString() + "]";
}
}
88 changes: 88 additions & 0 deletions src/main/java/apoc/export/json/ExportJson.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package apoc.export.json;

import apoc.Description;
import apoc.export.util.ExportConfig;
import apoc.export.util.NodesAndRelsSubGraph;
import apoc.export.util.ProgressReporter;
import apoc.result.ProgressInfo;
import apoc.util.Util;
import org.neo4j.cypher.export.DatabaseSubGraph;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;
import org.neo4j.graphdb.Result;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

import java.io.PrintWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static apoc.util.FileUtils.checkWriteAllowed;
import static apoc.util.FileUtils.getPrintWriter;

public class ExportJson {
@Context
public GraphDatabaseService db;

public ExportJson(GraphDatabaseService db) {
this.db = db;
}

public ExportJson() {
}

@Procedure
@Description("apoc.exportJson.json.all(file,config) - exports whole database as json to the provided file")
public Stream<ProgressInfo> all(@Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws Exception {

String source = String.format("database: nodes(%d), rels(%d)", Util.nodeCount(db), Util.relCount(db));
return exportJson(fileName, source, new DatabaseSubGraph(db), config);
}

@Procedure
@Description("apoc.exportJson.json.data(nodes,rels,file,config) - exports given nodes and relationships as json to the provided file")
public Stream<ProgressInfo> data(@Name("nodes") List<Node> nodes, @Name("rels") List<Relationship> rels, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws Exception {

String source = String.format("data: nodes(%d), rels(%d)", nodes.size(), rels.size());
return exportJson(fileName, source, new NodesAndRelsSubGraph(db, nodes, rels), config);
}
@Procedure
@Description("apoc.exportJson.json.graph(graph,file,config) - exports given graph object as json to the provided file")
public Stream<ProgressInfo> graph(@Name("graph") Map<String,Object> graph, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws Exception {

Collection<Node> nodes = (Collection<Node>) graph.get("nodes");
Collection<Relationship> rels = (Collection<Relationship>) graph.get("relationships");
String source = String.format("graph: nodes(%d), rels(%d)", nodes.size(), rels.size());
return exportJson(fileName, source, new NodesAndRelsSubGraph(db, nodes, rels), config);
}

@Procedure
@Description("apoc.exportJson.json.query(query,file,{config,...,params:{params}}) - exports results from the cypher kernelTransaction as json to the provided file")
public Stream<ProgressInfo> query(@Name("query") String query, @Name("file") String fileName, @Name(value = "config", defaultValue = "{}") Map<String, Object> config) throws Exception {
Map<String,Object> params = config == null ? Collections.emptyMap() : (Map<String,Object>)config.getOrDefault("params", Collections.emptyMap());
Result result = db.execute(query,params);
String source = String.format("kernelTransaction: cols(%d)", result.columns().size());
return exportJson(fileName, source,result,config);
}

private Stream<ProgressInfo> exportJson(@Name("file") String fileName, String source, Object data, Map<String,Object> config) throws Exception {
checkWriteAllowed();
ExportConfig c = new ExportConfig(config);
ProgressReporter reporter = new ProgressReporter(null, null, new ProgressInfo(fileName, source, "json"));
JsonFormat exporter = new JsonFormat(db);
try (PrintWriter printWriter = getPrintWriter(fileName, null);) {
if (data instanceof SubGraph)
exporter.dump(((SubGraph)data),printWriter,reporter,c);
if (data instanceof Result)
exporter.dump(((Result)data),printWriter,reporter,c);
}
return reporter.stream();
}
}

196 changes: 196 additions & 0 deletions src/main/java/apoc/export/json/JsonFormat.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package apoc.export.json;

import apoc.export.util.ExportConfig;
import apoc.export.util.Format;
import apoc.export.util.Reporter;
import apoc.meta.Meta;
import apoc.result.ProgressInfo;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.neo4j.cypher.export.SubGraph;
import org.neo4j.graphdb.*;
import org.neo4j.graphdb.spatial.Point;

import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.time.temporal.TemporalAccessor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

public class JsonFormat implements Format {
private final GraphDatabaseService db;

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);

static {
SimpleModule module = new SimpleModule("Neo4jApocSerializer", new Version(1, 0, 0, ""));
module.addSerializer(Point.class, new PointSerializer());
module.addSerializer(TemporalAccessor.class, new TemporalSerializer());
OBJECT_MAPPER.registerModule(module);
}

public JsonFormat(GraphDatabaseService db) {
this.db = db;
}

@Override
public ProgressInfo load(Reader reader, Reporter reporter, ExportConfig config) throws Exception {
return null;
}

private ProgressInfo dump(Writer writer, Reporter reporter, Consumer<JsonGenerator> consumer) throws Exception {
try (Transaction tx = db.beginTx(); JsonGenerator jsonGenerator = getJsonGenerator(writer);) {

consumer.accept(jsonGenerator);

tx.success();
return reporter.getTotal();
}
}

@Override
public ProgressInfo dump(SubGraph graph, Writer writer, Reporter reporter, ExportConfig config) throws Exception {
Consumer<JsonGenerator> consumer = (jsonGenerator) -> {
try {
writeNodes(graph.getNodes(), reporter, jsonGenerator, config);
writeRels(graph.getRelationships(), reporter, jsonGenerator, config);
} catch (IOException e) {
throw new RuntimeException(e);
}
};
return dump(writer, reporter, consumer);
}

public ProgressInfo dump(Result result, Writer writer, Reporter reporter, ExportConfig config) throws Exception {
Consumer<JsonGenerator> consumer = (jsonGenerator) -> {
try {
String[] header = result.columns().toArray(new String[result.columns().size()]);
result.accept((row) -> {
writeJsonResult(reporter, header, jsonGenerator, row, config);
reporter.nextRow();
return true;
});
} catch (IOException e) {
throw new RuntimeException(e);
}
};
return dump(writer, reporter, consumer);
}

private JsonGenerator getJsonGenerator(Writer writer) throws IOException {
JsonFactory jsonF = new JsonFactory();
JsonGenerator jsonGenerator = jsonF.createGenerator(writer);
jsonGenerator.setCodec(OBJECT_MAPPER);
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
return jsonGenerator;
}

private void writeNodes(Iterable<Node> nodes, Reporter reporter, JsonGenerator jsonGenerator,ExportConfig config) throws IOException {
for (Node node : nodes) {
writeNode(reporter, jsonGenerator, node, config);
}
}

private void writeNode(Reporter reporter, JsonGenerator jsonGenerator, Node node, ExportConfig config) throws IOException {
Map<String, Object> allProperties = node.getAllProperties();
JsonFormatSerializer.DEFAULT.writeNode(jsonGenerator, node, config);
reporter.update(1, 0, allProperties.size());
}

private void writeRels(Iterable<Relationship> rels, Reporter reporter, JsonGenerator jsonGenerator, ExportConfig config) throws IOException {
for (Relationship rel : rels) {
writeRel(reporter, jsonGenerator, rel, config);
}
}

private void writeRel(Reporter reporter, JsonGenerator jsonGenerator, Relationship rel, ExportConfig config) throws IOException {
Map<String, Object> allProperties = rel.getAllProperties();
JsonFormatSerializer.DEFAULT.writeRelationship(jsonGenerator, rel, config);
reporter.update(0, 1, allProperties.size());
}

private void writeJsonResult(Reporter reporter, String[] header, JsonGenerator jsonGenerator, Result.ResultRow row, ExportConfig config) throws IOException {
jsonGenerator.writeStartObject();
for (int col = 0; col < header.length; col++) {
String keyName = header[col];
Object value = row.get(keyName);
write(reporter, jsonGenerator, config, keyName, value, true);
}
jsonGenerator.writeEndObject();
}

private void write(Reporter reporter, JsonGenerator jsonGenerator, ExportConfig config, String keyName, Object value, boolean writeKey) throws IOException {
Meta.Types type = Meta.Types.of(value);
switch (type) {
case NODE:
writeFieldName(jsonGenerator, keyName, writeKey);
writeNode(reporter, jsonGenerator, (Node) value, config);
break;
case RELATIONSHIP:
writeFieldName(jsonGenerator, keyName, writeKey);
writeRel(reporter, jsonGenerator, (Relationship) value, config);
break;
case PATH:
writeFieldName(jsonGenerator, keyName, writeKey);
writePath(reporter, jsonGenerator, config, (Path) value);
break;
case MAP:
if (writeKey) {
jsonGenerator.writeObjectFieldStart(keyName);
} else {
jsonGenerator.writeStartObject();
writeKey = true;
}
Map<String, Object> map = (HashMap<String, Object>) value;
for (Map.Entry<String, Object> entry : map.entrySet()) {
write(reporter, jsonGenerator, config, entry.getKey(), entry.getValue(), writeKey);
}
jsonGenerator.writeEndObject();
break;
case LIST:
if (writeKey) {
jsonGenerator.writeArrayFieldStart(keyName);
} else {
jsonGenerator.writeStartArray();
}
List<Object> list = (List<Object>) value;
for (Object elem : list) {
write(reporter, jsonGenerator, config, keyName, elem, false);
}
jsonGenerator.writeEndArray();
break;
default:
JsonFormatSerializer.DEFAULT.serializeProperty(jsonGenerator, keyName, value, writeKey);
reporter.update(0, 0, 1);
break;

}
}

private void writeFieldName(JsonGenerator jsonGenerator, String keyName, boolean writeKey) throws IOException {
if (writeKey) {
jsonGenerator.writeFieldName(keyName);
}
}

private void writePath(Reporter reporter, JsonGenerator jsonGenerator, ExportConfig config, Path path) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField("length", path.length());
jsonGenerator.writeArrayFieldStart("rels");
writeRels(path.relationships(), reporter, jsonGenerator, config);
jsonGenerator.writeEndArray();
jsonGenerator.writeArrayFieldStart("nodes");
writeNodes(path.nodes(), reporter, jsonGenerator, config);
jsonGenerator.writeEndArray();
jsonGenerator.writeEndObject();
}

}
99 changes: 99 additions & 0 deletions src/main/java/apoc/export/json/JsonFormatSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package apoc.export.json;

import apoc.export.util.ExportConfig;
import com.fasterxml.jackson.core.JsonGenerator;
import org.neo4j.graphdb.Label;
import org.neo4j.graphdb.Node;
import org.neo4j.graphdb.Relationship;

import java.io.IOException;
import java.util.Map;

public enum JsonFormatSerializer {

DEFAULT() {

@Override
public void writeNode(JsonGenerator jsonGenerator, Node node, ExportConfig config) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("type", "node");
writeNodeDetails(jsonGenerator, node, true);
jsonGenerator.writeEndObject();
}

@Override
public void writeRelationship(JsonGenerator jsonGenerator, Relationship rel, ExportConfig config) throws IOException {
Node startNode = rel.getStartNode();
Node endNode = rel.getEndNode();
jsonGenerator.writeStartObject();
jsonGenerator.writeStringField("id", String.valueOf(rel.getId()));
jsonGenerator.writeStringField("type", "relationship");
jsonGenerator.writeStringField("label", rel.getType().toString());
serializeProperties(jsonGenerator, rel.getAllProperties());
writeRelationshipNode(jsonGenerator, "start", startNode, config);
writeRelationshipNode(jsonGenerator, "end", endNode, config);
jsonGenerator.writeEndObject();
}

@Override
public void serializeProperties(JsonGenerator jsonGenerator, Map<String, Object> properties) throws IOException {
if(properties != null && !properties.isEmpty()) {
jsonGenerator.writeObjectFieldStart("properties");
for (Map.Entry<String, Object> entry : properties.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
serializeProperty(jsonGenerator, key, value, true);
}
jsonGenerator.writeEndObject();
}
}

@Override
public void serializeProperty(JsonGenerator jsonGenerator, String key, Object value, boolean writeKey) throws IOException {
if (value == null) {
if (writeKey) {
jsonGenerator.writeNullField(key);
} else {
jsonGenerator.writeNull();
}
} else {
if (writeKey) {
jsonGenerator.writeObjectField(key, value);
} else {
jsonGenerator.writeObject(value);
}
}
}

private void writeNodeDetails(JsonGenerator jsonGenerator, Node node, boolean withNodeProperties) throws IOException {
jsonGenerator.writeStringField("id", String.valueOf(node.getId()));
Iterable<Label> labels = node.getLabels();
if (labels.iterator().hasNext()) {
jsonGenerator.writeArrayFieldStart("labels");
for (Label label : labels) {
jsonGenerator.writeString(label.toString());
}
jsonGenerator.writeEndArray();
}
if (withNodeProperties) {
serializeProperties(jsonGenerator, node.getAllProperties());
}
}

private void writeRelationshipNode(JsonGenerator jsonGenerator, String type, Node node, ExportConfig config) throws IOException {
jsonGenerator.writeObjectFieldStart(type);

writeNodeDetails(jsonGenerator, node, config.writeNodeProperties());
jsonGenerator.writeEndObject();
}
};

public abstract void writeNode(JsonGenerator jsonGenerator, Node node, ExportConfig config) throws IOException;

public abstract void writeRelationship(JsonGenerator jsonGenerator, Relationship relationship, ExportConfig config) throws IOException;

public abstract void serializeProperties(JsonGenerator jsonGenerator, Map<String,Object> properties) throws IOException;

public abstract void serializeProperty(JsonGenerator jsonGenerator, String key, Object value, boolean writeKey) throws IOException;

}
140 changes: 140 additions & 0 deletions src/main/java/apoc/export/json/PointSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package apoc.export.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import org.neo4j.graphdb.spatial.Point;
import org.neo4j.values.storable.CoordinateReferenceSystem;

import java.io.IOException;
import java.util.List;

public class PointSerializer extends JsonSerializer<Point> {
@Override
public void serialize(Point value, JsonGenerator jsonGenerator, SerializerProvider serializers) throws IOException {

String crsType = value.getCRS().getType();
List<Double> coordinate = value.getCoordinate().getCoordinate();

if (crsType.startsWith(CoordinateReferenceSystem.Cartesian.toString())) {
if (coordinate.size() == 3) {
jsonGenerator.writeObject(new PointCartesian(crsType, coordinate.get(0), coordinate.get(1), coordinate.get(2)));
} else {
jsonGenerator.writeObject(new PointCartesian(crsType, coordinate.get(0), coordinate.get(1)));
}
} else {
if (coordinate.size() == 3) {
jsonGenerator.writeObject(new PointWgs(crsType, coordinate.get(0), coordinate.get(1), coordinate.get(2)));
} else {
jsonGenerator.writeObject(new PointWgs(crsType, coordinate.get(0), coordinate.get(1)));
}
}
}


class PointCartesian {
private String crs;
private Double x;
private Double y;
private Double z;

public PointCartesian(String crs, Double x, Double y, Double z) {
this.crs = crs;
this.x = x;
this.y = y;
this.z = z;
}

public PointCartesian(String crs, Double x, Double y) {
this.crs = crs;
this.x = x;
this.y = y;
}

public String getCrs() {
return crs;
}

public void setCrs(String crs) {
this.crs = crs;
}

public Double getX() {
return x;
}

public void setX(Double x) {
this.x = x;
}

public Double getY() {
return y;
}

public void setY(Double y) {
this.y = y;
}

public Double getZ() {
return z;
}

public void setZ(Double z) {
this.z = z;
}
}


class PointWgs {
private String crs;
private Double latitude;
private Double longitude;
private Double height;

public PointWgs(String crs, Double latitude, Double longitude, Double height) {
this.crs = crs;
this.latitude = latitude;
this.longitude = longitude;
this.height = height;
}

public PointWgs(String crs, Double latitude, Double longitude) {
this.crs = crs;
this.latitude = latitude;
this.longitude = longitude;
}

public String getCrs() {
return crs;
}

public void setCrs(String crs) {
this.crs = crs;
}

public Double getLatitude() {
return latitude;
}

public void setLatitude(Double latitude) {
this.latitude = latitude;
}

public Double getLongitude() {
return longitude;
}

public void setLongitude(Double longitude) {
this.longitude = longitude;
}

public Double getHeight() {
return height;
}

public void setHeight(Double height) {
this.height = height;
}
}

}
19 changes: 19 additions & 0 deletions src/main/java/apoc/export/json/TemporalSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package apoc.export.json;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;

import java.io.IOException;
import java.time.temporal.TemporalAccessor;

public class TemporalSerializer extends JsonSerializer<TemporalAccessor> {

@Override
public void serialize(TemporalAccessor value, JsonGenerator jsonGenerator, SerializerProvider serializers) throws IOException {
if (value == null) {
jsonGenerator.writeNull();
}
jsonGenerator.writeString(value.toString());
}
}
11 changes: 11 additions & 0 deletions src/main/java/apoc/export/util/ExportConfig.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package apoc.export.util;

import apoc.export.cypher.formatter.CypherFormat;
import apoc.util.Util;

import java.util.Collections;
import java.util.Map;
Expand All @@ -22,6 +23,7 @@ public class ExportConfig {
private String delim = DEFAULT_DELIM;
private boolean quotes;
private boolean useTypes = false;
private boolean writeNodeProperties = false;
private boolean nodesOfRelationships;
private ExportFormat format;
private CypherFormat cypherFormat;
Expand Down Expand Up @@ -69,6 +71,7 @@ public ExportConfig(Map<String,Object> config) {
this.cypherFormat = CypherFormat.fromString((String) config.getOrDefault("cypherFormat", "create"));
this.config = config;
this.streamStatements = toBoolean(config.get("streamStatements")) || toBoolean(config.get("stream"));
this.writeNodeProperties = toBoolean(config.get("writeNodeProperties"));
}

public boolean getRelsInBetween() {
Expand Down Expand Up @@ -110,4 +113,12 @@ private ExportFormat format(Object format) {
public boolean streamStatements() {
return streamStatements;
}

public boolean writeNodeProperties() {
return writeNodeProperties;
}

public long getTimeoutSeconds() {
return Util.toLong(config.getOrDefault("timeoutSeconds",100));
}
}
117 changes: 74 additions & 43 deletions src/main/java/apoc/load/Jdbc.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package apoc.load;

import apoc.result.RowResult;
import apoc.ApocConfiguration;
import apoc.load.util.LoadJdbcConfig;
import apoc.result.RowResult;
import apoc.util.MapUtil;
import org.apache.commons.compress.utils.IOUtils;
import apoc.util.Util;
import org.neo4j.logging.Log;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
Expand All @@ -12,17 +13,10 @@

import java.math.BigDecimal;
import java.math.BigInteger;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.NameCallback;
import javax.security.auth.callback.PasswordCallback;
import javax.security.auth.login.LoginContext;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.sql.*;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.util.*;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand All @@ -33,6 +27,9 @@
*/
public class Jdbc {

private static final String LOAD_TYPE = "jdbc";
private static final String KEY_NOT_FOUND_MESSAGE = "No apoc.jdbc.%s.url url specified";

static {
ApocConfiguration.get("jdbc").forEach((k, v) -> {
if (k.endsWith("driver")) loadDriver(v.toString());
Expand Down Expand Up @@ -68,21 +65,22 @@ private static void loadDriver(@Name("driverClass") String driverClass) {
}

@Procedure
@Description("apoc.load.jdbc('key or url','table or kernelTransaction') YIELD row - load from relational database, from a full table or a sql kernelTransaction")
@Description("apoc.load.jdbc('key or url','table or statement', config) YIELD row - load from relational database, from a full table or a sql statement")
public Stream<RowResult> jdbc(@Name("jdbc") String urlOrKey, @Name("tableOrSql") String tableOrSelect, @Name
(value = "params", defaultValue = "[]") List<Object> params) {
return executeQuery(urlOrKey, tableOrSelect, params.toArray(new Object[params.size()]));
(value = "params", defaultValue = "[]") List<Object> params, @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
return executeQuery(urlOrKey, tableOrSelect, config, params.toArray(new Object[params.size()]));
}

@Procedure
@Deprecated
@Description("deprecated - please use: apoc.load.jdbc('key or url','kernelTransaction',[params]) YIELD row - load from relational database, from a sql kernelTransaction with parameters")
public Stream<RowResult> jdbcParams(@Name("jdbc") String urlOrKey, @Name("sql") String select, @Name("params") List<Object> params) {
return executeQuery(urlOrKey, select,params.toArray(new Object[params.size()]));
@Description("deprecated - please use: apoc.load.jdbc('key or url','',[params]) YIELD row - load from relational database, from a sql statement with parameters")
public Stream<RowResult> jdbcParams(@Name("jdbc") String urlOrKey, @Name("sql") String select, @Name("params") List<Object> params, @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
return executeQuery(urlOrKey, select, config, params.toArray(new Object[params.size()]));
}

private Stream<RowResult> executeQuery(String urlOrKey, String tableOrSelect, Object...params) {
String url = urlOrKey.contains(":") ? urlOrKey : getJdbcUrl(urlOrKey);
private Stream<RowResult> executeQuery(String urlOrKey, String tableOrSelect, Map<String, Object> config, Object... params) {
LoadJdbcConfig loadJdbcConfig = new LoadJdbcConfig(config);
String url = getUrlOrKey(urlOrKey);
String query = tableOrSelect.indexOf(' ') == -1 ? "SELECT * FROM " + tableOrSelect : tableOrSelect;
try {
Connection connection = getConnection(url);
Expand All @@ -92,7 +90,7 @@ private Stream<RowResult> executeQuery(String urlOrKey, String tableOrSelect, Ob
for (int i = 0; i < params.length; i++) stmt.setObject(i + 1, params[i]);
ResultSet rs = stmt.executeQuery();
rs.setFetchSize(5000);
Iterator<Map<String, Object>> supplier = new ResultSetIterator(log, rs, true);
Iterator<Map<String, Object>> supplier = new ResultSetIterator(log, rs, true, loadJdbcConfig);
Spliterator<Map<String, Object>> spliterator = Spliterators.spliteratorUnknownSize(supplier, Spliterator.ORDERED);
return StreamSupport.stream(spliterator, false)
.map(RowResult::new)
Expand All @@ -106,26 +104,26 @@ private Stream<RowResult> executeQuery(String urlOrKey, String tableOrSelect, Ob
throw sqle;
}
} catch (Exception e) {
log.error(String.format("Cannot execute SQL kernelTransaction `%s`.%nError:%n%s", query, e.getMessage()),e);
String errorMessage = "Cannot execute SQL kernelTransaction `%s`.%nError:%n%s";
if(e.getMessage().contains("No suitable driver")) errorMessage="Cannot execute SQL kernelTransaction `%s`.%nError:%n%s%n%s";
log.error(String.format("Cannot execute SQL statement `%s`.%nError:%n%s", query, e.getMessage()),e);
String errorMessage = "Cannot execute SQL statement `%s`.%nError:%n%s";
if(e.getMessage().contains("No suitable driver")) errorMessage="Cannot execute SQL statement `%s`.%nError:%n%s%n%s";
throw new RuntimeException(String.format(errorMessage, query, e.getMessage(), "Please download and copy the JDBC driver into $NEO4J_HOME/plugins,more details at https://neo4j-contrib.github.io/neo4j-apoc-procedures/#_load_jdbc_resources"), e);
}
}

@Procedure
@Description("apoc.load.jdbcUpdate('key or url','kernelTransaction',[params]) YIELD row - update relational database, from a SQL kernelTransaction with optional parameters")
@Description("apoc.load.jdbcUpdate('key or url','statement',[params]) YIELD row - update relational database, from a SQL statement with optional parameters")
public Stream<RowResult> jdbcUpdate(@Name("jdbc") String urlOrKey, @Name("query") String query, @Name(value = "params", defaultValue = "[]") List<Object> params) {
log.info( String.format( "Executing SQL update: %s", query ) );
return executeUpdate(urlOrKey, query, params.toArray(new Object[params.size()]));
}

private Stream<RowResult> executeUpdate(String urlOrKey, String query, Object...params) {
String url = urlOrKey.contains(":") ? urlOrKey : getJdbcUrl(urlOrKey);
String url = getUrlOrKey(urlOrKey);
try {
Connection connection = getConnection(url);
try {
PreparedStatement stmt = connection.prepareStatement(query);
PreparedStatement stmt = connection.prepareStatement(query);
try {
for (int i = 0; i < params.length; i++) stmt.setObject(i + 1, params[i]);
int updateCount = stmt.executeUpdate();
Expand All @@ -142,9 +140,9 @@ private Stream<RowResult> executeUpdate(String urlOrKey, String query, Object...
throw sqle;
}
} catch (Exception e) {
log.error(String.format("Cannot execute SQL kernelTransaction `%s`.%nError:%n%s", query, e.getMessage()),e);
String errorMessage = "Cannot execute SQL kernelTransaction `%s`.%nError:%n%s";
if(e.getMessage().contains("No suitable driver")) errorMessage="Cannot execute SQL kernelTransaction `%s`.%nError:%n%s%n%s";
log.error(String.format("Cannot execute SQL statement `%s`.%nError:%n%s", query, e.getMessage()),e);
String errorMessage = "Cannot execute SQL statement `%s`.%nError:%n%s";
if(e.getMessage().contains("No suitable driver")) errorMessage="Cannot execute SQL statement `%s`.%nError:%n%s%n%s";
throw new RuntimeException(String.format(errorMessage, query, e.getMessage(), "Please download and copy the JDBC driver into $NEO4J_HOME/plugins,more details at https://neo4j-contrib.github.io/neo4j-apoc-procedures/#_load_jdbc_resources"), e);
}
}
Expand All @@ -162,20 +160,17 @@ static void closeIt(Log log, AutoCloseable...closeables) {
}
}

private static String getJdbcUrl(String key) {
Object value = ApocConfiguration.get("jdbc").get(key + ".url");
if (value == null) throw new RuntimeException("No apoc.jdbc."+key+".url jdbc url specified");
return value.toString();
}

private static class ResultSetIterator implements Iterator<Map<String, Object>> {
private final Log log;
private final ResultSet rs;
private final String[] columns;
private final boolean closeConnection;
private Map<String, Object> map;
private LoadJdbcConfig config;


public ResultSetIterator(Log log, ResultSet rs, boolean closeConnection) throws SQLException {
public ResultSetIterator(Log log, ResultSet rs, boolean closeConnection, LoadJdbcConfig config) throws SQLException {
this.config = config;
this.log = log;
this.rs = rs;
this.columns = getMetaData(rs);
Expand Down Expand Up @@ -210,7 +205,7 @@ public Map<String, Object> get() {
if (handleEndOfResults()) return null;
Map<String, Object> row = new LinkedHashMap<>(columns.length);
for (int col = 1; col < columns.length; col++) {
row.put(columns[col], convert(rs.getObject(col)));
row.put(columns[col], convert(rs.getObject(col), rs.getMetaData().getColumnType(col)));
}
return row;
} catch (Exception e) {
Expand All @@ -220,18 +215,42 @@ public Map<String, Object> get() {
}
}

private Object convert(Object value) {
private Object convert(Object value, int sqlType) {
if (value instanceof UUID || value instanceof BigInteger || value instanceof BigDecimal) {
return value.toString();
}
if (value instanceof java.util.Date) {
return ((java.util.Date) value).getTime();
if (Types.TIME == sqlType) {
return ((java.sql.Time)value).toLocalTime();
}
if (Types.TIME_WITH_TIMEZONE == sqlType) {
return OffsetTime.parse(value.toString());
}
if (Types.TIMESTAMP == sqlType) {
if (config.getZoneId() != null) {
return ((java.sql.Timestamp)value).toInstant()
.atZone(config.getZoneId())
.toOffsetDateTime();
} else {
return ((java.sql.Timestamp)value).toLocalDateTime();
}
}
if (Types.TIMESTAMP_WITH_TIMEZONE == sqlType) {
if (config.getZoneId() != null) {
return ((java.sql.Timestamp)value).toInstant()
.atZone(config.getZoneId())
.toOffsetDateTime();
} else {
return OffsetDateTime.parse(value.toString());
}
}
if (Types.DATE == sqlType) {
return ((java.sql.Date)value).toLocalDate();
}
return value;
}

private boolean handleEndOfResults() throws SQLException {
Boolean closed = ignore(rs::isClosed);
Boolean closed = isRsClosed();
if (closed!=null && closed) {
return true;
}
Expand All @@ -243,12 +262,20 @@ private boolean handleEndOfResults() throws SQLException {
}

private void closeRs() {
Boolean closed = ignore(rs::isClosed);
Boolean closed = isRsClosed();
if (closed==null || !closed) {
closeIt(log, ignore(rs::getStatement), closeConnection ? ignore(()->rs.getStatement().getConnection()) : null);
}
}

private Boolean isRsClosed() {
try {
return ignore(rs::isClosed);
} catch(AbstractMethodError ame) {
return null;
}
}

}
interface FailingSupplier<T> {
T get() throws Exception;
Expand All @@ -261,4 +288,8 @@ public static <T> T ignore(FailingSupplier<T> fun) {
}
return null;
}
}

private String getUrlOrKey(String urlOrKey) {
return urlOrKey.contains(":") ? urlOrKey : Util.getLoadUrlByConfigFile(LOAD_TYPE, urlOrKey, "url").orElseThrow(() -> new RuntimeException(String.format(KEY_NOT_FOUND_MESSAGE, urlOrKey)));
}
}
8 changes: 5 additions & 3 deletions src/main/java/apoc/load/LoadJson.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package apoc.load;

import org.neo4j.procedure.Description;
import apoc.result.MapResult;
import apoc.result.ObjectResult;
import apoc.util.JsonUtil;
import apoc.util.MapUtil;
import apoc.util.Util;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.procedure.Context;
import org.neo4j.procedure.Description;
import org.neo4j.procedure.Name;
import org.neo4j.procedure.Procedure;

Expand All @@ -17,6 +18,7 @@
public class LoadJson {

private static final String AUTH_HEADER_KEY = "Authorization";
private static final String LOAD_TYPE = "json";

@Context
public GraphDatabaseService db;
Expand Down Expand Up @@ -46,10 +48,10 @@ public Stream<MapResult> json(@Name("url") String url, @Name(value = "path",defa
@SuppressWarnings("unchecked")
@Procedure
@Description("apoc.load.jsonParams('url',{header:value},payload, config) YIELD value - load from JSON URL (e.g. web-api) while sending headers / payload to import JSON as stream of values if the JSON was an array or a single value if it was a map")
public Stream<MapResult> jsonParams(@Name("url") String url, @Name("headers") Map<String,Object> headers, @Name("payload") String payload, @Name(value = "path",defaultValue = "") String path, @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
public Stream<MapResult> jsonParams(@Name("urlOrKey") String urlOrKey, @Name("headers") Map<String,Object> headers, @Name("payload") String payload, @Name(value = "path",defaultValue = "") String path, @Name(value = "config",defaultValue = "{}") Map<String, Object> config) {
if (config == null) config = Collections.emptyMap();
boolean failOnError = (boolean) config.getOrDefault("failOnError", true);
return loadJsonStream(url, headers, payload, path, failOnError);
return loadJsonStream(urlOrKey, headers, payload, path, failOnError);
}

public static Stream<MapResult> loadJsonStream(@Name("url") String url, @Name("headers") Map<String, Object> headers, @Name("payload") String payload) {
Expand Down
Loading