Skip to content

Commit

Permalink
Merge pull request #15 from moj-analytical-services/customfields
Browse files Browse the repository at this point in the history
custom fields
  • Loading branch information
mamonu committed Jun 3, 2021
2 parents b1771bb + 5b0613c commit 91c3775
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "splink_graph"
version = "0.3.11"
version = "0.3.12"
description = "a small set of graph functions to be used from pySpark on top of networkx and graphframes"
authors = ["Theodore Manassis <theodore.manassis@digital.justice.gov.uk>"]
keywords = ["graph theory", "graph metrics"]
Expand Down
41 changes: 28 additions & 13 deletions splink_graph/vectorised.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
)


def edgebetweeness(sparkdf):
def edgebetweeness(sparkdf, src="src", dst="dst", distance="distance"):
from pyspark.context import SparkContext, SparkConf
from pyspark.sql import SparkSession

Expand All @@ -47,17 +47,18 @@ def edgebetweeness(sparkdf):
conf.set("spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT", "1")
sc = SparkContext.getOrCreate(conf=conf)

psrc = src
pdst = dst
pdistance = distance

@pandas_udf(eboutSchema, PandasUDFType.GROUPED_MAP)
def ebdf(pdf):
src = "src"
dst = "dst"
distance = "distance"

srclist = []
dstlist = []
eblist = []
nxGraph = nx.Graph()
nxGraph = nx.from_pandas_edgelist(pdf, src, dst, distance)
nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst, pdistance)
eb = edge_betweenness_centrality(nxGraph, normalized=True, weight=distance)
currentcomp = pdf["component"].iloc[0] # access current component
compsize = pdf["component"].size # how many nodes does this cluster have?
Expand All @@ -79,7 +80,7 @@ def ebdf(pdf):
return out


def bridgesgroupedmap(sparkdf):
def bridgesgroupedmap(sparkdf, src="src", dst="dst", distance="distance"):

"""
Expand Down Expand Up @@ -113,6 +114,9 @@ def bridgesgroupedmap(sparkdf):
"""
psrc = src
pdst = dst
pdistance = distance

bridgesoutSchema = StructType(
[
Expand All @@ -128,7 +132,7 @@ def bridgesgroupedmap(sparkdf):
def br_p_udf(pdf):

nxGraph = nx.Graph()
nxGraph = nx.from_pandas_edgelist(pdf, "src", "dst", "distance")
nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst, pdistance)

b = bridges(nxGraph)
bpdf = pd.DataFrame(b, columns=["src", "dst",])
Expand All @@ -139,7 +143,7 @@ def br_p_udf(pdf):
return out


def eigencentrality(sparkdf):
def eigencentrality(sparkdf, src="src", dst="dst", distance="distance"):

"""
Expand Down Expand Up @@ -196,10 +200,14 @@ def eigencentrality(sparkdf):
]
)

psrc = src
pdst = dst
pdistance = distance

@pandas_udf(ecschema, PandasUDFType.GROUPED_MAP)
def eigenc(pdf):
nxGraph = nx.Graph()
nxGraph = nx.from_pandas_edgelist(pdf, "src", "dst", "distance")
nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst, pdistance)
ec = eigenvector_centrality(nxGraph)
return (
pd.DataFrame.from_dict(ec, orient="index", columns=["eigen_centrality"])
Expand All @@ -211,7 +219,7 @@ def eigenc(pdf):
return out


def harmoniccentrality(sparkdf):
def harmoniccentrality(sparkdf, src="src", dst="dst", distance="distance"):

"""
Expand Down Expand Up @@ -264,10 +272,14 @@ def harmoniccentrality(sparkdf):
]
)

psrc = src
pdst = dst
pdistance = distance

@pandas_udf(hcschema, PandasUDFType.GROUPED_MAP)
def harmc(pdf):
nxGraph = nx.Graph()
nxGraph = nx.from_pandas_edgelist(pdf, "src", "dst", "distance")
nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst, pdistance)
hc = harmonic_centrality(nxGraph)
return (
pd.DataFrame.from_dict(hc, orient="index", columns=["harmonic_centrality"])
Expand All @@ -281,7 +293,7 @@ def harmc(pdf):
return out


def diameter_radius_transitivity(sparkdf):
def diameter_radius_transitivity(sparkdf, src="src", dst="dst"):
"""
input spark dataframe:
Expand All @@ -308,6 +320,9 @@ def diameter_radius_transitivity(sparkdf):
"""

psrc = src
pdst = dst

@pandas_udf(
StructType(
[
Expand All @@ -325,7 +340,7 @@ def diameter_radius_transitivity(sparkdf):
def drt(pdf):

nxGraph = nx.Graph()
nxGraph = nx.from_pandas_edgelist(pdf, "src", "dst")
nxGraph = nx.from_pandas_edgelist(pdf, psrc, pdst)
d = diameter(nxGraph)
r = radius(nxGraph)
t = transitivity(nxGraph)
Expand Down
71 changes: 71 additions & 0 deletions tests/test_vectorised.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,27 @@ def test_diameter_radius_transitivity(spark):
assert df_result["transitivity"][0] == pytest.approx(0, 0.01)
assert df_result["transitivity"][1] == pytest.approx(0, 0.01)

assert df_result["graphhash"][0] == "0f43d8cdd43b0b78727b192b6d6d0d0e"
assert df_result["graphhash"][1] == "0f43d8cdd43b0b78727b192b6d6d0d0e"


def test_diameter_radius_transitivity_customcolname(spark):
# Create an Edge DataFrame with "id_l" and "id_r" columns
data_list = [
{"id_l": "a", "id_r": "b", "distance": 0.4, "component": 1},
{"id_l": "b", "id_r": "c", "distance": 0.56, "component": 1},
{"id_l": "d", "id_r": "e", "distance": 0.2, "component": 2},
{"id_l": "f", "id_r": "e", "distance": 0.8, "component": 2},
]

e_df = spark.createDataFrame(Row(**x) for x in data_list)
e_df = e_df.withColumn("weight", 1.0 - f.col("distance"))

df_result = diameter_radius_transitivity(e_df, src="id_l", dst="id_r").toPandas()

assert df_result["diameter"][0] == 2
assert df_result["diameter"][1] == 2


def test_edgebetweeness_samecomponentsinout(spark):
data_list = [
Expand Down Expand Up @@ -72,6 +93,22 @@ def test_edgebetweeness_simple(spark):
assert df_result["eb"].values == pytest.approx(0.666667, 0.1)


def test_edgebetweeness_simple_customcolname(spark):
data_list = [
{"id_l": "a", "id_r": "b", "distance": 0.4, "component": 1},
{"id_l": "b", "id_r": "c", "distance": 0.56, "component": 1},
{"id_l": "d", "id_r": "e", "distance": 0.2, "component": 2},
{"id_l": "f", "id_r": "e", "distance": 0.8, "component": 2},
]

e_df = spark.createDataFrame(Row(**x) for x in data_list)
e_df = e_df.withColumn("weight", 1.0 - f.col("distance"))

df_result = edgebetweeness(e_df, src="id_l", dst="id_r").toPandas()

assert df_result["eb"].values == pytest.approx(0.666667, 0.1)


def test_eigencentrality_simple(spark):
data_list = [
{"src": "a", "dst": "b", "distance": 0.4, "component": 1},
Expand All @@ -88,6 +125,23 @@ def test_eigencentrality_simple(spark):
assert df_result["eigen_centrality"][0] == pytest.approx(0.50000, 0.01)


def test_eigencentrality_customcolname(spark):

data_list = [
{"id_l": "a", "id_r": "b", "distance": 0.4, "component": 1},
{"id_l": "b", "id_r": "c", "distance": 0.56, "component": 1},
{"id_l": "d", "id_r": "e", "distance": 0.2, "component": 2},
{"id_l": "f", "id_r": "e", "distance": 0.8, "component": 2},
]

e_df = spark.createDataFrame(Row(**x) for x in data_list)
e_df = e_df.withColumn("weight", 1.0 - f.col("distance"))

df_result = eigencentrality(e_df, src="id_l", dst="id_r").toPandas()

assert df_result["eigen_centrality"][0] == pytest.approx(0.50000, 0.01)


def test_harmoniccentrality_simple(spark):
data_list = [
{"src": "a", "dst": "b", "distance": 0.4, "component": 1},
Expand All @@ -103,3 +157,20 @@ def test_harmoniccentrality_simple(spark):

assert df_result["harmonic_centrality"][0] == pytest.approx(1.50, 0.1)
assert df_result["harmonic_centrality"][4] == pytest.approx(2.0, 0.1)


def test_harmoniccentrality_customcolname(spark, src="id_l", dst="id_r"):
data_list = [
{"id_l": "a", "id_r": "b", "distance": 0.4, "component": 1},
{"id_l": "b", "id_r": "c", "distance": 0.56, "component": 1},
{"id_l": "d", "id_r": "e", "distance": 0.2, "component": 2},
{"id_l": "f", "id_r": "e", "distance": 0.8, "component": 2},
]

e_df = spark.createDataFrame(Row(**x) for x in data_list)
e_df = e_df.withColumn("weight", 1.0 - f.col("distance"))

df_result = harmoniccentrality(e_df, src="id_l", dst="id_r").toPandas()

assert df_result["harmonic_centrality"][0] == pytest.approx(1.50, 0.1)
assert df_result["harmonic_centrality"][4] == pytest.approx(2.0, 0.1)

0 comments on commit 91c3775

Please sign in to comment.