# Analysis and Validation of Power BI

In [None]:
# cell 1
original_file = "E:/OneDrive/OneDrive - lfernandes/03 - PROJETOS/Empresas/Oleon Brasil/Power BI/Original/RMP - UAT.SemanticModel/definition/"
modified_file = "E:/OneDrive/OneDrive - lfernandes/03 - PROJETOS/Empresas/Oleon Brasil/Power BI/Modificado/RMP - UAT.SemanticModel/definition/"

# cell 2
from pathlib import Path
from typing import Dict, List, Any, Tuple
import re
import json

# MODULE 1: LOADER
def load_tmdl_file(file_path: str) -> str:
    """
    Load .tmdl file content    
    """
    path = Path(file_path)
    
    if not path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")
    
    if path.suffix.lower() != ".tmdl":
        raise ValueError(f"Invalid extension. Expected .tmdl, got {path.suffix}")
    
    try:
        return path.read_text(encoding="utf-8")
    except UnicodeDecodeError as e:
        raise UnicodeDecodeError(
            e.encoding, e.object, e.start, e.end,
            f"Error decoding {file_path}. Check UTF-8 encoding"
        )


def load_tmdl_definition(definition_path: str) -> Dict[str, str]:
    """
    load all TMDL files into definition/    
    """
    base_path = Path(definition_path)

    if not base_path.exists():
        raise FileNotFoundError(f"Definition path not found: {definition_path}")

    tmdl_files = {}

    for folder in ["tables", "relationships", "cultures"]:
        folder_path = base_path / folder
        if not folder_path.exists():
            continue

        for file in folder_path.glob("*.tmdl"):
            tmdl_files[f"{folder}/{file.name}"] = file.read_text(encoding="utf-8")

    return tmdl_files


# MODULE 2: PARSER
def parse_tmdl_content(content: str) -> Dict[str, List[Dict[str, Any]]]:
    """
    Extract semantic structure from TMDL            
    Returns dictionary with tables, measures, columns and relationships
    """
    structure = {
        "tables": [],
        "measures": [],
        "columns": [],
        "relationships": []
    }
    
    # Extract tables
    table_pattern = r'table\s+([^\s\{]+)\s*\{'
    for match in re.finditer(table_pattern, content, re.MULTILINE):
        table_name = match.group(1).strip()
        table_start = match.start()
        table_block = _extract_block(content, table_start)
        
        structure["tables"].append({
            "name": table_name,
            "content": table_block,
            "lineage_tag": _extract_property(table_block, "lineageTag"),
            "description": _extract_property(table_block, "description")
        })
    
    # Extract measures
    measure_pattern = r'measure\s+([^\s=]+)\s*='
    for match in re.finditer(measure_pattern, content, re.MULTILINE):
        measure_name = match.group(1).strip()
        measure_start = match.start()
        measure_block = _extract_measure_block(content, measure_start)
        
        parent_table = _find_parent_table(content, measure_start)
        full_name = f"{parent_table}.{measure_name}" if parent_table else measure_name
        
        structure["measures"].append({
            "name": full_name,
            "expression": _extract_expression(measure_block),
            "format": _extract_property(measure_block, "formatString"),
            "description": _extract_property(measure_block, "description"),
            "content": measure_block
        })
    
    # Extract columns
    column_pattern = r'column\s+([^\s:\{]+)'
    for match in re.finditer(column_pattern, content, re.MULTILINE):
        column_name = match.group(1).strip()
        column_start = match.start()
        column_block = _extract_block(content, column_start)
        
        parent_table = _find_parent_table(content, column_start)
        
        structure["columns"].append({
            "name": column_name,
            "table": parent_table,
            "data_type": _extract_property(column_block, "dataType"),
            "source_column": _extract_property(column_block, "sourceColumn"),
            "expression": _extract_property(column_block, "expression"),
            "description": _extract_property(column_block, "description"),
            "content": column_block
        })
    
    # Extract relationships
    relationship_pattern = r'relationship\s+([^\s\{]+)\s*\{'
    for match in re.finditer(relationship_pattern, content, re.MULTILINE):
        rel_name = match.group(1).strip()
        rel_start = match.start()
        rel_block = _extract_block(content, rel_start)
        
        structure["relationships"].append({
            "name": rel_name,
            "from_table": _extract_property(rel_block, "fromTable"),
            "from_column": _extract_property(rel_block, "fromColumn"),
            "to_table": _extract_property(rel_block, "toTable"),
            "to_column": _extract_property(rel_block, "toColumn"),
            "cardinality": _extract_property(rel_block, "crossFilteringBehavior"),
            "content": rel_block
        })
    
    return structure


def _extract_block(content: str, start_pos: int) -> str:
    """Extract block delimited by braces"""
    brace_count = 0
    in_block = False
    block_start = start_pos
    
    for i in range(start_pos, len(content)):
        if content[i] == '{':
            if not in_block:
                block_start = i
                in_block = True
            brace_count += 1
        elif content[i] == '}':
            brace_count -= 1
            if brace_count == 0 and in_block:
                return content[block_start:i+1]
    
    return content[start_pos:start_pos+200]


def _extract_measure_block(content: str, start_pos: int) -> str:
    """Extract measure block (including DAX expression)"""
    brace_count = 0
    in_block = False
    block_start = None

    for i in range(start_pos, len(content)):
        if content[i] == '{':
            if not in_block:
                block_start = i
                in_block = True
            brace_count += 1
        elif content[i] == '}':
            brace_count -= 1
            if in_block and brace_count == 0:
                return content[start_pos:i+1]

    return content[start_pos:]


def _extract_expression(block: str) -> str:
    """Extract DAX expression from measure or calculated column"""
    match = re.search(
        r'expression:\s*(.+?)(?=\n\s*(?:formatString|description|lineageTag|\}))',
        block,
        re.DOTALL
    )
    if match:
        return match.group(1).strip()
    
    match = re.search(
        r'measure\s+[^\s=]+\s*=\s*(.+)',
        block
    )
    return match.group(1).strip() if match else ""


def _extract_property(block: str, prop_name: str) -> str:
    """Extract TMDL property value"""
    pattern = rf'{prop_name}\s*[:=]\s*["\']?([^"\'\n]+)["\']?'
    match = re.search(pattern, block)
    return match.group(1).strip() if match else ""


def _find_parent_table(content: str, position: int) -> str:
    """Identify parent table of a column"""
    before_content = content[:position]
    table_matches = list(re.finditer(r'table\s+([^\s\{]+)', before_content))
    
    if table_matches:
        return table_matches[-1].group(1).strip()
    return ""


# MODULE 3: DIFF
def compare_tmdl_models(
    original_structure: Dict[str, List[Dict[str, Any]]],
    modified_structure: Dict[str, List[Dict[str, Any]]]
) -> Dict[str, Any]:
    """
    Compare two versions of TMDL model
    
    Args:
        original_structure: structure extracted from original model
        modified_structure: structure extracted from modified model
        
    Returns:
        dictionary with added, removed and modified objects
    """
    diff_result = {
        "added": {"tables": [], "measures": [], "columns": [], "relationships": []},
        "removed": {"tables": [], "measures": [], "columns": [], "relationships": []},
        "modified": {"tables": [], "measures": [], "columns": [], "relationships": []}
    }
    
    # Compare each object type
    for obj_type in ["tables", "measures", "columns", "relationships"]:
        original_objs = {obj["name"]: obj for obj in original_structure[obj_type]}
        modified_objs = {obj["name"]: obj for obj in modified_structure[obj_type]}
        
        # Added objects
        for name in modified_objs.keys() - original_objs.keys():
            diff_result["added"][obj_type].append(modified_objs[name])
        
        # Removed objects
        for name in original_objs.keys() - modified_objs.keys():
            diff_result["removed"][obj_type].append(original_objs[name])
        
        # Modified objects
        for name in original_objs.keys() & modified_objs.keys():
            if _is_modified(original_objs[name], modified_objs[name]):
                diff_result["modified"][obj_type].append({
                    "name": name,
                    "original": original_objs[name],
                    "modified": modified_objs[name]
                })
    
    return diff_result


def _is_modified(original: Dict[str, Any], modified: Dict[str, Any]) -> bool:
    fields = ["expression", "format", "description"]

    for f in fields:
        o = (original.get(f) or "").strip()
        m = (modified.get(f) or "").strip()
        if o != m:
            return True

    return False



# MODULE 4: UTILITIES
def print_diff_summary(diff_result: Dict[str, Any]) -> None:
    """Display summary of found differences"""    
    
    for action in ["added", "removed", "modified"]:
        total = sum(len(diff_result[action][obj_type]) 
                   for obj_type in ["tables", "measures", "columns", "relationships"])
        
        if total > 0:
            print(f"\n{action.upper()}: {total} object(s)")
            for obj_type in ["tables", "measures", "columns", "relationships"]:
                count = len(diff_result[action][obj_type])
                if count > 0:
                    print(f"  - {obj_type}: {count}")


def export_modified_objects(diff_result: Dict[str, Any], output_path: str) -> None:
    """
    Export modified objects to JSON
    
    Args:
        diff_result: result from compare_tmdl_models
        output_path: output file path
    """
    output_data = {
        "summary": {
            "added": sum(len(diff_result["added"][k]) for k in diff_result["added"]),
            "removed": sum(len(diff_result["removed"][k]) for k in diff_result["removed"]),
            "modified": sum(len(diff_result["modified"][k]) for k in diff_result["modified"])
        },
        "details": diff_result
    }
    
    Path(output_path).write_text(
        json.dumps(output_data, indent=2, ensure_ascii=False),
        encoding="utf-8"
    )
    
    print(f"\nResult exported to: {output_path}")

original_content = load_tmdl_definition(original_file)
modified_content = load_tmdl_definition(modified_file)

print(f"✓ Original model loaded: {len(original_content)} characters")
print(f"✓ Modified model loaded: {len(modified_content)} characters")


# Cell 2: Parse structures
def aggregate_structure(tmdl_files: Dict[str, str]) -> Dict[str, list]:
    aggregated = {
        "tables": [],
        "measures": [],
        "columns": [],
        "relationships": []
    }

    for file_name, content in tmdl_files.items():
        parsed = parse_tmdl_content(content)

        for key in aggregated:
            aggregated[key].extend(parsed[key])

    return aggregated

original_structure = aggregate_structure(original_content)
modified_structure = aggregate_structure(modified_content)

print(f"\nOriginal model:")
print(f"  - Tables: {len(original_structure['tables'])}")
print(f"  - Measures: {len(original_structure['measures'])}")
print(f"  - Columns: {len(original_structure['columns'])}")
print(f"  - Relationships: {len(original_structure['relationships'])}")

print(f"\nModified model:")
print(f"  - Tables: {len(modified_structure['tables'])}")
print(f"  - Measures: {len(modified_structure['measures'])}")
print(f"  - Columns: {len(modified_structure['columns'])}")
print(f"  - Relationships: {len(modified_structure['relationships'])}")

# cell 3
# Compare and generate diff
diff_result = compare_tmdl_models(original_structure, modified_structure)
print_diff_summary(diff_result)

# cell 4
# Export result
export_modified_objects(diff_result, "data/tmdl_changes.json")

# cell 5
# Inspect specific modified objects
for measure in diff_result["modified"]["measures"]:
    print(f"\n{'='*60}")
    print(f"MODIFIED MEASURE: {measure['name']}")
    print(f"{'='*60}")
    print("\nORIGINAL:")
    print(measure["original"]["expression"][:200])
    print("\nMODIFIED:")
    print(measure["modified"]["expression"][:200])