In [1]:
# spark.conf.get("spark.microsoft.delta.optimizeWrite.binSize")

StatementMeta(, f49c7910-d820-4715-a4c8-32b743309882, 3, Finished, Available, Finished)

'1073741824'

In [1]:
spark.conf.set('spark.sql.parquet.vorder.enabled','true')
# writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Parquet
spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "CORRECTED")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInRead", "CORRECTED")
spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
# Fail to parse '0000-00-00 00:00:00' in the new parser
spark.conf.set("spark.sql.legacy.timeParserPolicy", "CORRECTED")

spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "268435456")
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.conf.set("spark.databricks.delta.optimize.zorder.checkStatsCollection.enabled", "false")

StatementMeta(, e6943dc9-9dab-4e01-b00f-c47d650f4b33, 3, Finished, Available)

In [2]:
from delta.tables import *
from pyspark.sql.functions import col, current_timestamp, date_sub
import time

StatementMeta(, e6943dc9-9dab-4e01-b00f-c47d650f4b33, 4, Finished, Available)

In [1]:
import requests
import pandas as pd
from requests.exceptions import HTTPError

WORKSPACE_ID = spark.conf.get("trident.workspace.id")

def get_lakehouse_list_api():

    '''
    Sandeep Pawar  |   fabric.guru

    This function uses the Fabric REST API to get all the lakehouses in the tenant that the user has access to.

    '''
    base_url = f"https://api.fabric.microsoft.com/v1/workspaces/{WORKSPACE_ID}/lakehouses"
    token = mssparkutils.credentials.getToken("https://api.fabric.microsoft.com/")
    headers = {"Authorization": f"Bearer {token}"}

    try:
        response = requests.get(base_url, headers=headers)
        response.raise_for_status()

        data = response.json()

        return data.get('value')

    except HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
    except Exception as err:
        print(f"An error occurred: {err}")

lakehouses = get_lakehouse_list_api()

StatementMeta(, 6f0519b0-48f4-45e8-bc53-9ab2036a4418, 3, Finished, Available)

In [2]:
lakehouses = [lh for lh in lakehouses if lh.get('displayName')=='lh_bronze_d365fo']
lakehouses

StatementMeta(, 6f0519b0-48f4-45e8-bc53-9ab2036a4418, 4, Finished, Available)

[{'id': '1b52cbf8-1abc-4013-8a8d-c78be98979be',
  'type': 'Lakehouse',
  'displayName': 'lh_bronze_d365fo',
  'description': '',
  'workspaceId': '2f9a24c8-70f6-4b0f-85df-a8daaac8fa5f',
  'properties': {'oneLakeTablesPath': 'https://onelake.dfs.fabric.microsoft.com/2f9a24c8-70f6-4b0f-85df-a8daaac8fa5f/1b52cbf8-1abc-4013-8a8d-c78be98979be/Tables',
   'oneLakeFilesPath': 'https://onelake.dfs.fabric.microsoft.com/2f9a24c8-70f6-4b0f-85df-a8daaac8fa5f/1b52cbf8-1abc-4013-8a8d-c78be98979be/Files',
   'sqlEndpointProperties': {'connectionString': 'dmtaiefx343u7aypoq4mxcq3ne-zasjul7woahuxbo7vdnkvsh2l4.datawarehouse.fabric.microsoft.com',
    'id': '3cd626d6-4ced-4c7a-8d20-3e0b6f9256da',
    'provisioningStatus': 'Success'}}}]

In [6]:
max_retries = 1
retry_delay = 5
vacuum_hours = 168
for lakehouse in lakehouses:
    tables_path = f"/{lakehouse.get('id')}/Tables"
    tables = mssparkutils.fs.ls(tables_path)
    print("-"*50)
    print(f"Lakehouse: {lakehouse.get('displayName')}: {len(tables)}")
    count_table_maintenance = 0
    retry_list = []
    error_list = []
    for table in tables:
        # print(table.name, table.isDir, table.isFile, table.path, table.size)
        if DeltaTable.isDeltaTable(spark, table.path):
            try:
                deltaTable = DeltaTable.forPath(spark, table.path)
                _count = deltaTable.history(20).filter((col("timestamp") >= date_sub(current_timestamp(), 1)) & (col("timestamp") <= current_timestamp()) & (col('operation')=='MERGE')).count()
                if _count > 1:
                    count_table_maintenance += 1
                    deltaTable.optimize().executeZOrderBy('__updated_at')
                    # deltaTable.optimize().executeCompaction()
                deltaTable.vacuum(vacuum_hours)  # 168 hours = 7 days
            except Exception as error:
                retry_list.append((table,0))
                # print(error)
            # break
    while retry_list:
        table, attempt = retry_list.pop(0)
        if attempt < max_retries:
            try:
                deltaTable = DeltaTable.forPath(spark, table.path)
                _count = deltaTable.history(20).filter((col("timestamp") >= date_sub(current_timestamp(), 1)) & (col("timestamp") <= current_timestamp()) & (col('operation')=='MERGE')).count()
                if _count > 1:
                    count_table_maintenance += 1
                    deltaTable.optimize().executeZOrderBy('__updated_at')
                    # deltaTable.optimize().executeCompaction()
                deltaTable.vacuum(vacuum_hours)  # 168 hours = 7 days
            except Exception as error:
                retry_list.append((table, attempt+1))
                error_list.append(error)
                time.sleep(retry_delay)
        else:
            print(error_list)
    print(f"Maintenance: {count_table_maintenance}")
    # break

StatementMeta(, e6943dc9-9dab-4e01-b00f-c47d650f4b33, 8, Finished, Available)

--------------------------------------------------
Lakehouse: lh_bronze_d365fo: 484
Maintenance: 0
--------------------------------------------------
Lakehouse: lh_validation: 17
Maintenance: 1
--------------------------------------------------
Lakehouse: lh_temp: 2
Maintenance: 0
