Skip to content

Commit

Permalink
Merge pull request #86 from moj-analytical-services/graph_hashes_edges
Browse files Browse the repository at this point in the history
add graph_hashes with edge attributes
  • Loading branch information
mamonu committed Mar 12, 2022
2 parents b4a9eac + e3ff865 commit e926687
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 188 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "splink_graph"
version = "0.4.21"
version = "0.5.0"
description = "a small set of graph functions to be used from pySpark on top of networkx and graphframes"
authors = ["Theodore Manassis <theodore.manassis@digital.justice.gov.uk>"]
keywords = ["graph theory", "graph metrics"]
Expand Down
51 changes: 51 additions & 0 deletions splink_graph/cluster_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,55 @@ def gh(pdf: pd.DataFrame) -> pd.DataFrame:
return out


def cluster_graph_hash_edge_attr(
sparkdf, src="src", dst="dst", cluster_id_colname="cluster_id", edge_attr_col=None
):
"""calculate weisfeiler-lehman graph hash of a cluster taking into account the edge weights too.
weights are converted to strings for the hashing.
Args:
sparkdf: imput edgelist Spark DataFrame
src: src column name
dst: dst column name
cluster_id_colname: Graphframes-created connected components created cluster_id
edge_attr_col: edge attributes (like edge weights) column
"""
psrc = src
pdst = dst

@pandas_udf(
StructType(
[
StructField("cluster_id", LongType()),
StructField("graphhash_ea", StringType()),
]
),
functionType=PandasUDFType.GROUPED_MAP,
)
def gh_edge_attr(pdf: pd.DataFrame) -> pd.DataFrame:

if edge_attr_col:
pdf[edge_attr_col] = pdf[edge_attr_col].astype(str)

nxGraph = nx.Graph()
nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst, [edge_attr_col])

ghe = nx.weisfeiler_lehman_graph_hash(nxGraph, edge_attr=edge_attr_col)
co = pdf[cluster_id_colname].iloc[0] # access component id

return pd.DataFrame(
[[co] + [ghe]],
columns=[
"cluster_id",
"graphhash_ea",
],
)

out = sparkdf.groupby(cluster_id_colname).apply(gh_edge_attr)
return out


def cluster_main_stats(sparkdf, src="src", dst="dst", cluster_id_colname="cluster_id"):
"""calculate diameter / transitivity(GCC) / triangle clustering coefficient LCC / square clustering coeff
Expand Down Expand Up @@ -224,6 +273,8 @@ def cluster_connectivity_stats(
node_conn: Node Connectivity measures the minimal number of vertices that can be removed to disconnect the graph.
edge_conn: Edge connectivity measures the minimal number of edges that can be removed to disconnect the graph.
degeneracy: a way to measure sparsity
num_articulation_pts: how many articulation points? an articulation point is a node that if removed disconnects a graph
The larger these metrics are --> the more connected the subggraph is.
Expand Down
88 changes: 44 additions & 44 deletions splink_graph/edge_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ def edgebetweeness(
cluster_id_colname="cluster_id",
):
"""return edge betweenness
Args:
sparkdf: imput edgelist Spark DataFrame
src: src column name
dst: dst column name
distance_colname: distance column name
cluster_id_colname: Graphframes-created connected components created cluster_id
"""

psrc = src
Expand Down Expand Up @@ -90,47 +90,47 @@ def bridge_edges(
):

"""return edges that are bridges
Args:
sparkdf: imput edgelist Spark DataFrame
src: src column name
dst: dst column name
distance_colname: distance column name
cluster_id_colname: Graphframes-created connected components created cluster_id
Returns:
src:
dst:
distance:
weight:
cluster_id: Graphframes-created connected components created cluster_id
example input spark dataframe
|src|dst|weight|cluster_id|distance|
+---|---|------|----------|--------|
| f| d| 0.67| 0|0.329 |
| f| g| 0.34| 0|0.659 |
| b| c| 0.56|8589934592| 0.439 |
| g| h| 0.99| 0|0.010 |
| a| b| 0.4|8589934592|0.6 |
| h| i| 0.5| 0|0.5 |
| h| j| 0.8| 0| 0.199 |
| d| e| 0.84| 0| 0.160 |
| e| f| 0.65| 0|0.35 |
example output spark dataframe
|src|dst|weight|cluster_id|distance|
+---|---|------|----------|--------|
| b| c| 0.56|8589934592|0.439 |
| f| g| 0.34| 0|0.659 |
| g| h| 0.99| 0|0.010 |
| h| i| 0.5| 0|0.5 |
| h| j| 0.8| 0|0.199 |
Args:
sparkdf: imput edgelist Spark DataFrame
src: src column name
dst: dst column name
distance_colname: distance column name
cluster_id_colname: Graphframes-created connected components created cluster_id
Returns:
src:
dst:
distance:
weight:
cluster_id: Graphframes-created connected components created cluster_id
example input spark dataframe
|src|dst|weight|cluster_id|distance|
+---|---|------|----------|--------|
| f| d| 0.67| 0|0.329 |
| f| g| 0.34| 0|0.659 |
| b| c| 0.56|8589934592| 0.439 |
| g| h| 0.99| 0|0.010 |
| a| b| 0.4|8589934592|0.6 |
| h| i| 0.5| 0|0.5 |
| h| j| 0.8| 0| 0.199 |
| d| e| 0.84| 0| 0.160 |
| e| f| 0.65| 0|0.35 |
example output spark dataframe
|src|dst|weight|cluster_id|distance|
+---|---|------|----------|--------|
| b| c| 0.56|8589934592|0.439 |
| f| g| 0.34| 0|0.659 |
| g| h| 0.99| 0|0.010 |
| h| i| 0.5| 0|0.5 |
| h| j| 0.8| 0|0.199 |
"""
Expand Down
26 changes: 15 additions & 11 deletions splink_graph/embedding/n2v.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,26 @@ def _node2vec_embedding(
walk_length=8,
):
"""provide node2vec embedding of each subgraph/cluster
Args:
sparkdf: imput edgelist Spark DataFrame
src: src column name
dst: dst column name
cluster_id_colname: Graphframes connected components created cluster_id
dimensions (int): node2vec parameter
walk_length (int): node2vec parameter
num_walks (int):node2vec parameter
dimensions (int): node2vec parameter
walk_length (int): node2vec parameter
num_walks (int):node2vec parameter
Returns:
cluster_id: Graphframes connected components created cluster_id
n2vembed: node2vec embedded array casted as string
n2vembed: node2vec embedded array casted as string
Note: in order for the array to be used it needs to be casted back to a numpy array downstream
like this: `n2varray= np.array(ast.literal_eval(n2varraystring))`
This has been implemented like this because of limitations on return types of PANDAS_UDFs)
"""

psrc = src
Expand Down Expand Up @@ -85,7 +85,11 @@ def n2v(pdf: pd.DataFrame) -> pd.DataFrame:
co = pdf[cluster_id_colname].iloc[0] # access component id

return pd.DataFrame(
[[co] + [str(embeddings)]], columns=["cluster_id", "n2vembed",]
[[co] + [str(embeddings)]],
columns=[
"cluster_id",
"n2vembed",
],
)

out = sparkdf.groupby(cluster_id_colname).apply(n2v)
Expand Down
Loading

0 comments on commit e926687

Please sign in to comment.