Skip to content

Commit

Permalink
Merge pull request #142 from moj-analytical-services/histdiag2
Browse files Browse the repository at this point in the history
Remove pandas dependency
  • Loading branch information
mamonu committed Dec 5, 2020
2 parents 84f8e71 + 7d036d9 commit cad5e71
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 101 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "splink"
version = "0.2.9"
version = "0.2.10"
description = "Implementation in Apache Spark of the EM algorithm to estimate parameters of Fellegi-Sunter's canonical model of record linkage."
authors = ["Robin Linacre <robinlinacre@hotmail.com>", "Sam Lindsay", "Theodore Manassis"]
license = "MIT"
Expand Down
143 changes: 59 additions & 84 deletions splink/diagnostics.py
@@ -1,8 +1,10 @@
from functools import reduce
from copy import copy
import warnings

from pyspark.sql.dataframe import DataFrame
from pyspark.sql.session import SparkSession
from .check_types import check_types
import pandas as pd
import warnings


altair_installed = True
Expand All @@ -23,9 +25,7 @@
"title": "Histogram of splink scores",
"width": 700,
"encoding": {
"tooltip": [
{"field": "count_rows", "title": "count", "type": "quantitative"}
],
"tooltip": [{"field": "count_rows", "title": "count", "type": "quantitative"}],
"x": {
"axis": {"title": "splink score"},
"bin": "binned",
Expand All @@ -44,33 +44,27 @@

@check_types
def _calc_probability_density(
df_e: DataFrame, spark: SparkSession, buckets=None, score_colname=None,
df_e: DataFrame,
spark: SparkSession,
buckets=None,
score_colname=None,
):

"""perform splink score histogram calculations / internal function
"""perform splink score histogram calculations / internal function
Compute a histogram using the provided buckets.
Args:
df_e (DataFrame): A dataframe of record comparisons containing a splink score,
e.g. as produced by the .... function
df_e (DataFrame): A dataframe of record comparisons containing a
splink score, e.g. as produced by the .... function
spark (SparkSession): SparkSession object
score_colname : is the score in another column? defaults to None
buckets : accepts either a list of split points or an integer number that is used
to create equally spaced split points. It defaults to 100 equally spaced split points
from 0.0 to 1.0
score_colname: is the score in another column? defaults to None
buckets: accepts either a list of split points or an integer number that is used
to create equally spaced split points. It defaults to 100 equally
spaced split points from 0.0 to 1.0
Returns:
(DataFrame) : pandas dataframe of histogram bins for appropriate splink score variable ready to be plotted.
(list) : list of rows of histogram bins for appropriate splink score variable ready to be plotted.
"""

# if splits a list then use it. if None... then create default. if integer then create equal bins
Expand All @@ -96,11 +90,7 @@ def _calc_probability_density(
# Otherwise match_probability is used or if that doesnt exit a warning is fired and function exits

if score_colname:
hist = (
df_e.select(score_colname)
.rdd.flatMap(lambda x: x)
.histogram(buckets)
)
hist = df_e.select(score_colname).rdd.flatMap(lambda x: x).histogram(buckets)

elif "tf_adjusted_match_prob" in df_e.columns:

Expand All @@ -113,57 +103,51 @@ def _calc_probability_density(
elif "match_probability" in df_e.columns:

hist = (
df_e.select("match_probability")
.rdd.flatMap(lambda x: x)
.histogram(buckets)
df_e.select("match_probability").rdd.flatMap(lambda x: x).histogram(buckets)
)

else:
warnings.warn("Cannot find score column")

# get bucket from and to points
bin_low = hist[0]
bin_high = copy(hist[0])
bin_low.pop()
bin_high.pop(0)
counts = hist[1]

hist[1].append(None)
hist_df = pd.DataFrame(
{"splink_score_bin_low": hist[0], "count_rows": hist[1]}
)
hist_df["splink_score_bin_high"] = hist_df["splink_score_bin_low"].shift(
-1
)
hist_df = hist_df.drop(hist_df.tail(1).index)
rows = []
for item in zip(bin_low, bin_high, counts):
new_row = {
"splink_score_bin_low": item[0],
"splink_score_bin_high": item[1],
"count_rows": item[2],
}
rows.append(new_row)

# take into account the bin width
for r in rows:
r["binwidth"] = r["splink_score_bin_high"] - r["splink_score_bin_low"]
r["freqdensity"] = r["count_rows"] / r["binwidth"]

hist_df["binwidth"] = (
hist_df["splink_score_bin_high"] - hist_df["splink_score_bin_low"]
)
hist_df["freqdensity"] = hist_df["count_rows"] / hist_df["binwidth"]
sumfreqdens = reduce(lambda a, b: a + b["freqdensity"], rows, 0)

sumfreqdens = hist_df.freqdensity.sum()
hist_df["normalised"] = hist_df["freqdensity"] / sumfreqdens
for r in rows:
r["normalised"] = r["freqdensity"] / sumfreqdens

return hist_df
return rows


def _create_probability_density_plot(hist_df):
"""plot score histogram
def _create_probability_density_plot(data):
"""plot score histogram
Args:
hist_df (pandas DataFrame): A pandas dataframe of histogram bins
Args:
data (list): A list of rows of histogram bins
as produced by the _calc_probability_density function
Returns:
if altair is installed a plot. if altair is not installed
Returns:
if altair is installed a plot. if altair is not installed
then it returns the vega lite chart spec as a dictionary
"""

data = hist_df.to_dict(orient="records")
hist_def_dict["data"]["values"] = data

if altair_installed:
Expand All @@ -173,39 +157,30 @@ def _create_probability_density_plot(hist_df):


def splink_score_histogram(
df_e: DataFrame, spark: SparkSession, buckets=None, score_colname=None,
df_e: DataFrame,
spark: SparkSession,
buckets=None,
score_colname=None,
):

"""splink score histogram diagnostic plot public API function
Compute a histogram using the provided buckets and plot the result.
Args:
df_e (DataFrame): A dataframe of record comparisons containing a splink score,
e.g. as produced by the `get_scored_comparisons` function
Args:
df_e (DataFrame): A dataframe of record comparisons containing a splink score,
e.g. as produced by the `get_scored_comparisons` function
spark (SparkSession): SparkSession object
score_colname : is the score in another column? defaults to None
buckets : accepts either a list of split points or an integer number that is used to
create equally spaced split points. It defaults to 100 equally spaced split points from 0.0 to 1.0
buckets : accepts either a list of split points or an integer number that is used to
create equally spaced split points. It defaults to 100 equally spaced split points from 0.0 to 1.0
Returns:
if altair library is installed this function returns a histogram plot. if altair is not installed
then it returns the vega lite chart spec as a dictionary
"""

pd_df = _calc_probability_density(
rows = _calc_probability_density(
df_e, spark=spark, buckets=buckets, score_colname=score_colname
)

return _create_probability_density_plot(pd_df)
return _create_probability_density_plot(rows)
32 changes: 16 additions & 16 deletions tests/test_diagnostics.py
Expand Up @@ -14,15 +14,14 @@ def test_score_hist_score(spark, gamma_settings_4, params_4, sqlite_con_4):

"""
test that a dataframe gets processed when function is given a column name to take splink score from
"""

dfpd = pd.read_sql("select * from df", sqlite_con_4)
df = spark.createDataFrame(dfpd)
df = df.withColumn("df_dummy", f.lit(1.0))

res = _calc_probability_density(df, spark=spark, score_colname="df_dummy")

res = pd.DataFrame(res)
assert all(value != None for value in res.count_rows.values)
assert isinstance(res, pd.DataFrame)

Expand All @@ -39,15 +38,15 @@ def test_score_hist_tf(spark, gamma_settings_4, params_4, sqlite_con_4):

res = _calc_probability_density(df, spark=spark)

assert isinstance(res, pd.DataFrame)
assert isinstance(res, list)


def test_score_hist_splits(spark, gamma_settings_4, params_4, sqlite_con_4):

"""
test that a dataframe gets processed with non-default splits list
test binwidths and normalised probability densities sum up to 1.0
"""

dfpd = pd.read_sql("select * from df", sqlite_con_4)
Expand All @@ -57,8 +56,8 @@ def test_score_hist_splits(spark, gamma_settings_4, params_4, sqlite_con_4):
mysplits = [0.3, 0.6]

res = _calc_probability_density(df, spark=spark, buckets=mysplits)
res = pd.DataFrame(res)

assert isinstance(res, pd.DataFrame)
assert res.count_rows.count() == 3
assert res.count_rows.sum() == res.count_rows.cumsum()[2]
assert res.binwidth.sum() == 1.0
Expand All @@ -67,7 +66,8 @@ def test_score_hist_splits(spark, gamma_settings_4, params_4, sqlite_con_4):
mysplits2 = [0.6, 0.3]

res2 = _calc_probability_density(df, spark=spark, buckets=mysplits2)

res2 = pd.DataFrame(res2)

assert isinstance(res2, pd.DataFrame)
assert res2.count_rows.count() == 3
assert res2.count_rows.sum() == res.count_rows.cumsum()[2]
Expand All @@ -78,16 +78,15 @@ def test_score_hist_splits(spark, gamma_settings_4, params_4, sqlite_con_4):
def test_score_hist_intsplits(spark, gamma_settings_4, params_4, sqlite_con_4):

"""
test integer value in splits variable
"""

dfpd = pd.read_sql("select * from df", sqlite_con_4)
df = spark.createDataFrame(dfpd)
df = df.withColumn("tf_adjusted_match_prob", 1.0 - (f.rand() / 10))

res3 = _calc_probability_density(df, spark=spark, buckets=5)

res3 = pd.DataFrame(res3)
assert res3.count_rows.count() == 5
assert res3.binwidth.sum() == 1.0
assert res3.normalised.sum() == 1.0
Expand All @@ -96,7 +95,7 @@ def test_score_hist_intsplits(spark, gamma_settings_4, params_4, sqlite_con_4):
def test_score_hist_output_json(spark, gamma_settings_4, params_4, sqlite_con_4):

"""
test chart exported as dictionary is in fact a valid dictionary
"""

Expand All @@ -121,15 +120,16 @@ def test_score_hist_output_json(spark, gamma_settings_4, params_4, sqlite_con_4)
def test_prob_density(spark, gamma_settings_4, params_4, sqlite_con_4):

"""
a test that checks that probability density is computed correctly.
explicitly define a dataframe with tf_adjusted_match_prob = [0.1, 0.3, 0.5, 0.7, 0.9]
and make sure that the probability density is the correct value (0.2) with all 5 bins
a test that checks that probability density is computed correctly.
explicitly define a dataframe with tf_adjusted_match_prob = [0.1, 0.3, 0.5, 0.7, 0.9]
and make sure that the probability density is the correct value (0.2) with all 5 bins
"""

dfpd = pd.DataFrame([0.1, 0.3, 0.5, 0.7, 0.9], columns=["tf_adjusted_match_prob"])
spdf = spark.createDataFrame(dfpd)

res = _calc_probability_density(spdf, spark=spark, buckets=5)
res = pd.DataFrame(res)
assert all(value == pytest.approx(0.2) for value in res.normalised.values)

0 comments on commit cad5e71

Please sign in to comment.