Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions changelog/1.1.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


## Breaking changes


## New features

Expand All @@ -21,7 +21,9 @@

## Improvements

* The functions `gds.graph.streamNodeProperty` and `gds.graph.streamRelationshipProperty` can leverage the Arrow Flight server of GDS to improve throughput.
* The functions `gds.graph.streamNodeProperty`, `gds.graph.streamNodeProperties`, `gds.graph.streamRelationshipProperty` and `gds.graph.streamRelationshipProperties` can leverage the Arrow Flight server of GDS to improve throughput.
* The calls to `gds.graph.streamNodeProperties` and `gds.graph.streamRelationshipProperties` can now return data in an improved format.
* By setting `separate_property_columns` to `True` the return format will be `nodeId, property1, property2, ...` and `sourceNodeId, targetNodeId, relationshipType, property1, property2, ...`.
* Improved error message of `gds.graph.get` to include currently targeted database if graph not found.
* Added inline progress bar for project and algorithm procedures with adequate server logging support.

Expand Down
30 changes: 28 additions & 2 deletions graphdatascience/graph/graph_proc_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,23 @@ def streamNodeProperties(
G: Graph,
node_properties: List[str],
node_labels: Strings = ["*"],
separate_property_columns: bool = False,
**config: Any,
) -> DataFrame:
self._namespace += ".streamNodeProperties"

return self._handle_properties(G, node_properties, node_labels, config)
result = self._handle_properties(G, node_properties, node_labels, config)

# new format was requested, but the query was run via Cypher
if separate_property_columns and "propertyValue" in result.keys():
return result.pivot_table("propertyValue", "nodeId", columns="nodeProperty").reset_index()
# old format was requested but the query was run via Arrow
elif not separate_property_columns and "propertyValue" not in result.keys():
return result.melt(id_vars=["nodeId"]).rename(
columns={"variable": "nodeProperty", "value": "propertyValue"}
)

return result

def streamNodeProperty(
self,
Expand All @@ -126,11 +138,25 @@ def streamRelationshipProperties(
G: Graph,
relationship_properties: List[str],
relationship_types: Strings = ["*"],
separate_property_columns: bool = False,
**config: Any,
) -> DataFrame:
self._namespace += ".streamRelationshipProperties"

return self._handle_properties(G, relationship_properties, relationship_types, config)
result = self._handle_properties(G, relationship_properties, relationship_types, config)

# new format was requested, but the query was run via Cypher
if separate_property_columns and "propertyValue" in result.keys():
return result.pivot_table(
"propertyValue", ["sourceNodeId", "targetNodeId", "relationshipType"], columns="relationshipProperty"
).reset_index()
# old format was requested but the query was run via Arrow
elif not separate_property_columns and "propertyValue" not in result.keys():
return result.melt(id_vars=["sourceNodeId", "targetNodeId", "relationshipType"]).rename(
columns={"variable": "relationshipProperty", "value": "propertyValue"}
)

return result

def streamRelationshipProperty(
self,
Expand Down
18 changes: 18 additions & 0 deletions graphdatascience/query_runner/arrow_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ def run_query(self, query: str, params: Dict[str, Any] = {}) -> DataFrame:
return self._run_arrow_property_get(
graph_name, "gds.graph.streamNodeProperty", {"node_property": property_name, "node_labels": node_labels}
)
elif "gds.graph.streamNodeProperties" in query:
graph_name = params["graph_name"]
property_names = params["properties"]
node_labels = params["entities"]
return self._run_arrow_property_get(
graph_name,
"gds.graph.streamNodeProperties",
{"node_properties": property_names, "node_labels": node_labels},
)
elif "gds.graph.streamRelationshipProperty" in query:
graph_name = params["graph_name"]
property_name = params["properties"]
Expand All @@ -54,6 +63,15 @@ def run_query(self, query: str, params: Dict[str, Any] = {}) -> DataFrame:
"gds.graph.streamRelationshipProperty",
{"relationship_property": property_name, "relationship_types": relationship_types},
)
elif "gds.graph.streamRelationshipProperties" in query:
graph_name = params["graph_name"]
property_names = params["properties"]
relationship_types = params["entities"]
return self._run_arrow_property_get(
graph_name,
"gds.graph.streamRelationshipProperties",
{"relationship_properties": property_names, "relationship_types": relationship_types},
)

return self._fallback_query_runner.run_query(query, params)

Expand Down
2 changes: 1 addition & 1 deletion graphdatascience/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

URI = os.environ.get("NEO4J_URI", "bolt://localhost:7687")

AUTH = None
AUTH = ("neo4j", "password")
if os.environ.get("NEO4J_USER") is not None:
AUTH = (
os.environ.get("NEO4J_USER", "DUMMY"),
Expand Down
121 changes: 109 additions & 12 deletions graphdatascience/tests/integration/test_graph_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ def run_around_tests(runner: Neo4jQueryRunner) -> Generator[None, None, None]:
runner.run_query(
"""
CREATE
(a: Node {x: 1}),
(b: Node {x: 2}),
(c: Node {x: 3}),
(a)-[:REL {relX: 4}]->(b),
(a)-[:REL {relX: 5}]->(c),
(b)-[:REL {relX: 6}]->(c),
(a: Node {x: 1, y: 2}),
(b: Node {x: 2, y: 3}),
(c: Node {x: 3, y: 4}),
(a)-[:REL {relX: 4, relY: 5}]->(b),
(a)-[:REL {relX: 5, relY: 6}]->(c),
(b)-[:REL {relX: 6, relY: 7}]->(c),
(b)-[:REL2]->(c)
"""
)
Expand Down Expand Up @@ -164,10 +164,52 @@ def test_graph_streamNodeProperty_without_arrow(gds_without_arrow: GraphDataScie


def test_graph_streamNodeProperties(gds: GraphDataScience) -> None:
G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": "x"}}, "*")
G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*")

result = gds.graph.streamNodeProperties(G, ["x"], concurrency=2)
assert {e for e in result["propertyValue"]} == {1, 2, 3}
result = gds.graph.streamNodeProperties(G, ["x", "y"], concurrency=2)

assert list(result.keys()) == ["nodeId", "nodeProperty", "propertyValue"]

x_values = result[result.nodeProperty == "x"]
assert {e for e in x_values["propertyValue"]} == {1, 2, 3}

y_values = result[result.nodeProperty == "y"]
assert {e for e in y_values["propertyValue"]} == {2, 3, 4}


def test_graph_streamNodeProperties_separate_property_columns(gds: GraphDataScience) -> None:
G, _ = gds.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*")

result = gds.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2)
assert list(result.keys()) == ["nodeId", "x", "y"]
assert {e for e in result["x"]} == {1, 2, 3}
assert {e for e in result["y"]} == {2, 3, 4}


def test_graph_streamNodeProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None:
G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*")

result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], concurrency=2)

assert list(result.keys()) == ["nodeId", "nodeProperty", "propertyValue"]

x_values = result[result.nodeProperty == "x"]
assert {e for e in x_values["propertyValue"]} == {1, 2, 3}

y_values = result[result.nodeProperty == "y"]
assert {e for e in y_values["propertyValue"]} == {2, 3, 4}


def test_graph_streamNodeProperties_without_arrow_separate_property_columns(
gds_without_arrow: GraphDataScience,
) -> None:
G, _ = gds_without_arrow.graph.project(GRAPH_NAME, {"Node": {"properties": ["x", "y"]}}, "*")

result = gds_without_arrow.graph.streamNodeProperties(G, ["x", "y"], separate_property_columns=True, concurrency=2)

assert list(result.keys()) == ["nodeId", "x", "y"]
assert {e for e in result["x"]} == {1, 2, 3}
assert {e for e in result["y"]} == {2, 3, 4}


def test_graph_streamRelationshipProperty(gds: GraphDataScience) -> None:
Expand All @@ -185,10 +227,65 @@ def test_graph_streamRelationshipProperty_without_arrow(gds_without_arrow: Graph


def test_graph_streamRelationshipProperties(gds: GraphDataScience) -> None:
G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": "relX"}})
G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}})

result = gds.graph.streamRelationshipProperties(G, ["relX"], concurrency=2)
assert {e for e in result["propertyValue"]} == {4, 5, 6}
result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2)

assert list(result.keys()) == [
"sourceNodeId",
"targetNodeId",
"relationshipType",
"relationshipProperty",
"propertyValue",
]

x_values = result[result.relationshipProperty == "relX"]
assert {e for e in x_values["propertyValue"]} == {4, 5, 6}
y_values = result[result.relationshipProperty == "relY"]
assert {e for e in y_values["propertyValue"]} == {5, 6, 7}


def test_graph_streamRelationshipProperties_separate_property_columns(gds: GraphDataScience) -> None:
G, _ = gds.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}})

result = gds.graph.streamRelationshipProperties(G, ["relX", "relY"], separate_property_columns=True, concurrency=2)

assert list(result.keys()) == ["sourceNodeId", "targetNodeId", "relationshipType", "relX", "relY"]
assert {e for e in result["relX"]} == {4, 5, 6}
assert {e for e in result["relY"]} == {5, 6, 7}


def test_graph_streamRelationshipProperties_without_arrow(gds_without_arrow: GraphDataScience) -> None:
G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}})

result = gds_without_arrow.graph.streamRelationshipProperties(G, ["relX", "relY"], concurrency=2)

assert list(result.keys()) == [
"sourceNodeId",
"targetNodeId",
"relationshipType",
"relationshipProperty",
"propertyValue",
]

x_values = result[result.relationshipProperty == "relX"]
assert {e for e in x_values["propertyValue"]} == {4, 5, 6}
y_values = result[result.relationshipProperty == "relY"]
assert {e for e in y_values["propertyValue"]} == {5, 6, 7}


def test_graph_streamRelationshipProperties_without_arrow_separate_property_columns(
gds_without_arrow: GraphDataScience,
) -> None:
G, _ = gds_without_arrow.graph.project(GRAPH_NAME, "*", {"REL": {"properties": ["relX", "relY"]}})

result = gds_without_arrow.graph.streamRelationshipProperties(
G, ["relX", "relY"], separate_property_columns=True, concurrency=2
)

assert list(result.keys()) == ["sourceNodeId", "targetNodeId", "relationshipType", "relX", "relY"]
assert {e for e in result["relX"]} == {4, 5, 6}
assert {e for e in result["relY"]} == {5, 6, 7}


def test_graph_writeNodeProperties(gds: GraphDataScience) -> None:
Expand Down
12 changes: 10 additions & 2 deletions graphdatascience/tests/unit/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, Union

import pandas
import pytest
Expand All @@ -15,6 +15,7 @@

class CollectingQueryRunner(QueryRunner):
def __init__(self, server_version: Union[str, ServerVersion]) -> None:
self._mock_result: Optional[DataFrame] = None
self.queries: List[str] = []
self.params: List[Dict[str, Any]] = []
self.server_version = server_version
Expand All @@ -24,7 +25,11 @@ def run_query(self, query: str, params: Dict[str, Any] = {}) -> DataFrame:
self.params.append(params)

# This "mock" lets us initialize the GDS object without issues.
return pandas.DataFrame([{"version": str(self.server_version)}])
return (
self._mock_result
if self._mock_result is not None
else pandas.DataFrame([{"version": str(self.server_version)}])
)

def last_query(self) -> str:
return self.queries[-1]
Expand All @@ -41,6 +46,9 @@ def database(self) -> str:
def create_graph_constructor(self, _: str, __: int) -> GraphConstructor:
raise NotImplementedError

def set__mock_result(self, result: DataFrame) -> None:
self._mock_result = result


@pytest.fixture
def runner(server_version: ServerVersion) -> CollectingQueryRunner:
Expand Down
17 changes: 17 additions & 0 deletions graphdatascience/tests/unit/test_graph_ops.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pandas
import pytest

from graphdatascience.graph_data_science import GraphDataScience
Expand Down Expand Up @@ -140,6 +141,8 @@ def test_graph_streamNodeProperty(runner: CollectingQueryRunner, gds: GraphDataS
def test_graph_streamNodeProperties(runner: CollectingQueryRunner, gds: GraphDataScience) -> None:
G, _ = gds.graph.project("g", "*", "*")

runner.set__mock_result(pandas.DataFrame([{"nodeId": 0, "dummyProp": 2}]))

gds.graph.streamNodeProperties(G, ["dummyProp"], concurrency=2)
assert runner.last_query() == "CALL gds.graph.streamNodeProperties($graph_name, $properties, $entities, $config)"
assert runner.last_params() == {
Expand Down Expand Up @@ -188,6 +191,20 @@ def test_graph_streamRelationshipProperty(runner: CollectingQueryRunner, gds: Gr
def test_graph_streamRelationshipProperties(runner: CollectingQueryRunner, gds: GraphDataScience) -> None:
G, _ = gds.graph.project("g", "*", "*")

result_df = pandas.DataFrame(
[
{
"sourceNodeId": 0,
"targetNodeId": 1,
"relationshipType": "REL",
"relationshipProperty": "dummyProp",
"propertyValue": 2,
}
]
)

runner.set__mock_result(result_df)

gds.graph.streamRelationshipProperties(G, ["dummyProp"], concurrency=2)
assert (
runner.last_query()
Expand Down
2 changes: 1 addition & 1 deletion requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ pandas-stubs
pytest
pytest-annotate
tox
types-setuptools
types-setuptools