In [19]:
lakehouses = [
    {'workspace':'inventory-analytics-data', 'lakehouse':'lh_bronze','schema':'availability'}
    ]

StatementMeta(, 0fed8eff-cfe1-4e1c-9a76-43a369f39d2d, 7, Finished, Available, Finished)

In [30]:
from pyspark.sql.functions import map_concat, col, lit, collect_list, concat_ws, concat, expr, col, max, min, current_timestamp, datediff, desc, map_entries, map_from_entries, struct

def vacuum_and_optimize_all_tables(workspace, lakehouse, schema, retention_period=None):
    """
    A function to vacuum and optimize all delta tables in a specific lakehouse/schema

    Parameters
    ----------
        workspace : string
            The name of the fabric workspace where the lakehouse exists
        lakehouse : string
            The name of the Lakehouse where the schema/table(s) are located
        schema : string
            The name of the Schema where tables require hygiene
        retention_period: int, in hours, default=None
            The number of hours you would like to retain 7 days is 168 Hours; if no value added the default retention will be used, Databricks default is 7 days
    """
 
    for i in get_all_tables(workspace, lakehouse, schema).selectExpr('tableName').collect():
        if do_vacuum_and_optimize(workspace, lakehouse, schema, i[0], retention_period):
            vacuum_table(workspace, lakehouse, schema, i[0], retention_period)
            optimize_table(workspace, lakehouse, schema, i[0])

def get_all_tables(workspace, lakehouse, schema):
    """
    A function to return all delta tables in the lakehouse

    Parameters
    ----------
        workspace : string
            The name of the fabric workspace where the lakehouse exists
        lakehouse : string
            The name of the Lakehouse where the schema/table(s) are located
        schema : string
            The name of the Schema where tables require hygiene
    Returns
    -------
        dataframe
    """
    df = spark.sql(f"show tables from `{workspace}`.{lakehouse}.{schema}")
    return df

def do_vacuum_and_optimize(workspace, lakehouse, schema, delta_table,
                           retention_period=None) -> bool:
    """
    A function to Check to see if Table has enough version or is old enough to optimize and vacuum

    Parameters
    ----------
        workspace : string
            The name of the fabric workspace where the lakehouse exists
        lakehouse : string
            The name of the Lakehouse where the schema/table(s) are located
        schema : string
            The name of the Schema where table require hygiene
        delta_table: string
            The name of the delta table that requires hygiene
        retention_period: int, in hours, default=None
            The number of hours you would like to retain 7 days is 168 Hours; if no value added the default retention will be used, Databricks default is 7 days
    Returns
    -------
        bool:
            whether to vacuum and optimize table
    """
    retention = 168 if retention_period is None else retention_period
    ret_period = int(retention / 24)
    try:
        table_delta_info = spark.sql(f"describe history `{workspace}`.{lakehouse}.{schema}.{delta_table}").groupBy().agg(max('version').alias('version'), 
            datediff(current_timestamp(), min('timestamp')).alias('table_age')).collect()[0]
        if table_delta_info['version'] >= ret_period and table_delta_info['table_age'] > ret_period:
            return True
        else:
            return False
    except Exception:
        return False

def vacuum_table(workspace, lakehouse, schema, delta_table, retention_period=None):
    """
    A function to vacuum a delta table

    Parameters
    ----------
         workspace : string
            The name of the fabric workspace where the lakehouse exists
        lakehouse : string
            The name of the Lakehouse where the schema/table(s) are located
        schema : string
            The name of the Schema where table require hygiene
        delta_table: string
            The name of the delta table that requires hygiene
        retention_period: int, in hours default=None
            The number of hours you would like to retain 7 days is 168 Hours; if no value added the default retention will be used, Databricks default is 7 days
    Returns
    -------
        In the case of an Exception a JSON object describing the error is returned
            {"error-message": f"error vacuuming table {delta_db}.{delta_table}- {Exception}"}
    """
    try:
        retention_string = f"RETAIN {retention_period} HOURS" if retention_period is not None else ""

        spark.sql(f"vacuum  `{workspace}`.{lakehouse}.{schema}.{delta_table} {retention_string}")
    except Exception as e:
        return {"error-message": f"error vacuuming table `{workspace}`.{lakehouse}.{schema}.{delta_table} {retention_string} - {e}"}

def optimize_table(workspace, lakehouse, schema, delta_table):
    """
    A function to optimize a delta table

    Parameters
    ----------
        spark : spark context
            spark context passed from the calling spark instance
        delta_db : string
            The name of the delta db where the table exists
        delta_table : string
            The name of the table to be optimized
        catalog: string, default=None
            The Name of the Unity Catalog if exists, Defaults to None
    Returns
    -------
        In the case of an Exception a JSON object describing the error is returned
            {"error-message": f"error optimizing table {delta_db}.{delta_table}- {Exception}"}
    """

    try:
        spark.sql(f"OPTIMIZE `{workspace}`.{lakehouse}.{schema}.{delta_table} ")
     
    except Exception as e:
        return {
            "error-message": f"error optimizing table  `{workspace}`.{lakehouse}.{schema}.{delta_table}"}

StatementMeta(, 0fed8eff-cfe1-4e1c-9a76-43a369f39d2d, 8, Finished, Available, Finished)

In [None]:
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")

StatementMeta(, 0fed8eff-cfe1-4e1c-9a76-43a369f39d2d, 5, Finished, Available, Finished)

In [31]:
for v_and_o_task in lakehouses:
    vacuum_and_optimize_all_tables(v_and_o_task.get('workspace'),v_and_o_task.get('lakehouse'),v_and_o_task.get('schema'), 168)


StatementMeta(, 0fed8eff-cfe1-4e1c-9a76-43a369f39d2d, 9, Finished, Available, Finished)