Skip to content

Commit

Permalink
Renumber PG to be contiguous per type (#2697)
Browse files Browse the repository at this point in the history
Closes #2627 and CC @VibhuJawa 

Currently only SG as we hammer out the API and behavior.

This returns a dataframe with start and stop for each type.  Should stop be inclusive or exclusive?

How should we handle vertex ids that only exist in edge data?  Should we raise (for now) if this condition exists?  I think we can handle this without _too_ much difficulty, but it will take more work.

Since we number edge data, I think edge IDs will often be added in a way that is already contiguous per type.  We could keep track of this to avoid unnecessary computation.

Also, I want to confirm that we _cannot_ have multiple rows for a single vertex ID, right?  I think we settled on this.  Multiple rows with the same ID would cause a problem with the current implementation--it currently gives each row a unique ID.

Authors:
  - Erik Welch (https://github.com/eriknw)

Approvers:
  - Vibhu Jawa (https://github.com/VibhuJawa)
  - Alex Barghi (https://github.com/alexbarghi-nv)
  - Rick Ratzel (https://github.com/rlratzel)

URL: #2697
  • Loading branch information
eriknw committed Sep 21, 2022
1 parent 1232037 commit ae69a29
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 0 deletions.
88 changes: 88 additions & 0 deletions python/cugraph/cugraph/dask/structure/mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,94 @@ def edge_props_to_graph(self,

return G

def renumber_vertices_by_type(self):
"""Renumber vertex IDs to be contiguous by type.
Returns a DataFrame with the start and stop IDs for each vertex type.
Stop is *inclusive*.
"""
# Check if some vertex IDs exist only in edge data
default = self._default_type_name
if (
self.__edge_prop_dataframe is not None
and self.get_num_vertices(default, include_edge_data=True)
!= self.get_num_vertices(default, include_edge_data=False)
):
raise NotImplementedError(
"Currently unable to renumber vertices when some vertex "
"IDs only exist in edge data"
)
if self.__vertex_prop_dataframe is None:
return None
# We'll need to update this when index is vertex ID
df = (
self.__vertex_prop_dataframe
.sort_values(by=self.type_col_name)
)
if self.__edge_prop_dataframe is not None:
new_name = f"new_{self.vertex_col_name}"
df[new_name] = 1
df[new_name] = df[new_name].cumsum() - 1
mapper = df[[self.vertex_col_name, new_name]]
self.__edge_prop_dataframe = (
self.__edge_prop_dataframe
# map src_col_name IDs
.merge(mapper, left_on=self.src_col_name,
right_on=self.vertex_col_name)
.drop(columns=[self.src_col_name])
.rename(columns={new_name: self.src_col_name})
# map dst_col_name IDs
.merge(mapper, left_on=self.dst_col_name,
right_on=self.vertex_col_name)
.drop(columns=[self.dst_col_name])
.rename(columns={new_name: self.dst_col_name})
)
df[self.vertex_col_name] = df[new_name]
del df[new_name]
else:
df[self.vertex_col_name] = 1
df[self.vertex_col_name] = df[self.vertex_col_name].cumsum() - 1

self.__vertex_prop_dataframe = df
rv = (
self._vertex_type_value_counts
.sort_index()
.cumsum()
.to_frame("stop")
)
rv["start"] = rv["stop"].shift(1, fill_value=0)
rv["stop"] -= 1 # Make inclusive
return rv[["start", "stop"]]

def renumber_edges_by_type(self):
"""Renumber edge IDs to be contiguous by type.
Returns a DataFrame with the start and stop IDs for each edge type.
Stop is *inclusive*.
"""
# TODO: keep track if edges are already numbered correctly.
if self.__edge_prop_dataframe is None:
return None
# We'll need to update this when index is edge ID
self.__edge_prop_dataframe = (
self.__edge_prop_dataframe
.drop(columns=[self.edge_id_col_name])
.sort_values(by=self.type_col_name, ignore_index=True)
)
self.__edge_prop_dataframe[self.edge_id_col_name] = 1
self.__edge_prop_dataframe[self.edge_id_col_name] = (
self.__edge_prop_dataframe[self.edge_id_col_name].cumsum() - 1
)
rv = (
self._edge_type_value_counts
.sort_index()
.cumsum()
.to_frame("stop")
)
rv["start"] = rv["stop"].shift(1, fill_value=0)
rv["stop"] -= 1 # Make inclusive
return rv[["start", "stop"]]

@classmethod
def has_duplicate_edges(cls, df):
"""
Expand Down
75 changes: 75 additions & 0 deletions python/cugraph/cugraph/structure/property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,81 @@ def edge_props_to_graph(self,

return G

def renumber_vertices_by_type(self):
"""Renumber vertex IDs to be contiguous by type.
Returns a DataFrame with the start and stop IDs for each vertex type.
Stop is *inclusive*.
"""
# Check if some vertex IDs exist only in edge data
default = self._default_type_name
if (
self.__edge_prop_dataframe is not None
and self.get_num_vertices(default, include_edge_data=True)
!= self.get_num_vertices(default, include_edge_data=False)
):
raise NotImplementedError(
"Currently unable to renumber vertices when some vertex "
"IDs only exist in edge data"
)
if self.__vertex_prop_dataframe is None:
return None
# We'll need to update this when index is vertex ID
df = (
self.__vertex_prop_dataframe
.sort_values(by=self.type_col_name)
)
if self.__edge_prop_dataframe is not None:
mapper = self.__series_type(
df.index, index=df[self.vertex_col_name]
)
self.__edge_prop_dataframe[self.src_col_name] = (
self.__edge_prop_dataframe[self.src_col_name].map(mapper)
)
self.__edge_prop_dataframe[self.dst_col_name] = (
self.__edge_prop_dataframe[self.dst_col_name].map(mapper)
)
df.drop(columns=[self.vertex_col_name], inplace=True)
df.index.name = self.vertex_col_name
df.reset_index(inplace=True)
self.__vertex_prop_dataframe = df
rv = (
self._vertex_type_value_counts
.sort_index()
.cumsum()
.to_frame("stop")
)
rv["start"] = rv["stop"].shift(1, fill_value=0)
rv["stop"] -= 1 # Make inclusive
return rv[["start", "stop"]]

def renumber_edges_by_type(self):
"""Renumber edge IDs to be contiguous by type.
Returns a DataFrame with the start and stop IDs for each edge type.
Stop is *inclusive*.
"""
# TODO: keep track if edges are already numbered correctly.
if self.__edge_prop_dataframe is None:
return None
# We'll need to update this when index is edge ID
self.__edge_prop_dataframe = (
self.__edge_prop_dataframe
.drop(columns=[self.edge_id_col_name])
.sort_values(by=self.type_col_name, ignore_index=True)
)
self.__edge_prop_dataframe.index.name = self.edge_id_col_name
self.__edge_prop_dataframe.reset_index(inplace=True)
rv = (
self._edge_type_value_counts
.sort_index()
.cumsum()
.to_frame("stop")
)
rv["start"] = rv["stop"].shift(1, fill_value=0)
rv["stop"] -= 1 # Make inclusive
return rv[["start", "stop"]]

@classmethod
def has_duplicate_edges(cls, df):
"""
Expand Down
53 changes: 53 additions & 0 deletions python/cugraph/cugraph/tests/mg/test_mg_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,56 @@ def test_get_data_empty_graphs(dask_client):
assert pG.get_vertex_data([0, 1, 2]) is None
assert pG.get_edge_data() is None
assert pG.get_edge_data([0, 1, 2]) is None


def test_renumber_vertices_by_type(dataset1_MGPropertyGraph):
from cugraph.experimental import MGPropertyGraph

(pG, data) = dataset1_MGPropertyGraph
df_id_ranges = pG.renumber_vertices_by_type()
expected = {
"merchants": [0, 4], # stop is inclusive
"users": [5, 8],
}
for key, (start, stop) in expected.items():
assert df_id_ranges.loc[key, "start"] == start
assert df_id_ranges.loc[key, "stop"] == stop
df = pG.get_vertex_data(types=[key]).compute()
assert len(df) == stop - start + 1
assert (df["_VERTEX_"] == list(range(start, stop + 1))).all()

# Make sure we renumber vertex IDs in edge data too
df = pG.get_edge_data().compute()
assert 0 <= df[pG.src_col_name].min() < df[pG.src_col_name].max() < 9
assert 0 <= df[pG.dst_col_name].min() < df[pG.dst_col_name].max() < 9

empty_pG = MGPropertyGraph()
assert empty_pG.renumber_vertices_by_type() is None

# Test when vertex IDs only exist in edge data
df = cudf.DataFrame({"src": [99998], "dst": [99999]})
df = dask_cudf.from_cudf(df, npartitions=1)
empty_pG.add_edge_data(df, ["src", "dst"])
with pytest.raises(NotImplementedError, match="only exist in edge"):
empty_pG.renumber_vertices_by_type()


def test_renumber_edges_by_type(dataset1_MGPropertyGraph):
from cugraph.experimental import MGPropertyGraph

(pG, data) = dataset1_MGPropertyGraph
df_id_ranges = pG.renumber_edges_by_type()
expected = {
"referrals": [0, 5], # stop is inclusive
"relationships": [6, 9],
"transactions": [10, 13],
}
for key, (start, stop) in expected.items():
assert df_id_ranges.loc[key, "start"] == start
assert df_id_ranges.loc[key, "stop"] == stop
df = pG.get_edge_data(types=[key]).compute()
assert len(df) == stop - start + 1
assert (df[pG.edge_id_col_name] == list(range(start, stop + 1))).all()

empty_pG = MGPropertyGraph()
assert empty_pG.renumber_edges_by_type() is None
52 changes: 52 additions & 0 deletions python/cugraph/cugraph/tests/test_property_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,58 @@ def test_get_data_empty_graphs():
assert pG.get_edge_data([0, 1, 2]) is None


def test_renumber_vertices_by_type(dataset1_PropertyGraph):
from cugraph.experimental import PropertyGraph

(pG, data) = dataset1_PropertyGraph
df_id_ranges = pG.renumber_vertices_by_type()
expected = {
"merchants": [0, 4], # stop is inclusive
"users": [5, 8],
}
for key, (start, stop) in expected.items():
assert df_id_ranges.loc[key, "start"] == start
assert df_id_ranges.loc[key, "stop"] == stop
df = pG.get_vertex_data(types=[key])
assert len(df) == stop - start + 1
assert (df["_VERTEX_"] == list(range(start, stop + 1))).all()

# Make sure we renumber vertex IDs in edge data too
df = pG.get_edge_data()
assert 0 <= df[pG.src_col_name].min() < df[pG.src_col_name].max() < 9
assert 0 <= df[pG.dst_col_name].min() < df[pG.dst_col_name].max() < 9

empty_pG = PropertyGraph()
assert empty_pG.renumber_vertices_by_type() is None

# Test when vertex IDs only exist in edge data
df = type(df)({"src": [99998], "dst": [99999]})
empty_pG.add_edge_data(df, ["src", "dst"])
with pytest.raises(NotImplementedError, match="only exist in edge"):
empty_pG.renumber_vertices_by_type()


def test_renumber_edges_by_type(dataset1_PropertyGraph):
from cugraph.experimental import PropertyGraph

(pG, data) = dataset1_PropertyGraph
df_id_ranges = pG.renumber_edges_by_type()
expected = {
"referrals": [0, 5], # stop is inclusive
"relationships": [6, 9],
"transactions": [10, 13],
}
for key, (start, stop) in expected.items():
assert df_id_ranges.loc[key, "start"] == start
assert df_id_ranges.loc[key, "stop"] == stop
df = pG.get_edge_data(types=[key])
assert len(df) == stop - start + 1
assert (df[pG.edge_id_col_name] == list(range(start, stop + 1))).all()

empty_pG = PropertyGraph()
assert empty_pG.renumber_edges_by_type() is None


# =============================================================================
# Benchmarks
# =============================================================================
Expand Down

0 comments on commit ae69a29

Please sign in to comment.