## Document Current Workspace
Written by Prathy Kamasani

https://prathy.com/

https://www.linkedin.com/in/prathy/

## Clean Up

If tables already exists, clean by dropping existing tables. Useful in development mode.

In [None]:
# This is to delete all delta tables in LH
LH_Name = "LH_Fabric_Documentation"

# Get all tables in the database
tables = spark.sql(f"SHOW TABLES IN {LH_Name}")
# display(tables)

# Loop through the tables and drop each one
for table in tables.collect():
    table_name = table['tableName']
    spark.sql(f"DROP TABLE {LH_Name}.{table_name}")

print("All Delta tables have been deleted.")

## Import all necessary items

In [None]:
! pip install semantic-link
! pip install semantic-link-labs
! pip install delta-spark

In [None]:
import sempy.fabric as fabric

import sempy_labs as sempy_labs
from sempy_labs import migration, report, directlake
from sempy_labs import lakehouse as lake
from sempy_labs.tom import connect_semantic_model

from pyspark.sql.functions import current_timestamp
from pyspark.sql.functions import lit

from delta.tables import DeltaTable

import pandas as pd

## Define items needed for the documentaion

In [None]:
# Define Lakehouse name and description. This will the LH where all documentation will be saved
LH_Name = "LH_Fabric_Documentation"
LH_desc = "Lakehouse for Fabric Documentation"

In [None]:
# Get current workspace details
current_workspace_id = fabric.get_workspace_id()
current_workspace_name = fabric.resolve_workspace_name(current_workspace_id)
print(f'Current workspace ID: {current_workspace_id}')
print(f'Current workspace name: {current_workspace_name}')

# Check if the Lakehouse already exists, if not, create it
# List existing lakehouses and check if the specified one already exists
lakehouse_list = sempy_labs.list_lakehouses()

if LH_Name in lakehouse_list['Lakehouse Name'].values:
    print("Lakehouse already exists")
else:
    # Create a new Lakehouse
    mssparkutils.lakehouse.create(name=LH_Name, description=LH_desc, workspaceId=current_workspace_id)
    print("Lakehouse created successfully")

# Mount the Lakehouse for direct file system access
lakehouse = mssparkutils.lakehouse.get(LH_Name)
mssparkutils.fs.mount(lakehouse.get("properties").get("abfsPath"), f"/{LH_Name}")

# Retrieve and store local and ABFS paths of the mounted Lakehouse
local_path = mssparkutils.fs.getMountPath(f"/{LH_Name}")
lh_abfs_path = lakehouse.get("properties").get("abfsPath")
print(f'Local path: {local_path}')
print(f'ABFS path: {lh_abfs_path}')


#### Give list of Lakehouses or select the current Lakehouse

In this notebook, I am working to get one current workspace but idea is to be able to work with multiple workspaces

In [None]:
# Initialize the list of workspaces if it's empty populate with current workspace ID
list_of_workspaces = []
if not list_of_workspaces:
    list_of_workspaces.append(fabric.get_workspace_id())
print("Number of workspaces: ", len(list_of_workspaces))
print(list_of_workspaces)


## Fabric Items

To begin with, I am working with getting data for all fabric items of current workspace. If you don't give workspace name, then it will bring all tennant data based on your access. <mark>This could be slow if you are working on a big teannt. BE CAREFUL!</mark>


In [None]:
# Iterate through each workspace
for Current_workspace_name in list_of_workspaces:
    # Retrieve all items from the current fabric workspace
    Fabric_all_items = fabric.list_items(workspace=Current_workspace_name)
    
    # Transform column names for LH compatibility
    Fabric_all_items.columns = Fabric_all_items.columns.str.replace('[^a-zA-Z0-9]', '', regex=True)
    Fabric_all_items.columns = Fabric_all_items.columns.str.replace('[ ]', '', regex=True)
    
    # Create a Spark DataFrame from the fabric items and save it to the Lakehouse
    sparkdf = spark.createDataFrame(Fabric_all_items)
    sparkdf = sparkdf.withColumn("LoadDate", current_timestamp())
    Table_Name = "R_fabric_all_items"
    sparkdf.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{Table_Name}")
    print(Table_Name, "created at :", f"{lh_abfs_path}/Tables/{Table_Name}")
    
    # Extract and save distinct item types from fabric items
    Fabric_all_items_type = Fabric_all_items['Type'].unique()
    
    # Create a Spark DataFrame for item types and save it to the Lakehouse
    spark_df_Fabric_all_items_type = spark.createDataFrame(Fabric_all_items_type)
    spark_df_Fabric_all_items_type = spark_df_Fabric_all_items_type.withColumnRenamed('value', 'Type')
    spark_df_Fabric_all_items_type = spark_df_Fabric_all_items_type.withColumn("LoadDate", current_timestamp())
    Table_Name = "R_fabric_item_types"
    spark_df_Fabric_all_items_type.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{Table_Name}")
    print(Table_Name, "created at :", f"{lh_abfs_path}/Tables/{Table_Name}")


### Create Presentation layer for Fabric Items  

In [None]:
fabric_all_items_path = f"{lh_abfs_path}/Tables/R_fabric_all_items"
fabric_item_types_path = f"{lh_abfs_path}/Tables/R_fabric_item_types"
distinct_fabric_all_items_path = f"{lh_abfs_path}/Tables/Fabric_all_items"
distinct_fabric_item_types_path = f"{lh_abfs_path}/Tables/Fabric_item_types"

# Get distinct fabric all items
df_fabric_all_items = spark.read.parquet(fabric_all_items_path)
df_fabric_all_items = df_fabric_all_items.drop("LoadDate").dropDuplicates()
df_fabric_all_items.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(distinct_fabric_all_items_path)

# Get distinct fabric item types
df_fabric_item_types = spark.read.parquet(fabric_item_types_path)
df_fabric_item_types = df_fabric_item_types.drop("LoadDate").dropDuplicates()
df_fabric_item_types.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(distinct_fabric_item_types_path)


## Lakehouses

In [None]:
# Filter for Lakehouse items
fabric_all_lakehouses = Fabric_all_items[Fabric_all_items['Type'] == 'Lakehouse']
fabric_all_lakehouses

# e.g. Get Lakehouse tables
lakehouse_name = "ReleasePlan"
lakehouse_tables = sempy_labs.lakehouse.get_lakehouse_tables(lakehouse_name, current_workspace_name)
display(lakehouse_tables)


In [None]:
import pandas as pd

# Initialize an empty DataFrame to store all tables
all_tables_df = pd.DataFrame()

# Loop through each lakehouse and get its tables
for lakehouse_name in fabric_all_lakehouses['DisplayName']:
    lakehouse_tables = sempy_labs.lakehouse.get_lakehouse_tables(lakehouse_name, current_workspace_name)
    
    # Convert the tables to a DataFrame
    lakehouse_tables_df = pd.DataFrame(lakehouse_tables)
    
    # Append to the main DataFrame
    all_tables_df = pd.concat([all_tables_df, lakehouse_tables_df], ignore_index=True)

# Transform column names for LH compatibility
all_tables_df.columns = all_tables_df.columns.str.replace('[^a-zA-Z0-9]', '', regex=True)
all_tables_df.columns = all_tables_df.columns.str.replace('[ ]', '', regex=True)
# Display the combined DataFrame
display(all_tables_df)

sdf_df_LHTables= spark.createDataFrame(all_tables_df)

table_name = "R_Lakehouse_Tables"
sdf_df_LHTables.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{table_name}")




Selecting only subset of lakehouses to make it easy

In [None]:
# Group by workspace and lakehouse name, then count the number of tables
grouped_df = all_tables_df.groupby(['WorkspaceName', 'LakehouseName']).size().reset_index(name='Number_of_Tables')

# Filter to work with a small set of data
filtered_LH_df = grouped_df[grouped_df['Number_of_Tables'] <= 5]

# Display the filtered DataFrame
filtered_LH_df

In [None]:
from delta.tables import DeltaTable
import pandas as pd
import json

# Assuming Fabric_all_lakehouses is a DataFrame with lakehouse details
for _, lakehouse_row in filtered_LH_df.iterrows():
    lakehouse_name = lakehouse_row['LakehouseName']
    Workspace_name = lakehouse_row['WorkspaceName']
    
    try:
        lakehouse_tables = sempy_labs.lakehouse.get_lakehouse_tables(lakehouse_name, Workspace_name)
    except Exception as e:
        print(f"Error fetching tables for lakehouse {lakehouse_name}: {e}")
        continue
    
    for _, table_row in lakehouse_tables.iterrows():
        path = table_row['Location']
        LH_table_name = table_row['Table Name']
        
        try:
            delta_table = DeltaTable.forPath(spark, path)
            detail_df = delta_table.detail().toPandas()
        except Exception as e:
            print(f"Error processing table {LH_table_name} at {path}: {e}")
            continue

        delta_table = DeltaTable.forPath(spark, path)
        detail_df = delta_table.detail().toPandas()
        # display(detail_df)
        detail_df = detail_df.astype(str)

        # Add new columns to the DataFrame
        detail_df['Lakehouse_name'] = lakehouse_name
        detail_df['Table_Name'] = LH_table_name
        
        try:
            spark_pandas_df = spark.createDataFrame(detail_df)
            spark_pandas_df = spark_pandas_df.withColumn("LoadDate", current_timestamp())
            # spark_pandas_df.show()
            table_name = "R_Lakehouse_Tables_Details"
            spark_pandas_df.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{table_name}")
            print(table_name, "created at:", f"{lh_abfs_path}/Tables/{table_name}","for LH table - ",{LH_table_name})
        except Exception as e:
            print(f"Error saving table {table_name}: {e}")



Lakehouse Presentation Layer

In [None]:
R_LakehouseTables_path = f"{lh_abfs_path}/Tables/R_Lakehouse_Tables"
Distinct_LakehouseTables_path = f"{lh_abfs_path}/Tables/Lakehouse_Tables"

# Get distinct LH tables
df_LH_Tables = spark.read.parquet(R_LakehouseTables_path)
df_LH_Tables = df_LH_Tables.drop("LoadDate").dropDuplicates()
df_LH_Tables.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(Distinct_LakehouseTables_path)


R_LakehouseTablesDetails_path = f"{lh_abfs_path}/Tables/R_Lakehouse_Tables_Details"
Distinct_LakehouseTablesDetails_path = f"{lh_abfs_path}/Tables/Lakehouse_Tables_Details"


# Get distinct LH tables
df_LH_Tables = spark.read.parquet(R_LakehouseTablesDetails_path)
df_LH_Tables = df_LH_Tables.drop("LoadDate").dropDuplicates()
df_LH_Tables.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(Distinct_LakehouseTablesDetails_path)


## Semantic models

In [None]:
from delta.tables import DeltaTable
import pandas as pd
from pyspark.sql.functions import lit, current_timestamp

# Initialize an empty list to store semantic model objects
list_semantic_model_objects = []

# Filter the DataFrame to include only items of type 'SemanticModel'
df_semantic_models = Fabric_all_items[(Fabric_all_items['Type'] == 'SemanticModel') & (Fabric_all_items['DisplayName'] != 'Report Usage Metrics Model')]
sdf_df_semantic_models = spark.createDataFrame(df_semantic_models)
table_name = "R_semantic_models"
sdf_df_semantic_models.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{table_name}")

# Iterate through each semantic model, retrieve its objects, and append to the list
for _, row in df_semantic_models.iterrows():
    dataset_name = row['DisplayName']
    try:
        semantic_model_objects = sempy_labs.list_semantic_model_objects(dataset_name, current_workspace_name)
    except Exception as e:
        print(f"Error fetching semantic model objects for {dataset_name}: {e}")
        continue

    if not semantic_model_objects.empty:
        df_semantic_model_objects = pd.DataFrame(semantic_model_objects)
        df_semantic_model_objects.columns = df_semantic_model_objects.columns.str.replace('[^a-zA-Z0-9]', '', regex=True)
        df_semantic_model_objects.columns = df_semantic_model_objects.columns.str.replace(' ', '', regex=True)
        
        spark_df_semantic_models = spark.createDataFrame(df_semantic_model_objects)
        spark_df_semantic_models = spark_df_semantic_models.withColumn("LoadDate", current_timestamp())
        spark_df_semantic_models = spark_df_semantic_models.withColumn("WorkspaceName", lit(current_workspace_name))
        spark_df_semantic_models = spark_df_semantic_models.withColumn("DatasetName", lit(dataset_name))
        
        table_name = "R_semantic_model_objects"
        spark_df_semantic_models.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{table_name}")
        print(f"{table_name} created/updated at: {lh_abfs_path}/Tables/{table_name} for {dataset_name}")

# Extract and save distinct item types from fabric items
df_semantic_model_objects_type = df_semantic_model_objects['ObjectType'].unique()

# Create a Spark DataFrame for item types and save it to the Lakehouse
spark_df_semantic_model_objects_type = spark.createDataFrame(df_semantic_model_objects_type)
spark_df_semantic_model_objects_type = spark_df_semantic_model_objects_type.withColumn("LoadDate", current_timestamp())
table_name = "R_semantic_model_object_types"
spark_df_semantic_model_objects_type.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{table_name}")
print(f"{table_name} created at: {lh_abfs_path}/Tables/{table_name}")


### **Create data for Semnatic models and create presentation Layer**

In [None]:
# Get distinct semantic model object types
df_semantic_model_object_types = spark.read.parquet(f"{lh_abfs_path}/Tables/R_semantic_model_object_types")
df_semantic_model_object_types = df_semantic_model_object_types.drop("LoadDate").dropDuplicates()

# Rename column 'value' to 'ObjectType'
df_semantic_model_object_types = df_semantic_model_object_types.withColumnRenamed('value', 'ObjectType')
df_semantic_model_object_types.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(f"{lh_abfs_path}/Tables/semantic_model_object_types")

# Get distinct object types from semantic model objects
df_object_types = spark.read.parquet(f"{lh_abfs_path}/Tables/R_semantic_model_objects")
df_object_types = df_object_types.drop("LoadDate").dropDuplicates()
df_object_types.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(f"{lh_abfs_path}/Tables/semantic_model_objects")

# Get distinct semantic models
df_semantic_models = spark.read.parquet(f"{lh_abfs_path}/Tables/R_semantic_models")
df_semantic_models = df_semantic_models.drop("LoadDate").dropDuplicates()

# Rename 'DisplayName' to 'DatasetName' and select relevant columns
df_semantic_models = df_semantic_models.withColumnRenamed('DisplayName', 'DatasetName').select('DatasetName', 'Id', 'WorkspaceID', 'Description')
df_semantic_models.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(f"{lh_abfs_path}/Tables/semantic_models")


### Semantic models and underline data

In [None]:
# from pyspark.sql.functions import col, lit

# # Read the parquet file
# df = spark.read.parquet(f"{lh_abfs_path}/Tables/R_semantic_model_objects")

# # Select all columns except 'LoadDate' and remove duplicates
# df = df.drop("LoadDate").dropDuplicates()

# # Get distinct object types
# object_types = df.select('ObjectType').distinct().rdd.flatMap(lambda x: x).collect()

# # Function to process each object type
# def process_object_type(object_type):
#     df_object_type = df.filter(col('ObjectType') == object_type)
    
#     # Rename 'ObjectName' to 'ObjectType_Name' and drop 'ObjectType' column
#     df_object_type = df_object_type.withColumnRenamed('ObjectName', f'{object_type}_Name').drop('ObjectType')
    
#     # Remove special characters from column names
#     for column in df_object_type.columns:
#         new_column = column.replace(' ', '_').replace('(', '').replace(')', '').replace('.', '')
#         df_object_type = df_object_type.withColumnRenamed(column, new_column)
    
#     # Replace spaces with underscores in the object type for the filename
#     object_type_filename = object_type.replace(' ', '_')
    
#     # Save the DataFrame as a Delta table
#     df_object_type.write.format("delta").option("mergeSchema", "true").mode("append").save(f"{lh_abfs_path}/Tables/{object_type_filename}")

# # Loop through each object type and process the data
# for object_type in object_types:
#     process_object_type(object_type)

# # Combine Table and Calculated_Table to create a single table with a 'Type' column to differentiate between the two
# df_tables = spark.read.parquet(f"{lh_abfs_path}/Tables/Table")
# df_calculated_tables = spark.read.parquet(f"{lh_abfs_path}/Tables/Calculated_Table")

# # Rename column 'Calculated_Table' to 'Table_Name'
# df_calculated_tables = df_calculated_tables.withColumnRenamed('Calculated_Table', 'Table_Name')

# # Add 'Type' column to each DataFrame
# df_tables = df_tables.withColumn('Type', lit('Table'))
# df_calculated_tables = df_calculated_tables.withColumn('Type', lit('Calculated Table'))

# # Combine the DataFrames and remove duplicates
# df_combined_tables = df_tables.union(df_calculated_tables).drop("LoadDate").dropDuplicates()

# # Save the combined DataFrame as a Delta table
# df_combined_tables.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(f"{lh_abfs_path}/Tables/SM_Tables")

# # Get distinct columns and save as a Delta table
# df_columns = spark.read.parquet(f"{lh_abfs_path}/Tables/Column")
# df_columns.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(f"{lh_abfs_path}/Tables/SM_Columns")


## **Vertipaq Analyser for Semantic Models data**

In [None]:
df_semantic_models = Fabric_all_items[(Fabric_all_items['Type'] == 'SemanticModel') & (Fabric_all_items['DisplayName'] != 'Report Usage Metrics Model')]

# Initialize an empty list to store semantic model objects
list_semantic_model_objects_vertipaq_analyzer = []

# Iterate through each semantic model, retrieve its objects, and append to the list
for index, row in df_semantic_models.iterrows():
    try:
        Dataset_Name = row['DisplayName']
        print(f'Dataset_Name - {Dataset_Name}')
        # sempy_labs.vertipaq_analyzer(dataset=Dataset_Name)
        sempy_labs.vertipaq_analyzer(dataset = Dataset_Name,workspace=Current_workspace_name, export = 'table')
        # sempy_labs.vertipaq_analyzer(dataset=Dataset_Name, workspace=Current_workspace_name)
    except KeyError as e:
        print(f"KeyError: {e} - Check if 'DisplayName' exists in the DataFrame.")
    except Exception as e:
        print(f"An error occurred: {e}")

print("Processing complete.")