Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cugraph-Service Remote Graphs and Algorithm Dispatch #2832

Merged
merged 62 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
2a6c9cf
PropertyGraph set index to vertex and edge ids
eriknw Aug 9, 2022
4c93f77
Update graph_store
eriknw Aug 10, 2022
2631715
flake8
eriknw Aug 10, 2022
ff0f80c
Merge branch 'branch-22.10' into pg_set_index
eriknw Sep 7, 2022
99c2e0e
Set index to vertex or edge IDs in PG for MG
eriknw Sep 14, 2022
1a1e039
Merge branch 'pg_set_index' of https://github.com/eriknw/cugraph into…
alexbarghi-nv Sep 21, 2022
9bbf048
fixes
alexbarghi-nv Sep 21, 2022
4496cba
merge
alexbarghi-nv Oct 12, 2022
ccae80b
Fix concat with different index dtypes in SG PropertyGraph
eriknw Oct 13, 2022
824d083
initial
alexbarghi-nv Oct 17, 2022
5aaa90d
initial work on remote wrappers, very rough
alexbarghi-nv Oct 18, 2022
52fe830
merge resolution
alexbarghi-nv Oct 18, 2022
3221911
additional functionality, v/e counts
alexbarghi-nv Oct 18, 2022
f097043
copyright update
alexbarghi-nv Oct 18, 2022
7d33ed6
additional functions
alexbarghi-nv Oct 18, 2022
d14ae24
quick fix
alexbarghi-nv Oct 18, 2022
10bf725
Merge branch 'sgpg_fix_concat' of https://github.com/eriknw/cugraph i…
alexbarghi-nv Oct 18, 2022
1887ce7
add definition for remote graph, tests for pg
alexbarghi-nv Oct 18, 2022
f598dbe
remove dispatch (will be added in other pr)
alexbarghi-nv Oct 18, 2022
6b34f5b
Merge branch 'branch-22.12' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Oct 18, 2022
8495b70
revert inadvertently changed file
alexbarghi-nv Oct 18, 2022
7ffd777
Merge branch 'branch-22.12' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Oct 19, 2022
5089def
initial changes
alexbarghi-nv Oct 20, 2022
c157076
update version
alexbarghi-nv Oct 20, 2022
bc400ca
Merge branch 'branch-22.12' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Oct 20, 2022
ab3a28e
pull in dispatch from other branch
alexbarghi-nv Oct 20, 2022
438bfff
dispatch
alexbarghi-nv Oct 21, 2022
50fd0df
fix get_vertices(), add tests
alexbarghi-nv Oct 21, 2022
7fe9b0f
tests, fixes
alexbarghi-nv Oct 21, 2022
0a88a52
fix typo
alexbarghi-nv Oct 21, 2022
ae87b94
major changes to update output array/dataframe/tensor handling, unit/…
alexbarghi-nv Oct 25, 2022
ec44561
Merge branch 'cgs-remote-wrappers' of https://github.com/alexbarghi-n…
alexbarghi-nv Oct 25, 2022
c7d7112
fix merge conflict
alexbarghi-nv Oct 25, 2022
5703d41
fix version
alexbarghi-nv Oct 25, 2022
c8379ad
infer default backend
alexbarghi-nv Oct 25, 2022
3aed33d
fix default backend for remote pg
alexbarghi-nv Oct 25, 2022
ce12b47
reverse this commit
alexbarghi-nv Oct 25, 2022
092db5e
Revert "reverse this commit"
alexbarghi-nv Oct 25, 2022
e1a3c1f
Merge branch 'branch-22.12' of https://github.com/rapidsai/cugraph in…
alexbarghi-nv Oct 25, 2022
a66e437
remove useless code from pg, remove print statement
alexbarghi-nv Oct 26, 2022
a18b336
move backend call to methods, add graph() factory, update tests
alexbarghi-nv Oct 26, 2022
865ca44
fix version
alexbarghi-nv Oct 26, 2022
0975038
fix get vertex/edge data with types in cgs handler, minor raii fix, u…
alexbarghi-nv Oct 26, 2022
8f28820
fix version
alexbarghi-nv Oct 26, 2022
c8289f6
update branch
alexbarghi-nv Oct 26, 2022
03a1cf2
minor fix
alexbarghi-nv Oct 26, 2022
e1d4b84
Resolve merge conflict
alexbarghi-nv Nov 1, 2022
bf3df8a
cleanup, fixes for renumbering
alexbarghi-nv Nov 1, 2022
7931832
support for pg api
alexbarghi-nv Nov 1, 2022
668f95f
sampling, algo calls, implicit sg, fixes for multigraph
alexbarghi-nv Nov 2, 2022
3fc56be
fix version
alexbarghi-nv Nov 2, 2022
4955f90
remove print statements
alexbarghi-nv Nov 2, 2022
90f700f
resolve merge conflict
alexbarghi-nv Nov 7, 2022
8dc069e
fix version
alexbarghi-nv Nov 9, 2022
64b7d82
rename columns
alexbarghi-nv Nov 9, 2022
be41c53
switch to import_optional
alexbarghi-nv Nov 9, 2022
8462a24
minor cleanup
alexbarghi-nv Nov 9, 2022
53020e2
prevent copy in numpy to numpy conversion
alexbarghi-nv Nov 9, 2022
ddfb89d
is_mg -> is_multi_gpu
alexbarghi-nv Nov 9, 2022
2548efd
point to new issue
alexbarghi-nv Nov 9, 2022
09a5be4
point to new issue
alexbarghi-nv Nov 9, 2022
1070c7c
add fixme for large graph issue
alexbarghi-nv Nov 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 62 additions & 9 deletions python/cugraph/cugraph/gnn/pyg_extensions/loader/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,76 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from cugraph.structure.graph_implementation import (
simpleDistributedGraphImpl,
simpleGraphImpl,
)
# cuGraph or cuGraph-Service is required; each has its own version of
# import_optional and we need to select the correct one.
try:
alexbarghi-nv marked this conversation as resolved.
Show resolved Hide resolved
from cugraph_service.client.remote_graph_utils import import_optional
except ModuleNotFoundError:
try:
from cugraph.utilities.utils import import_optional
except ModuleNotFoundError:
raise ModuleNotFoundError(
"cuGraph extensions for PyG require cuGraph"
"or cuGraph-Service to be installed."
)

_transform_to_backend_dtype_1d = import_optional("_transform_to_backend_dtype_1d")
cudf = import_optional("cudf")
pandas = import_optional("pandas")


def call_cugraph_algorithm(name, graph, *args, backend="numpy", **kwargs):
"""
Calls a cugraph algorithm for a remote, sg, or mg graph.
Requires either cuGraph or cuGraph-Service to be installed.

name : string
The name of the cuGraph algorithm to run (i.e. uniform_neighbor_sample)
graph : Graph (cuGraph) or RemoteGraph (cuGraph-Service)
The graph to call the algorithm on.
backend : ('cudf', 'pandas', 'cupy', 'numpy', 'torch', 'torch:<device>')
[default = 'numpy']
The backend where the algorithm results will be stored. Only used
if the graph is a remote graph.
"""

if graph.is_remote():
# If the graph is remote, cuGraph-Service must be installed
# Therefore we do not explicitly check that it is available
if name != "uniform_neighbor_sample":
raise ValueError(
f"cuGraph algorithm {name} is not yet supported for RemoteGraph"
)
else:
# TODO eventually replace this with a "call_algorithm call"
sample_result = graph._client.uniform_neighbor_sample(
*args, **kwargs, graph_id=graph._graph_id
)

if backend == "cudf":
df = cudf.DataFrame()
elif backend == "pandas":
df = pandas.DataFrame()
else:
# handle cupy, numpy, torch as dict of arrays/tensors
df = {}

# _transform_to_backend_dtype_1d handles array/Series conversion
for k, v in sample_result.__dict__.items():
df[k] = _transform_to_backend_dtype_1d(
v, series_name=k, backend=backend
)

return df

def call_cugraph_algorithm(name, graph, *args, **kwargs):
# TODO check using graph property in a future PR
if isinstance(graph._Impl, simpleDistributedGraphImpl):
elif graph.is_multi_gpu():
import cugraph.dask

return getattr(cugraph.dask, name)(graph, *args, **kwargs)

# TODO check using graph property in a future PR
elif isinstance(graph._Impl, simpleGraphImpl):
else:
import cugraph

return getattr(cugraph, name)(graph, *args, **kwargs)

# TODO Properly dispatch for cugraph-service.
7 changes: 7 additions & 0 deletions python/cugraph/cugraph/structure/graph_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,13 @@ def is_remote(self):
"""
return False

def is_multi_gpu(self):
"""
Returns True if the graph is a multi-gpu graph; otherwise
returns False.
"""
return isinstance(self._Impl, simpleDistributedGraphImpl)

def to_directed(self):
"""
Return a directed representation of the graph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ class simpleGraphImpl:
edgeWeightCol = "weights"
edgeIdCol = "edge_id"
edgeTypeCol = "edge_type"
srcCol = "src"
dstCol = "dst"

class EdgeList:
def __init__(self, source, destination, edge_attr=None):
self.edgelist_df = cudf.DataFrame()
self.edgelist_df["src"] = source
self.edgelist_df["dst"] = destination
self.edgelist_df[simpleGraphImpl.srcCol] = source
self.edgelist_df[simpleGraphImpl.dstCol] = destination
self.weights = False
if edge_attr is not None:
self.weights = True
Expand Down Expand Up @@ -245,7 +247,12 @@ def __from_edgelist(
value_col=value_col, store_transposed=store_transposed, renumber=renumber
)

def to_pandas_edgelist(self, source="src", destination="dst", weight="weights"):
def to_pandas_edgelist(
self,
source="src",
destination="dst",
weight="weights",
):
"""
Returns the graph edge list as a Pandas DataFrame.

Expand All @@ -266,11 +273,21 @@ def to_pandas_edgelist(self, source="src", destination="dst", weight="weights"):
gdf = self.view_edge_list()
if self.properties.weighted:
gdf.rename(
columns={"src": source, "dst": destination, "weight": weight},
columns={
simpleGraphImpl.srcCol: source,
simpleGraphImpl.dstCol: destination,
"weight": weight,
},
inplace=True,
)
else:
gdf.rename(columns={"src": source, "dst": destination}, inplace=True)
gdf.rename(
columns={
simpleGraphImpl.srcCol: source,
simpleGraphImpl.dstCol: destination,
},
inplace=True,
)
return gdf.to_pandas()

def to_pandas_adjacency(self):
Expand All @@ -296,9 +313,9 @@ def to_numpy_array(self):
df = self.edgelist.edgelist_df
np_array = np.full((nlen, nlen), 0.0)
for i in range(0, elen):
np_array[df["src"].iloc[i], df["dst"].iloc[i]] = df[
self.edgeWeightCol
].iloc[i]
np_array[
df[simpleGraphImpl.srcCol].iloc[i], df[simpleGraphImpl.dstCol].iloc[i]
] = df[self.edgeWeightCol].iloc[i]
return np_array

def to_numpy_matrix(self):
Expand Down Expand Up @@ -345,11 +362,18 @@ def view_edge_list(self):
edgelist_df = self.edgelist.edgelist_df

if self.properties.renumbered:
edgelist_df = self.renumber_map.unrenumber(edgelist_df, "src")
edgelist_df = self.renumber_map.unrenumber(edgelist_df, "dst")
edgelist_df = self.renumber_map.unrenumber(
edgelist_df, simpleGraphImpl.srcCol
)
edgelist_df = self.renumber_map.unrenumber(
edgelist_df, simpleGraphImpl.dstCol
)

if not self.properties.directed:
edgelist_df = edgelist_df[edgelist_df["src"] <= edgelist_df["dst"]]
edgelist_df = edgelist_df[
edgelist_df[simpleGraphImpl.srcCol]
<= edgelist_df[simpleGraphImpl.dstCol]
]
edgelist_df = edgelist_df.reset_index(drop=True)
self.properties.edge_count = len(edgelist_df)

Expand Down Expand Up @@ -576,7 +600,9 @@ def number_of_vertices(self):
elif self.transposedadjlist is not None:
self.properties.node_count = len(self.transposedadjlist.offsets) - 1
elif self.edgelist is not None:
df = self.edgelist.edgelist_df[["src", "dst"]]
df = self.edgelist.edgelist_df[
[simpleGraphImpl.srcCol, simpleGraphImpl.dstCol]
]
self.properties.node_count = df.max().max() + 1
else:
raise RuntimeError("Graph is Empty")
Expand All @@ -601,8 +627,8 @@ def number_of_edges(self, directed_edges=False):
if self.properties.directed is False:
self.properties.edge_count = len(
self.edgelist.edgelist_df[
self.edgelist.edgelist_df["src"]
>= self.edgelist.edgelist_df["dst"]
self.edgelist.edgelist_df[simpleGraphImpl.srcCol]
>= self.edgelist.edgelist_df[simpleGraphImpl.dstCol]
]
)
else:
Expand Down Expand Up @@ -852,8 +878,8 @@ def _make_plc_graph(self, value_col=None, store_transposed=False, renumber=True)
self._plc_graph = SGGraph(
resource_handle=ResourceHandle(),
graph_properties=graph_props,
src_array=self.edgelist.edgelist_df["src"],
dst_array=self.edgelist.edgelist_df["dst"],
src_array=self.edgelist.edgelist_df[simpleGraphImpl.srcCol],
dst_array=self.edgelist.edgelist_df[simpleGraphImpl.dstCol],
weight_array=weight_col,
edge_id_array=id_col,
edge_type_array=type_col,
Expand Down Expand Up @@ -901,10 +927,15 @@ def to_undirected(self, G, store_transposed=False):
df = self.edgelist.edgelist_df
if self.edgelist.weights:
source_col, dest_col, value_col = symmetrize(
df, "src", "dst", simpleGraphImpl.edgeWeightCol
df,
simpleGraphImpl.srcCol,
simpleGraphImpl.dstCol,
simpleGraphImpl.edgeWeightCol,
)
else:
source_col, dest_col = symmetrize(df, "src", "dst")
source_col, dest_col = symmetrize(
df, simpleGraphImpl.srcCol, simpleGraphImpl.dstCol
)
value_col = None
G.edgelist = simpleGraphImpl.EdgeList(source_col, dest_col, value_col)

Expand All @@ -923,25 +954,29 @@ def has_node(self, n):
tmp = self.renumber_map.to_internal_vertex_id(cudf.Series([n]))
return tmp[0] is not cudf.NA and tmp[0] >= 0
else:
df = self.edgelist.edgelist_df[["src", "dst"]]
df = self.edgelist.edgelist_df[
[simpleGraphImpl.srcCol, simpleGraphImpl.dstCol]
]
return (df == n).any().any()

def has_edge(self, u, v):
"""
Returns True if the graph contains the edge (u,v).
"""
if self.properties.renumbered:
tmp = cudf.DataFrame({"src": [u, v]})
tmp = tmp.astype({"src": "int"})
tmp = cudf.DataFrame({simpleGraphImpl.srcCol: [u, v]})
tmp = tmp.astype({simpleGraphImpl.srcCol: "int"})
tmp = self.renumber_map.add_internal_vertex_id(
tmp, "id", "src", preserve_order=True
tmp, "id", simpleGraphImpl.srcCol, preserve_order=True
)

u = tmp["id"][0]
v = tmp["id"][1]

df = self.edgelist.edgelist_df
return ((df["src"] == u) & (df["dst"] == v)).any()
return (
(df[simpleGraphImpl.srcCol] == u) & (df[simpleGraphImpl.dstCol] == v)
).any()

def has_self_loop(self):
"""
Expand All @@ -950,7 +985,7 @@ def has_self_loop(self):
# Detect self loop
if self.properties.self_loop is None:
elist = self.edgelist.edgelist_df
if (elist["src"] == elist["dst"]).any():
if (elist[simpleGraphImpl.srcCol] == elist[simpleGraphImpl.dstCol]).any():
self.properties.self_loop = True
else:
self.properties.self_loop = False
Expand All @@ -962,7 +997,7 @@ def edges(self):
sources and destinations. It does not return the edge weights.
For viewing edges with weights use view_edge_list()
"""
return self.view_edge_list()[["src", "dst"]]
return self.view_edge_list()[[simpleGraphImpl.srcCol, simpleGraphImpl.dstCol]]

def nodes(self):
"""
Expand All @@ -981,7 +1016,9 @@ def nodes(self):
else:
return df[df.columns[0]]
else:
return cudf.concat([df["src"], df["dst"]]).unique()
return cudf.concat(
[df[simpleGraphImpl.srcCol], df[simpleGraphImpl.dstCol]]
).unique()
if self.adjlist is not None:
return cudf.Series(np.arange(0, self.number_of_nodes()))

Expand All @@ -995,7 +1032,9 @@ def neighbors(self, n):
n = node[0]

df = self.edgelist.edgelist_df
neighbors = df[df["src"] == n]["dst"].reset_index(drop=True)
neighbors = df[df[simpleGraphImpl.srcCol] == n][
simpleGraphImpl.dstCol
].reset_index(drop=True)
if self.properties.renumbered:
# FIXME: Multi-column vertices
return self.renumber_map.from_internal_vertex_id(neighbors)["0"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,3 @@

from cugraph_service_client.client import CugraphServiceClient
from cugraph_service_client.remote_graph import RemoteGraph
from cugraph_service_client.remote_graph import RemotePropertyGraph
22 changes: 11 additions & 11 deletions python/cugraph_service/cugraph_service_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import cupy as cp

from cugraph_service_client import defaults
from cugraph_service_client.remote_graph import RemoteGraph
from cugraph_service_client import extension_return_dtype_map
from cugraph_service_client.remote_graph import RemotePropertyGraph
from cugraph_service_client.types import (
ValueWrapper,
GraphVertexEdgeID,
Expand Down Expand Up @@ -515,9 +515,9 @@ def delete_graph(self, graph_id):

def graph(self):
"""
Constructs an empty RemotePropertyGraph object.
Constructs a new RemoteGraph object wrapping a remote PropertyGraph.
"""
return RemotePropertyGraph(self, self.create_graph())
return RemoteGraph(self, self.create_graph())

@__server_connection
def get_graph_ids(self):
Expand Down Expand Up @@ -797,7 +797,7 @@ def extract_subgraph(
selection=None,
edge_weight_property="",
default_edge_weight=1.0,
allow_multi_edges=False,
check_multi_edges=True,
renumber_graph=True,
add_edge_data=True,
graph_id=defaults.graph_id,
Expand All @@ -811,7 +811,7 @@ def extract_subgraph(
create_using : string, default is None
String describing the type of Graph object to create from the
selected subgraph of vertices and edges. The default (None) results
in a cugraph.Graph object.
in a directed cugraph.MultiGraph object.

selection : int, default is None
A PropertySelection ID returned from one or more calls to
Expand All @@ -830,10 +830,10 @@ def extract_subgraph(
The value to use when an edge property is specified but not present
on an edge.

allow_multi_edges : bool
If True, multiple edges should be used to create the resulting
Graph, otherwise multiple edges will be detected and an exception
raised.
check_multi_edges : bool (default is True)
When True and create_using argument is given and not a MultiGraph,
this will perform an expensive check to verify that the edges in
the edge dataframe do not form a multigraph with duplicate edges.

graph_id : int, default is defaults.graph_id
The graph ID to extract the subgraph from. If the ID passed is not
Expand Down Expand Up @@ -861,7 +861,7 @@ def extract_subgraph(
selection,
edge_weight_property,
default_edge_weight,
allow_multi_edges,
check_multi_edges,
renumber_graph,
add_edge_data,
graph_id,
Expand Down Expand Up @@ -979,7 +979,7 @@ def get_graph_edge_data(
def is_vertex_property(self, property_key, graph_id=defaults.graph_id):
"""
Returns True if the given property key is for a valid vertex property
in the given graph, false otherwise.e
in the given graph, False otherwise.

Parameters
----------
Expand Down
Loading