-
Notifications
You must be signed in to change notification settings - Fork 127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEAT] Detect equi-join conditions in a blocking rule to count the number of comparisons without needing to perform the join #1388
[FEAT] Detect equi-join conditions in a blocking rule to count the number of comparisons without needing to perform the join #1388
Conversation
Test: test_2_rounds_1k_duckdbPercentage change: -26.1%
Test: test_2_rounds_1k_sqlitePercentage change: -24.1%
Click here for vega lite time series charts |
As expected, the new methodology is dramatically quicker, especially on loose blocking conditions (which are the ones which are most problematic and we most need to be able to analyse easily): Test scriptdf = pd.read_parquet(
"/Users/robinlinacre/Documents/data_linking/splink/synthetic_1m_clean.parquet"
)
blocking_rules = [
"l.first_name = r.first_name",
"l.first_name = r.first_name AND l.surname = r.surname",
"substr(l.first_name,2,3) = substr(r.first_name,3,4)",
"substr(l.first_name,1,3) = substr(r.first_name,1,3)",
]
settings = {"link_type": "dedupe_only"}
linker = DuckDBLinker(df, settings)
for br in blocking_rules:
print("-------")
print(br)
start_time = datetime.datetime.now()
print(linker.count_num_comparisons_from_blocking_rule_2(br))
end_time = datetime.datetime.now()
time_new = end_time - start_time
print("Time taken for count_num_comparisons_from_blocking_rule_2: ", time_new)
start_time = datetime.datetime.now()
print(linker.count_num_comparisons_from_blocking_rule(br))
end_time = datetime.datetime.now()
time_old = end_time - start_time
print(
"Time taken for count_num_comparisons_from_blocking_rule: ",
time_old,
)
print(f"Speed multipler: {(time_old/time_new):,.1f}") Output:
|
Spark test script: from pyspark.context import SparkConf, SparkContext
from pyspark.sql import SparkSession
from splink.spark.linker import SparkLinker
conf = SparkConf()
conf.set("spark.driver.memory", "12g")
conf.set("spark.sql.shuffle.partitions", "12")
sc = SparkContext.getOrCreate(conf=conf)
sc.setCheckpointDir("tmp_checkpoints/")
spark = SparkSession(sc)
settings = {
"link_type": "dedupe_only",
}
df = spark.read.csv("./tests/datasets/fake_1000_from_splink_demos.csv", header=True)
# df = df.withColumn("hi THERE", "email")
df = df.withColumnRenamed("surname", "hi THERE")
linker = SparkLinker(df, input_table_aliases="fake_data_1")
br = "l.`first_name` = r.`first_name`"
linker._count_num_comparisons_from_blocking_rule_pre_filter_conditions(br)
br = "l.first_name = r.`first_name` and l.`hi THERE` = r.`hi THERE`"
linker._count_num_comparisons_from_blocking_rule_pre_filter_conditions(br) |
def count_comparisons_from_blocking_rule_pre_filter_conditions_sqls( | ||
linker: "Linker", blocking_rule: Union[str, "BlockingRule"] | ||
): | ||
if isinstance(blocking_rule, str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, you'll be able to use this new conversion function once some of my BR work has been merged in.
Co-authored-by: Tom Hepworth <45356472+ThomasHepworth@users.noreply.github.com>
if not join_conditions: | ||
if linker._two_dataset_link_only: | ||
sql = f""" | ||
SELECT | ||
(SELECT COUNT(*) FROM {input_tablename_l}) | ||
* | ||
(SELECT COUNT(*) FROM {input_tablename_r}) | ||
AS count_of_pairwise_comparisons_generated | ||
""" | ||
else: | ||
sql = """ | ||
select count(*) * count(*) as count_of_pairwise_comparisons_generated | ||
from __splink__df_concat | ||
|
||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This appears to be broken for "stacked" dataframes at present - i.e. a link job where we input a single dataframe with a column detailing which records belongs to which dataset.
from splink.duckdb.linker import DuckDBLinker
import pandas as pd
df = pd.read_csv("./tests/datasets/fake_1000_from_splink_demos.csv")
df['source_dataset'] = pd.Series(['a'] * 500 + ['b'] * 500, name='source_dataset')
linker = DuckDBLinker(
df,
settings_dict = {
"link_type": "link_only",
}
)
linker._initialise_df_concat(True).as_pandas_dataframe()
linker._two_dataset_link_only # False
br = "levenshtein(first_name, 3)"
display(
linker._count_num_comparisons_from_blocking_rule_pre_filter_conditions(br)
) # expecting 25k
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch, good spot, I will try and figure out how to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh wait - I don't think this is actually broken. It's a little difficult to explain, but let me try:
link_only
works in the general n dataset case, includingn==2
- In the general case, a self-join of the concatenated datasets is used to generate comparisons, filtering out records where
source_dataset_l = source_dataset_r
- There is a specific optimisation where splink observes two input datasets that allows an inner join of the raw (non concatenated) input datasets
- This optimisation is not used if you pass Splink a single dataframe that implicitly has two source datasets
1,000,000
is therefore the correct result from your code example because the optimisation is not being used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separately, there is something a bit weird here for which I will raise a different issue. Consider running the following on master
:
from splink.duckdb.linker import DuckDBLinker
import pandas as pd
df = pd.read_csv("./tests/datasets/fake_1000_from_splink_demos.csv")
df['source_dataset'] = pd.Series(['a'] * 500 + ['b'] * 500, name='source_dataset')
linker = DuckDBLinker(
df,
settings_dict = {
"link_type": "link_and_dedupe"
}
)
import logging
logging.getLogger("splink").setLevel(1)
linker._initialise_df_concat_with_tf(True).as_pandas_dataframe()
You still end up with a column called __splink_source_dataset
, which seems unnecessary
I will raise an issue - I think this probably needs to be fixed separately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's definitely unnecessary.
The fix that I put in place was merely intended as a bandage before a more permanent solution was implemented.
Apologies, I should've made an issue at the time.
I'm essentially happy to approve this once the "stacked dataframe" problem has been resolved. That may require its own PR, because some of the logic may need to be rewritten. Apologies, I meant to clean up some of that code a while back, but never found the time. |
Needed because sqlglot.optimizer.eliminate_joins.join_condition is not available prior to v7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Robin, this is great!
Some caveats to be dealt with in a later PR.
Is your Pull Request linked to an existing Issue or Pull Request?
Will close #1376
Will also set the groundwork for a future method like
linker.suggest_blocking_rules(target_comparisons=x)
Give a brief description for the solution you have provided
See the #1376 issue for more detail:
To do:
analyse_blocking.py
PR Checklist