# Delta Table Maintenance
This notebook is used to query spark for the listing of Delta Tables so they can be automatically optimized and vacuumed.  For more on this topic, check out https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-table-maintenance

## Gather Schema information

In [None]:
from delta.tables import *
from tqdm.auto import tqdm
 
# Get schemas
schemas_df = spark.sql("SHOW SCHEMAS")
print("Schema DataFrame columns:", schemas_df.columns)
 
# Extract just the schema names from the fully qualified paths
raw_schemas = [row["namespace"] for row in schemas_df.collect()]
print(f"Raw schemas: {raw_schemas}")
 
# Extract just the schema name from the fully qualified path
schemas = []
for raw_schema in raw_schemas:
    # Remove backticks and split by dots
    parts = raw_schema.replace('`', '').split('.')
    # The last part should be the actual schema name
    if len(parts) >= 3:
        schema_name = parts[-1]
        schemas.append(schema_name)
 
print(f"Extracted schema names: {schemas}")
total_tables = 0
optimized_tables = 0
vacuumed_tables = 0
 
# First, count total tables across all schemas
for schema_name in schemas:
    try:
        tables_df = spark.sql(f"SHOW TABLES IN {schema_name}")
        table_count = tables_df.count()
        print(f"Found {table_count} tables in schema {schema_name}")
        display(tables_df)
        total_tables += table_count
        table_column_name = "tableName"
        print(f"Using column '{table_column_name}' for table names")
    except Exception as e:
        print(f"Error listing tables in schema {schema_name}: {str(e)}")

## Optimize tables across all schemas
VOrder is applied.  For more on this topic, check out https://learn.microsoft.com/en-us/fabric/data-engineering/delta-optimization-and-v-order?tabs=sparksql

In [None]:
from delta.tables import DeltaTable

for schema_name in schemas:
    try:
        tables_df = spark.sql(f"SHOW TABLES IN {schema_name}")
        tables = tables_df.collect()
        print(f"Optimizing tables in schema: {schema_name}")

        for table in tables:
            try:
                table_name = table[table_column_name]
                full_table_name = f"{schema_name}.`{table_name}`"

                try:
                    spark.sql(f"OPTIMIZE {full_table_name} VORDER")
                    optimized_tables += 1
                    print(f"✔ Optimized using SQL: {full_table_name} ({optimized_tables}/{total_tables})")
                except Exception as e1:
                    try:
                        deltaTable = DeltaTable.forName(spark, full_table_name)
                        deltaTable.optimize().executeCompaction()
                        optimized_tables += 1
                        print(f"✔ Optimized using DeltaTable: {full_table_name} ({optimized_tables}/{total_tables})")
                    except Exception as e2:
                        print(f"✖ Failed to optimize '{full_table_name}': {str(e2)}")
            except Exception as e:
                print(f"⚠ Error processing table in schema {schema_name}: {str(e)}")
    except Exception as e:
        print(f"⚠ Error processing schema {schema_name}: {str(e)}")

print(f"✅ Optimization complete. Successfully optimized {optimized_tables} out of {total_tables} tables.")

## Vacuum tables across all schemas

In [None]:
from delta.tables import DeltaTable

retention_hours = '168'  # This equals 1 week

for schema_name in schemas:
    try:
        tables_df = spark.sql(f"SHOW TABLES IN {schema_name}")
        tables = tables_df.collect()
        print(f"Vacuuming tables in schema: {schema_name}")
        
        for table in tables:
            try:
                table_name = table[table_column_name]
                full_table_name = f"{schema_name}.`{table_name}`"
                try:
                    spark.sql(f"VACUUM {full_table_name} RETAIN {retention_hours} HOURS")
                    vacuumed_tables += 1
                    print(f"✔ Vacuumed using SQL: {full_table_name} ({vacuumed_tables}/{total_tables})")
                except Exception as e1:
                    try:
                        deltaTable = DeltaTable.forName(spark, full_table_name)
                        deltaTable.vacuum(retention_hours)
                        vacuumed_tables += 1
                        print(f"✔ Vacuumed using DeltaTable: {full_table_name} ({vacuumed_tables}/{total_tables})")
                    except Exception as e2:
                        print(f"✖ Failed to vacuum '{full_table_name}': {str(e2)}")
            except Exception as e:
                print(f"⚠ Vacuum - Error processing table in schema {schema_name}: {str(e)}")
    except Exception as e:
        print(f"⚠ Error processing schema {schema_name}: {str(e)}")

print(f"✅ Optimization complete. Successfully vacuumed {vacuumed_tables} out of {total_tables} tables.")

## Verify OPTIMIZE and VACUUM command executed on a specific table
You will need to change the schema and table to a valid schema.table in your Lakehouse.

In [None]:
%%sql
DESCRIBE HISTORY nwd.Customers