From 955a90907bb8a2c67257b19fcf6f3f0ec0e9a43d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Tue, 17 May 2022 14:43:57 +0200 Subject: [PATCH 1/8] Add support for exporting node properties via arrow - introduce flag to toggle between formats --- graphdatascience/graph/graph_proc_runner.py | 14 +++++- .../query_runner/arrow_query_runner.py | 9 ++++ .../tests/integration/test_graph_ops.py | 50 +++++++++++++++---- requirements/dev.txt | 1 + 4 files changed, 64 insertions(+), 10 deletions(-) diff --git a/graphdatascience/graph/graph_proc_runner.py b/graphdatascience/graph/graph_proc_runner.py index 2e829fd06..7d2176859 100644 --- a/graphdatascience/graph/graph_proc_runner.py +++ b/graphdatascience/graph/graph_proc_runner.py @@ -104,11 +104,23 @@ def streamNodeProperties( G: Graph, node_properties: List[str], node_labels: Strings = ["*"], + old_format: bool = False, **config: Any, ) -> DataFrame: self._namespace += ".streamNodeProperties" - return self._handle_properties(G, node_properties, node_labels, config) + result = self._handle_properties(G, node_properties, node_labels, config) + + # new format was requested, but the query was run via Cypher + if not old_format and "propertyValue" in result.keys(): + return result.pivot_table("propertyValue", "nodeId", columns="nodeProperty") + # old format was requested but the query was run via Arrow + elif old_format and "propertyValue" not in result.keys(): + return result.melt(id_vars=["nodeId"]).rename( + columns={"variable": "nodeProperty", "value": "propertyValue"} + ) + + return result def streamNodeProperty( self, diff --git a/graphdatascience/query_runner/arrow_query_runner.py b/graphdatascience/query_runner/arrow_query_runner.py index 6a2d0fd0a..9333dceaf 100644 --- a/graphdatascience/query_runner/arrow_query_runner.py +++ b/graphdatascience/query_runner/arrow_query_runner.py @@ -45,6 +45,15 @@ def run_query(self, query: str, params: Dict[str, Any] = {}) -> DataFrame: return self._run_arrow_property_get( graph_name, "gds.graph.streamNodeProperty", {"node_property": property_name, "node_labels": node_labels} ) + elif "gds.graph.streamNodeProperties" in query: + graph_name = params["graph_name"] + property_names = params["properties"] + node_labels = params["entities"] + return self._run_arrow_property_get( + graph_name, + "gds.graph.streamNodeProperties", + {"node_properties": property_names, "node_labels": node_labels}, + ) elif "gds.graph.streamRelationshipProperty" in query: graph_name = params["graph_name"] property_name = params["properties"] diff --git a/graphdatascience/tests/integration/test_graph_ops.py b/graphdatascience/tests/integration/test_graph_ops.py index 8269907b1..4488da182 100644 --- a/graphdatascience/tests/integration/test_graph_ops.py +++ b/graphdatascience/tests/integration/test_graph_ops.py @@ -18,12 +18,12 @@ def run_around_tests(runner: Neo4jQueryRunner) -> Generator[None, None, None]: runner.run_query( """ CREATE - (a: Node {x: 1}), - (b: Node {x: 2}), - (c: Node {x: 3}), - (a)-[:REL {relX: 4}]->(b), - (a)-[:REL {relX: 5}]->(c), - (b)-[:REL {relX: 6}]->(c), + (a: Node {x: 1, y: 2}), + (b: Node {x: 2, y: 3}), + (c: Node {x: 3, y: 4}), + (a)-[:REL {relX: 4, relY: 5}]->(b), + (a)-[:REL {relX: 5, relY: 6}]->(c), + (b)-[:REL {relX: 6, relY: 7}]->(c), (b)-[:REL2]->(c) """ ) @@ -164,10 +164,42 @@ def test_graph_streamNodeProperty_without_arrow(gds_without_arrow: GraphDataScie def test_graph_streamNodeProperties(gds: GraphDataScience) -> None: - G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": "x"}}, "*") + G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") - result = gds.graph.streamNodeProperties(G, ["x"], concurrency=2) - assert {e for e in result["propertyValue"]} == {1, 2, 3} + result = gds.graph.streamNodeProperties(G, ["x", "y"], concurrency=2) + assert {e for e in result["x"]} == {1, 2, 3} + assert {e for e in result["y"]} == {2, 3, 4} + + +def test_graph_streamNodeProperties_old_format(gds: GraphDataScience) -> None: + G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") + + result = gds.graph.streamNodeProperties(G, ["x", "y"], old_format=True, concurrency=2) + + x_values = result[result.nodeProperty == "x"] + assert {e for e in x_values["propertyValue"]} == {1, 2, 3} + + y_values = result[result.nodeProperty == "y"] + assert {e for e in y_values["propertyValue"]} == {2, 3, 4} + + +def test_graph_streamNodeProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") + + result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], concurrency=2) + assert {e for e in result["x"]} == {1, 2, 3} + assert {e for e in result["y"]} == {2, 3, 4} + + +def test_graph_streamNodeProperties_without_arrow_old_format(gds_without_arrow: GraphDataScience) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") + + result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], old_format=True, concurrency=2) + x_values = result[result.nodeProperty == "x"] + assert {e for e in x_values["propertyValue"]} == {1, 2, 3} + + y_values = result[result.nodeProperty == "y"] + assert {e for e in y_values["propertyValue"]} == {2, 3, 4} def test_graph_streamRelationshipProperty(gds: GraphDataScience) -> None: diff --git a/requirements/dev.txt b/requirements/dev.txt index 0a088fc07..9bce84f0f 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -8,3 +8,4 @@ pytest pytest-annotate tox types-setuptools +tabulate From b3b68757dac9ca132b631a6fe4d88c4d2c27e8b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Tue, 17 May 2022 14:44:11 +0200 Subject: [PATCH 2/8] Add support for exporting relationship properties via arrow - introduce flag to toggle between formats --- graphdatascience/graph/graph_proc_runner.py | 16 +++++++- .../query_runner/arrow_query_runner.py | 9 +++++ .../tests/integration/test_graph_ops.py | 39 +++++++++++++++++-- 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/graphdatascience/graph/graph_proc_runner.py b/graphdatascience/graph/graph_proc_runner.py index 7d2176859..837154223 100644 --- a/graphdatascience/graph/graph_proc_runner.py +++ b/graphdatascience/graph/graph_proc_runner.py @@ -138,11 +138,25 @@ def streamRelationshipProperties( G: Graph, relationship_properties: List[str], relationship_types: Strings = ["*"], + old_format: bool = False, **config: Any, ) -> DataFrame: self._namespace += ".streamRelationshipProperties" - return self._handle_properties(G, relationship_properties, relationship_types, config) + result = self._handle_properties(G, relationship_properties, relationship_types, config) + + # new format was requested, but the query was run via Cypher + if not old_format and "propertyValue" in result.keys(): + return result.pivot_table( + "propertyValue", ["sourceNodeId", "targetNodeId", "relationshipType"], columns="relationshipProperty" + ) + # old format was requested but the query was run via Arrow + elif old_format and "propertyValue" not in result.keys(): + return result.melt(id_vars=["sourceNodeId", "targetNodeId", "relationshipType"]).rename( + columns={"variable": "relationshipProperty", "value": "propertyValue"} + ) + + return result def streamRelationshipProperty( self, diff --git a/graphdatascience/query_runner/arrow_query_runner.py b/graphdatascience/query_runner/arrow_query_runner.py index 9333dceaf..2abc57cab 100644 --- a/graphdatascience/query_runner/arrow_query_runner.py +++ b/graphdatascience/query_runner/arrow_query_runner.py @@ -63,6 +63,15 @@ def run_query(self, query: str, params: Dict[str, Any] = {}) -> DataFrame: "gds.graph.streamRelationshipProperty", {"relationship_property": property_name, "relationship_types": relationship_types}, ) + elif "gds.graph.streamRelationshipProperties" in query: + graph_name = params["graph_name"] + property_names = params["properties"] + relationship_types = params["entities"] + return self._run_arrow_property_get( + graph_name, + "gds.graph.streamRelationshipProperties", + {"relationship_properties": property_names, "relationship_types": relationship_types}, + ) return self._fallback_query_runner.run_query(query, params) diff --git a/graphdatascience/tests/integration/test_graph_ops.py b/graphdatascience/tests/integration/test_graph_ops.py index 4488da182..03e5ca247 100644 --- a/graphdatascience/tests/integration/test_graph_ops.py +++ b/graphdatascience/tests/integration/test_graph_ops.py @@ -217,10 +217,43 @@ def test_graph_streamRelationshipProperty_without_arrow(gds_without_arrow: Graph def test_graph_streamRelationshipProperties(gds: GraphDataScience) -> None: - G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": "relX"}}) + G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) - result = gds.graph.streamRelationshipProperties(G, ["relX"], concurrency=2) - assert {e for e in result["propertyValue"]} == {4, 5, 6} + result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) + assert {e for e in result["relationshipType"]} == {"REL", "REL", "REL"} + assert {e for e in result["relX"]} == {4, 5, 6} + assert {e for e in result["relY"]} == {5, 6, 7} + + +def test_graph_streamRelationshipProperties_old_format(gds: GraphDataScience) -> None: + G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) + + result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], old_format=True, concurrency=2) + + x_values = result[result.relationshipProperty == "relX"] + assert {e for e in x_values["propertyValue"]} == {4, 5, 6} + y_values = result[result.relationshipProperty == "relY"] + assert {e for e in y_values["propertyValue"]} == {5, 6, 7} + + +def test_graph_streamRelationshipProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) + + result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) + print(result.to_markdown()) + assert {e for e in result["relX"]} == {4, 5, 6} + assert {e for e in result["relY"]} == {5, 6, 7} + + +def test_graph_streamRelationshipProperties_without_arrow_old_format(gds_without_arrow: GraphDataScience) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) + + result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], old_format=True, concurrency=2) + + x_values = result[result.relationshipProperty == "relX"] + assert {e for e in x_values["propertyValue"]} == {4, 5, 6} + y_values = result[result.relationshipProperty == "relY"] + assert {e for e in y_values["propertyValue"]} == {5, 6, 7} def test_graph_writeNodeProperties(gds: GraphDataScience) -> None: From 6bb070e0fdad0068567cf4464a4bdaee62ee9ef7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Tue, 17 May 2022 14:54:06 +0200 Subject: [PATCH 3/8] Update changelog --- changelog/1.1.0.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/changelog/1.1.0.md b/changelog/1.1.0.md index 869e19f6e..39dd877f4 100644 --- a/changelog/1.1.0.md +++ b/changelog/1.1.0.md @@ -3,6 +3,10 @@ ## Breaking changes +* The calls to `gds.graph.streamNodeProperties` and `gds.graph.streamRelationshipProperties` by default now return a different format + * The default format will be `nodeId, property1, property2, ...` and `sourceNodeId, targetNodeId, relationshipType, property1, property2, ...`. + * The old format can be returned by setting `old_format` to `True`. + ## New features @@ -21,7 +25,7 @@ ## Improvements -* The functions `gds.graph.streamNodeProperty` and `gds.graph.streamRelationshipProperty` can leverage the Arrow Flight server of GDS to improve throughput. +* The functions `gds.graph.streamNodeProperty`, `gds.graph.streamNodeProperties`, `gds.graph.streamRelationshipProperty` and `gds.graph.streamRelationshipProperties` can leverage the Arrow Flight server of GDS to improve throughput. * Improved error message of `gds.graph.get` to include currently targeted database if graph not found. * Added inline progress bar for project and algorithm procedures with adequate server logging support. From 928c0673ac7bfe5970644403f3fe887adbfd7424 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Tue, 17 May 2022 16:41:36 +0200 Subject: [PATCH 4/8] Make merged property columns for stream property procs the none default - rename flag from `old_format` to `merge_property_columns` --- changelog/1.1.0.md | 6 +- graphdatascience/graph/graph_proc_runner.py | 12 ++-- .../tests/integration/test_graph_ops.py | 55 +++++++++---------- 3 files changed, 35 insertions(+), 38 deletions(-) diff --git a/changelog/1.1.0.md b/changelog/1.1.0.md index 39dd877f4..1cbd41f12 100644 --- a/changelog/1.1.0.md +++ b/changelog/1.1.0.md @@ -2,10 +2,6 @@ ## Breaking changes - -* The calls to `gds.graph.streamNodeProperties` and `gds.graph.streamRelationshipProperties` by default now return a different format - * The default format will be `nodeId, property1, property2, ...` and `sourceNodeId, targetNodeId, relationshipType, property1, property2, ...`. - * The old format can be returned by setting `old_format` to `True`. ## New features @@ -26,6 +22,8 @@ ## Improvements * The functions `gds.graph.streamNodeProperty`, `gds.graph.streamNodeProperties`, `gds.graph.streamRelationshipProperty` and `gds.graph.streamRelationshipProperties` can leverage the Arrow Flight server of GDS to improve throughput. +* The calls to `gds.graph.streamNodeProperties` and `gds.graph.streamRelationshipProperties` can now return data in an improved format. + * By setting `merge_property_columns` to `True` the return format will be `nodeId, property1, property2, ...` and `sourceNodeId, targetNodeId, relationshipType, property1, property2, ...`. * Improved error message of `gds.graph.get` to include currently targeted database if graph not found. * Added inline progress bar for project and algorithm procedures with adequate server logging support. diff --git a/graphdatascience/graph/graph_proc_runner.py b/graphdatascience/graph/graph_proc_runner.py index 837154223..36be79594 100644 --- a/graphdatascience/graph/graph_proc_runner.py +++ b/graphdatascience/graph/graph_proc_runner.py @@ -104,7 +104,7 @@ def streamNodeProperties( G: Graph, node_properties: List[str], node_labels: Strings = ["*"], - old_format: bool = False, + merge_property_columns: bool = False, **config: Any, ) -> DataFrame: self._namespace += ".streamNodeProperties" @@ -112,10 +112,10 @@ def streamNodeProperties( result = self._handle_properties(G, node_properties, node_labels, config) # new format was requested, but the query was run via Cypher - if not old_format and "propertyValue" in result.keys(): + if merge_property_columns and "propertyValue" in result.keys(): return result.pivot_table("propertyValue", "nodeId", columns="nodeProperty") # old format was requested but the query was run via Arrow - elif old_format and "propertyValue" not in result.keys(): + elif not merge_property_columns and "propertyValue" not in result.keys(): return result.melt(id_vars=["nodeId"]).rename( columns={"variable": "nodeProperty", "value": "propertyValue"} ) @@ -138,7 +138,7 @@ def streamRelationshipProperties( G: Graph, relationship_properties: List[str], relationship_types: Strings = ["*"], - old_format: bool = False, + merge_property_columns: bool = False, **config: Any, ) -> DataFrame: self._namespace += ".streamRelationshipProperties" @@ -146,12 +146,12 @@ def streamRelationshipProperties( result = self._handle_properties(G, relationship_properties, relationship_types, config) # new format was requested, but the query was run via Cypher - if not old_format and "propertyValue" in result.keys(): + if merge_property_columns and "propertyValue" in result.keys(): return result.pivot_table( "propertyValue", ["sourceNodeId", "targetNodeId", "relationshipType"], columns="relationshipProperty" ) # old format was requested but the query was run via Arrow - elif old_format and "propertyValue" not in result.keys(): + elif not merge_property_columns and "propertyValue" not in result.keys(): return result.melt(id_vars=["sourceNodeId", "targetNodeId", "relationshipType"]).rename( columns={"variable": "relationshipProperty", "value": "propertyValue"} ) diff --git a/graphdatascience/tests/integration/test_graph_ops.py b/graphdatascience/tests/integration/test_graph_ops.py index 03e5ca247..cf415e4f4 100644 --- a/graphdatascience/tests/integration/test_graph_ops.py +++ b/graphdatascience/tests/integration/test_graph_ops.py @@ -167,14 +167,6 @@ def test_graph_streamNodeProperties(gds: GraphDataScience) -> None: G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") result = gds.graph.streamNodeProperties(G, ["x", "y"], concurrency=2) - assert {e for e in result["x"]} == {1, 2, 3} - assert {e for e in result["y"]} == {2, 3, 4} - - -def test_graph_streamNodeProperties_old_format(gds: GraphDataScience) -> None: - G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") - - result = gds.graph.streamNodeProperties(G, ["x", "y"], old_format=True, concurrency=2) x_values = result[result.nodeProperty == "x"] assert {e for e in x_values["propertyValue"]} == {1, 2, 3} @@ -183,18 +175,18 @@ def test_graph_streamNodeProperties_old_format(gds: GraphDataScience) -> None: assert {e for e in y_values["propertyValue"]} == {2, 3, 4} -def test_graph_streamNodeProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None: - G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") +def test_graph_streamNodeProperties_merge_property_columns(gds: GraphDataScience) -> None: + G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") - result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], concurrency=2) + result = gds.graph.streamNodeProperties(G, ["x", "y"], merge_property_columns=True, concurrency=2) assert {e for e in result["x"]} == {1, 2, 3} assert {e for e in result["y"]} == {2, 3, 4} -def test_graph_streamNodeProperties_without_arrow_old_format(gds_without_arrow: GraphDataScience) -> None: +def test_graph_streamNodeProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None: G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") - result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], old_format=True, concurrency=2) + result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], concurrency=2) x_values = result[result.nodeProperty == "x"] assert {e for e in x_values["propertyValue"]} == {1, 2, 3} @@ -202,6 +194,14 @@ def test_graph_streamNodeProperties_without_arrow_old_format(gds_without_arrow: assert {e for e in y_values["propertyValue"]} == {2, 3, 4} +def test_graph_streamNodeProperties_without_arrow_merge_property_columns(gds_without_arrow: GraphDataScience) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") + + result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], merge_property_columns=True, concurrency=2) + assert {e for e in result["x"]} == {1, 2, 3} + assert {e for e in result["y"]} == {2, 3, 4} + + def test_graph_streamRelationshipProperty(gds: GraphDataScience) -> None: G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": "relX"}}) @@ -220,15 +220,6 @@ def test_graph_streamRelationshipProperties(gds: GraphDataScience) -> None: G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) - assert {e for e in result["relationshipType"]} == {"REL", "REL", "REL"} - assert {e for e in result["relX"]} == {4, 5, 6} - assert {e for e in result["relY"]} == {5, 6, 7} - - -def test_graph_streamRelationshipProperties_old_format(gds: GraphDataScience) -> None: - G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) - - result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], old_format=True, concurrency=2) x_values = result[result.relationshipProperty == "relX"] assert {e for e in x_values["propertyValue"]} == {4, 5, 6} @@ -236,19 +227,19 @@ def test_graph_streamRelationshipProperties_old_format(gds: GraphDataScience) -> assert {e for e in y_values["propertyValue"]} == {5, 6, 7} -def test_graph_streamRelationshipProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None: - G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) +def test_graph_streamRelationshipProperties_merge_property_columns(gds: GraphDataScience) -> None: + G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) - result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) - print(result.to_markdown()) + result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], merge_property_columns=True, concurrency=2) + assert {e for e in result["relationshipType"]} == {"REL", "REL", "REL"} assert {e for e in result["relX"]} == {4, 5, 6} assert {e for e in result["relY"]} == {5, 6, 7} -def test_graph_streamRelationshipProperties_without_arrow_old_format(gds_without_arrow: GraphDataScience) -> None: +def test_graph_streamRelationshipProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None: G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) - result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], old_format=True, concurrency=2) + result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) x_values = result[result.relationshipProperty == "relX"] assert {e for e in x_values["propertyValue"]} == {4, 5, 6} @@ -256,6 +247,14 @@ def test_graph_streamRelationshipProperties_without_arrow_old_format(gds_without assert {e for e in y_values["propertyValue"]} == {5, 6, 7} +def test_graph_streamRelationshipProperties_without_arrow_merge_property_columns(gds_without_arrow: GraphDataScience) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) + + result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], merge_property_columns=True, concurrency=2) + assert {e for e in result["relX"]} == {4, 5, 6} + assert {e for e in result["relY"]} == {5, 6, 7} + + def test_graph_writeNodeProperties(gds: GraphDataScience) -> None: G, _ = gds.graph.project(GRAPH_NAME, "*", "*") From c76fd66f4e719b0a6c7488b6540a203a761203cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Tue, 17 May 2022 16:42:48 +0200 Subject: [PATCH 5/8] Remove tabulate dev dependency --- requirements/dev.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements/dev.txt b/requirements/dev.txt index 9bce84f0f..969580bd7 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -7,5 +7,4 @@ pandas-stubs pytest pytest-annotate tox -types-setuptools -tabulate +types-setuptools \ No newline at end of file From 650d755464b1c58b7571792c7aae0dfa0b0e0d44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Wed, 18 May 2022 17:25:46 +0200 Subject: [PATCH 6/8] Rename `merge_property_columns` to `separate_property_columns` Co-Authored-By: Adam Schill Collberg --- changelog/1.1.0.md | 2 +- graphdatascience/graph/graph_proc_runner.py | 12 ++++++------ .../tests/integration/test_graph_ops.py | 16 ++++++++++------ 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/changelog/1.1.0.md b/changelog/1.1.0.md index 1cbd41f12..7b57f536d 100644 --- a/changelog/1.1.0.md +++ b/changelog/1.1.0.md @@ -23,7 +23,7 @@ * The functions `gds.graph.streamNodeProperty`, `gds.graph.streamNodeProperties`, `gds.graph.streamRelationshipProperty` and `gds.graph.streamRelationshipProperties` can leverage the Arrow Flight server of GDS to improve throughput. * The calls to `gds.graph.streamNodeProperties` and `gds.graph.streamRelationshipProperties` can now return data in an improved format. - * By setting `merge_property_columns` to `True` the return format will be `nodeId, property1, property2, ...` and `sourceNodeId, targetNodeId, relationshipType, property1, property2, ...`. + * By setting `separate_property_columns` to `True` the return format will be `nodeId, property1, property2, ...` and `sourceNodeId, targetNodeId, relationshipType, property1, property2, ...`. * Improved error message of `gds.graph.get` to include currently targeted database if graph not found. * Added inline progress bar for project and algorithm procedures with adequate server logging support. diff --git a/graphdatascience/graph/graph_proc_runner.py b/graphdatascience/graph/graph_proc_runner.py index 36be79594..820c462ce 100644 --- a/graphdatascience/graph/graph_proc_runner.py +++ b/graphdatascience/graph/graph_proc_runner.py @@ -104,7 +104,7 @@ def streamNodeProperties( G: Graph, node_properties: List[str], node_labels: Strings = ["*"], - merge_property_columns: bool = False, + separate_property_columns: bool = False, **config: Any, ) -> DataFrame: self._namespace += ".streamNodeProperties" @@ -112,10 +112,10 @@ def streamNodeProperties( result = self._handle_properties(G, node_properties, node_labels, config) # new format was requested, but the query was run via Cypher - if merge_property_columns and "propertyValue" in result.keys(): + if separate_property_columns and "propertyValue" in result.keys(): return result.pivot_table("propertyValue", "nodeId", columns="nodeProperty") # old format was requested but the query was run via Arrow - elif not merge_property_columns and "propertyValue" not in result.keys(): + elif not separate_property_columns and "propertyValue" not in result.keys(): return result.melt(id_vars=["nodeId"]).rename( columns={"variable": "nodeProperty", "value": "propertyValue"} ) @@ -138,7 +138,7 @@ def streamRelationshipProperties( G: Graph, relationship_properties: List[str], relationship_types: Strings = ["*"], - merge_property_columns: bool = False, + separate_property_columns: bool = False, **config: Any, ) -> DataFrame: self._namespace += ".streamRelationshipProperties" @@ -146,12 +146,12 @@ def streamRelationshipProperties( result = self._handle_properties(G, relationship_properties, relationship_types, config) # new format was requested, but the query was run via Cypher - if merge_property_columns and "propertyValue" in result.keys(): + if separate_property_columns and "propertyValue" in result.keys(): return result.pivot_table( "propertyValue", ["sourceNodeId", "targetNodeId", "relationshipType"], columns="relationshipProperty" ) # old format was requested but the query was run via Arrow - elif not merge_property_columns and "propertyValue" not in result.keys(): + elif not separate_property_columns and "propertyValue" not in result.keys(): return result.melt(id_vars=["sourceNodeId", "targetNodeId", "relationshipType"]).rename( columns={"variable": "relationshipProperty", "value": "propertyValue"} ) diff --git a/graphdatascience/tests/integration/test_graph_ops.py b/graphdatascience/tests/integration/test_graph_ops.py index cf415e4f4..2cb89e11c 100644 --- a/graphdatascience/tests/integration/test_graph_ops.py +++ b/graphdatascience/tests/integration/test_graph_ops.py @@ -178,7 +178,7 @@ def test_graph_streamNodeProperties(gds: GraphDataScience) -> None: def test_graph_streamNodeProperties_merge_property_columns(gds: GraphDataScience) -> None: G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") - result = gds.graph.streamNodeProperties(G, ["x", "y"], merge_property_columns=True, concurrency=2) + result = gds.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2) assert {e for e in result["x"]} == {1, 2, 3} assert {e for e in result["y"]} == {2, 3, 4} @@ -197,7 +197,7 @@ def test_graph_streamNodeProperties_without_arrow(gds_without_arrow: GraphDataSc def test_graph_streamNodeProperties_without_arrow_merge_property_columns(gds_without_arrow: GraphDataScience) -> None: G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") - result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], merge_property_columns=True, concurrency=2) + result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2) assert {e for e in result["x"]} == {1, 2, 3} assert {e for e in result["y"]} == {2, 3, 4} @@ -230,7 +230,7 @@ def test_graph_streamRelationshipProperties(gds: GraphDataScience) -> None: def test_graph_streamRelationshipProperties_merge_property_columns(gds: GraphDataScience) -> None: G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) - result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], merge_property_columns=True, concurrency=2) + result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], separate_property_columns=True, concurrency=2) assert {e for e in result["relationshipType"]} == {"REL", "REL", "REL"} assert {e for e in result["relX"]} == {4, 5, 6} assert {e for e in result["relY"]} == {5, 6, 7} @@ -239,7 +239,7 @@ def test_graph_streamRelationshipProperties_merge_property_columns(gds: GraphDat def test_graph_streamRelationshipProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None: G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) - result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) + result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) x_values = result[result.relationshipProperty == "relX"] assert {e for e in x_values["propertyValue"]} == {4, 5, 6} @@ -247,10 +247,14 @@ def test_graph_streamRelationshipProperties_without_arrow(gds_without_arrow: Gra assert {e for e in y_values["propertyValue"]} == {5, 6, 7} -def test_graph_streamRelationshipProperties_without_arrow_merge_property_columns(gds_without_arrow: GraphDataScience) -> None: +def test_graph_streamRelationshipProperties_without_arrow_merge_property_columns( + gds_without_arrow: GraphDataScience, +) -> None: G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) - result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], merge_property_columns=True, concurrency=2) + result = gds_without_arrow.graph.streamRelationshipProperties( + G, ["relX", "relY"], separate_property_columns=True, concurrency=2 + ) assert {e for e in result["relX"]} == {4, 5, 6} assert {e for e in result["relY"]} == {5, 6, 7} From 030b27119a383a10a1ad9b6481e69f01e92bca38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Thu, 19 May 2022 10:19:41 +0200 Subject: [PATCH 7/8] Add result mock to CollectingQueryRunner --- graphdatascience/tests/integration/conftest.py | 2 +- graphdatascience/tests/unit/conftest.py | 12 ++++++++++-- graphdatascience/tests/unit/test_graph_ops.py | 17 +++++++++++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/graphdatascience/tests/integration/conftest.py b/graphdatascience/tests/integration/conftest.py index abd91fb04..999667f3c 100644 --- a/graphdatascience/tests/integration/conftest.py +++ b/graphdatascience/tests/integration/conftest.py @@ -9,7 +9,7 @@ URI = os.environ.get("NEO4J_URI", "bolt://localhost:7687") -AUTH = None +AUTH = ("neo4j", "password") if os.environ.get("NEO4J_USER") is not None: AUTH = ( os.environ.get("NEO4J_USER", "DUMMY"), diff --git a/graphdatascience/tests/unit/conftest.py b/graphdatascience/tests/unit/conftest.py index 4f4e0e89a..4eb391216 100644 --- a/graphdatascience/tests/unit/conftest.py +++ b/graphdatascience/tests/unit/conftest.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union import pandas import pytest @@ -15,6 +15,7 @@ class CollectingQueryRunner(QueryRunner): def __init__(self, server_version: Union[str, ServerVersion]) -> None: + self._mock_result: Optional[DataFrame] = None self.queries: List[str] = [] self.params: List[Dict[str, Any]] = [] self.server_version = server_version @@ -24,7 +25,11 @@ def run_query(self, query: str, params: Dict[str, Any] = {}) -> DataFrame: self.params.append(params) # This "mock" lets us initialize the GDS object without issues. - return pandas.DataFrame([{"version": str(self.server_version)}]) + return ( + self._mock_result + if self._mock_result is not None + else pandas.DataFrame([{"version": str(self.server_version)}]) + ) def last_query(self) -> str: return self.queries[-1] @@ -41,6 +46,9 @@ def database(self) -> str: def create_graph_constructor(self, _: str, __: int) -> GraphConstructor: raise NotImplementedError + def set__mock_result(self, result: DataFrame) -> None: + self._mock_result = result + @pytest.fixture def runner(server_version: ServerVersion) -> CollectingQueryRunner: diff --git a/graphdatascience/tests/unit/test_graph_ops.py b/graphdatascience/tests/unit/test_graph_ops.py index cd1f7d92e..6a01785fa 100644 --- a/graphdatascience/tests/unit/test_graph_ops.py +++ b/graphdatascience/tests/unit/test_graph_ops.py @@ -1,3 +1,4 @@ +import pandas import pytest from graphdatascience.graph_data_science import GraphDataScience @@ -140,6 +141,8 @@ def test_graph_streamNodeProperty(runner: CollectingQueryRunner, gds: GraphDataS def test_graph_streamNodeProperties(runner: CollectingQueryRunner, gds: GraphDataScience) -> None: G, _ = gds.graph.project("g", "*", "*") + runner.set__mock_result(pandas.DataFrame([{"nodeId": 0, "dummyProp": 2}])) + gds.graph.streamNodeProperties(G, ["dummyProp"], concurrency=2) assert runner.last_query() == "CALL gds.graph.streamNodeProperties($graph_name, $properties, $entities, $config)" assert runner.last_params() == { @@ -188,6 +191,20 @@ def test_graph_streamRelationshipProperty(runner: CollectingQueryRunner, gds: Gr def test_graph_streamRelationshipProperties(runner: CollectingQueryRunner, gds: GraphDataScience) -> None: G, _ = gds.graph.project("g", "*", "*") + result_df = pandas.DataFrame( + [ + { + "sourceNodeId": 0, + "targetNodeId": 1, + "relationshipType": "REL", + "relationshipProperty": "dummyProp", + "propertyValue": 2, + } + ] + ) + + runner.set__mock_result(result_df) + gds.graph.streamRelationshipProperties(G, ["dummyProp"], concurrency=2) assert ( runner.last_query() From 0dd74fd847bd8901e1d33991ba8f2e3875dfc878 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Thu, 19 May 2022 11:32:00 +0200 Subject: [PATCH 8/8] Make sure to return the correct columns for stream[Node/Relationship]Properties --- graphdatascience/graph/graph_proc_runner.py | 4 +- .../tests/integration/test_graph_ops.py | 39 ++++++++++++++++--- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/graphdatascience/graph/graph_proc_runner.py b/graphdatascience/graph/graph_proc_runner.py index 820c462ce..ec956c3a5 100644 --- a/graphdatascience/graph/graph_proc_runner.py +++ b/graphdatascience/graph/graph_proc_runner.py @@ -113,7 +113,7 @@ def streamNodeProperties( # new format was requested, but the query was run via Cypher if separate_property_columns and "propertyValue" in result.keys(): - return result.pivot_table("propertyValue", "nodeId", columns="nodeProperty") + return result.pivot_table("propertyValue", "nodeId", columns="nodeProperty").reset_index() # old format was requested but the query was run via Arrow elif not separate_property_columns and "propertyValue" not in result.keys(): return result.melt(id_vars=["nodeId"]).rename( @@ -149,7 +149,7 @@ def streamRelationshipProperties( if separate_property_columns and "propertyValue" in result.keys(): return result.pivot_table( "propertyValue", ["sourceNodeId", "targetNodeId", "relationshipType"], columns="relationshipProperty" - ) + ).reset_index() # old format was requested but the query was run via Arrow elif not separate_property_columns and "propertyValue" not in result.keys(): return result.melt(id_vars=["sourceNodeId", "targetNodeId", "relationshipType"]).rename( diff --git a/graphdatascience/tests/integration/test_graph_ops.py b/graphdatascience/tests/integration/test_graph_ops.py index 2cb89e11c..84caeca3f 100644 --- a/graphdatascience/tests/integration/test_graph_ops.py +++ b/graphdatascience/tests/integration/test_graph_ops.py @@ -168,6 +168,8 @@ def test_graph_streamNodeProperties(gds: GraphDataScience) -> None: result = gds.graph.streamNodeProperties(G, ["x", "y"], concurrency=2) + assert list(result.keys()) == ["nodeId", "nodeProperty", "propertyValue"] + x_values = result[result.nodeProperty == "x"] assert {e for e in x_values["propertyValue"]} == {1, 2, 3} @@ -175,10 +177,11 @@ def test_graph_streamNodeProperties(gds: GraphDataScience) -> None: assert {e for e in y_values["propertyValue"]} == {2, 3, 4} -def test_graph_streamNodeProperties_merge_property_columns(gds: GraphDataScience) -> None: +def test_graph_streamNodeProperties_separate_property_columns(gds: GraphDataScience) -> None: G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") result = gds.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2) + assert list(result.keys()) == ["nodeId", "x", "y"] assert {e for e in result["x"]} == {1, 2, 3} assert {e for e in result["y"]} == {2, 3, 4} @@ -187,6 +190,9 @@ def test_graph_streamNodeProperties_without_arrow(gds_without_arrow: GraphDataSc G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], concurrency=2) + + assert list(result.keys()) == ["nodeId", "nodeProperty", "propertyValue"] + x_values = result[result.nodeProperty == "x"] assert {e for e in x_values["propertyValue"]} == {1, 2, 3} @@ -194,10 +200,14 @@ def test_graph_streamNodeProperties_without_arrow(gds_without_arrow: GraphDataSc assert {e for e in y_values["propertyValue"]} == {2, 3, 4} -def test_graph_streamNodeProperties_without_arrow_merge_property_columns(gds_without_arrow: GraphDataScience) -> None: +def test_graph_streamNodeProperties_without_arrow_separate_property_columns( + gds_without_arrow: GraphDataScience, +) -> None: G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2) + + assert list(result.keys()) == ["nodeId", "x", "y"] assert {e for e in result["x"]} == {1, 2, 3} assert {e for e in result["y"]} == {2, 3, 4} @@ -221,17 +231,26 @@ def test_graph_streamRelationshipProperties(gds: GraphDataScience) -> None: result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) + assert list(result.keys()) == [ + "sourceNodeId", + "targetNodeId", + "relationshipType", + "relationshipProperty", + "propertyValue", + ] + x_values = result[result.relationshipProperty == "relX"] assert {e for e in x_values["propertyValue"]} == {4, 5, 6} y_values = result[result.relationshipProperty == "relY"] assert {e for e in y_values["propertyValue"]} == {5, 6, 7} -def test_graph_streamRelationshipProperties_merge_property_columns(gds: GraphDataScience) -> None: +def test_graph_streamRelationshipProperties_separate_property_columns(gds: GraphDataScience) -> None: G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], separate_property_columns=True, concurrency=2) - assert {e for e in result["relationshipType"]} == {"REL", "REL", "REL"} + + assert list(result.keys()) == ["sourceNodeId", "targetNodeId", "relationshipType", "relX", "relY"] assert {e for e in result["relX"]} == {4, 5, 6} assert {e for e in result["relY"]} == {5, 6, 7} @@ -241,13 +260,21 @@ def test_graph_streamRelationshipProperties_without_arrow(gds_without_arrow: Gra result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) + assert list(result.keys()) == [ + "sourceNodeId", + "targetNodeId", + "relationshipType", + "relationshipProperty", + "propertyValue", + ] + x_values = result[result.relationshipProperty == "relX"] assert {e for e in x_values["propertyValue"]} == {4, 5, 6} y_values = result[result.relationshipProperty == "relY"] assert {e for e in y_values["propertyValue"]} == {5, 6, 7} -def test_graph_streamRelationshipProperties_without_arrow_merge_property_columns( +def test_graph_streamRelationshipProperties_without_arrow_separate_property_columns( gds_without_arrow: GraphDataScience, ) -> None: G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) @@ -255,6 +282,8 @@ def test_graph_streamRelationshipProperties_without_arrow_merge_property_columns result = gds_without_arrow.graph.streamRelationshipProperties( G, ["relX", "relY"], separate_property_columns=True, concurrency=2 ) + + assert list(result.keys()) == ["sourceNodeId", "targetNodeId", "relationshipType", "relX", "relY"] assert {e for e in result["relX"]} == {4, 5, 6} assert {e for e in result["relY"]} == {5, 6, 7}