diff --git a/changelog/1.1.0.md b/changelog/1.1.0.md index 869e19f6e..7b57f536d 100644 --- a/changelog/1.1.0.md +++ b/changelog/1.1.0.md @@ -2,7 +2,7 @@ ## Breaking changes - + ## New features @@ -21,7 +21,9 @@ ## Improvements -* The functions `gds.graph.streamNodeProperty` and `gds.graph.streamRelationshipProperty` can leverage the Arrow Flight server of GDS to improve throughput. +* The functions `gds.graph.streamNodeProperty`, `gds.graph.streamNodeProperties`, `gds.graph.streamRelationshipProperty` and `gds.graph.streamRelationshipProperties` can leverage the Arrow Flight server of GDS to improve throughput. +* The calls to `gds.graph.streamNodeProperties` and `gds.graph.streamRelationshipProperties` can now return data in an improved format. + * By setting `separate_property_columns` to `True` the return format will be `nodeId, property1, property2, ...` and `sourceNodeId, targetNodeId, relationshipType, property1, property2, ...`. * Improved error message of `gds.graph.get` to include currently targeted database if graph not found. * Added inline progress bar for project and algorithm procedures with adequate server logging support. diff --git a/graphdatascience/graph/graph_proc_runner.py b/graphdatascience/graph/graph_proc_runner.py index 2e829fd06..ec956c3a5 100644 --- a/graphdatascience/graph/graph_proc_runner.py +++ b/graphdatascience/graph/graph_proc_runner.py @@ -104,11 +104,23 @@ def streamNodeProperties( G: Graph, node_properties: List[str], node_labels: Strings = ["*"], + separate_property_columns: bool = False, **config: Any, ) -> DataFrame: self._namespace += ".streamNodeProperties" - return self._handle_properties(G, node_properties, node_labels, config) + result = self._handle_properties(G, node_properties, node_labels, config) + + # new format was requested, but the query was run via Cypher + if separate_property_columns and "propertyValue" in result.keys(): + return result.pivot_table("propertyValue", "nodeId", columns="nodeProperty").reset_index() + # old format was requested but the query was run via Arrow + elif not separate_property_columns and "propertyValue" not in result.keys(): + return result.melt(id_vars=["nodeId"]).rename( + columns={"variable": "nodeProperty", "value": "propertyValue"} + ) + + return result def streamNodeProperty( self, @@ -126,11 +138,25 @@ def streamRelationshipProperties( G: Graph, relationship_properties: List[str], relationship_types: Strings = ["*"], + separate_property_columns: bool = False, **config: Any, ) -> DataFrame: self._namespace += ".streamRelationshipProperties" - return self._handle_properties(G, relationship_properties, relationship_types, config) + result = self._handle_properties(G, relationship_properties, relationship_types, config) + + # new format was requested, but the query was run via Cypher + if separate_property_columns and "propertyValue" in result.keys(): + return result.pivot_table( + "propertyValue", ["sourceNodeId", "targetNodeId", "relationshipType"], columns="relationshipProperty" + ).reset_index() + # old format was requested but the query was run via Arrow + elif not separate_property_columns and "propertyValue" not in result.keys(): + return result.melt(id_vars=["sourceNodeId", "targetNodeId", "relationshipType"]).rename( + columns={"variable": "relationshipProperty", "value": "propertyValue"} + ) + + return result def streamRelationshipProperty( self, diff --git a/graphdatascience/query_runner/arrow_query_runner.py b/graphdatascience/query_runner/arrow_query_runner.py index 6a2d0fd0a..2abc57cab 100644 --- a/graphdatascience/query_runner/arrow_query_runner.py +++ b/graphdatascience/query_runner/arrow_query_runner.py @@ -45,6 +45,15 @@ def run_query(self, query: str, params: Dict[str, Any] = {}) -> DataFrame: return self._run_arrow_property_get( graph_name, "gds.graph.streamNodeProperty", {"node_property": property_name, "node_labels": node_labels} ) + elif "gds.graph.streamNodeProperties" in query: + graph_name = params["graph_name"] + property_names = params["properties"] + node_labels = params["entities"] + return self._run_arrow_property_get( + graph_name, + "gds.graph.streamNodeProperties", + {"node_properties": property_names, "node_labels": node_labels}, + ) elif "gds.graph.streamRelationshipProperty" in query: graph_name = params["graph_name"] property_name = params["properties"] @@ -54,6 +63,15 @@ def run_query(self, query: str, params: Dict[str, Any] = {}) -> DataFrame: "gds.graph.streamRelationshipProperty", {"relationship_property": property_name, "relationship_types": relationship_types}, ) + elif "gds.graph.streamRelationshipProperties" in query: + graph_name = params["graph_name"] + property_names = params["properties"] + relationship_types = params["entities"] + return self._run_arrow_property_get( + graph_name, + "gds.graph.streamRelationshipProperties", + {"relationship_properties": property_names, "relationship_types": relationship_types}, + ) return self._fallback_query_runner.run_query(query, params) diff --git a/graphdatascience/tests/integration/conftest.py b/graphdatascience/tests/integration/conftest.py index abd91fb04..999667f3c 100644 --- a/graphdatascience/tests/integration/conftest.py +++ b/graphdatascience/tests/integration/conftest.py @@ -9,7 +9,7 @@ URI = os.environ.get("NEO4J_URI", "bolt://localhost:7687") -AUTH = None +AUTH = ("neo4j", "password") if os.environ.get("NEO4J_USER") is not None: AUTH = ( os.environ.get("NEO4J_USER", "DUMMY"), diff --git a/graphdatascience/tests/integration/test_graph_ops.py b/graphdatascience/tests/integration/test_graph_ops.py index 8269907b1..84caeca3f 100644 --- a/graphdatascience/tests/integration/test_graph_ops.py +++ b/graphdatascience/tests/integration/test_graph_ops.py @@ -18,12 +18,12 @@ def run_around_tests(runner: Neo4jQueryRunner) -> Generator[None, None, None]: runner.run_query( """ CREATE - (a: Node {x: 1}), - (b: Node {x: 2}), - (c: Node {x: 3}), - (a)-[:REL {relX: 4}]->(b), - (a)-[:REL {relX: 5}]->(c), - (b)-[:REL {relX: 6}]->(c), + (a: Node {x: 1, y: 2}), + (b: Node {x: 2, y: 3}), + (c: Node {x: 3, y: 4}), + (a)-[:REL {relX: 4, relY: 5}]->(b), + (a)-[:REL {relX: 5, relY: 6}]->(c), + (b)-[:REL {relX: 6, relY: 7}]->(c), (b)-[:REL2]->(c) """ ) @@ -164,10 +164,52 @@ def test_graph_streamNodeProperty_without_arrow(gds_without_arrow: GraphDataScie def test_graph_streamNodeProperties(gds: GraphDataScience) -> None: - G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": "x"}}, "*") + G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") - result = gds.graph.streamNodeProperties(G, ["x"], concurrency=2) - assert {e for e in result["propertyValue"]} == {1, 2, 3} + result = gds.graph.streamNodeProperties(G, ["x", "y"], concurrency=2) + + assert list(result.keys()) == ["nodeId", "nodeProperty", "propertyValue"] + + x_values = result[result.nodeProperty == "x"] + assert {e for e in x_values["propertyValue"]} == {1, 2, 3} + + y_values = result[result.nodeProperty == "y"] + assert {e for e in y_values["propertyValue"]} == {2, 3, 4} + + +def test_graph_streamNodeProperties_separate_property_columns(gds: GraphDataScience) -> None: + G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") + + result = gds.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2) + assert list(result.keys()) == ["nodeId", "x", "y"] + assert {e for e in result["x"]} == {1, 2, 3} + assert {e for e in result["y"]} == {2, 3, 4} + + +def test_graph_streamNodeProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") + + result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], concurrency=2) + + assert list(result.keys()) == ["nodeId", "nodeProperty", "propertyValue"] + + x_values = result[result.nodeProperty == "x"] + assert {e for e in x_values["propertyValue"]} == {1, 2, 3} + + y_values = result[result.nodeProperty == "y"] + assert {e for e in y_values["propertyValue"]} == {2, 3, 4} + + +def test_graph_streamNodeProperties_without_arrow_separate_property_columns( + gds_without_arrow: GraphDataScience, +) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*") + + result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2) + + assert list(result.keys()) == ["nodeId", "x", "y"] + assert {e for e in result["x"]} == {1, 2, 3} + assert {e for e in result["y"]} == {2, 3, 4} def test_graph_streamRelationshipProperty(gds: GraphDataScience) -> None: @@ -185,10 +227,65 @@ def test_graph_streamRelationshipProperty_without_arrow(gds_without_arrow: Graph def test_graph_streamRelationshipProperties(gds: GraphDataScience) -> None: - G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": "relX"}}) + G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) - result = gds.graph.streamRelationshipProperties(G, ["relX"], concurrency=2) - assert {e for e in result["propertyValue"]} == {4, 5, 6} + result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) + + assert list(result.keys()) == [ + "sourceNodeId", + "targetNodeId", + "relationshipType", + "relationshipProperty", + "propertyValue", + ] + + x_values = result[result.relationshipProperty == "relX"] + assert {e for e in x_values["propertyValue"]} == {4, 5, 6} + y_values = result[result.relationshipProperty == "relY"] + assert {e for e in y_values["propertyValue"]} == {5, 6, 7} + + +def test_graph_streamRelationshipProperties_separate_property_columns(gds: GraphDataScience) -> None: + G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) + + result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], separate_property_columns=True, concurrency=2) + + assert list(result.keys()) == ["sourceNodeId", "targetNodeId", "relationshipType", "relX", "relY"] + assert {e for e in result["relX"]} == {4, 5, 6} + assert {e for e in result["relY"]} == {5, 6, 7} + + +def test_graph_streamRelationshipProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) + + result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2) + + assert list(result.keys()) == [ + "sourceNodeId", + "targetNodeId", + "relationshipType", + "relationshipProperty", + "propertyValue", + ] + + x_values = result[result.relationshipProperty == "relX"] + assert {e for e in x_values["propertyValue"]} == {4, 5, 6} + y_values = result[result.relationshipProperty == "relY"] + assert {e for e in y_values["propertyValue"]} == {5, 6, 7} + + +def test_graph_streamRelationshipProperties_without_arrow_separate_property_columns( + gds_without_arrow: GraphDataScience, +) -> None: + G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}}) + + result = gds_without_arrow.graph.streamRelationshipProperties( + G, ["relX", "relY"], separate_property_columns=True, concurrency=2 + ) + + assert list(result.keys()) == ["sourceNodeId", "targetNodeId", "relationshipType", "relX", "relY"] + assert {e for e in result["relX"]} == {4, 5, 6} + assert {e for e in result["relY"]} == {5, 6, 7} def test_graph_writeNodeProperties(gds: GraphDataScience) -> None: diff --git a/graphdatascience/tests/unit/conftest.py b/graphdatascience/tests/unit/conftest.py index 4f4e0e89a..4eb391216 100644 --- a/graphdatascience/tests/unit/conftest.py +++ b/graphdatascience/tests/unit/conftest.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union import pandas import pytest @@ -15,6 +15,7 @@ class CollectingQueryRunner(QueryRunner): def __init__(self, server_version: Union[str, ServerVersion]) -> None: + self._mock_result: Optional[DataFrame] = None self.queries: List[str] = [] self.params: List[Dict[str, Any]] = [] self.server_version = server_version @@ -24,7 +25,11 @@ def run_query(self, query: str, params: Dict[str, Any] = {}) -> DataFrame: self.params.append(params) # This "mock" lets us initialize the GDS object without issues. - return pandas.DataFrame([{"version": str(self.server_version)}]) + return ( + self._mock_result + if self._mock_result is not None + else pandas.DataFrame([{"version": str(self.server_version)}]) + ) def last_query(self) -> str: return self.queries[-1] @@ -41,6 +46,9 @@ def database(self) -> str: def create_graph_constructor(self, _: str, __: int) -> GraphConstructor: raise NotImplementedError + def set__mock_result(self, result: DataFrame) -> None: + self._mock_result = result + @pytest.fixture def runner(server_version: ServerVersion) -> CollectingQueryRunner: diff --git a/graphdatascience/tests/unit/test_graph_ops.py b/graphdatascience/tests/unit/test_graph_ops.py index cd1f7d92e..6a01785fa 100644 --- a/graphdatascience/tests/unit/test_graph_ops.py +++ b/graphdatascience/tests/unit/test_graph_ops.py @@ -1,3 +1,4 @@ +import pandas import pytest from graphdatascience.graph_data_science import GraphDataScience @@ -140,6 +141,8 @@ def test_graph_streamNodeProperty(runner: CollectingQueryRunner, gds: GraphDataS def test_graph_streamNodeProperties(runner: CollectingQueryRunner, gds: GraphDataScience) -> None: G, _ = gds.graph.project("g", "*", "*") + runner.set__mock_result(pandas.DataFrame([{"nodeId": 0, "dummyProp": 2}])) + gds.graph.streamNodeProperties(G, ["dummyProp"], concurrency=2) assert runner.last_query() == "CALL gds.graph.streamNodeProperties($graph_name, $properties, $entities, $config)" assert runner.last_params() == { @@ -188,6 +191,20 @@ def test_graph_streamRelationshipProperty(runner: CollectingQueryRunner, gds: Gr def test_graph_streamRelationshipProperties(runner: CollectingQueryRunner, gds: GraphDataScience) -> None: G, _ = gds.graph.project("g", "*", "*") + result_df = pandas.DataFrame( + [ + { + "sourceNodeId": 0, + "targetNodeId": 1, + "relationshipType": "REL", + "relationshipProperty": "dummyProp", + "propertyValue": 2, + } + ] + ) + + runner.set__mock_result(result_df) + gds.graph.streamRelationshipProperties(G, ["dummyProp"], concurrency=2) assert ( runner.last_query() diff --git a/requirements/dev.txt b/requirements/dev.txt index 0a088fc07..969580bd7 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -7,4 +7,4 @@ pandas-stubs pytest pytest-annotate tox -types-setuptools +types-setuptools \ No newline at end of file