Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KED-2891] Implement spark.DeltaTable dataset #964

Merged
merged 56 commits into from Dec 3, 2021

Conversation

jiriklein
Copy link
Contributor

@jiriklein jiriklein commented Oct 14, 2021

Description

This PR was created to add the functionality of databricks DeltaTable in a kedro pipeline. Unlike a regular dataset, which has the concept of save that fully overwrites (or creates) the underlying data, a DeltaTable is mutable underneath - this means it can be appended to, updated, merged or deleted, as a regular SQL table is (along with commit logs etc.).

This means that unlike with other datasets, kedro needs to be extremely careful on defining what the save method does. In my opinion, this is, in the case of DeltaTable, modifying an underlying dataset. The actual implementation is described below.

Development notes

In my opinion, the DeltaTable method calls are best abstracted away from the users. The actual API then comes in through well-designed conf/catalog entry and a strategy pattern (or similar), that resolves into the right call in the _save method.
Happy to take onboard any design contributions on this.

Checklist

  • Read the contributing guidelines
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added a description of this change and added my name to the list of supporting contributions in the RELEASE.md file
  • Added tests to cover my changes

Notice

  • I acknowledge and agree that, by checking this box and clicking "Submit Pull Request":

  • I submit this contribution under the Apache 2.0 license and represent that I am entitled to do so on behalf of myself, my employer, or relevant third parties, as applicable.

  • I certify that (a) this contribution is my original creation and / or (b) to the extent it is not my original creation, I am authorised to submit this contribution on behalf of the original creator(s) or their licensees.

  • I certify that the use of this contribution as authorised by the Apache 2.0 license does not violate the intellectual property rights of anyone else.

@jiriklein jiriklein self-assigned this Oct 14, 2021
@jiriklein
Copy link
Contributor Author

@datajoely @limdauto

I am going to update the PR but I can foresee that the YAML would take a similar shape. Please let me know what do you think about that.

_anchor: &anchor
  type: spark.DeltaTableDataset

delta_overwrite_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/overwrite/dataset
  delta_options:
    replaceWhere: "start_date >= '2017-01-01' AND end_date <= '2021-01-01'"
    overwriteSchema: true
  save_args:
    mode: overwrite

delta_append_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/append/dataset
  save_args:
    mode: append

delta_update_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/update/dataset
  delta_options:
    timestampAsOf: '2019-01-01'
    versionAsOf: 2
  save_args:
    mode: update
    update_predicates: 
      - 'col = "foo"'
    update_values: 
      'event': 'baz'
      'another': 'foobar' 

delta_upsert_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/upsert/dataset
  save_args:
    mode: upsert
    key: 'a.id = b.id'
    whenMatchedUpdate: 
      'a.data': 'b.new_data'
    whenNotMatchedInsert:
      'a.date': 'b.new_date'
      'a.id': 'b.id'

delta_delete_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/delete/dataset
  save_args:
    mode: delete
    delete_predicates:
      - 'date < "2017-01-01"'

@datajoely
Copy link
Contributor

datajoely commented Oct 15, 2021

So I have a few thoughts - this is very related to the SQL conversation I had with @idanov and @MerelTheisenQB a while back which ended with the conclusion that we don't want to encourage many transformation side effects outside of the Kedro DAG.

The central question here is what sort of declarative CRUD functionality we allow in the catalog. Of the two options (1) Catalog definition (2) Within node, option (2) is still possible in both. I don't know if it means we shouldn't do (1), but it highlights the fact that it adds complexity in terms of implementation which we could just delegate to the existing PySpark API.

Upsert example - The syntax doesn't make it especially clear to the reader where exactly A and B come from.
Merge example - Not available yet, but the syntax needed to support merging two catalog entries feels like it will be cumbersome.

The more I write on this comment, the less convinced I am that we should expose this sort of functionality in the catalog definition.

@jiriklein
Copy link
Contributor Author

So I have a few thoughts - this is very related to the SQL conversation I had with @idanov and @MerelTheisenQB a while back which ended with the conclusion that we don't want to encourage many transformation side effects outside of the Kedro DAG.

The central question here is what sort of declarative CRUD functionality we allow in the catalog. Of the two options (1) Catalog definition (2) Within node, option (2) is still possible in both. I don't know if it means we shouldn't do (1), but it highlights the fact that it adds complexity in terms of implementation which we could just delegate to the existing PySpark API.

Upsert example - The syntax doesn't make it especially clear to the reader where exactly A and B come from. Merge example - Not available yet, but the syntax needed to support merging two catalog entries feels like it will be cumbersome.

The more I write on this comment, the less convinced I am that we should expose this sort of functionality in the catalog definition.

In my opinion, this fully aligns with not allowing IO ops outside of the node. Therefore the only place where this could be is the _save and _load methods respectively - thus in the dataset.
My bad - upsert is merge - The other keyword would lead to either

  • dataset entry (more kedro-like but gives access to catalog within a dataset which might be a circular dep?)
  • filepath (easier but leads to not catching this in the DAG, therefore runtime errors)
    So I fully see your point.

THAT BEING SAID - we could restrict the upsert operation to merging the dataframe to earlier version of itself only - which is the point of the upsert. If the user wants to use the merge functionality, they can do that within node. But upserting to self is a valid CRUD transformation that doesn't violate the DAG and I would absolutely support it.

Similar with delete - it feels weird, but I think it's perfectly valid for terminus datasets OR for interactive IO within notebook/ipython.

@datajoely
Copy link
Contributor

THAT BEING SAID - we could restrict the upsert operation to merging the dataframe to earlier version of itself only - which is the point of the upsert. If the user wants to use the merge functionality, they can do that within node. But upserting to self is a valid CRUD transformation that doesn't violate the DAG and I would absolutely support it.

I think this is reasonable for an upsert, but I'm not sure what the implementation would look like

Similar with delete - it feels weird, but I think it's perfectly valid for terminus datasets OR for interactive IO within notebook/ipython.

I also like this restriction, but the dataset isn't aware of where it sits in the pipeline - I guess the runner is, but that's not a great help. Would this be enforced or just encouraged?

Comment on lines 62 to 67
self._update_options = save_args.pop(
"insert_options"
) # TBD - maybe better solution?
self._insert_options = save_args.pop(
"update_options"
) # TBD - maybe better solution?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these the wrong way round?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes 😬 it was testing whether reviewers are paying attention 😂

@antonymilne
Copy link
Contributor

Initial comments from someone with no understanding of what a delta table is, what upsert is or really what spark is... Would the strategy pattern work similarly to SparkHiveDataSet, where we switch between different write modes? If yes then I think that's a good pattern to use, but then should this inherit just from AbstractDataSet instead?

@IamSandeepGunda
Copy link

@datajoely @limdauto

I am going to update the PR but I can foresee that the YAML would take a similar shape. Please let me know what do you think about that.

_anchor: &anchor
  type: spark.DeltaTableDataset

delta_overwrite_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/overwrite/dataset
  delta_options:
    replaceWhere: "start_date >= '2017-01-01' AND end_date <= '2021-01-01'"
    overwriteSchema: true
  save_args:
    mode: overwrite

delta_append_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/append/dataset
  save_args:
    mode: append

delta_update_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/update/dataset
  delta_options:
    timestampAsOf: '2019-01-01'
    versionAsOf: 2
  save_args:
    mode: update
    update_predicates: 
      - 'col = "foo"'
    update_values: 
      'event': 'baz'
      'another': 'foobar' 

delta_upsert_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/upsert/dataset
  save_args:
    mode: upsert
    key: 'a.id = b.id'
    whenMatchedUpdate: 
      'a.data': 'b.new_data'
    whenNotMatchedInsert:
      'a.date': 'b.new_date'
      'a.id': 'b.id'

delta_delete_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/delete/dataset
  save_args:
    mode: delete
    delete_predicates:
      - 'date < "2017-01-01"'

The dates in replaceWhere are static. That would be against the purpose of replaceWhere because we'd need it to be updated whenever we're doing incremental load. That dates being set based on whatever incremental load strategy the user is applying.

@jiriklein
Copy link
Contributor Author

@datajoely @limdauto
I am going to update the PR but I can foresee that the YAML would take a similar shape. Please let me know what do you think about that.

_anchor: &anchor
  type: spark.DeltaTableDataset

delta_overwrite_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/overwrite/dataset
  delta_options:
    replaceWhere: "start_date >= '2017-01-01' AND end_date <= '2021-01-01'"
    overwriteSchema: true
  save_args:
    mode: overwrite

delta_append_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/append/dataset
  save_args:
    mode: append

delta_update_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/update/dataset
  delta_options:
    timestampAsOf: '2019-01-01'
    versionAsOf: 2
  save_args:
    mode: update
    update_predicates: 
      - 'col = "foo"'
    update_values: 
      'event': 'baz'
      'another': 'foobar' 

delta_upsert_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/upsert/dataset
  save_args:
    mode: upsert
    key: 'a.id = b.id'
    whenMatchedUpdate: 
      'a.data': 'b.new_data'
    whenNotMatchedInsert:
      'a.date': 'b.new_date'
      'a.id': 'b.id'

delta_delete_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/delete/dataset
  save_args:
    mode: delete
    delete_predicates:
      - 'date < "2017-01-01"'

The dates in replaceWhere are static. That would be against the purpose of replaceWhere because we'd need it to be updated whenever we're doing incremental load. That dates being set based on whatever incremental load strategy the user is applying.

I see what you mean @IamSandeepGunda - that being said, in the kedro framework, the logical ops are at node function level (e.g. your aggregations etc.), returning a pyspark.sql.DataFrame instance, then delegating (and abstracting) the save method to the DataSet instance.

The individual writer delta options seem like they should be a property of the dataset, rather than node. What would you suggest as implementation?

@IamSandeepGunda
Copy link

IamSandeepGunda commented Oct 19, 2021

@datajoely @limdauto
I am going to update the PR but I can foresee that the YAML would take a similar shape. Please let me know what do you think about that.

_anchor: &anchor
  type: spark.DeltaTableDataset

delta_overwrite_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/overwrite/dataset
  delta_options:
    replaceWhere: "start_date >= '2017-01-01' AND end_date <= '2021-01-01'"
    overwriteSchema: true
  save_args:
    mode: overwrite

delta_append_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/append/dataset
  save_args:
    mode: append

delta_update_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/update/dataset
  delta_options:
    timestampAsOf: '2019-01-01'
    versionAsOf: 2
  save_args:
    mode: update
    update_predicates: 
      - 'col = "foo"'
    update_values: 
      'event': 'baz'
      'another': 'foobar' 

delta_upsert_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/upsert/dataset
  save_args:
    mode: upsert
    key: 'a.id = b.id'
    whenMatchedUpdate: 
      'a.data': 'b.new_data'
    whenNotMatchedInsert:
      'a.date': 'b.new_date'
      'a.id': 'b.id'

delta_delete_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/delete/dataset
  save_args:
    mode: delete
    delete_predicates:
      - 'date < "2017-01-01"'

The dates in replaceWhere are static. That would be against the purpose of replaceWhere because we'd need it to be updated whenever we're doing incremental load. That dates being set based on whatever incremental load strategy the user is applying.

I see what you mean @IamSandeepGunda - that being said, in the kedro framework, the logical ops are at node function level (e.g. your aggregations etc.), returning a pyspark.sql.DataFrame instance, then delegating (and abstracting) the save method to the DataSet instance.

The individual writer delta options seem like they should be a property of the dataset, rather than node. What would you suggest as implementation?

Well, I gave up finding an elegant solution and hacked it a little bit.
Example catalog:

delta_overwrite_dataset:
  type: "${datasets.spark}" ## That's just my custom spark dataset
  filepath: /dbfs/path/to/overwrite/dataset
  file_format: delta
  save_args:
    mode: overwrite
    option:
        overwriteSchema: true

Now inside my custom _save() method,

partition_col = self._save_args.get("partitionBy")
start_date = data.filter(min(col(partition_col)))
data.write \
                        .option("replaceWhere", f"{partition_col} >= '{start_date}'") \
                        .save(
                            path=save_path,
                            format=self._file_format,
                            mode=self._save_args.get("mode"),
                            partitionBy=partition_col)

This is how I'm getting around setting replace where dynamically. I'd love to have a decent implementation of spark delta dataset where maybe node can pass the dates to the save method

@jiriklein
Copy link
Contributor Author

jiriklein commented Oct 20, 2021

Summary

@datajoely @AntonyMilneQB @limdauto @idanov @lorenabalan

So after our discussions, I believe the best use-case for this dataset is to smooth over the non-self-consistent DeltaTableAPI. For each of the write modes, this would look such as:

First of all, the DeltaTableDataSet._save method would take shape such as:

def _save(self, io: Union[DataFrame, NodeStatus]):
    # align the Spark and DeltaTable APIs
    if isinstance(io, DataFrame):
        super()._save(data=io)  # there is still `_strip_dbfs_prefix` -- will this work?
    # allow the user to handle DeltaTable IO in node and return success status
    elif isinstance(io, NodeStatus):
        if io != NodeStatus.SUCCESS:
            raise DataSetError("`NodeStatus` returned something other than SUCCESS")
    else:
        raise DataSetError("Incorrect return from node func.")

Append & Overwrite

Consistent with the Kedro Principles & DAG, requires user to return pyspark.sql.DataFrame to the _save method, and the filepath is extrinsic to the dataset. DeltaTable does not have a save method that supports this directly and delegates to DataFrame API. (https://docs.delta.io/latest/delta-batch.html#append&language-python)

Leverages: https://github.com/delta-io/delta/blob/master/python/delta/tables.py#L43

The node function would look like this

from delta.tables import DeltaTable
from pyspark.sql import DataFrame

def node_func(input: DeltaTable) -> DataFrame:
  # processing happens here
  return input.toDF()

The _save() method picks this up, adds options if necessary, and saves as format("delta").
This would work for modes error, errorifexists, append, overwrite in line with SparkDataSet.

Update, Upsert/Merge, Delete

These are not directly consistent with the Kedro Principles & DAG, as

  1. The filepath is intrinsic to the DeltaTable
  2. The update, merge and delete methods are methods on the DeltaTable and are immediately materialised (on call or on subsequent execute call on a merge builder)
  3. We still need to inform the Kedro pipeline and DAG that this node has succeeded in a meaningful way

Therefore, the node function would look like this

# in kedro.io.core
class NodeStatus(Enum):
    SUCCESS = 0
    FAILURE = 1

# in node func module
from delta.tables import DeltaTable
from kedro.io.core import NodeStatus
import pyspark.sql.functions as f

def node_func(input: DeltaTable) -> NodeStatus:
  # processing happens here, such as
  input.update(f.col("col_1") == "foo", { "col_1": f.lit("bar") } )
  return NodeStatus.SUCCESS

This implementation has the following benefits:

  1. Aligns and smooths over the API in a more consistent manner
  2. Allows for delta specific options to be added via _add_options on the new DataSet, such as .option("overwriteSchema", "true")
  3. YAML entries would be consistent (at least on surface), as documented below
  4. We can allow for write_mode checking and logging of warnings that the write happens on node, rather than dataset, level
  5. Fairly easy to change, e.g. if DeltaTable offers overwrite and insert methods in the future (probs thin wrappers).

Constraints, disadvantages and questions:

  1. replaceWhere and other delta specific options are static in YAML - this is an option on the DataFrameWriter rather than anywhere on the dataframe which is a PITA. One user pointed out that this is key in incremental data loads so maybe we allow user to return DataFrameWriter instead?
  2. Node returns are not consistent - albeit there is precedence for this in SQL datasets where they often return nothing.
  3. Should mode keys sit under save_args or have a specific first-class write_mode?
  4. Should we handle the user passing a parquet file to the DeltaTable dataset? This is technically possible, because there is a convertToDelta method available (https://github.com/delta-io/delta/blob/master/python/delta/tables.py#L237). However, this would change the property of the underlying dataset and convert it permanently, therefore breaking the DAG if the user transcodes the original parquet file to use with pandas as well.

YAML

_anchor: &anchor
  type: spark.DeltaTableDataset

delta_overwrite_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/overwrite/dataset
  delta_options:  # this may need to be dynamic
    replaceWhere: "start_date >= '2017-01-01' AND end_date <= '2021-01-01'"  
    overwriteSchema: true
  save_args:
    mode: overwrite

delta_append_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/append/dataset
  save_args:
    mode: append

delta_update_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/update/dataset
  delta_options:  # this may need to be dynamic
    timestampAsOf: '2019-01-01'
    versionAsOf: 2
  save_args:
    mode: update  # the only point of this is to check or log

delta_upsert_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/upsert/dataset
  save_args:
    mode: upsert  # the only point of this is to check or log

delta_delete_dataset:
  <<: *anchor
  filepath: /dbfs/path/to/delete/dataset
  save_args:
    mode: delete  # the only point of this is to check or log

Alternatives

  • Given the restriction on Update, Upsert and Delete modes, the only way to further align the two modes is to extract the filepath out of the DeltaTable object. Unfortunately, I can't see it in https://github.com/delta-io/delta/blob/master/python/delta/tables.py#L25 so I am assuming this lives in the self._jdt which is the Java DeltaTable representation. Even then, we would just provide the user with boilerplate on how to pass that into the dataframe and not much beyond that
  • Pause the work on this and research whether this will change in the future. Even then, however, this would not be much help, as these are surface changes with minimal migration headaches
  • Our own subclassing of the DeltaTable and adding thin wrappers for append, overwrite, errorifexists write methods? Not sure how this would work because we would ask the user operate a kedro-specific object that is a subclass of the DeltaTable?

@jiriklein
Copy link
Contributor Author

jiriklein commented Oct 20, 2021

Alternatively, this can be visualised as:

DeltaTableDataset (3)

@datajoely
Copy link
Contributor

Thank you for this @jiriklein - the graphic is very helpful, but I need more time to think about the prose. Could you also think about transcoding from @pandas, @spark and @delta as I think that's a valid use case

@jiriklein
Copy link
Contributor Author

@datajoely @idanov @limdauto @lorenabalan @SajidAlamQB

Further discussion

After yesterday's discussion, we came at the following:

  1. The DeltaTableDataSet should handle only DeltaTable-specific operations (i.e. from its own API). This means that the user should be using DeltaTableDataSet for update, upsert and delete write modes.
  2. For other write modes, the user should be using SparkDataSet, with format: delta as these write modes (overwrite, append, error, ignore) align with the pyspark API
  3. The resolution between them within the Kedro DAG should be handled by transcribing the datasets. The graph below describes this, but this requires extensive doccing.

Transcribing and dataset proposal

user-research

@datajoely
Copy link
Contributor

One last point - perhaps it's worth adding a note under the image saying something like:

Note: This pattern of creating 'dummy' datasets to preserve the data flow also applies to other 'out of DAG' execution operations operations such as SQL operations within a node.

Co-authored-by: Joel Schwarzmann <35801847+datajoely@users.noreply.github.com>
@lorenabalan lorenabalan force-pushed the feature/databricks-deltatable-dataset branch from 284401e to 56ea747 Compare November 25, 2021 10:39
@datajoely datajoely self-requested a review November 25, 2021 12:07
@lorenabalan lorenabalan force-pushed the feature/databricks-deltatable-dataset branch from 08cb366 to c837288 Compare November 29, 2021 12:45
.circleci/config.yml Outdated Show resolved Hide resolved
.circleci/config.yml Outdated Show resolved Hide resolved
Copy link
Member

@merelcht merelcht left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work everyone! Thanks @lorenabalan for persevering 👏 🏆 The docs look really good as well!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants