# SQL Endpoint Metadata Refresher for Lakehouses
# 
This notebook allows you to refresh the SQL endpoint metadata for each lakehouse in the workspace, including listing the synchronization status of each lakehouse table.

Author: [Rui Cunha]

GitHub: [https://github.com/ruippcunha/msfabric_samples]

Disclaimer
This notebook is provided as a sample implementation intended for development and testing purposes only. It is not designed or recommended for use in production environments. Although this notebook was developed by a Microsoft employee, it is not an officially supported solution, use it at your own risk. . Microsoft and the author assume no responsibility for any issues or data loss that may result from its use in live systems. Before applying any part of this solution in production, ensure it is thoroughly reviewed, tested, and adapted to meet your organization’s security, compliance, and operational requirements.

In [None]:
!pip install semantic-link-labs --quiet

In [None]:
import json
import sempy.fabric as fabric
import sempy_labs as labs
from sempy_labs import ConnectLakehouse
from sempy.fabric import evaluate_measure
from datetime import date, datetime , time
import pandas as pd

from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, StringType, IntegerType


import pandas as pd
from IPython.display import display, HTML


In [None]:

workspace_name = notebookutils.runtime.context.get("currentWorkspaceName")

print (f"📄Starting the synchronization of the SQL endpoint for the lakehouses in  workspace {workspace_name}")

# Get today's date
current_date = date.today()

# Initialize list to track lakehouses that failed validation
not_refreshed_endpoints = []

# List all lakehouses in the current workspace
lakehouses = notebookutils.lakehouse.list()


for lakehouse in lakehouses:
    lakehouse_name = lakehouse.displayName
    lakehouse_id = lakehouse.id

    # Only process Lakehouses explicitly listed in LAKEHOUSES parameter
    
    print(f"ℹ️ Processing Lakehouse '{lakehouse_name}' (ID: {lakehouse_id})")

    try:
        with ConnectLakehouse(lakehouse=lakehouse_name, workspace=workspace_name) as sql:
            print(f"ℹ️ Verifying that the Lakehouse SQL endpoint is active by executing a test query")
            df = sql.query("SELECT GETDATE() AS today;")
            #display(df)
                
            print(f"ℹ️ Starting refreshing the {lakehouse_name} SQL endpoint metadata")   

            # Refresh SQL endpoint metadata
            client = fabric.FabricRestClient()
            workspace_id = fabric.get_notebook_workspace_id()
            sql_endpoint_id = client.get(f"/v1/workspaces/{workspace_id}/lakehouses/{lakehouse_id}").json()['properties']['sqlEndpointProperties']['id']
            uri = f"v1/workspaces/{workspace_id}/sqlEndpoints/{sql_endpoint_id}/refreshMetadata?preview=true"
            response = client.post(uri, json={}, lro_wait=True)
            
            print(f"ℹ️ {lakehouse_name}  SQL endpoint metadata refresh completed")   
            
            # Convert response to JSON and pretty-print
            response_json = response.json()
            
            # Flatten the response
            flattened = []
            for entry in response_json:
                flattened.append({
                    "Table Name": entry.get("tableName"),
                    "Status": entry.get("status"),
                    "Start Time": entry.get("startDateTime"),
                    "End Time": entry.get("endDateTime"),
                    "Last Successful Sync": entry.get("lastSuccessfulSyncDateTime"),
                    "Error Code": entry.get("error", {}).get("errorCode"),
                    "Error Message": entry.get("error", {}).get("message")
                })

            # Create Spark session
            spark = SparkSession.builder.getOrCreate()

            # Define schema
            schema = StructType([
                StructField("Table Name", StringType(), True),
                StructField("Status", StringType(), True),
                StructField("Start Time", StringType(), True),
                StructField("End Time", StringType(), True),
                StructField("Last Successful Sync", StringType(), True),
                StructField("Error Code", StringType(), True),
                StructField("Error Message", StringType(), True)
            ])

            # Create DataFrame and show
            df = spark.createDataFrame(flattened, schema)
            #df.show(truncate=False)
            
            # Convert Spark DataFrame to Pandas
            pandas_df = df.toPandas()

            # Apply styling: red text for rows with an error code
            def highlight_errors(row):
                color = 'color: red;' if pd.notnull(row['Error Code']) else ''
                return [color] * len(row)

            styled_df = pandas_df.style.apply(highlight_errors, axis=1)

            # Display styled DataFrame
            display(HTML(styled_df.to_html()))


    except Exception as e:
            not_refreshed_endpoints.append(lakehouse_name)
            print(f"\❌ Failed to refresh or validate Lakehouse '{lakehouse_name}': {e}")

# Summary
if not_refreshed_endpoints:
    print("\❌ The following lakehouses failed validation:")
    for lh in not_refreshed_endpoints:
        print(f"  - {lh}")
else:
    print("\✅ All lakehouses validated successfully.")
