Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor mg symmetrize tests #2278

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
290 changes: 169 additions & 121 deletions python/cugraph/cugraph/tests/mg/test_mg_symmetrize.py
Expand Up @@ -16,104 +16,81 @@
import pytest

import pandas as pd
import cudf
import cugraph
import dask_cudf
from cugraph.testing import utils
from cugraph.dask.common.mg_utils import (is_single_gpu,
setup_local_dask_cluster,
teardown_local_dask_cluster)


def test_version():
# =============================================================================
# Pytest Setup / Teardown - called for each test function
# =============================================================================
def setup_function():
gc.collect()


def test_version():
cugraph.__version__


def compare(src1, dst1, val1, src2, dst2, val2):
def compare(ddf1, ddf2, src_col_name, dst_col_name, val_col_name):
#
# We will do comparison computations by using dataframe
# merge functions (essentially doing fast joins). We
# start by making two data frames
#
df1 = cudf.DataFrame()
df1["src1"] = src1
df1["dst1"] = dst1
if val1 is not None:
df1["val1"] = val1

df2 = cudf.DataFrame()
df2["src2"] = src2
df2["dst2"] = dst2
if val2 is not None:
df2["val2"] = val2

#
# merge functions (essentially doing fast joins).
# Check to see if all pairs in the original data frame
# still exist in the new data frame. If we join (merge)
# the data frames where (src1[i]=src2[i]) and (dst1[i]=dst2[i])
# then we should get exactly the same number of entries in
# the data frame if we did not lose any data.
#
join = df1.merge(df2, left_on=["src1", "dst1"], right_on=["src2", "dst2"])

if len(df1) != len(join):
join2 = df1.merge(df2, how='left',
left_on=["src1", "dst1"], right_on=["src2", "dst2"])
pd.set_option('display.max_rows', 500)
print('df1 = \n', df1.sort_values(["src1", "dst1"]))
print('df2 = \n', df2.sort_values(["src2", "dst2"]))
print('join2 = \n', join2.sort_values(["src1", "dst1"])
.to_pandas().query('src2.isnull()', engine='python'))

assert len(df1) == len(join)

if val1 is not None:
#
# Check the values. In this join, if val1 and val2 are
# the same then we are good. If they are different then
# we need to check if the value is selected from the opposite
# direction, so we'll merge with the edges reversed and
# check to make sure that the values all match
#
diffs = join.query("val1 != val2")
diffs_check = diffs.merge(
df1, left_on=["src1", "dst1"], right_on=["dst1", "src1"]
)
query = diffs_check.query("val1_y != val2")
if len(query) > 0:
print("differences: ")
print(query)
assert 0 == len(query)

ddf1 = ddf1.add_suffix("_x")
ddf2 = ddf2.add_suffix("_y")

if not isinstance(src_col_name, list) and not isinstance(
dst_col_name, list):
src_col_name = [src_col_name]
dst_col_name = [dst_col_name]

# Column names for ddf1
src_col_name1 = [f"{src}_x" for src in src_col_name]
dst_col_name1 = [f"{dst}_x" for dst in dst_col_name]
col_names1 = src_col_name1 + dst_col_name1

# Column names for ddf2
src_col_name2 = [f"{src}_y" for src in src_col_name]
dst_col_name2 = [f"{dst}_y" for dst in dst_col_name]
col_names2 = src_col_name2 + dst_col_name2

if val_col_name is not None:
val_col_name = [val_col_name]
val_col_name1 = [f"{val}_x" for val in val_col_name]
val_col_name2 = [f"{val}_y" for val in val_col_name]
col_names1 += val_col_name1
col_names2 += val_col_name2
#
# Now check the symmetrized edges are present. If the original
# data contains (u,v) we want to make sure that (v,u) is present
# data contains (u,v), we want to make sure that (v,u) is present
# in the new data frame.
#
# We can accomplish this by doing the join (merge) where
# (src1[i] = dst2[i]) and (dst1[i] = src2[i]), and verifying
# that we get exactly the same number of entries in the data frame.
#
join = df1.merge(df2, left_on=["src1", "dst1"], right_on=["dst2", "src2"])
assert len(df1) == len(join)

if val1 is not None:
#
# Check the values. In this join, if val1 and val2 are
# the same then we are good. If they are different then
# we need to check if the value is selected from the opposite
# direction, so we'll merge with the edges reversed and
# check to make sure that the values all match
#
diffs = join.query("val1 != val2")
diffs_check = diffs.merge(
df1, left_on=["src2", "dst2"], right_on=["src1", "dst1"]
)
query = diffs_check.query("val1_y != val2")
if len(query) > 0:
print("differences: ")
print(query)
assert 0 == len(query)
join = ddf1.merge(ddf2, left_on=[*col_names1], right_on=[*col_names2])

if len(ddf1) != len(join):
# The code below is for debugging purposes only. It will print
# edges in the original dataframe that are missing from the symmetrize
# dataframe
join2 = ddf1.merge(ddf2, how='left',
left_on=[*col_names1], right_on=[*col_names2])
# FIXME: Didn't find a cudf alternative for the function below
pd.set_option('display.max_rows', 500)
print('join2 = \n', join2.sort_values([*col_names1])
.compute().to_pandas().query(
f"{src_col_name[0]}_y.isnull()", engine='python'))

assert len(ddf1) == len(join)

#
# Finally, let's check (in both directions) backwards.
Expand All @@ -125,11 +102,11 @@ def compare(src1, dst1, val1, src2, dst2, val2):
# for some edge (u,v) ALREADY contain the edge (v,u). The
# symmetrized graph will not duplicate any edges, so the edge
# (u,v) will only be present once. So we can't simply check
# counts of df2 joined with df1.
# counts of ddf2 joined with ddf1.
#
# join1 will contain the join (merge) of df2 to df1 in the
# join1 will contain the join (merge) of ddf2 to ddf1 in the
# forward direction
# join2 will contain the join (merge) of df2 to df1 in the
# join2 will contain the join (merge) of ddf2 to ddf1 in the
# reverse direction
#
# Finally, we'll do an outer join of join1 and join2, which
Expand All @@ -138,64 +115,135 @@ def compare(src1, dst1, val1, src2, dst2, val2):
# in both data frames as single rows. This gives us a data frame
# with the same number of rows as the symmetrized data.
#
join1 = df2.merge(df1, left_on=["src2", "dst2"], right_on=["src1", "dst1"])
join2 = df2.merge(df1, left_on=["src2", "dst2"], right_on=["dst1", "src1"])
joinM = join1.merge(join2, how="outer", on=["src2", "dst2"])

assert len(df2) == len(joinM)
swap_columns = dst_col_name1 + src_col_name1
if val_col_name is not None:
swap_columns += val_col_name1

join1 = ddf2.merge(ddf1, left_on=[*col_names2], right_on=[*col_names1])
join2 = ddf2.merge(ddf1, left_on=[*col_names2], right_on=[*swap_columns])

# Ensure join2["weight_*"] and join1["weight"] are of the same type.
# Failing to do that can trigger ddf to return a warning if the two ddf
# being merge are of dofferent types
join2 = join2.astype(join1.dtypes.to_dict())

joinM = join1.merge(join2, how="outer", on=[*ddf2.columns])

assert len(ddf2) == len(joinM)

#
# Note, we don't need to check the reverse values... we checked
# them in both directions earlier.
#


@pytest.fixture(scope="module")
def client_connection():
(cluster, client) = setup_local_dask_cluster(p2p=True)
yield client
teardown_local_dask_cluster(cluster, client)

input_data_path = [utils.RAPIDS_DATASET_ROOT_DIR_PATH /
"karate-asymmetric.csv"] + utils.DATASETS_UNDIRECTED
datasets = [pytest.param(d.as_posix()) for d in input_data_path]

@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED)
def test_mg_symmetrize(graph_file, client_connection):
gc.collect()

ddf = utils.read_dask_cudf_csv_file(graph_file)
sym_src, sym_dst = cugraph.symmetrize(ddf["src"], ddf["dst"])
fixture_params = utils.genFixtureParamsProduct(
(datasets, "graph_file"),
([True, False], "edgevals"),
([True, False], "multi_columns"),
)

# convert to regular cudf to facilitate comparison
df = ddf.compute()

compare(
df["src"], df["dst"], None, sym_src.compute(), sym_dst.compute(), None
)
@pytest.fixture(scope="module", params=fixture_params)
def input_combo(request):
"""
Simply return the current combination of params as a dictionary for use in
tests or other parameterized fixtures.
"""
return dict(
zip(("graph_file", "edgevals", "multi_columns"), request.param))


@pytest.mark.skipif(
is_single_gpu(), reason="skipping MG testing on Single GPU system"
)
@pytest.mark.parametrize("graph_file", utils.DATASETS_UNDIRECTED)
def test_mg_symmetrize_df(graph_file, client_connection):
gc.collect()
@pytest.fixture(scope="module")
def read_datasets(input_combo):
"""
This fixture reads the datasets and returns a dictionary containing all
input params required to run the symmetrize function
"""

pd.set_option('display.max_rows', 500)
graph_file = input_combo["graph_file"]
edgevals = input_combo["edgevals"]
multi_columns = input_combo["multi_columns"]

ddf = utils.read_dask_cudf_csv_file(graph_file)
sym_ddf = cugraph.symmetrize_ddf(ddf, "src", "dst", "weight")

# convert to regular cudf to facilitate comparison
df = ddf.compute()
sym_df = sym_ddf.compute()

compare(
df["src"],
df["dst"],
df["weight"],
sym_df["src"],
sym_df["dst"],
sym_df["weight"],
)

src_col_name = "src"
dst_col_name = "dst"
val_col_name = None

if edgevals:
val_col_name = "weight"

if multi_columns:
# Generate multicolumn from the ddf
ddf = ddf.rename(columns={"src": "src_0", "dst": "dst_0"})
ddf["src_1"] = ddf["src_0"] + 100
ddf["dst_1"] = ddf["dst_0"] + 100

src_col_name = ["src_0", "src_1"]
dst_col_name = ["dst_0", "dst_1"]

input_combo["ddf"] = ddf
input_combo["src_col_name"] = src_col_name
input_combo["dst_col_name"] = dst_col_name
input_combo["val_col_name"] = val_col_name

return input_combo


# =============================================================================
# Tests
# =============================================================================
# @pytest.mark.skipif(
# is_single_gpu(), reason="skipping MG testing on Single GPU system"
# )
def test_mg_symmetrize(dask_client, read_datasets):

ddf = read_datasets["ddf"]
src_col_name = read_datasets["src_col_name"]
dst_col_name = read_datasets["dst_col_name"]
val_col_name = read_datasets["val_col_name"]

if val_col_name is not None:
sym_src, sym_dst, sym_val = cugraph.symmetrize(
ddf, src_col_name, dst_col_name, val_col_name)
else:
if not isinstance(src_col_name, list):
vertex_col_names = [src_col_name, dst_col_name]
else:
vertex_col_names = src_col_name + dst_col_name
ddf = ddf[[*vertex_col_names]]
sym_src, sym_dst = cugraph.symmetrize(ddf, src_col_name, dst_col_name)

# create a dask DataFrame from the dask Series
if isinstance(sym_src, dask_cudf.Series):
ddf2 = sym_src.to_frame()
ddf2 = ddf2.rename(columns={sym_src.name: "src"})
ddf2["dst"] = sym_dst
else:
ddf2 = dask_cudf.concat([sym_src, sym_dst], axis=1)

if val_col_name is not None:
ddf2["weight"] = sym_val

compare(ddf, ddf2, src_col_name, dst_col_name, val_col_name)


# @pytest.mark.skipif(
# is_single_gpu(), reason="skipping MG testing on Single GPU system"
# )
def test_mg_symmetrize_df(dask_client, read_datasets):
ddf = read_datasets["ddf"]
src_col_name = read_datasets["src_col_name"]
dst_col_name = read_datasets["dst_col_name"]
val_col_name = read_datasets["val_col_name"]

sym_ddf = cugraph.symmetrize_ddf(
ddf, src_col_name, dst_col_name, val_col_name)

compare(ddf, sym_ddf, src_col_name, dst_col_name, val_col_name)