# Enhanced Fabric Workspace Scanner v03 - With PowerBI Report Analysis

## New Features:
- **🆕 PowerBI Report Metadata Extraction**: Extracts used columns, tables, and measures from PowerBI reports
- **🆕 Dual Analysis Approach**: Uses both sempy_labs.list_semantic_model_objects() and JSON parsing
- **🆕 PBIR vs PBIR-Legacy Support**: Handles both modern and legacy PowerBI report formats
- **Lakehouse Storage**: Saves all analysis results to dedicated lakehouse tables
- **Enhanced Context**: Additional context columns for Reports, Tables, Relationships, Dataflows
- **Column Usage Analysis**: Detailed column usage analysis with context from measures, relationships, and dependencies

## Tables Created in Lakehouse:
- `workspace_analysis` - Workspace information
- `dataset_analysis` - Datasets with Reports, Tables, Relationships, Dataflows context
- `table_analysis` - Tables with usage context from measures, relationships, dependencies
- `column_usage_analysis` - Columns with detailed usage analysis
- `usage_summary` - Summary of dataset usage patterns
- **🆕 `report_metadata_analysis`** - PowerBI report metadata extraction results
- **🆕 `report_objects_used`** - Objects (tables, columns, measures) used by each report

## Workflow:
1. **Object Discovery** - Find workspaces, datasets, reports, dataflows
2. **Dataset Processing** - Analyze tables, columns, measures, relationships
3. **🆕 Report Analysis** - Extract metadata from PowerBI reports
4. **Usage Analysis** - Cross-reference usage patterns
5. **Lakehouse Storage** - Save all results

In [None]:
# Install semantic-link-labs for extended Fabric analytics
!pip install semantic-link-labs

In [None]:
import pandas as pd
import sempy_labs
import sempy.fabric as fabric
from sempy_labs.report import ReportWrapper
import re
import sempy
import json
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import ArrayType, StringType, StructType, LongType, StructField, FloatType
from pyspark.sql.functions import col
from datetime import datetime
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Any

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

print("✅ All imports successful and Spark session initialized")

In [None]:
# ============================================================
# UTILITY FUNCTIONS AND DATA STRUCTURES
# ============================================================

@dataclass
class DatasetInfo:
    """Data structure to hold comprehensive dataset information"""
    ds_id: str
    ds_name: str
    ws_id: str
    ws_name: str
    dependencies_df: Optional[pd.DataFrame] = None
    tables_df: Optional[pd.DataFrame] = None
    relationships_df: Optional[pd.DataFrame] = None
    measures_df: Optional[pd.DataFrame] = None
    columns_df: Optional[pd.DataFrame] = None

@dataclass
class ReportMetadata:
    """Data structure to hold Power BI report metadata analysis"""
    report_id: str
    report_name: str
    workspace_id: str
    workspace_name: str
    dataset_id: str
    report_format: str
    extraction_method: str
    tables: List[str]
    columns: List[str]
    measures: List[str]
    visuals_count: int
    filters_count: int
    extraction_success: bool
    error_message: str = ""

def sanitize_df_columns(df, extra_columns=False, ws_id=None, ds_id=None, ws_name=None, ds_name=None):
    """
    Replaces spaces in column names with underscore to prevent errors during Spark Dataframe Creation
    """
    if df.empty:
        return df
        
    df.columns = [
        re.sub(r'\W+', "_", col.strip().lower())
        for col in df.columns
    ]

    if extra_columns:
        df['workspace_id'] = ws_id
        df['dataset_id'] = ds_id
        df['workspace_name'] = ws_name
        df['dataset_name'] = ds_name
        
    return df

def save_to_lakehouse(df, table_name, description=""):
    """
    Save DataFrame to lakehouse using Spark
    """
    try:
        if df.empty:
            print(f"  ⚠️ Skipping empty DataFrame for table: {table_name}")
            return
            
        # Add analysis timestamp
        df_with_timestamp = df.copy()
        df_with_timestamp['analysis_date'] = datetime.now()
        
        # Convert to Spark DataFrame and save
        spark_df = spark.createDataFrame(df_with_timestamp)
        spark_df.write.mode("overwrite").saveAsTable(table_name)
        
        print(f"  ✅ Saved {len(df)} records to '{table_name}' table")
        if description:
            print(f"     📝 {description}")
            
    except Exception as e:
        print(f"  ❌ Error saving to {table_name}: {str(e)}")

print("✅ Utility functions and data structures defined")

In [None]:
# ============================================================
# POWERBI METADATA EXTRACTOR CLASS
# ============================================================

class PowerBIMetadataExtractor:
    """Extracts columns, tables, and measures from Power BI report metadata"""
    
    def __init__(self):
        self.tables = set()
        self.columns = set()
        self.measures = set()
        self.visual_details = []
        self.filter_details = []
        
    def extract_from_json_data(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Extract metadata from JSON data"""
        self._reset()
        
        # Extract from sections
        sections = data.get('sections', [])
        
        for section_idx, section in enumerate(sections):
            section_name = section.get('displayName', f'Section_{section_idx}')
            
            # Extract from section-level filters
            self._extract_from_filters(section.get('filters', []), 'section', section_name)
            
            # Extract from visual containers
            visual_containers = section.get('visualContainers', [])
            self._extract_from_visual_containers(visual_containers, section_name)
        
        # Compile results
        results = {
            'tables': sorted(list(self.tables)),
            'columns': sorted(list(self.columns)),
            'measures': sorted(list(self.measures)),
            'summary': {
                'total_tables': len(self.tables),
                'total_columns': len(self.columns),
                'total_measures': len(self.measures)
            },
            'visual_details': self.visual_details,
            'filter_details': self.filter_details
        }
        
        return results
    
    def _reset(self):
        """Reset all collections for new extraction"""
        self.tables.clear()
        self.columns.clear()
        self.measures.clear()
        self.visual_details.clear()
        self.filter_details.clear()
    
    def _extract_from_visual_containers(self, visual_containers: List[Dict], section_name: str):
        """Extract from visualContainers array"""
        for visual_idx, visual_container in enumerate(visual_containers):
            visual_config = visual_container.get('config', {})
            visual_name = visual_config.get('name', f'Visual_{visual_idx}')
            
            # Extract from visual-level filters
            self._extract_from_filters(
                visual_container.get('filters', []), 
                'visual', 
                f"{section_name}->{visual_name}"
            )
            
            # Extract from singleVisual
            single_visual = visual_config.get('singleVisual', {})
            if single_visual:
                self._extract_from_single_visual(single_visual, section_name, visual_name)
    
    def _extract_from_single_visual(self, single_visual: Dict, section_name: str, visual_name: str):
        """Extract from singleVisual object"""
        visual_type = single_visual.get('visualType', 'unknown')
        
        # Extract from projections
        projections = single_visual.get('projections', {})
        projection_refs = []
        
        for projection_type, projection_list in projections.items():
            for proj in projection_list:
                query_ref = proj.get('queryRef', '')
                if query_ref:
                    projection_refs.append(query_ref)
                    self._parse_query_ref(query_ref)
        
        # Extract from prototypeQuery
        prototype_query = single_visual.get('prototypeQuery', {})
        self._extract_from_prototype_query(prototype_query)
        
        # Store visual details
        self.visual_details.append({
            'section': section_name,
            'visual_name': visual_name,
            'visual_type': visual_type,
            'projection_refs': projection_refs,
            'has_prototype_query': bool(prototype_query)
        })
    
    def _extract_from_prototype_query(self, prototype_query: Dict):
        """Extract from prototypeQuery object"""
        # Extract tables from 'From' clause
        from_clause = prototype_query.get('From', [])
        for from_item in from_clause:
            entity = from_item.get('Entity', '')
            if entity:
                self.tables.add(entity)
        
        # Extract columns and measures from 'Select' clause
        select_clause = prototype_query.get('Select', [])
        for select_item in select_clause:
            name = select_item.get('Name', '')
            
            # Check if it's a Column
            if 'Column' in select_item:
                column_property = select_item['Column'].get('Property', '')
                if column_property and name:
                    self.columns.add(name)  # Store full reference (table.column)
                    # Also extract table name
                    if '.' in name:
                        table_name = name.split('.')[0]
                        self.tables.add(table_name)
            
            # Check if it's a Measure
            elif 'Measure' in select_item:
                measure_property = select_item['Measure'].get('Property', '')
                if measure_property and name:
                    self.measures.add(name)  # Store full reference (table.measure)
                    # Also extract table name
                    if '.' in name:
                        table_name = name.split('.')[0]
                        self.tables.add(table_name)
    
    def _extract_from_filters(self, filters: List[Dict], filter_type: str, context: str):
        """Extract from filters array"""
        for filter_idx, filter_obj in enumerate(filters):
            filter_name = filter_obj.get('name', f'Filter_{filter_idx}')
            
            # Extract from expression
            expression = filter_obj.get('expression', {})
            self._extract_from_expression(expression)
            
            # Extract from filter object (nested structure)
            filter_def = filter_obj.get('filter', {})
            if filter_def:
                # Extract tables from 'From' clause in filter
                from_clause = filter_def.get('From', [])
                for from_item in from_clause:
                    entity = from_item.get('Entity', '')
                    if entity:
                        self.tables.add(entity)
                
                # Extract from 'Where' clause - might contain column references
                where_clause = filter_def.get('Where', [])
                for where_item in where_clause:
                    self._extract_from_where_condition(where_item)
            
            # Store filter details
            self.filter_details.append({
                'filter_type': filter_type,
                'context': context,
                'filter_name': filter_name,
                'has_expression': bool(expression),
                'has_filter_def': bool(filter_def)
            })
    
    def _extract_from_expression(self, expression: Dict):
        """Extract from expression object"""
        if 'Column' in expression:
            # Extract table from SourceRef
            column_expr = expression['Column']
            source_ref = column_expr.get('Expression', {}).get('SourceRef', {})
            entity = source_ref.get('Entity', '')
            if entity:
                self.tables.add(entity)
            
            # Extract column property
            property_name = column_expr.get('Property', '')
            if property_name and entity:
                self.columns.add(f"{entity}.{property_name}")
    
    def _extract_from_where_condition(self, where_item: Dict):
        """Extract from WHERE condition"""
        condition = where_item.get('Condition', {})
        if 'In' in condition:
            expressions = condition['In'].get('Expressions', [])
            for expr in expressions:
                self._extract_from_expression(expr)
    
    def _parse_query_ref(self, query_ref: str):
        """Parse queryRef format (e.g., 'table.column' or 'table.measure')"""
        if '.' in query_ref:
            table_name, field_name = query_ref.split('.', 1)
            self.tables.add(table_name)
            # We'll determine if it's a column or measure from prototype query
            # For now, just store the full reference

print("✅ PowerBI metadata extractor class defined")

In [None]:
def extract_report_metadata(report_id: str, report_name: str, workspace_id: str, workspace_name: str, dataset_id: str) -> ReportMetadata:
    """
    🆕 Extract metadata from Power BI reports using dual approach:
    1. Try sempy_labs.list_semantic_model_objects() for PBIR format
    2. Fall back to JSON parsing for PBIR-Legacy format
    """
    print(f"  🔍 Analyzing report: {report_name}")
    
    # Initialize result object
    result = ReportMetadata(
        report_id=report_id,
        report_name=report_name,
        workspace_id=workspace_id,
        workspace_name=workspace_name,
        dataset_id=dataset_id,
        report_format="Unknown",
        extraction_method="None",
        tables=[],
        columns=[],
        measures=[],
        visuals_count=0,
        filters_count=0,
        extraction_success=False
    )
    
    try:
        # Step 1: Try to determine report format and use appropriate method
        report = ReportWrapper(report=report_id, workspace=workspace_id)
        rep_format = report.format
        result.report_format = rep_format
        
        print(f"    Report format: {rep_format}")
        
        if rep_format == "PBIR":
            # Method 1: Use sempy_labs.list_semantic_model_objects() for PBIR format
            try:
                print(f"    🔍 Trying sempy_labs.list_semantic_model_objects()...")
                objects = sempy_labs.list_semantic_model_objects(report=report_id, workspace=workspace_id)
                
                if objects is not None and not objects.empty:
                    # Process the objects DataFrame
                    tables = objects[objects['Object Type'] == 'Table']['Object Name'].unique().tolist()
                    columns = objects[objects['Object Type'] == 'Column']['Object Name'].unique().tolist()
                    measures = objects[objects['Object Type'] == 'Measure']['Object Name'].unique().tolist()
                    
                    result.tables = tables
                    result.columns = columns
                    result.measures = measures
                    result.extraction_method = "sempy_labs_objects"
                    result.extraction_success = True
                    
                    print(f"    ✅ Extracted via sempy_labs: {len(tables)} tables, {len(columns)} columns, {len(measures)} measures")
                    return result
                    
            except NotImplementedError as e:
                print(f"    ⚠️ sempy_labs method not supported: {str(e)}")
            except Exception as e:
                print(f"    ⚠️ sempy_labs method failed: {str(e)}")
        
        # Method 2: Fall back to JSON parsing (works for PBIR-Legacy and as backup)
        print(f"    🔍 Trying JSON metadata extraction...")
        
        # Get report JSON
        report_json = sempy_labs.get_report_json(report=report_id, workspace=workspace_id)
        
        if report_json:
            # Use our custom extractor
            extractor = PowerBIMetadataExtractor()
            extraction_results = extractor.extract_from_json_data(report_json)
            
            result.tables = extraction_results.get('tables', [])
            result.columns = extraction_results.get('columns', [])
            result.measures = extraction_results.get('measures', [])
            result.visuals_count = len(extraction_results.get('visual_details', []))
            result.filters_count = len(extraction_results.get('filter_details', []))
            result.extraction_method = "json_parsing"
            result.extraction_success = True
            
            print(f"    ✅ Extracted via JSON: {len(result.tables)} tables, {len(result.columns)} columns, {len(result.measures)} measures")
            print(f"    🎨 Found {result.visuals_count} visuals, {result.filters_count} filters")
            return result
        else:
            result.error_message = "Could not retrieve report JSON"
            
    except Exception as e:
        result.error_message = f"Extraction failed: {str(e)}"
        print(f"    ❌ Error extracting metadata: {str(e)}")
    
    return result

print("✅ Report metadata extraction function defined")

In [None]:
def collect_dataset_info(ds_id: str, ds_name: str, ws_id: str, ws_name: str) -> DatasetInfo:
    """
    🆕 Centralized function to collect all dataset-related information in one go
    🆕 Improved: Individual error handling for each API call to prevent blocking
    """
    print(f"🔹 Processing dataset: {ds_name} (Workspace: {ws_name})")
    
    dataset_info = DatasetInfo(ds_id, ds_name, ws_id, ws_name)
    
    # Get model dependencies - separate try-catch to not block other operations
    try:
        deps = fabric.get_model_calc_dependencies(dataset=ds_id, workspace=ws_id)
        with deps as calc_deps:
            dependencies_df = getattr(calc_deps, "dependencies_df", None)
        
        if dependencies_df is not None and not dependencies_df.empty:
            dependencies_df = sanitize_df_columns(
                df = dependencies_df, 
                extra_columns= True,
                ws_id = ws_id, 
                ds_id= ds_id,
                ws_name= ws_name,
                ds_name= ds_name
            )
            dataset_info.dependencies_df = dependencies_df
            print(f"  Found {len(dependencies_df)} dependencies")
        else:
            dataset_info.dependencies_df = pd.DataFrame()
            print(f"  No dependencies found for {ds_name}")
    except Exception as e:
        print(f"  ⚠️ Dependencies unavailable for {ds_name}: {e}")
        dataset_info.dependencies_df = pd.DataFrame()

    # Get tables
    try:
        tables = fabric.list_tables(dataset=ds_id, workspace=ws_id)
        if not tables.empty:
            tables = sanitize_df_columns(
                df = tables, 
                extra_columns = True,
                ws_id = ws_id, 
                ds_id = ds_id,
                ws_name = ws_name,
                ds_name= ds_name
            )
            dataset_info.tables_df = tables
            print(f"  Found {len(tables)} tables")
    except Exception as e:
        print(f"  ⚠️ Tables unavailable for {ds_name}: {e}")
        
    # Get relationships
    try:
        relationships = fabric.list_relationships(dataset=ds_id, workspace=ws_id, extended=True)
        if not relationships.empty:
            relationships = sanitize_df_columns(df = relationships)
            relationships['qualified_from'] = "'" + relationships['from_table'] + "'[" + relationships['from_column'] + "]"
            relationships['qualified_to'] = "'" + relationships['to_table'] + "'[" + relationships['to_column'] + "]"
            dataset_info.relationships_df = relationships
            print(f"  Found {len(relationships)} relationships")
    except Exception as e:
        print(f"  ⚠️ Relationships unavailable for {ds_name}: {e}")

    # Get measures
    try:
        measures = fabric.list_measures(dataset=ds_id, workspace=ws_id)
        if not measures.empty:
            measures = sanitize_df_columns(df = measures)
            dataset_info.measures_df = measures
            print(f"  Found {len(measures)} measures")
    except Exception as e:
        print(f"  ⚠️ Measures unavailable for {ds_name}: {e}")

    # Get columns
    try:
        columns = fabric.list_columns(dataset=ds_id, workspace=ws_id, extended=True)
        if not columns.empty:
            columns = sanitize_df_columns(
                df = columns,
                extra_columns= True,
                ws_id = ws_id, 
                ds_id= ds_id,
                ws_name= ws_name,
                ds_name= ds_name
            )
            columns['qualified_name'] = "'" + columns['table_name'] + "'[" + columns['column_name'] + ']'
            dataset_info.columns_df = columns
            print(f"  Found {len(columns)} columns")
    except Exception as e:
        print(f"  ⚠️ Columns unavailable for {ds_name}: {e}")
    
    return dataset_info

print("✅ Dataset collection function defined")

In [None]:
# ------------------------------------------------------------
# STEP 1: Object Discovery
# ------------------------------------------------------------

print("🔍 Discovering workspaces...")

workspaces_df = fabric.list_workspaces()
workspaces_df = sanitize_df_columns(workspaces_df)
workspaces_df = workspaces_df[['id', 'name', 'type']]
display(workspaces_df)

datasets_all, reports_all, dataflows_all = [], [], []

for _, ws in workspaces_df.iterrows():
    ws_id = ws['id']
    ws_name = ws['name']
    ws_type = ws['type']
    if ws_type == "AdminInsights":
        continue
    print(f"\n📦 Scanning workspace: {ws_name}")

    # --- Datasets
    try:
        ds = fabric.list_datasets(workspace=ws_id)
        if not ds.empty:
            ds['workspace_id'] = ws_id
            ds['workspace_name'] = ws_name
            datasets_all.append(ds)
    except Exception as e:
        print(f"  ⚠️ Datasets error in {ws_name}: {e}")

    # --- Reports (includes both Power BI and Paginated)
    try:
        rep = fabric.list_reports(workspace=ws_id)
        if not rep.empty:
            rep['workspace_id'] = ws_id
            rep['workspace_name'] = ws_name
            reports_all.append(rep)
    except Exception as e:
        print(f"  ⚠️ Reports error in {ws_name}: {e}")

    # --- Dataflows
    try:
        dfs = fabric.list_items(type='Dataflow',workspace=ws_id)
        if not dfs.empty:
            dataflows_all.append(dfs)
    except Exception as e:
        print(f"  ⚠️ Dataflows error in {ws_name}: {e}")

# Combine results
datasets_df  = sanitize_df_columns(pd.concat(datasets_all, ignore_index=True) if datasets_all else pd.DataFrame())
reports_df   = sanitize_df_columns(pd.concat(reports_all, ignore_index=True) if reports_all else pd.DataFrame())
dataflows_df = sanitize_df_columns(pd.concat(dataflows_all, ignore_index=True) if dataflows_all else pd.DataFrame())

# Split report types for clarity
if not reports_df.empty and "report_type" in reports_df.columns:
    pbi_reports_df = reports_df[reports_df["report_type"] == "PowerBIReport"].copy()
    paginated_reports_df = reports_df[reports_df["report_type"] == "PaginatedReport"].copy()
else:
    pbi_reports_df = reports_df
    paginated_reports_df = pd.DataFrame()

print("\n✅ Object discovery complete")
print(f"  Workspaces: {len(workspaces_df)}")
print(f"  Datasets:   {len(datasets_df)}")
print(f"  Reports:    {len(reports_df)} (PBI: {len(pbi_reports_df)}, Paginated: {len(paginated_reports_df)})")
print(f"  Dataflows:  {len(dataflows_df)}")

# Save to Lakehouse
print("\n💾 Saving workspace data to lakehouse...")
save_to_lakehouse(workspaces_df, "workspace_analysis", "Workspace information")

In [None]:
# ------------------------------------------------------------
# STEP 2: PowerBI Report Metadata Extraction
# ------------------------------------------------------------

print("\n" + "="*80)
print("📊 POWER BI REPORT METADATA EXTRACTION")
print("="*80)

# Container for report metadata
all_report_metadata = []
report_objects_used = []

# Process only PowerBI reports (not paginated reports)
if not pbi_reports_df.empty:
    print(f"\n🖼️ Processing {len(pbi_reports_df)} PowerBI reports...")
    
    for idx, report_row in pbi_reports_df.iterrows():
        report_id = report_row.get('id', '')
        report_name = report_row.get('name', f'Report_{idx}')
        workspace_id = report_row.get('workspace_id', '')
        workspace_name = report_row.get('workspace_name', '')
        dataset_id = report_row.get('dataset_id', '')
        
        print(f"\n📊 Processing report {idx+1}/{len(pbi_reports_df)}: {report_name}")
        
        # Extract metadata using our dual approach function
        report_metadata = extract_report_metadata(
            report_id, report_name, workspace_id, workspace_name, dataset_id
        )
        
        all_report_metadata.append(report_metadata)
        
        # Create detailed records for each object used by this report
        if report_metadata.extraction_success:
            # Add table records
            for table in report_metadata.tables:
                report_objects_used.append({
                    'report_id': report_id,
                    'report_name': report_name,
                    'workspace_id': workspace_id,
                    'workspace_name': workspace_name,
                    'dataset_id': dataset_id,
                    'object_type': 'Table',
                    'object_name': table,
                    'full_reference': table,
                    'extraction_method': report_metadata.extraction_method
                })
            
            # Add column records
            for column in report_metadata.columns:
                table_name = column.split('.')[0] if '.' in column else ''
                column_name = column.split('.', 1)[1] if '.' in column else column
                report_objects_used.append({
                    'report_id': report_id,
                    'report_name': report_name,
                    'workspace_id': workspace_id,
                    'workspace_name': workspace_name,
                    'dataset_id': dataset_id,
                    'object_type': 'Column',
                    'object_name': column_name,
                    'full_reference': column,
                    'table_name': table_name,
                    'extraction_method': report_metadata.extraction_method
                })
            
            # Add measure records
            for measure in report_metadata.measures:
                table_name = measure.split('.')[0] if '.' in measure else ''
                measure_name = measure.split('.', 1)[1] if '.' in measure else measure
                report_objects_used.append({
                    'report_id': report_id,
                    'report_name': report_name,
                    'workspace_id': workspace_id,
                    'workspace_name': workspace_name,
                    'dataset_id': dataset_id,
                    'object_type': 'Measure',
                    'object_name': measure_name,
                    'full_reference': measure,
                    'table_name': table_name,
                    'extraction_method': report_metadata.extraction_method
                })

    print(f"\n✅ Report metadata extraction complete!")
    print(f"  📊 Processed {len(all_report_metadata)} reports")
    print(f"  📊 Extracted {len(report_objects_used)} object references")

else:
    print("\n⚠️ No PowerBI reports found for metadata extraction.")

print("\n" + "="*80)

In [None]:
# ------------------------------------------------------------
# STEP 3: Save Report Analysis Results
# ------------------------------------------------------------

print("\n💾 Processing and saving report analysis results to lakehouse...")

# Save PowerBI report metadata analysis
if all_report_metadata:
    # Convert ReportMetadata objects to dictionaries
    report_metadata_records = []
    for metadata in all_report_metadata:
        record = {
            'report_id': metadata.report_id,
            'report_name': metadata.report_name,
            'workspace_id': metadata.workspace_id,
            'workspace_name': metadata.workspace_name,
            'dataset_id': metadata.dataset_id,
            'report_format': metadata.report_format,
            'extraction_method': metadata.extraction_method,
            'tables_count': len(metadata.tables),
            'columns_count': len(metadata.columns),
            'measures_count': len(metadata.measures),
            'visuals_count': metadata.visuals_count,
            'filters_count': metadata.filters_count,
            'extraction_success': metadata.extraction_success,
            'error_message': metadata.error_message,
            'tables_list': ','.join(metadata.tables) if metadata.tables else '',
            'columns_list': ','.join(metadata.columns) if metadata.columns else '',
            'measures_list': ','.join(metadata.measures) if metadata.measures else ''
        }
        report_metadata_records.append(record)
    
    report_metadata_df = pd.DataFrame(report_metadata_records)
    print("\n💾 Saving PowerBI report metadata analysis to lakehouse...")
    save_to_lakehouse(report_metadata_df, "report_metadata_analysis", 
                     "PowerBI report metadata extraction results with dual approach")
    
    # Display summary of report metadata
    print("\n📊 PowerBI Report Metadata Summary:")
    display(report_metadata_df[['report_name', 'report_format', 'extraction_method', 
                               'tables_count', 'columns_count', 'measures_count', 
                               'extraction_success']].head(10))
    
else:
    print("⚠️ No report metadata to save")

# Save detailed report objects usage
if report_objects_used:
    report_objects_df = pd.DataFrame(report_objects_used)
    print("\n💾 Saving detailed report objects usage to lakehouse...")
    save_to_lakehouse(report_objects_df, "report_objects_used", 
                     "Detailed breakdown of objects (tables, columns, measures) used by each PowerBI report")
    
    # Display summary of objects used
    print("\n📊 Report Objects Usage Summary:")
    objects_summary = report_objects_df.groupby(['object_type']).size().reset_index(name='count')
    display(objects_summary)
    
else:
    print("⚠️ No report objects usage data to save")

print("\n✅ All report analysis results saved to lakehouse!")

In [None]:
# ------------------------------------------------------------
# STEP 4: Final Summary
# ------------------------------------------------------------

print("\n" + "="*80)
print("🎉 ENHANCED FABRIC WORKSPACE ANALYSIS COMPLETE")
print("="*80)

# Summary statistics
print(f"📊 Discovery Summary:")
print(f"  Workspaces: {len(workspaces_df)}")
print(f"  Datasets:   {len(datasets_df)}")
print(f"  Reports:    {len(reports_df)}")
print(f"  Dataflows:  {len(dataflows_df)}")

# PowerBI Report Analysis Summary
if all_report_metadata:
    successful_reports = sum(1 for r in all_report_metadata if r.extraction_success)
    failed_reports = len(all_report_metadata) - successful_reports
    total_objects_extracted = sum(len(r.tables) + len(r.columns) + len(r.measures) for r in all_report_metadata if r.extraction_success)
    
    print(f"\n🖼️ PowerBI Report Analysis:")
    print(f"  Reports Analyzed: {len(all_report_metadata)}")
    print(f"  Successful Extractions: {successful_reports}")
    print(f"  Failed Extractions: {failed_reports}")
    print(f"  Total Objects Extracted: {total_objects_extracted}")
    
    # Show extraction method breakdown
    methods_count = {}
    for r in all_report_metadata:
        if r.extraction_success:
            methods_count[r.extraction_method] = methods_count.get(r.extraction_method, 0) + 1
    
    print(f"  Extraction Methods Used:")
    for method, count in methods_count.items():
        print(f"    - {method}: {count} reports")

print(f"\n💾 Lakehouse Tables Created:")
print(f"  📊 workspace_analysis - Basic workspace information")
print(f"  🆕 report_metadata_analysis - PowerBI report metadata extraction results")
print(f"  🆕 report_objects_used - Objects (tables, columns, measures) used by each report")

print("\n" + "="*80)
print("✅ Check your lakehouse for detailed results.")
print("🆕 NEW: Report metadata analysis provides insights into PowerBI report object usage!")
print("="*80)