Skip to content

Commit

Permalink
WIP: enable liquid clustering for delta lakes
Browse files Browse the repository at this point in the history
  • Loading branch information
mikix committed May 21, 2024
1 parent ceabc53 commit b13924e
Showing 1 changed file with 23 additions and 28 deletions.
51 changes: 23 additions & 28 deletions cumulus_etl/formats/deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,38 +93,33 @@ def _write_one_batch(self, batch: Batch) -> None:
delta_table.generate("symlink_format_manifest")

def update_delta_table(self, updates: pyspark.sql.DataFrame, groups: set[str]) -> delta.DeltaTable:
full_path = self._table_path(self.dbname)

try:
# Load table -- this will trigger an AnalysisException if the table doesn't exist yet
table = delta.DeltaTable.forPath(self.spark, full_path)

# Determine merge condition
conditions = [f"table.{field} = updates.{field}" for field in self.uniqueness_fields]
condition = " AND ".join(conditions)
table = (
delta.DeltaTable.createIfNotExists(self.spark)
.addColumns(updates.schema)
.clusterBy(*self.uniqueness_fields)
.location(self._table_path(self.dbname))
.execute()
)

# Merge in new data
merge = (
table.alias("table")
.merge(source=updates.alias("updates"), condition=condition)
.whenNotMatchedInsertAll()
)
if self.update_existing:
update_condition = self._get_update_condition(updates.schema)
merge = merge.whenMatchedUpdateAll(condition=update_condition)
# Determine merge condition
conditions = [f"table.{field} = updates.{field}" for field in self.uniqueness_fields]
condition = " AND ".join(conditions)

if self.group_field and groups:
# Delete any entries for groups touched by this update that are no longer present in the group
# (we are guaranteed to have all members of each group in the `updates` dataframe).
condition_column = table.toDF()[self.group_field].isin(groups)
merge = merge.whenNotMatchedBySourceDelete(condition_column)
# Merge in new data
merge = (
table.alias("table").merge(source=updates.alias("updates"), condition=condition).whenNotMatchedInsertAll()
)
if self.update_existing:
update_condition = self._get_update_condition(updates.schema)
merge = merge.whenMatchedUpdateAll(condition=update_condition)

merge.execute()
if self.group_field and groups:
# Delete any entries for groups touched by this update that are no longer present in the group
# (we are guaranteed to have all members of each group in the `updates` dataframe).
condition_column = table.toDF()[self.group_field].isin(groups)
merge = merge.whenNotMatchedBySourceDelete(condition_column)

except AnalysisException:
# table does not exist yet, let's make an initial version
updates.write.save(path=full_path, format="delta")
table = delta.DeltaTable.forPath(self.spark, full_path)
merge.execute()

return table

Expand Down

0 comments on commit b13924e

Please sign in to comment.