In [None]:
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime

### Functions

In [None]:
def get_table_list():
    # Get default lakehouse name
    lakehouse_name = notebookutils.runtime.context.get("defaultLakehouseName")

    # Get list of tables from lakehouse
    tables = notebookutils.lakehouse.listTables(lakehouse_name)

    return [table.name for table in tables] 

In [None]:
def refresh_table(table_name):
    try:
        print(f"[{datetime.now()}] Starting refresh for table: {table_name}")
        
        spark.sql(f"OPTIMIZE `{table_name}`")
        spark.sql(f"VACUUM `{table_name}` RETAIN 168 HOURS")

        spark.sql(f"REFRESH TABLE `{table_name}`")
        spark.sql(f"ANALYZE TABLE `{table_name}` COMPUTE STATISTICS")
        
        print(f"[{datetime.now()}] Completed refresh for table: {table_name}")
        return f"Success: {table_name}"
        
    except Exception as e:
        error_msg = f"Failed to refresh {table_name}: {str(e)}"
        print(f"[{datetime.now()}] {error_msg}")
        return f"Error: {error_msg}"

### Set Variables To Be Used

1. max_workers - Adjust this based on your Fabric capacity or Notebook Environment limits
2. timeout_limit - Max time (seconds) limit a worker is allowed to run 


In [None]:
max_workers = 5
timeout_limit = 300
table_list = {
    "fake_erp_dbo_customer",
    "fake_erp_dbo_purchase_order"
}

# Alternative approach is get ALL the tables in the lakehouse
# table_list = get_table_list()


### Process Tables Concurrently

For more information on concurrent.futures visit https://docs.python.org/3/library/concurrent.futures.html

In [None]:
job_start_time = datetime.now()
print(f"[{job_start_time}] Job Starting")
    
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_to_table = {
        executor.submit(refresh_table, table): table for table in table_list
    }
    
    for future in concurrent.futures.as_completed(future_to_table):
        table_name = future_to_table[future]
        try:
            result = future.result(timeout=timeout_limit)
        except concurrent.futures.TimeoutError:
            error_msg = f"Timeout: {table_name} took longer than {timeout_limit} seconds"
            print(f"[{datetime.now()}] {error_msg}")
        except Exception as exc:
            error_msg = f"Exception for {table_name}: {exc}"
            print(f"[{datetime.now()}] {error_msg}")

job_end_time = datetime.now()
job_duration = (job_end_time - job_start_time).total_seconds()

print(f"[{datetime.now()}] Job Completed in {job_duration:.1f} seconds")