In [1]:
# Set the bases
from datetime import datetime, timezone
run_timestamp = datetime.now(tz=timezone.utc).isoformat()

workspace_name = "Semantic Link for Power BI Folks" # not used currently, scoped to current workspace.
lakehouse_name = "ModelDocumenter"

StatementMeta(, 0d7c956f-4123-4ec4-8c36-0a3b6cfe9c92, 3, Finished, Available, Finished)

In [2]:
# To avoid duplications and polution in lakehouse, truncate all tables

# List all tables in the default database
tables_df = spark.sql("SHOW TABLES")

# Collect the table names
tables = tables_df.collect()

# Iterate through each table and execute TRUNCATE statements
for table in tables:
    table_name = table.tableName
    print(f"Truncating table: {table_name}")
    
    # Execute TRUNCATE command
    spark.sql(f" DELETE FROM {table_name}")

StatementMeta(, 0d7c956f-4123-4ec4-8c36-0a3b6cfe9c92, 4, Finished, Available, Finished)

In [3]:
# Define functions
import pyspark.sql.functions as F
import sempy.fabric as fabric
import time

def remove_special_chars_from_col_names(dataframe, tokens = r" .,;{}()\="):
    cols_without_bad_characters = [
        col.translate({ord(token): None for token in tokens})
        for col in dataframe.columns
    ]
    return dataframe.toDF(*cols_without_bad_characters)

def save_table(dataframe, dataset_name, entity):
    if dataframe.empty: 
        print(f"no results found for {entity}")
        return 
    df = spark.createDataFrame(dataframe)
    df = remove_special_chars_from_col_names(df)
    df = df.withColumn("DatasetName", F.lit(dataset_name))
    df = df.withColumn("DatasetId", F.lit(dataset_id))
    df = df.withColumn("_load_datetime", F.lit(run_timestamp))
    print(f"Writing entity '{entity}' to table {lakehouse_name}.MD_{entity}")
    df.write.mode("append").saveAsTable(f"{lakehouse_name}.MD_{entity}")

StatementMeta(, 0d7c956f-4123-4ec4-8c36-0a3b6cfe9c92, 5, Finished, Available, Finished)

In [4]:
# List all semantic models in workspace
semanticmodels = fabric.list_datasets()
semanticmodels[['Dataset Name', 'Dataset ID']] # in case filter add: .str.endswith('_SM')]

# Create datasets table
df_datasets = spark.createDataFrame(semanticmodels)
#df_datasets.write.format("delta").mode("overwrite").save(lakehouse_name)

StatementMeta(, 0d7c956f-4123-4ec4-8c36-0a3b6cfe9c92, 6, Finished, Available, Finished)

In [5]:
# Save datasets table to lakehouse
df_datasets = remove_special_chars_from_col_names(df_datasets)
df_datasets.write.mode("append").saveAsTable("MD_datasets")

StatementMeta(, 0d7c956f-4123-4ec4-8c36-0a3b6cfe9c92, 7, Finished, Available, Finished)

In [6]:
# Clean-up dataset list

# Get current list of datasets
cleandatsets = spark.sql(f"SELECT * FROM {lakehouse_name}.md_datasets")
cleandatasetsrowcount = cleandatsets.count()

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# Define the window specification to partition by Dataset ID and order by Last Update in descending order
window_spec = Window.partitionBy("DatasetID").orderBy(col("LastUpdate").desc())

# Add a row number column to identify the most recent record
df_with_row_number = cleandatsets.withColumn("row_number", row_number().over(window_spec))

# Filter to keep only the most recent record for each Dataset ID
df_no_duplicates = df_with_row_number.filter(col("row_number") == 1).drop("row_number")
noduplicatesrowcount = df_no_duplicates.count()

# Show rowcount diff
print(f"Original row count: {cleandatasetsrowcount}, cleaned set: {noduplicatesrowcount}")

# save updated datasets list
df_no_duplicates.write.mode("overwrite").saveAsTable("MD_datasets")
updatedrowcount = spark.sql(f"SELECT COUNT(*) AS VALIDATEDROWCOUNTAFTERUPDATE FROM {lakehouse_name}.md_datasets")
print("Table cleaned successfully")

StatementMeta(, 0d7c956f-4123-4ec4-8c36-0a3b6cfe9c92, 8, Finished, Available, Finished)

Original row count: 6, cleaned set: 6
Table cleaned successfully


In [7]:
# For each semantic model
for index, row in semanticmodels.iterrows():

    # Extract dataset_name and dataset_id from the current row
    dataset_name = row['Dataset Name']
    dataset_id = row['Dataset ID']

    # Print name and datset id to screen at start
    print(f"Starting Dataset Name: {dataset_name}, Dataset ID: {row['Dataset ID']}...")

    # Get semantic model meta data and save to lakehouse
    save_table(fabric.list_tables(dataset_name), dataset_name, "tables")
    save_table(fabric.list_tables(dataset_name, include_columns=True), dataset_name, "columns")
    save_table(fabric.list_relationships(dataset_name), dataset_name, "relationships")
    save_table(fabric.list_measures(dataset_name), dataset_name, "measures")

    # Print name and dataset id to screen after completion
    print(f"...Finished Dataset Name: {dataset_name}, Dataset ID: {dataset_id}")

StatementMeta(, 0d7c956f-4123-4ec4-8c36-0a3b6cfe9c92, 9, Finished, Available, Finished)

Starting Dataset Name: SempyDemo, Dataset ID: 20512a78-371d-4a9f-a6cc-c806ee080040...
no results found for tables
no results found for columns
no results found for relationships
no results found for measures
...Finished Dataset Name: SempyDemo, Dataset ID: 20512a78-371d-4a9f-a6cc-c806ee080040
Starting Dataset Name: Example Semantic Model, Dataset ID: 8ef42108-5df7-4222-85ee-76d8b8c40bc5...
Writing entity 'tables' to table ModelDocumenter.MD_tables
Writing entity 'columns' to table ModelDocumenter.MD_columns
Writing entity 'relationships' to table ModelDocumenter.MD_relationships
Writing entity 'measures' to table ModelDocumenter.MD_measures
...Finished Dataset Name: Example Semantic Model, Dataset ID: 8ef42108-5df7-4222-85ee-76d8b8c40bc5
Starting Dataset Name: ModelDocumentation, Dataset ID: e26c306a-7c56-4ddc-a1e5-28ca7b2bfa47...
Writing entity 'tables' to table ModelDocumenter.MD_tables
Writing entity 'columns' to table ModelDocumenter.MD_columns
Writing entity 'relationships' to tab