## 1. Instalación de Librerías y Carga de Módulos

In [None]:
%pip install semantic-link-labs

## Importaciones necesarias

In [2]:
import sempy_labs as labs
import sempy.fabric as fabric
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import col

StatementMeta(, f7602553-b675-4b1e-a7d9-ac04b965604b, 9, Finished, Available, Finished)

## 2. Definición de Funciones

In [3]:
def get_file_details_recursive(folder_path):
    """
    Escanea recursivamente una carpeta en OneLake y devuelve una lista con detalles
    (ruta completa ABFSS, tamaño en bytes) de cada archivo encontrado.

    Utiliza mssparkutils.fs.ls para listar el contenido.

    Args:
        folder_path (str): La ruta ABFSS de la carpeta a escanear.
                           Ej: "abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<item_id>/Files/"

    Returns:
        list: Una lista de diccionarios [{'path': str, 'size_bytes': int}],
              o una lista vacía si ocurre un error al listar la carpeta
              o si la carpeta está vacía o no contiene archivos directamente.
              Los errores en subcarpetas se registran pero no detienen el escaneo general.
    """
    
    file_details_list = []
    try:
        # print(f"DEBUG: Escaneando carpeta: {folder_path}") # Descomentar para depuración detallada
        items = notebookutils.fs.ls(folder_path)

        for item in items:
            # Asegurarse de que la ruta del item es completa (ABFSS)
            item_path_full = item.path
            if not item_path_full.startswith("abfss://"):
                 # Si la ruta no es completa, intentar reconstruirla (puede no ser siempre necesario/correcto)
                 # Esto es una suposición basada en cómo a veces se devuelven las rutas
                 if folder_path.endswith('/'):
                     item_path_full = folder_path + item.name
                 else:
                     item_path_full = folder_path + '/' + item.name
                 # print(f"DEBUG: Ruta reconstruida: {item_path_full}") # Descomentar para depuración

            if not item.isDir:
                # Es un archivo, añadir sus detalles
                file_details_list.append({'path': item_path_full, 'size_bytes': item.size})
            else:
                # Es un directorio, llamar recursivamente si no es la misma carpeta (evitar bucles)
                # Comprobamos la ruta completa normalizada para evitar errores por barras finales
                current_folder_normalized = folder_path.rstrip('/')
                item_folder_normalized = item_path_full.rstrip('/')

                if item_folder_normalized != current_folder_normalized:
                    # print(f"DEBUG: Entrando recursivamente en: {item_path_full}") # Descomentar para depuración
                    sub_dir_files = get_file_details_recursive(item_path_full)
                    if sub_dir_files:
                        file_details_list.extend(sub_dir_files)

    except Exception as e:
        # Imprime el error específico de esta carpeta pero permite que el proceso general continúe.
        # No se añadirán archivos de esta ruta específica si falla el 'ls'.
        #print(f"WARN: Error al escanear la carpeta '{folder_path}': {e}. Omitiendo esta ruta.")
        print(f"WARN: Error al escanear la carpeta '{folder_path}'. Omitiendo esta ruta.")
        return [] # Devuelve lista vacía en caso de error en esta carpeta específica

    return file_details_list

StatementMeta(, f7602553-b675-4b1e-a7d9-ac04b965604b, 10, Finished, Available, Finished)

## 3. Proceso Principal: Escaneo de Workspaces y Artefactos

Iteramos por todos los workspaces, obtenemos la lista de artefactos para cada uno,
y escaneamos el almacenamiento subyacente de los artefactos soportados.

In [None]:
# Lista para almacenar los datos de todos los archivos encontrados
all_files_data = []

# Contador para mostrar progreso
processed_workspaces = 0
total_workspaces = 0

print("Iniciando escaneo de workspaces...")

try:
    # Obtener todos los workspaces accesibles
    workspaces_pd = fabric.list_workspaces()
    total_workspaces = len(workspaces_pd)
    print(f"Se encontraron {total_workspaces} áreas de trabajo accesibles.")

    # Iterar sobre cada workspace encontrado
    for ws_index, ws_row in workspaces_pd.iterrows():
        workspace_name = ws_row['Name']
        workspace_id = ws_row['Id']
        processed_workspaces += 1
        print(f"\n[{processed_workspaces}/{total_workspaces}] 📂 Procesando Workspace: '{workspace_name}' (ID: {workspace_id})")

        try:
            # Listar todos los artefactos (items) dentro del workspace actual
            items_pd = fabric.list_items(workspace=workspace_id)
            print(f"  -> Se encontraron {len(items_pd)} artefactos escaneables en '{workspace_name}'.")

            # Iterar sobre cada artefacto del workspace
            for item_index, item_row in items_pd.iterrows():
                artifact_type = item_row['Type']
                artifact_name = item_row['Display Name']
                artifact_id = item_row['Id']

                # Construir la ruta raíz ABFSS para el artefacto
                # Nota: La estructura interna puede variar. Generalmente '/Files' o '/Tables' son puntos de entrada comunes.
                #       Probamos escanear desde la raíz del artefacto.
                artifact_root_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{artifact_id}"

                print(f"    -> 🔍 Escaneando artefacto: '{artifact_name}' (Tipo: {artifact_type}, ID: {artifact_id})")
                # print(f"       Ruta base: {artifact_root_path}") # Descomentar para depuración

                # Inicializar la lista de archivos para ESTE artefacto en CADA iteración
                files_in_artifact = []

                # Variable para almacenar el tamaño si se obtiene de forma especial (p.ej. Semantic Model)
                special_size_bytes = -1 # Usar -1 como indicador inicial (no determinado/error)

                try:
                    # Obtener la lista de archivos y sus tamaños recursivamente
                    if artifact_type == 'SemanticModel':
                        print(f"      -> Intentando obtener tamaño para Semantic Model '{artifact_name}'...")
                        size = labs.get_semantic_model_size(artifact_id, workspace_id)
                        if size is not None:
                            special_size_bytes = int(size)
                        
                        print(f"      => Tamaño obtenido de sempy_labs: {special_size_bytes} bytes.")

                        all_files_data.append({
                                "WorkspaceID": workspace_id, 
                                "WorkspaceName": workspace_name,
                                "ArtifactType": artifact_type, 
                                "ArtifactName": artifact_name, 
                                "ArtifactID": artifact_id,
                                "ScannedRootPath": artifact_root_path,
                                "FilePath": f"{artifact_root_path}",
                                "SizeBytes": special_size_bytes,
                                "SizeMB": float(special_size_bytes / (1024 * 1024)) if special_size_bytes > 0 else 0.0
                        })
                    else:
                        files_in_artifact = get_file_details_recursive(artifact_root_path + "/") # Añadir '/' por si acaso es necesario

                    if files_in_artifact and artifact_type != 'SemanticModel':
                        print(f"      => Se encontraron {len(files_in_artifact)} archivos para '{artifact_name}'.")
                        # Procesar y añadir la información de cada archivo a la lista global
                        for file_detail in files_in_artifact:
                            file_size_bytes = int(file_detail['size_bytes'])
                            # Evitar división por cero si el tamaño es 0
                            file_size_mb = float(file_size_bytes / (1024 * 1024)) if file_size_bytes > 0 else 0.0

                            all_files_data.append({
                                "WorkspaceID": workspace_id,
                                "WorkspaceName": workspace_name,
                                "ArtifactType": artifact_type,
                                "ArtifactName": artifact_name,
                                "ArtifactID": artifact_id,
                                "ScannedRootPath": artifact_root_path, # Ruta base desde donde se escaneó
                                "FilePath": file_detail['path'],       # Ruta completa del archivo
                                "SizeBytes": file_size_bytes,
                                "SizeMB": file_size_mb
                            })
                    else:
                        # Si no se encontraron archivos (puede ser normal para ciertos tipos o si está vacío)
                        print(f"      => No se encontraron archivos accesibles vía ABFSS para '{artifact_name}' o está vacío.")

                except Exception as scan_error:
                    # Captura errores durante el escaneo de un artefacto específico
                    print(f"      => ERROR escaneando el artefacto '{artifact_name}' en '{artifact_root_path}'.")
                    #print(f"      => ERROR escaneando el artefacto '{artifact_name}' en '{artifact_root_path}'. Error: {scan_error}")
                    # Opcional: Añadir un registro con error o tamaño -1 para indicar fallo
                    all_files_data.append({
                                "WorkspaceID": workspace_id,
                                "WorkspaceName": workspace_name,
                                "ArtifactType": artifact_type,
                                "ArtifactName": artifact_name,
                                "ArtifactID": artifact_id,
                                "ScannedRootPath": artifact_root_path,
                                "FilePath": "ERROR_SCANNING_ARTIFACT",
                                "SizeBytes": -1, # Indicador de error
                                "SizeMB": -1.0
                            })

        except Exception as item_error:
            # Captura errores al listar los artefactos de un workspace
            print(f"  -> ERROR al listar artefactos para el workspace '{workspace_name}': {item_error}")
            continue # Continuar con el siguiente workspace

except Exception as ws_error:
    # Captura errores al listar los workspaces
    print(f"FATAL: Error crítico al obtener la lista de workspaces: {ws_error}")

print(f"\n📊 Escaneo completado. Se recopilaron {len(all_files_data)} registros de archivos.")

## 4. Creación del DataFrame y Almacenamiento en Tabla Delta

Convertimos la lista de datos recopilados en un DataFrame de Spark y lo guardamos
en una tabla Delta en el Lakehouse por defecto.

In [None]:
# Verificar si se recopilaron datos antes de crear el DataFrame
if all_files_data:
    print("Creando DataFrame de Spark...")

    # Definir el esquema explícitamente para mayor robustez y control de tipos
    schema = StructType([
        StructField("WorkspaceID", StringType(), True),
        StructField("WorkspaceName", StringType(), True),
        StructField("ArtifactType", StringType(), True),
        StructField("ArtifactName", StringType(), True),
        StructField("ArtifactID", StringType(), True),
        StructField("ScannedRootPath", StringType(), True),
        StructField("FilePath", StringType(), True),
        StructField("SizeBytes", LongType(), True),      # Usar LongType para tamaños grandes
        StructField("SizeMB", DoubleType(), True)       # Usar DoubleType para precisión decimal
    ])

    # Crear el DataFrame de Spark
    try:
        spark_df = spark.createDataFrame(data=all_files_data, schema=schema)

        # --- Almacenamiento en Tabla Delta ---
        # Nombre de la tabla Delta de salida
        DELTA_TABLE_NAME = "OneLakeSize"
        # Ruta relativa en el Lakehouse por defecto (asegúrate que esté adjunto)
        DELTA_TABLE_PATH = f"Tables/{DELTA_TABLE_NAME}"

        print(f"Guardando datos en la tabla Delta: '{DELTA_TABLE_PATH}' (Modo: Overwrite)...")
        try:
            # Escribir el DataFrame en formato Delta, sobrescribiendo si ya existe
            spark_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(DELTA_TABLE_PATH)
            print(f"Tabla Delta '{DELTA_TABLE_NAME}' guardada/actualizada exitosamente.")

        except Exception as e:
            print(f"ERROR al guardar la tabla Delta '{DELTA_TABLE_NAME}': {e}")

    except Exception as df_error:
        print(f"ERROR al crear el DataFrame de Spark: {df_error}")

else:
    print("\nNo se recopilaron datos de archivos. No se creó ni guardó ninguna tabla.")


## 5. Visualización de Resultados (Opcional)

Mostramos las primeras filas del DataFrame resultante.

In [None]:
display(spark_df)