Skip to content

Commit

Permalink
Merge pull request #78 from moj-analytical-services/degen
Browse files Browse the repository at this point in the history
Degeneracy metric added
  • Loading branch information
mamonu committed Oct 19, 2021
2 parents 121eafd + a917cc9 commit ffbd691
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "splink_graph"
version = "0.4.20"
version = "0.4.21"
description = "a small set of graph functions to be used from pySpark on top of networkx and graphframes"
authors = ["Theodore Manassis <theodore.manassis@digital.justice.gov.uk>"]
keywords = ["graph theory", "graph metrics"]
Expand Down
54 changes: 40 additions & 14 deletions splink_graph/cluster_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ def cluster_graph_hash(sparkdf, src="src", dst="dst", cluster_id_colname="cluste
"""calculate weisfeiler-lehman graph hash of a cluster
Args:
sparkdf: imput edgelist Spark DataFrame
src: src column name
dst: dst column name
cluster_id_colname: Graphframes-created connected components created cluster_id
"""
Args:
sparkdf: imput edgelist Spark DataFrame
src: src column name
dst: dst column name
cluster_id_colname: Graphframes-created connected components created cluster_id
"""
psrc = src
pdst = dst

Expand All @@ -110,7 +110,13 @@ def gh(pdf: pd.DataFrame) -> pd.DataFrame:
h = nx.weisfeiler_lehman_graph_hash(nxGraph)
co = pdf[cluster_id_colname].iloc[0] # access component id

return pd.DataFrame([[co] + [h]], columns=["cluster_id", "graphhash",],)
return pd.DataFrame(
[[co] + [h]],
columns=[
"cluster_id",
"graphhash",
],
)

out = sparkdf.groupby(cluster_id_colname).apply(gh)
return out
Expand Down Expand Up @@ -200,7 +206,10 @@ def drt(pdf: pd.DataFrame) -> pd.DataFrame:


def cluster_connectivity_stats(
sparkdf, src="src", dst="dst", cluster_id_colname="cluster_id",
sparkdf,
src="src",
dst="dst",
cluster_id_colname="cluster_id",
):
"""outputs connectivity metrics per cluster_id
Expand Down Expand Up @@ -250,6 +259,7 @@ def cluster_connectivity_stats(
StructField("cluster_id", LongType()),
StructField("node_conn", IntegerType()),
StructField("edge_conn", IntegerType()),
StructField("degeneracy", IntegerType()),
]
),
functionType=PandasUDFType.GROUPED_MAP,
Expand All @@ -260,11 +270,13 @@ def conn_eff(pdf: pd.DataFrame) -> pd.DataFrame:
nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst)
nc = nx.algorithms.node_connectivity(nxGraph)
ec = nx.algorithms.edge_connectivity(nxGraph)
dg = max(nx.core_number(nxGraph).values())

co = pdf[cluster_id_colname].iloc[0] # access component id

return pd.DataFrame(
[[co] + [nc] + [ec]], columns=["cluster_id", "node_conn", "edge_conn",],
[[co] + [nc] + [ec] + [dg]],
columns=["cluster_id", "node_conn", "edge_conn", "degeneracy"],
)

out = sparkdf.groupby(cluster_id_colname).apply(conn_eff)
Expand Down Expand Up @@ -371,7 +383,11 @@ def largest_edge_betweenness(G):
co_eb_mod = -1.0

return pd.DataFrame(
[[co] + [co_eb_mod]], columns=["cluster_id", "cluster_eb_modularity",],
[[co] + [co_eb_mod]],
columns=[
"cluster_id",
"cluster_eb_modularity",
],
)

out = sparkdf.groupby(cluster_id_colname).apply(cluster_eb_m)
Expand Down Expand Up @@ -460,7 +476,11 @@ def cluster_lpg_m(pdf: pd.DataFrame) -> pd.DataFrame:
co_lpg_mod = nx_comm.modularity(nxGraph, gn)

return pd.DataFrame(
[[co] + [co_lpg_mod]], columns=["cluster_id", "cluster_lpg_modularity",],
[[co] + [co_lpg_mod]],
columns=[
"cluster_id",
"cluster_lpg_modularity",
],
)

out = sparkdf.groupby(cluster_id_colname).apply(cluster_lpg_m)
Expand Down Expand Up @@ -668,7 +688,7 @@ def cluster_assortativity(
),
functionType=PandasUDFType.GROUPED_MAP,
)
def drt(pdf: pd.DataFrame) -> pd.DataFrame:
def asrt(pdf: pd.DataFrame) -> pd.DataFrame:

nxGraph = nx.Graph()
nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst)
Expand All @@ -683,9 +703,15 @@ def drt(pdf: pd.DataFrame) -> pd.DataFrame:

co = pdf[cluster_id_colname].iloc[0] # access component id

return pd.DataFrame([[co] + [r]], columns=["cluster_id", "assortativity",],)
return pd.DataFrame(
[[co] + [r]],
columns=[
"cluster_id",
"assortativity",
],
)

out = sparkdf.groupby(cluster_id_colname).apply(drt)
out = sparkdf.groupby(cluster_id_colname).apply(asrt)

out = out.withColumn("assortativity", f.round(f.col("assortativity"), 3))

Expand Down
79 changes: 69 additions & 10 deletions tests/test_cluster_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,15 @@ def test_cluster_connectivity_stats_completegraph(spark):

e_df = spark.createDataFrame(Row(**x) for x in data_list)
df_result = cluster_connectivity_stats(
e_df, src="src", dst="dst", cluster_id_colname="cluster_id",
e_df,
src="src",
dst="dst",
cluster_id_colname="cluster_id",
).toPandas()

assert df_result["node_conn"][0] == 4
assert df_result["edge_conn"][0] == 4
assert df_result["degeneracy"][0] == 4


def test_cluster_connectivity_stats_linegraph(spark):
Expand All @@ -416,11 +420,42 @@ def test_cluster_connectivity_stats_linegraph(spark):

e_df = spark.createDataFrame(Row(**x) for x in data_list)
df_result = cluster_connectivity_stats(
e_df, src="src", dst="dst", cluster_id_colname="cluster_id",
e_df,
src="src",
dst="dst",
cluster_id_colname="cluster_id",
).toPandas()

assert df_result["node_conn"][0] == 1
assert df_result["edge_conn"][0] == 1
assert df_result["degeneracy"][0] == 1


def test_connectivity_degen_barbell(spark):

g = nx.barbell_graph(5, 3)
fourbridges = pd.DataFrame(list(g.edges), columns=["src", "dst"])
fourbridges["weight"] = 1.0
fourbridges["cluster_id"] = 1

# Create an Edge DataFrame with "src" and "dst" columns
e2_df = spark.createDataFrame(
fourbridges,
["src", "dst", "weight", "cluster_id"],
)

e2_df = e2_df.withColumn("distance", 1.0 - f.col("weight"))

df_result = cluster_connectivity_stats(
e2_df,
src="src",
dst="dst",
cluster_id_colname="cluster_id",
).toPandas()

assert df_result["node_conn"][0] == 1
assert df_result["edge_conn"][0] == 1
assert df_result["degeneracy"][0] == 4


def test_number_of_bridges(spark):
Expand Down Expand Up @@ -464,7 +499,10 @@ def test_four_bridges(spark):
fourbridges["cluster_id"] = 1

# Create an Edge DataFrame with "src" and "dst" columns
e2_df = spark.createDataFrame(fourbridges, ["src", "dst", "weight", "cluster_id"],)
e2_df = spark.createDataFrame(
fourbridges,
["src", "dst", "weight", "cluster_id"],
)

e2_df = e2_df.withColumn("distance", 1.0 - f.col("weight"))

Expand All @@ -481,7 +519,10 @@ def test_0_bridges(spark):
zerobridges["cluster_id"] = 1

# Create an Edge DataFrame with "src" and "dst" columns
e2_df = spark.createDataFrame(zerobridges, ["src", "dst", "weight", "cluster_id"],)
e2_df = spark.createDataFrame(
zerobridges,
["src", "dst", "weight", "cluster_id"],
)

e2_df = e2_df.withColumn("distance", 1.0 - f.col("weight"))

Expand All @@ -501,7 +542,10 @@ def test_cluster_graph_hash(spark):

e_df = spark.createDataFrame(Row(**x) for x in data_list)
df_result = cluster_graph_hash(
e_df, src="src", dst="dst", cluster_id_colname="cluster_id",
e_df,
src="src",
dst="dst",
cluster_id_colname="cluster_id",
).toPandas()

assert df_result["graphhash"][0] == "0f43d8cdd43b0b78727b192b6d6d0d0e"
Expand All @@ -518,7 +562,10 @@ def test_cluster_assortativity_neg(spark):

e_df = spark.createDataFrame(Row(**x) for x in data_list)
df_result = cluster_assortativity(
e_df, src="src", dst="dst", cluster_id_colname="cluster_id",
e_df,
src="src",
dst="dst",
cluster_id_colname="cluster_id",
).toPandas()

assert df_result["assortativity"][0] < 0.0
Expand All @@ -532,10 +579,16 @@ def test_cluster_assortativity_pos(spark):
barb["cluster_id"] = 1

# Create an spark Edge DataFrame with "src" and "dst" columns
e_df = spark.createDataFrame(barb, ["src", "dst", "cluster_id"],)
e_df = spark.createDataFrame(
barb,
["src", "dst", "cluster_id"],
)

df_result = cluster_assortativity(
e_df, src="src", dst="dst", cluster_id_colname="cluster_id",
e_df,
src="src",
dst="dst",
cluster_id_colname="cluster_id",
).toPandas()

assert df_result["assortativity"][0] > 0.0
Expand All @@ -550,10 +603,16 @@ def test_cluster_assortativity_fully(spark):

# Create an spark Edge DataFrame with "src" and "dst" columns

e_df = spark.createDataFrame(zerobridges, ["src", "dst", "cluster_id"],)
e_df = spark.createDataFrame(
zerobridges,
["src", "dst", "cluster_id"],
)

df_result = cluster_assortativity(
e_df, src="src", dst="dst", cluster_id_colname="cluster_id",
e_df,
src="src",
dst="dst",
cluster_id_colname="cluster_id",
).toPandas()

# when all degrees are equal (grids or full graphs) assortativity is nan
Expand Down

0 comments on commit ffbd691

Please sign in to comment.