In [0]:
from pyspark.sql.functions import *

dbutils.widgets.text("source_schema" ,"" ,"source_schema")
source_schema = dbutils.widgets.get("source_schema")
print(f"source_schema - {source_schema}")

dbutils.widgets.text("backup_schema" ,"" ,"backup_schema")
backup_schema = dbutils.widgets.get("backup_schema")
print(f"backup_schema - {backup_schema}")

dbutils.widgets.text("source_table" ,"" ,"source_table")
source_table = dbutils.widgets.get("source_table")
print(f"source_table - {source_table}")

current_date = spark.sql("select current_date() as date")
date = current_date.withColumn("date", date_format(col("date"),"yyyyMMdd")).collect()[0][0]
backup_table = f"{source_table}_{date}"
print(f"backup_table - {backup_table}")

dbutils.widgets.text("duplicate_column" ,"" ,"duplicate_column")
duplicate_column = dbutils.widgets.get("duplicate_column")
print(f"duplicate_column - {duplicate_column}")

# If there is no filter condition (full backup), provide 1=1 or TRUE in filter_condition
dbutils.widgets.text("filter_condition" ,"" ,"filter_condition")
filter_condition = dbutils.widgets.get("filter_condition")
print(f"filter_condition - {filter_condition}")

In [0]:
partition_cols = spark.sql(f"DESC DETAIL {source_schema}.{source_table}").select('partitionColumns').collect()[0][0]
table_exists = spark.catalog._jcatalog.tableExists(backup_schema,backup_table)
if(table_exists):
  location = spark.sql(f"DESC DETAIL {backup_schema}.{backup_table}").select('location').collect()[0][0]
  backup_table_drop = f"DROP TABLE IF EXISTS {backup_schema}.{backup_table}"
  print(backup_table_drop)
  display(spark.sql(backup_table_drop))
  print(f"\nDropped {backup_schema}.{backup_table} table")
  dbutils.fs.rm(f'{location}', True)
  print(f"\nDropped {backup_schema}.{backup_table} path '{location}'")

bkp_db_lcoation = spark.sql(f"DESC DATABASE {backup_schema}").filter("database_description_item='Location'").select('database_description_value').collect()[0][0]
backup_table_create = f"""CREATE TABLE IF NOT EXISTS {backup_schema}.{backup_table}
USING DELTA
PARTITIONED BY ({', '.join(partition_cols)})
LOCATION '{bkp_db_lcoation}/{backup_table}'
AS SELECT * FROM {source_schema}.{source_table} WHERE {filter_condition}"""
print(backup_table_create)
display(spark.sql(backup_table_create))
print(f"\nCreated {backup_schema}.{backup_table}")

num_affected_rows,num_inserted_rows


In [0]:
source_backup_count_check = f"""SELECT '{source_schema}.{source_table}' AS Table, COUNT(*) AS Count 
                                FROM {source_schema}.{source_table} WHERE {filter_condition}
                                UNION ALL
                                SELECT '{backup_schema}.{backup_table}', COUNT(*) AS Count FROM {backup_schema}.{backup_table} 
                                WHERE {filter_condition} )"""
print(source_backup_count_check)
display(spark.sql(source_backup_count_check))

Table,Count
dw_xle_sz_delta_fdp.t_f_xle_ceded_claim_transaction_detail_as_is,8625889512
dw_xle_sz_delta_fdp.t_f_xle_ceded_claim_transaction_detail_as_is,8625889512


In [0]:
duplicate_count_check = f"""SELECT COUNT(*) AS duplicate_count FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY {duplicate_column} ORDER BY dw_batch_id DESC, dw_last_update_dt DESC) rnk FROM {source_schema}.{source_table} WHERE {filter_condition} )a WHERE rnk>1"""
print(duplicate_count_check)
display(spark.sql(duplicate_count_check))

duplicate_count
2916715242


In [0]:
delete_from_table = f"""DELETE FROM {source_schema}.{source_table} WHERE {filter_condition}"""
print(delete_from_table)
display(spark.sql(delete_from_table))

num_affected_rows
8625889512


In [0]:
columns = spark.sql(f"SELECT * FROM {source_schema}.{source_table}").columns
insert_into_table = f"""INSERT INTO {source_schema}.{source_table}
PARTITION ({', '.join(partition_cols)})
({', '.join(columns)})
SELECT {', '.join(columns)} FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY {duplicate_column} ORDER BY dw_batch_id DESC, dw_last_update_dt DESC) rnk FROM {backup_schema}.{backup_table} WHERE {filter_condition} )a WHERE rnk=1"""
print(insert_into_table)
display(spark.sql(insert_into_table))

num_affected_rows,num_inserted_rows
5709174270,5709174270


In [0]:
print(duplicate_count_check)
display(spark.sql(duplicate_count_check))