-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove _pipeline
from linker and refactor CTE pipeline
#2069
Conversation
…tf from cache after calculating it
…pute_methods Enqueue and compute methods
_pipeline
from linker and refactor CTE pipeline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is great, looks like a big QoL improvement! Feels a lot clearer treating input frames in this way also - used to often get a bit tripped up by the old way, as it didn't line up between when you need/use them.
Haven't looked in detail at everything, but happy that the shape of this is good, and sure we can pick up any small issues if any arise
This PR:
linker._pipeline
, so that all SQL operations create and use fresh pipeline(s).linker._enqueue_sql
.linker._execute_sql_pipeline
_initialise_df_concat
and_initialise_df_concat_with_tf
, instead relying on more explicit calculations using functions invertically_concatenate.py
input_dataframes
tosql_pipeline_to_splink_dataframe
, instead, input dataframes can be added to aCTEPipeline
with.append_input_dataframe
. Users can therefore add dataframes in places that make the logic flow more clearlypipeline.spent
property enforces this.Motivation for this PR
Consider the existing _initialise_df_concat_with_tf.
The return type and the mutations of state it performs are confusing:
materialise=True
it returns a Splink dataframe.materialise=False
, it enqueues sql onlinker._pipeline()
and returns NoneSo it relies on:
linker._pipeline
linker._pipeline
, and reuse itThis function is not really compatible with the idea of using a fresh SQL pipeline each time we want to queue sql. You'd need to pass a pipeline in, but it's not clear what comes out.
By allowing input tables to be queued directly onto the
CTEPipeline
, then we can write a new functionlinker._enqueue_df_concat_with_tf
which:materialise=True
, runs the SQL, and returns aCTEPipeline
with the result already enqueuedmaterialise=False
, enqueues the SQL without running it to the pipeline, and returns a CTEPipeineThus allowing us to replace all use of
_initialise_df_concat_with_tf
Also closes #1696
Reviewing
The main changes have been on:
All the other changes are all just downstream consequences of changing those files
I was getting weirdness with the tests so you'll see i had to reset the cache
Related possible future PRs
pipeline.to_splink_dataframe(db_api)
method? This feels clearer than having to calldb_api.sql_pipeline_to_splink_dataframe(pipeline)