diff --git a/python/cugraph/cugraph/dask/structure/mg_property_graph.py b/python/cugraph/cugraph/dask/structure/mg_property_graph.py index 1752d966e62..911a1667410 100644 --- a/python/cugraph/cugraph/dask/structure/mg_property_graph.py +++ b/python/cugraph/cugraph/dask/structure/mg_property_graph.py @@ -12,7 +12,7 @@ # limitations under the License. import cudf - +import numpy as np import cugraph import dask_cudf import cugraph.dask as dcg @@ -488,7 +488,8 @@ def get_vertex_data(self, vertex_ids=None, types=None, columns=None): if isinstance(vertex_ids, int): vertex_ids = [vertex_ids] elif not isinstance(vertex_ids, - (list, slice, self.__series_type)): + (list, slice, np.ndarray, + self.__series_type)): vertex_ids = list(vertex_ids) df = df.loc[vertex_ids] @@ -724,7 +725,8 @@ def get_edge_data(self, edge_ids=None, types=None, columns=None): if isinstance(edge_ids, int): edge_ids = [edge_ids] elif not isinstance(edge_ids, - (list, slice, self.__series_type)): + (list, slice, np.ndarray, + self.__series_type)): edge_ids = list(edge_ids) df = df.loc[edge_ids] diff --git a/python/cugraph/cugraph/gnn/graph_store.py b/python/cugraph/cugraph/gnn/graph_store.py index 3921d19dd90..782c9d596e0 100644 --- a/python/cugraph/cugraph/gnn/graph_store.py +++ b/python/cugraph/cugraph/gnn/graph_store.py @@ -12,6 +12,7 @@ # limitations under the License. from collections import defaultdict + import cudf import dask_cudf import cugraph @@ -51,24 +52,131 @@ def __init__(self, graph, backend_lib="torch"): self.ndata_feat_col_d = defaultdict(list) self.backend_lib = backend_lib - def add_node_data(self, df, node_col_name, feat_name, ntype=None): + def add_node_data( + self, + df, + node_col_name, + feat_name=None, + ntype=None, + is_single_vector_feature=True, + ): + """ + Add a dataframe describing node properties to the PropertyGraph. + + Parameters + ---------- + dataframe : DataFrame-compatible instance + A DataFrame instance with a compatible Pandas-like DataFrame + interface. + node_col_name : string + The column name that contains the values to be used as vertex IDs. + feat_name : string + The feature name under which we should save the added properties + (ignored if is_single_vector_feature=False and the col names of + the dataframe are treated as corresponding feature names) + ntype : string + The node type to be added. + For example, if dataframe contains data about users, ntype + might be "users". + If not specified, the type of properties will be added as + an empty string. + is_single_vector_feature : True + Whether to treat all the columns of the dataframe being added as + a single 2d feature + Returns + ------- + None + """ self.gdata.add_vertex_data( df, vertex_col_name=node_col_name, type_name=ntype ) columns = [col for col in list(df.columns) if col != node_col_name] - self.ndata_feat_col_d[feat_name] = columns - def add_edge_data(self, df, vertex_col_names, feat_name, etype=None): + if is_single_vector_feature: + if feat_name is None: + raise ValueError( + "feature name must be provided when wrapping" + + " multiple columns under a single feature name" + ) + + elif feat_name: + raise ValueError( + "feat_name is only valid when wrapping" + + " multiple columns under a single feature name" + ) + + if is_single_vector_feature: + self.ndata_feat_col_d[feat_name] = columns + else: + for col in columns: + self.ndata_feat_col_d[col] = [col] + # Clear properties if set as data has changed + self.__clear_cached_properties() + + def add_edge_data( + self, + df, + node_col_names, + feat_name=None, + etype=None, + is_single_vector_feature=True, + ): + """ + Add a dataframe describing edge properties to the PropertyGraph. + + Parameters + ---------- + dataframe : DataFrame-compatible instance + A DataFrame instance with a compatible Pandas-like DataFrame + interface. + node_col_names : string + The column names that contain the values to be used as the source + and destination vertex IDs for the edges. + feat_name : string + The feature name under which we should save the added properties + (ignored if is_single_vector_feature=False and the col names of + the dataframe are treated as corresponding feature names) + etype : string + The edge type to be added. This should follow the string format + '(src_type),(edge_type),(dst_type)' + If not specified, the type of properties will be added as + an empty string. + is_single_vector_feature : True + Wether to treat all the columns of the dataframe being + added as a single 2d feature + Returns + ------- + None + """ self.gdata.add_edge_data( - df, vertex_col_names=vertex_col_names, type_name=etype + df, vertex_col_names=node_col_names, type_name=etype ) columns = [ - col for col in list(df.columns) if col not in vertex_col_names + col for col in list(df.columns) if col not in node_col_names ] - self.edata_feat_col_d[feat_name] = columns + if is_single_vector_feature: + if feat_name is None: + raise ValueError( + "feature name must be provided when wrapping" + + " multiple columns under a single feature name" + ) - def get_node_storage(self, feat_name, ntype=None): + elif feat_name: + raise ValueError( + "feat_name is only valid when wrapping" + + " multiple columns under a single feature name" + ) + + if is_single_vector_feature: + self.edata_feat_col_d[feat_name] = columns + else: + for col in columns: + self.edata_feat_col_d[col] = [col] + # Clear properties if set as data has changed + self.__clear_cached_properties() + + def get_node_storage(self, feat_name, ntype=None): if ntype is None: ntypes = self.ntypes if len(self.ntypes) > 1: @@ -195,8 +303,7 @@ def sample_neighbors( ) if isinstance(nodes_cap, dict): - nodes = [cudf.from_dlpack(nodes) for nodes in nodes_cap.values()] - nodes = cudf.concat(nodes, ignore_index=True) + nodes = {t: cudf.from_dlpack(n) for t, n in nodes_cap.items()} else: nodes = cudf.from_dlpack(nodes_cap) @@ -216,20 +323,24 @@ def sample_neighbors( # of the seed dtype is not same as the node dtype self.set_sg_node_dtype(list(sgs.values())[0]) - nodes = nodes.astype(self._sg_node_dtype) sampled_df = sample_multiple_sgs( - sgs, sample_f, nodes, fanout, replace + sgs, + sample_f, + nodes, + self._sg_node_dtype, + edge_dir, + fanout, + replace, ) else: if edge_dir == "in": sg = self.extracted_reverse_subgraph else: sg = self.extracted_subgraph - # Uniform sampling fails when the dtype - # of the seed dtype is not same as the node dtype self.set_sg_node_dtype(sg) - nodes = nodes.astype(self._sg_node_dtype) - sampled_df = sample_single_sg(sg, sample_f, nodes, fanout, replace) + sampled_df = sample_single_sg( + sg, sample_f, nodes, self._sg_node_dtype, fanout, replace + ) # we reverse directions when directions=='in' if edge_dir == "in": @@ -267,6 +378,10 @@ def get_vertex_ids(self): return self.gdata.vertices_ids() def _get_edgeid_type_d(self, edge_ids, etypes): + if isinstance(edge_ids, cudf.Series): + # Work around for below issue + # https://github.com/rapidsai/cudf/issues/11877 + edge_ids = edge_ids.values_host df = self.gdata.get_edge_data(edge_ids=edge_ids, columns=[type_n]) if isinstance(df, dask_cudf.DataFrame): df = df.compute() @@ -274,9 +389,7 @@ def _get_edgeid_type_d(self, edge_ids, etypes): @cached_property def extracted_subgraph(self): - edge_list = self.gdata.get_edge_data( - columns=[src_n, dst_n, type_n] - ) + edge_list = self.gdata.get_edge_data(columns=[src_n, dst_n, type_n]) edge_list = edge_list.reset_index(drop=True) return get_subgraph_from_edgelist( @@ -285,9 +398,7 @@ def extracted_subgraph(self): @cached_property def extracted_reverse_subgraph(self): - edge_list = self.gdata.get_edge_data( - columns=[src_n, dst_n, type_n] - ) + edge_list = self.gdata.get_edge_data(columns=[src_n, dst_n, type_n]) return get_subgraph_from_edgelist( edge_list, self.is_mg, reverse_edges=True ) @@ -318,6 +429,9 @@ def extracted_reverse_subgraphs_per_type(self): @cached_property def num_nodes_dict(self): + """ + Return num_nodes_dict of the graph + """ return {ntype: self.num_nodes(ntype) for ntype in self.ntypes} @cached_property @@ -361,7 +475,9 @@ def find_edges(self, edge_ids_cap, etype): The dst nodes for the given ids """ edge_ids = cudf.from_dlpack(edge_ids_cap) - subset_df = self.gdata.get_edge_data(edge_ids=edge_ids, columns=type_n) + subset_df = self.gdata.get_edge_data( + edge_ids=edge_ids, columns=type_n, types=[etype] + ) if isinstance(subset_df, dask_cudf.DataFrame): subset_df = subset_df.compute() return subset_df[src_n].to_dlpack(), subset_df[dst_n].to_dlpack() @@ -388,8 +504,9 @@ def node_subgraph( The sampled subgraph with the same node ID space with the original graph. """ - _g = self.gdata.extract_subgraph(create_using=create_using, - check_multi_edges=True) + _g = self.gdata.extract_subgraph( + create_using=create_using, check_multi_edges=True + ) if nodes is None: return _g @@ -398,6 +515,28 @@ def node_subgraph( _subg = cugraph.subgraph(_g, _n) return _subg + def __clear_cached_properties(self): + if hasattr(self, "has_multiple_etypes"): + del self.has_multiple_etypes + + if hasattr(self, "num_nodes_dict"): + del self.num_nodes_dict + + if hasattr(self, "num_edges_dict"): + del self.num_edges_dict + + if hasattr(self, "extracted_subgraph"): + del self.extracted_subgraph + + if hasattr(self, "extracted_reverse_subgraph"): + del self.extracted_reverse_subgraph + + if hasattr(self, "extracted_subgraphs_per_type"): + del self.extracted_subgraphs_per_type + + if hasattr(self, "extracted_reverse_subgraphs_per_type"): + del self.extracted_reverse_subgraphs_per_type + class CuFeatureStorage: """Storage for node/edge feature data. @@ -448,7 +587,12 @@ def fetch(self, indices, device=None, pin_memory=False, **kwargs): # Default implementation uses synchronous fetch. indices = cp.asarray(indices) - + if isinstance(self.pg, MGPropertyGraph): + # dask_cudf loc breaks if we provide cudf series/cupy array + # https://github.com/rapidsai/cudf/issues/11877 + indices = indices.get() + else: + indices = cudf.Series(indices) if self.storage_type == "node": subset_df = self.pg.get_vertex_data( vertex_ids=indices, columns=self.columns @@ -462,7 +606,10 @@ def fetch(self, indices, device=None, pin_memory=False, **kwargs): if isinstance(subset_df, dask_cudf.DataFrame): subset_df = subset_df.compute() - tensor = self.from_dlpack(subset_df.to_dlpack()) + if len(subset_df) == 0: + raise ValueError(f"{indices=} not found in FeatureStorage") + else: + tensor = self.from_dlpack(subset_df.to_dlpack()) if isinstance(tensor, cp.ndarray): # can not transfer to @@ -490,7 +637,15 @@ def return_dlpack_d(d): return dlpack_d -def sample_single_sg(sg, sample_f, start_list, fanout, with_replacement): +def sample_single_sg( + sg, sample_f, start_list, start_list_dtype, fanout, with_replacement +): + if isinstance(start_list, dict): + start_list = cudf.concat(list(start_list.values())) + + # Uniform sampling fails when the dtype + # of the seed dtype is not same as the node dtype + start_list = start_list.astype(start_list_dtype) sampled_df = sample_f( sg, start_list=start_list, @@ -502,33 +657,89 @@ def sample_single_sg(sg, sample_f, start_list, fanout, with_replacement): return sampled_df -def sample_multiple_sgs(sgs, sample_f, start_list, fanout, with_replacement): - output_dfs = [ - sample_single_sg(sg, sample_f, start_list, fanout, with_replacement) - for sg in sgs.values() - ] +def sample_multiple_sgs( + sgs, + sample_f, + start_list_d, + start_list_dtype, + edge_dir, + fanout, + with_replacement, +): + start_list_types = list(start_list_d.keys()) + output_dfs = [] + for can_etype, sg in sgs.items(): + can_etype = _convert_can_etype_s_to_tup(can_etype) + if _edge_types_contains_canonical_etype( + can_etype, start_list_types, edge_dir + ): + if edge_dir == "in": + subset_type = can_etype[2] + else: + subset_type = can_etype[0] + + output = sample_single_sg( + sg, + sample_f, + start_list_d[subset_type], + start_list_dtype, + fanout, + with_replacement, + ) + output_dfs.append(output) + + if len(output_dfs) == 0: + empty_df = cudf.DataFrame( + {"sources": [], "destinations": [], "indices": []} + ) + return empty_df.astype(cp.int32) + if isinstance(output_dfs[0], dask_cudf.DataFrame): return dask_cudf.concat(output_dfs, ignore_index=True) else: return cudf.concat(output_dfs, ignore_index=True) +def _edge_types_contains_canonical_etype(can_etype, edge_types, edge_dir): + src_type, _, dst_type = can_etype + if edge_dir == "in": + return dst_type in edge_types + else: + return src_type in edge_types + + +def _convert_can_etype_s_to_tup(canonical_etype_s): + src_type, etype, dst_type = canonical_etype_s.split(",") + src_type = src_type[2:-1] + dst_type = dst_type[2:-2] + etype = etype[2:-1] + return (src_type, etype, dst_type) + + def get_subgraph_from_edgelist(edge_list, is_mg, reverse_edges=False): if reverse_edges: edge_list = edge_list.rename(columns={src_n: dst_n, dst_n: src_n}) subgraph = cugraph.MultiGraph(directed=True) if is_mg: + # FIXME: Can not switch to renumber = False + # For MNMG Algos + # Remove when https://github.com/rapidsai/cugraph/issues/2437 + # lands create_subgraph_f = subgraph.from_dask_cudf_edgelist + renumber = True else: + # Note: We have to keep renumber = False + # to handle cases when the seed_nodes is not present in sugraph create_subgraph_f = subgraph.from_cudf_edgelist + renumber = False create_subgraph_f( edge_list, source=src_n, destination=dst_n, edge_attr=eid_n, - renumber=True, + renumber=renumber, # FIXME: renumber=False is not supported for MNMG algos legacy_renum_only=True, ) diff --git a/python/cugraph/cugraph/structure/property_graph.py b/python/cugraph/cugraph/structure/property_graph.py index 8be39d61503..13e8788bd00 100644 --- a/python/cugraph/cugraph/structure/property_graph.py +++ b/python/cugraph/cugraph/structure/property_graph.py @@ -500,7 +500,8 @@ def get_vertex_data(self, vertex_ids=None, types=None, columns=None): if isinstance(vertex_ids, int): vertex_ids = [vertex_ids] elif not isinstance(vertex_ids, - (list, slice, self.__series_type)): + (list, slice, np.ndarray, + self.__series_type)): vertex_ids = list(vertex_ids) df = df.loc[vertex_ids] @@ -749,7 +750,8 @@ def get_edge_data(self, edge_ids=None, types=None, columns=None): if isinstance(edge_ids, int): edge_ids = [edge_ids] elif not isinstance(edge_ids, - (list, slice, self.__series_type)): + (list, slice, np.ndarray, + self.__series_type)): edge_ids = list(edge_ids) df = df.loc[edge_ids] diff --git a/python/cugraph/cugraph/tests/mg/test_mg_dgl_extensions.py b/python/cugraph/cugraph/tests/mg/test_mg_dgl_extensions.py index 6250fb47acd..7fb9ae74511 100644 --- a/python/cugraph/cugraph/tests/mg/test_mg_dgl_extensions.py +++ b/python/cugraph/cugraph/tests/mg/test_mg_dgl_extensions.py @@ -39,7 +39,7 @@ def basic_mg_gs(dask_client): df_1 = dask_cudf.from_cudf(df_1, npartitions=2) - gs.add_edge_data(df_1, vertex_col_names=["src", "dst"], feat_name="edge_w") + gs.add_edge_data(df_1, node_col_names=["src", "dst"], feat_name="edge_w") df_2 = cudf.DataFrame( { @@ -56,9 +56,8 @@ def basic_mg_gs(dask_client): return gs -# @pytest.fixture(scope="module") -# def gs_heterogeneous_dgl_eg(dask_client): -def create_gs_heterogeneous_dgl_eg(dask_client): +@pytest.fixture(scope="module") +def gs_heterogeneous_dgl_eg(dask_client): pg = MGPropertyGraph() gs = CuGraphStore(pg) # Changing npartitions is leading to errors @@ -282,8 +281,8 @@ def test_sampling_homogeneous_gs_neg_one_fanout(dask_client): # Test against DGLs output # See below notebook # https://gist.github.com/VibhuJawa/f85fda8e1183886078f2a34c28c4638c -def test_sampling_dgl_heterogeneous_gs_m_fanouts(dask_client): - gs = create_gs_heterogeneous_dgl_eg(dask_client) +def test_sampling_dgl_heterogeneous_gs_m_fanouts(gs_heterogeneous_dgl_eg): + gs = gs_heterogeneous_dgl_eg expected_output = { 1: { "('nt.a', 'connects', 'nt.b')": 0, @@ -295,15 +294,11 @@ def test_sampling_dgl_heterogeneous_gs_m_fanouts(dask_client): "('nt.a', 'connects', 'nt.c')": 1, "('nt.c', 'connects', 'nt.c')": 2, }, - # TODO: replace=False - # leads to 4 neighbors - # instead of 3 with dask.UniformSampling - # Raise issue and link here - # 3: { - # "('nt.a', 'connects', 'nt.b')": 0, - # "('nt.a', 'connects', 'nt.c')": 1, - # "('nt.c', 'connects', 'nt.c')": 3, - # }, + 3: { + "('nt.a', 'connects', 'nt.b')": 0, + "('nt.a', 'connects', 'nt.c')": 1, + "('nt.c', 'connects', 'nt.c')": 3, + }, -1: { "('nt.a', 'connects', 'nt.b')": 0, "('nt.a', 'connects', 'nt.c')": 1, @@ -324,8 +319,8 @@ def test_sampling_dgl_heterogeneous_gs_m_fanouts(dask_client): assert expected_output[fanout][etype] == len(output_df) -def test_sampling_gs_heterogeneous_in_dir(dask_client): - gs = create_gs_heterogeneous_dgl_eg(dask_client) +def test_sampling_gs_heterogeneous_in_dir(gs_heterogeneous_dgl_eg): + gs = gs_heterogeneous_dgl_eg # DGL expected_output from # https://gist.github.com/VibhuJawa/f85fda8e1183886078f2a34c28c4638c expeced_val_d = { @@ -382,8 +377,8 @@ def test_sampling_gs_heterogeneous_in_dir(dask_client): cudf.testing.assert_frame_equal(output_df, expected_df) -def test_sampling_gs_heterogeneous_out_dir(dask_client): - gs = create_gs_heterogeneous_dgl_eg(dask_client) +def test_sampling_gs_heterogeneous_out_dir(gs_heterogeneous_dgl_eg): + gs = gs_heterogeneous_dgl_eg # DGL expected_output from # https://gist.github.com/VibhuJawa/f85fda8e1183886078f2a34c28c4638c expeced_val_d = { diff --git a/python/cugraph/cugraph/tests/test_graph_store.py b/python/cugraph/cugraph/tests/test_graph_store.py index 7056fab3c98..a321f85d8d2 100644 --- a/python/cugraph/cugraph/tests/test_graph_store.py +++ b/python/cugraph/cugraph/tests/test_graph_store.py @@ -59,7 +59,7 @@ def test_node_data_pg(graph_file): pG = PropertyGraph() gstore = cugraph.gnn.CuGraphStore(graph=pG, backend_lib="cupy") gstore.add_edge_data( - cu_M, vertex_col_names=("0", "1"), feat_name="edge_feat" + cu_M, node_col_names=("0", "1"), feat_name="edge_feat" ) edata_f = gstore.get_edge_storage("edge_feat") @@ -80,7 +80,7 @@ def test_egonet(graph_file): pG = PropertyGraph() gstore = cugraph.gnn.CuGraphStore(graph=pG, backend_lib="cupy") gstore.add_edge_data( - cu_M, vertex_col_names=("0", "1"), feat_name="edge_feat" + cu_M, node_col_names=("0", "1"), feat_name="edge_feat" ) nodes = [1, 2] @@ -100,7 +100,7 @@ def test_workflow(graph_file): columns={"src": "0", "dst": "1", "wgt": "2"}) pg = PropertyGraph() gstore = cugraph.gnn.CuGraphStore(graph=pg) - gstore.add_edge_data(cu_M, vertex_col_names=("0", "1"), feat_name="feat") + gstore.add_edge_data(cu_M, node_col_names=("0", "1"), feat_name="feat") nodes = gstore.get_vertex_ids() num_nodes = len(nodes) @@ -119,7 +119,7 @@ def test_sample_neighbors(graph_file): columns={"src": "0", "dst": "1", "wgt": "2"}) pg = PropertyGraph() gstore = cugraph.gnn.CuGraphStore(graph=pg) - gstore.add_edge_data(cu_M, feat_name="feat", vertex_col_names=("0", "1")) + gstore.add_edge_data(cu_M, feat_name="feat", node_col_names=("0", "1")) nodes = gstore.get_vertex_ids() num_nodes = len(nodes) @@ -142,7 +142,7 @@ def test_sample_neighbor_neg_one_fanout(graph_file): columns={"src": "0", "dst": "1", "wgt": "2"}) pg = PropertyGraph() gstore = cugraph.gnn.CuGraphStore(graph=pg) - gstore.add_edge_data(cu_M, feat_name="edge_k", vertex_col_names=("0", "1")) + gstore.add_edge_data(cu_M, feat_name="edge_k", node_col_names=("0", "1")) nodes = gstore.get_vertex_ids() sampled_nodes = nodes[:5].to_dlpack() @@ -165,7 +165,7 @@ def test_get_node_storage_graph_file(graph_file): gstore.add_edge_data( cu_M, feat_name="feat", - vertex_col_names=("0", "1"), + node_col_names=("0", "1"), ) num_nodes = gstore.num_nodes() @@ -191,7 +191,7 @@ def test_edge_storage_data_graph_file(graph_file): columns={"src": "0", "dst": "1", "wgt": "2"}) pg = PropertyGraph() gstore = cugraph.gnn.CuGraphStore(graph=pg, backend_lib="cupy") - gstore.add_edge_data(cu_M, vertex_col_names=("0", "1"), feat_name="edge_k") + gstore.add_edge_data(cu_M, node_col_names=("0", "1"), feat_name="edge_k") edata_s = gstore.get_edge_storage(feat_name="edge_k") edata = edata_s.fetch([0, 1, 2, 3], device="cuda") @@ -280,11 +280,12 @@ def create_df_from_dataset(col_n, rows): return cudf.DataFrame(data_d) -# @pytest.fixture() -# TODO: Creating a fixture seems to hang the pytests - - -def get_dataset1_CuGraphStore(): +@pytest.fixture(scope="module") +def dataset1_CuGraphStore(): + """ + Fixture which returns an instance of a CuGraphStore with vertex and edge + data added from dataset1, parameterized for different DataFrame types. + """ merchant_df = create_df_from_dataset( dataset1["merchants"][0], dataset1["merchants"][1] ) @@ -319,54 +320,52 @@ def get_dataset1_CuGraphStore(): graph.add_node_data(merchant_df, "merchant_id", "merchant_k", "merchant") graph.add_node_data(user_df, "user_id", "user_k", "user") graph.add_node_data(taxpayers_df, "payer_id", "taxpayers_k", "taxpayers") - graph.add_edge_data( referrals_df, ("user_id_1", "user_id_2"), "referrals_k", - "referrals", + "('user', 'refers', 'user')", ) graph.add_edge_data( relationships_df, ("user_id_1", "user_id_2"), "relationships_k", - "relationships", + "('user', 'relationship', 'user')", ) graph.add_edge_data( transactions_df, ("user_id", "merchant_id"), "transactions_k", - "transactions", + "('user', 'transactions', 'merchant')", ) return graph -def test_num_nodes_gs(): - gs = get_dataset1_CuGraphStore() +def test_num_nodes_gs(dataset1_CuGraphStore): # Added unique id in tax_payer so changed to 16 - assert gs.num_nodes() == 16 + assert dataset1_CuGraphStore.num_nodes() == 16 -def test_num_edges(): - gs = get_dataset1_CuGraphStore() +def test_num_edges(dataset1_CuGraphStore): + gs = dataset1_CuGraphStore assert gs.num_edges() == 14 -def test_etypes(): - dataset1_CuGraphStore = get_dataset1_CuGraphStore() - assert dataset1_CuGraphStore.etypes == [ - 'referrals', 'relationships', 'transactions' +def test_etypes(dataset1_CuGraphStore): + expected_types = [ + "('user', 'refers', 'user')", + "('user', 'relationship', 'user')", + "('user', 'transactions', 'merchant')", ] + assert dataset1_CuGraphStore.etypes == expected_types -def test_ntypes(): - dataset1_CuGraphStore = get_dataset1_CuGraphStore() - assert dataset1_CuGraphStore.ntypes == ['merchant', 'taxpayers', 'user'] +def test_ntypes(dataset1_CuGraphStore): + assert dataset1_CuGraphStore.ntypes == ["merchant", "taxpayers", "user"] -def test_get_node_storage_gs(): - dataset1_CuGraphStore = get_dataset1_CuGraphStore() +def test_get_node_storage_gs(dataset1_CuGraphStore): fs = dataset1_CuGraphStore.get_node_storage( feat_name="merchant_k", ntype="merchant" ) @@ -381,8 +380,7 @@ def test_get_node_storage_gs(): assert cp.allclose(cudf_ar, merchant_gs) -def test_get_edge_storage_gs(): - dataset1_CuGraphStore = get_dataset1_CuGraphStore() +def test_get_edge_storage_gs(dataset1_CuGraphStore): fs = dataset1_CuGraphStore.get_edge_storage( "relationships_k", "relationships" ) @@ -396,10 +394,9 @@ def test_get_edge_storage_gs(): assert cp.allclose(cudf_ar, relationship_t) -@pytest.mark.skip("Skipping because it tends to hang sometimes") -def test_sampling_gs_heterogeneous_ds1(): - node_d = {"merchant_id": cudf.Series([4], dtype="int64").to_dlpack()} - gs = get_dataset1_CuGraphStore() +def test_sampling_gs_heterogeneous_ds1(dataset1_CuGraphStore): + node_d = {"merchant": cudf.Series([4], dtype="int64").to_dlpack()} + gs = dataset1_CuGraphStore sampled_obj = gs.sample_neighbors(node_d, fanout=1) sampled_d = convert_dlpack_to_cudf_ser(sampled_obj) # Ensure we get sample from at at least one of the etypes @@ -407,11 +404,9 @@ def test_sampling_gs_heterogeneous_ds1(): assert len(src_ser) != 0 -@pytest.mark.skip("Skipping because it tends to hang sometimes") -def test_sampling_gs_heterogeneous_ds1_neg_one_fanout(): - - node_d = {"merchant_id": cudf.Series([4], dtype="int64").to_dlpack()} - gs = get_dataset1_CuGraphStore() +def test_sampling_gs_heterogeneous_ds1_neg_one_fanout(dataset1_CuGraphStore): + node_d = {"merchant": cudf.Series([4], dtype="int64").to_dlpack()} + gs = dataset1_CuGraphStore sampled_obj = gs.sample_neighbors(node_d, fanout=-1) sampled_d = convert_dlpack_to_cudf_ser(sampled_obj) # Ensure we get sample from at at least one of the etypes @@ -563,7 +558,6 @@ def create_gs_heterogeneous_dgl_eg(): return gs -@pytest.mark.skip("Skipping because tends to hang") def test_sampling_gs_heterogeneous_in_dir(): gs = create_gs_heterogeneous_dgl_eg() # DGL expected_output from @@ -622,7 +616,6 @@ def test_sampling_gs_heterogeneous_in_dir(): cudf.testing.assert_frame_equal(output_df, expected_df) -@pytest.mark.skip("Skipping because tends to hang") def test_sampling_gs_heterogeneous_out_dir(): gs = create_gs_heterogeneous_dgl_eg() # DGL expected_output from @@ -692,7 +685,6 @@ def test_sampling_gs_heterogeneous_out_dir(): cudf.testing.assert_frame_equal(output_df, expected_df) -@pytest.mark.skip("Skipping because tends to hang") def test_sampling_dgl_heterogeneous_gs_m_fanouts(): gs = create_gs_heterogeneous_dgl_eg() # Test against DGLs output @@ -733,6 +725,59 @@ def test_sampling_dgl_heterogeneous_gs_m_fanouts(): assert expected_output[fanout][etype] == len(output_df) +def test_clear_cache(): + gs = create_gs_heterogeneous_dgl_eg() + prev_nodes = gs.num_nodes_dict["nt.a"] + + df = cudf.DataFrame() + df["node_id"] = [1000, 2000, 3000] + df["new_node_feat"] = [float(i + 1) for i in range(len(df))] + gs.add_node_data( + df, feat_name="new_node_feat", node_col_name="node_id", ntype="nt.a" + ) + + new_nodes = gs.num_nodes_dict["nt.a"] + assert new_nodes == prev_nodes + 3 + + +def test_add_node_data_scaler_vector_feats(): + pg = PropertyGraph() + gs = CuGraphStore(pg, backend_lib="cupy") + df = cudf.DataFrame() + df["node_id"] = [1, 2, 3] + df["node_scaler_feat_1"] = [10, 20, 30] + df["node_scaler_feat_2"] = [15, 25, 35] + gs.add_node_data(df, "node_id", is_single_vector_feature=False) + + out_1 = gs.get_node_storage("node_scaler_feat_1").fetch([1, 3]) + exp_1 = cp.asarray([10, 30]) + cp.testing.assert_array_equal(exp_1, out_1) + + out_2 = gs.get_node_storage("node_scaler_feat_2").fetch([1, 2]) + exp_2 = cp.asarray([15, 25]) + cp.testing.assert_array_equal(exp_2, out_2) + + df = cudf.DataFrame() + df["node_id"] = [1, 2, 3] + df["v_s1"] = [10, 20, 30] + df["v_s2"] = [15, 25, 35] + gs.add_node_data( + df, "node_id", feat_name="vector_feat", is_single_vector_feature=True + ) + + out_vec = gs.get_node_storage("vector_feat").fetch([1, 2]) + exp_vec = cp.asarray([[10, 15], [20, 25]]) + cp.testing.assert_array_equal(out_vec, exp_vec) + + with pytest.raises(ValueError): + gs.add_node_data( + df, + "node_id", + feat_name="vector_feat", + is_single_vector_feature=False, + ) + + def assert_correct_eids(edge_df, sample_edge_id_df): # We test that all src, dst correspond to the correct # eids in the sample_edge_id_df