# Power BI Semantic Layer Generator > Notebook - Most DAX functions 

This notebook automates the creation of a comprehensive semantic layer in Databricks by extracting and transforming your Power BI dataset definitions. It serves as a bridge between Power BI's modeling capabilities and Databricks' scalable compute environment.

## Key Features
- **Automated View Generation**: Creates SQL views for all base tables from your Power BI dataset
- **DAX to PySpark Translation**: Converts Power BI DAX measures into equivalent PySpark code
  - Aggregate Functions:
    - Basic (SUM, COUNT, AVERAGE, MIN, MAX)
    - Statistical (VAR.P, STDEV.P)
    - Distinct aggregations (DISTINCTCOUNT)
  - Time Intelligence Functions:
    - Period comparisons (DATEADD, SAMEPERIODLASTYEAR)
    - Year-to-date (DATESYTD, DATESMTD, DATESQTD)
    - Period navigation (PREVIOUSMONTH, PREVIOUSYEAR)
  - Text Functions:
    - String operations (CONCATENATE, UPPER, LOWER)
    - Text manipulation (LEN, TRIM, SUBSTITUTE)
  - Logical Functions:
    - Conditional (IF, SWITCH)
    - Boolean operations (AND, OR, NOT)
  - Mathematical Functions:
    - Basic math (ABS, ROUND, FLOOR, CEILING)
    - Advanced calculations (POWER, SQRT)
  - Filter Functions:
    - Context modification (CALCULATE, FILTER)
    - Table operations (ALL, ALLEXCEPT)
  - Window Functions:
    - Rankings (RANKX, TOPN)
    - First/Last values (FIRSTNONBLANK, LASTNONBLANK)
- **Relationship Handling**: 
  - Automated join view generation
  - Preservation of Power BI relationship cardinality
  - Support for multiple relationship paths
- **Delta Lake Integration**: 
  - Optional materialization of views as Delta tables
  - Performance optimization through materialized views
  - Automatic refresh handling
- **Documentation Generation**:
  - Measure definitions and translations
  - Relationship documentation
  - Data lineage tracking

## Prerequisites
1. **Databricks Setup**:
   - Workspace with appropriate permissions
   - Ability to create tables and views
   - PySpark environment configuration
2. **Power BI Access**:
   - Workspace admin privileges
   - Dataset access permissions
   - XMLA endpoint access
3. **Azure AD Application**:
   - Registered app with Power BI API permissions
   - Client ID and secret
   - API scope configuration

## Configuration Guide
1. **Azure AD Setup**:
   - Register app in Azure Active Directory
   - Grant Power BI API permissions
   - Generate client secret
2. **Power BI Setup**:
   - Note workspace ID and dataset ID
   - Ensure dataset is accessible
   - Configure XMLA endpoints
3. **Parameters**:
   - Fill in credentials below
   - Set materialization preference
   - Configure refresh schedules

In [None]:
# Configuration Parameters
client_id = "YOUR_CLIENT_ID"  # Azure AD application ID
client_secret = "YOUR_CLIENT_SECRET"  # Azure AD client secret
tenant_id = "YOUR_TENANT_ID"  # Azure AD tenant ID
workspace_id = "YOUR_WORKSPACE_ID"  # Power BI workspace ID
dataset_id = "YOUR_DATASET_ID"  # Power BI dataset ID
semantic_model_name = "Sales"  # Name for generated semantic model
materialize = True  # Set to True to create Delta tables

# Install required dependencies
%pip install msal requests nbformat

In [None]:
# Authentication Setup
import msal
import requests
import json

def get_access_token():
    """Get Power BI API access token using MSAL"""
    app = msal.ConfidentialClientApplication(
        client_id,
        authority=f"https://login.microsoftonline.com/{tenant_id}",
        client_credential=client_secret
    )
    
    result = app.acquire_token_for_client(
        scopes=["https://analysis.windows.net/powerbi/api/.default"]
    )
    
    if "access_token" not in result:
        raise Exception(f"Failed to get token: {result.get('error_description')}")
        
    return result["access_token"]

In [None]:
# Power BI API Functions
def get_dataset_info():
    """Retrieve dataset definition from Power BI API"""
    token = get_access_token()
    headers = {"Authorization": f"Bearer {token}"}
    
    url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets/{dataset_id}"
    response = requests.get(url, headers=headers)
    
    if response.status_code != 200:
        raise Exception(f"API call failed: {response.text}")
        
    return response.json()

def get_dataset_tables():
    """Extract tables from dataset"""
    dataset = get_dataset_info()
    return dataset.get("tables", [])

def get_dataset_relationships():
    """Extract relationships from dataset"""
    dataset = get_dataset_info()
    return dataset.get("relationships", [])

def get_dataset_measures():
    """Extract DAX measures from dataset"""
    dataset = get_dataset_info()
    measures = []
    for table in dataset.get("tables", []):
        measures.extend(table.get("measures", []))
    return measures

In [None]:
# Semantic Layer Generation
import nbformat as nbf
from pyspark.sql import functions as F

def generate_view_definitions(tables):
    """Generate SQL view definitions for tables"""
    view_cells = []
    for table in tables:
        view_name = table["name"]
        view_cells.extend([
            nbf.v4.new_markdown_cell(f"## Base View: {view_name}"),
            nbf.v4.new_code_cell(
                f"CREATE OR REPLACE VIEW semantic.{view_name} AS\n"
                f"SELECT * FROM raw.{view_name};"
            )
        ])
        if materialize:
            view_cells.append(nbf.v4.new_code_cell(
                f"CREATE OR REPLACE TABLE semantic.{view_name}_materialized AS\n"
                f"SELECT * FROM semantic.{view_name};"
            ))
    return view_cells

def generate_relationships(relationships):
    """Generate joined views based on relationships"""
    join_cells = []
    for rel in relationships:
        from_table = rel["fromTable"]
        to_table = rel["toTable"]
        from_column = rel["fromColumn"]
        to_column = rel["toColumn"]
        
        view_name = f"vw_{from_table}_{to_table}"
        join_cells.extend([
            nbf.v4.new_markdown_cell(f"## Joined View: {view_name}"),
            nbf.v4.new_code_cell(
                f"CREATE OR REPLACE VIEW semantic.{view_name} AS\n"
                f"SELECT f.*, d.*\n"
                f"FROM semantic.{from_table} f\n"
                f"LEFT JOIN semantic.{to_table} d ON f.{from_column} = d.{to_column};"
            )
        ])
    return join_cells

def translate_dax_to_pyspark(measure):
    """Translate DAX measures to PySpark code"""
    name = measure["name"]
    expression = measure["expression"]
    
    # Aggregate Functions
    agg_functions = {
        'SUM': lambda col: f"df.agg(F.sum('{col}')).alias('{name}')",
        'COUNT': lambda col: f"df.agg(F.count('{col}')).alias('{name}')",
        'DISTINCTCOUNT': lambda col: f"df.agg(F.countDistinct('{col}')).alias('{name}')",
        'MIN': lambda col: f"df.agg(F.min('{col}')).alias('{name}')",
        'MAX': lambda col: f"df.agg(F.max('{col}')).alias('{name}')",
        'AVERAGE': lambda col: f"df.agg(F.avg('{col}')).alias('{name}')",
        'VAR.P': lambda col: f"df.agg(F.var_pop('{col}')).alias('{name}')",
        'STDEV.P': lambda col: f"df.agg(F.stddev_pop('{col}')).alias('{name}')"
    }

    # Time Intelligence Functions
    time_functions = {
        'DATEADD': lambda date_col, num, interval: f"df.withColumn('{name}', F.add_months(F.col('{date_col}'), {num}))",
        'DATESYTD': lambda date_col: f"df.filter(F.col('{date_col}') <= F.last_day(F.current_date(), 'year'))",
        'DATESMTD': lambda date_col: f"df.filter(F.col('{date_col}') <= F.last_day(F.current_date(), 'month'))",
        'DATESQTD': lambda date_col: f"df.filter(F.col('{date_col}') <= F.last_day(F.current_date(), 'quarter'))",
        'SAMEPERIODLASTYEAR': lambda date_col: f"df.withColumn('{name}', F.add_months(F.col('{date_col}'), -12))",
        'PREVIOUSMONTH': lambda date_col: f"df.withColumn('{name}', F.add_months(F.col('{date_col}'), -1))",
        'PREVIOUSYEAR': lambda date_col: f"df.withColumn('{name}', F.add_months(F.col('{date_col}'), -12))"
    }

    # Text Functions
    text_functions = {
        'CONCATENATE': lambda *cols: f"df.withColumn('{name}', F.concat({', '.join([f'F.col(\"{c}\")' for c in cols])})",
        'UPPER': lambda col: f"df.withColumn('{name}', F.upper(F.col('{col}')))",
        'LOWER': lambda col: f"df.withColumn('{name}', F.lower(F.col('{col}')))",
        'LEN': lambda col: f"df.withColumn('{name}', F.length(F.col('{col}')))",
        'TRIM': lambda col: f"df.withColumn('{name}', F.trim(F.col('{col}')))",
        'SUBSTITUTE': lambda text, old, new: f"df.withColumn('{name}', F.regexp_replace(F.col('{text}'), '{old}', '{new}'))"
    }

    # Logical Functions
    logical_functions = {
        'IF': lambda cond, true_val, false_val: f"df.withColumn('{name}', F.when({cond}, {true_val}).otherwise({false_val}))",
        'SWITCH': lambda expr, *cases: f"df.withColumn('{name}', F.case({', '.join([f'F.when({c[0]}, {c[1]})' for c in zip(cases[::2], cases[1::2])])}))",
        'AND': lambda *conds: f"df.filter({' & '.join([f'({c})' for c in conds])})",
        'OR': lambda *conds: f"df.filter({' | '.join([f'({c})' for c in conds])})",
        'NOT': lambda cond: f"df.filter(~({cond}))"
    }

    # Mathematical Functions
    math_functions = {
        'ABS': lambda col: f"df.withColumn('{name}', F.abs(F.col('{col}')))",
        'ROUND': lambda col, decimals: f"df.withColumn('{name}', F.round(F.col('{col}'), {decimals}))",
        'FLOOR': lambda col: f"df.withColumn('{name}', F.floor(F.col('{col}')))",
        'CEILING': lambda col: f"df.withColumn('{name}', F.ceil(F.col('{col}')))",
        'POWER': lambda col, power: f"df.withColumn('{name}', F.pow(F.col('{col}'), {power}))",
        'SQRT': lambda col: f"df.withColumn('{name}', F.sqrt(F.col('{col}')))"
    }

    # Filter and Calculate Functions
    filter_functions = {
        'CALCULATE': lambda expr, *filters: f"""
            df.filter({' & '.join([f'({f})' for f in filters])})
            .agg({expr}).alias('{name}')
        """,
        'FILTER': lambda table, condition: f"df.filter({condition})",
        'ALL': lambda table: f"df.select('*')",
        'ALLEXCEPT': lambda table, *cols: f"df.select({', '.join([f'F.col(\"{c}\")' for c in cols])})"
    }

    # Window Functions
    window_functions = {
        'RANKX': lambda table, expr: f"df.withColumn('{name}', F.rank().over(Window.orderBy(F.desc('{expr}'))))",
        'TOPN': lambda n, table, orderBy: f"df.orderBy(F.desc('{orderBy}')).limit({n})",
        'FIRSTNONBLANK': lambda col, expr: f"df.filter(F.col('{col}').isNotNull()).first()",
        'LASTNONBLANK': lambda col, expr: f"df.filter(F.col('{col}').isNotNull()).orderBy(F.desc('{col}')).first()"
    }

    # Parse expression and identify function
    import re
    func_match = re.match(r'(\w+)\((.*)\)', expression)
    if not func_match:
        return "# Could not parse DAX expression"

    func_name = func_match.group(1)
    args = [arg.strip() for arg in func_match.group(2).split(',')]

    # Find and execute appropriate translation
    all_functions = {
        **agg_functions,
        **time_functions,
        **text_functions,
        **logical_functions,
        **math_functions,
        **filter_functions,
        **window_functions
    }

    if func_name in all_functions:
        try:
            return all_functions[func_name](*args)
        except Exception as e:
            return f"# Error translating {func_name}: {str(e)}"
    
    return f"# Translation not implemented for function: {func_name}"

def generate_measures(measures):
    """Generate PySpark code for DAX measures"""
    measure_cells = []
    for measure in measures:
        measure_cells.extend([
            nbf.v4.new_markdown_cell(
                f"### Measure: {measure['name']}\n"
                f"**DAX:** `{measure['expression']}`"
            ),
            nbf.v4.new_code_cell(
                "from pyspark.sql import functions as F\n"
                "df = spark.table('semantic.FactSales')\n"
                f"{translate_dax_to_pyspark(measure)}"
            )
        ])
    return measure_cells

def generate_semantic_notebook():
    """Generate complete semantic layer notebook"""
    nb = nbf.v4.new_notebook()
    
    # Add documentation
    nb.cells.append(nbf.v4.new_markdown_cell(
        f"# Generated Semantic Layer: {semantic_model_name}\n\n"
        "This notebook contains the generated semantic layer based on your Power BI dataset."
    ))
    
    # Generate components
    tables = get_dataset_tables()
    relationships = get_dataset_relationships()
    measures = get_dataset_measures()
    
    nb.cells.extend(generate_view_definitions(tables))
    nb.cells.extend(generate_relationships(relationships))
    nb.cells.extend(generate_measures(measures))
    
    # Add materialization status
    if materialize:
        nb.cells.extend([
            nbf.v4.new_markdown_cell("### ✅ Materialization Active"),
            nbf.v4.new_code_cell(
                "print('Delta tables have been created for improved performance.')"
            )
        ])
    
    # Save notebook
    output_path = f"semantic_{semantic_model_name.lower()}.ipynb"
    nbf.write(nb, output_path)
    print(f"Generated semantic layer notebook: {output_path}")

In [None]:
# Execute generation
generate_semantic_notebook()