# Microsoft Dataflows to Databricks Pipeline Migration

This comprehensive notebook automates the migration of Microsoft Dataflows into a Databricks pipeline. It retrieves the full dataflow definition from the source API, translates each transformation step from the M language (Power Query) into equivalent PySpark code, and then automatically generates a new Databricks notebook that implements the complete pipeline.

## Key Features

- **Full Dataflow Extraction**: Retrieves all transformation steps (extract, filter, join, aggregate, pivot, etc.)
- **Comprehensive Translator**: Converts a wide array of common Dataflow functions to PySpark:
  - **Extract & Load**: Reading source files in various formats
  - **Filtering & Conditional Logic**
  - **Renaming & Type Conversion**
  - **Joins, Merge, and Append**
  - **Aggregation, Grouping and Sorting**
  - **Pivot and Unpivot operations**
  - **Date/Time transformations**
  - **Text Manipulations and Replacements**
  - **Column Splitting and Custom Scripts**
- **Delta Lake Integration**: Optionally materializes intermediate results as Delta tables
- **Automated Notebook Generation**: Builds a complete pipeline notebook with detailed markdown documentation for each step

## Prerequisites

1. **Databricks Environment** with PySpark configured
2. **API Access** to your Microsoft Dataflow definitions
3. **Python Dependencies**: `msal`, `requests`, `nbformat`


In [None]:
# Configuration Parameters
client_id = "YOUR_CLIENT_ID"          # Azure AD Application (client) ID
client_secret = "YOUR_CLIENT_SECRET"    # Azure AD Client Secret
tenant_id = "YOUR_TENANT_ID"            # Azure AD Tenant ID
workspace_id = "YOUR_WORKSPACE_ID"      # Power BI / Power Platform Workspace ID
dataflow_id = "YOUR_DATAFLOW_ID"        # Dataflow ID to migrate
dataflow_name = "SalesDataFlow"         # Name for the generated Databricks pipeline notebook
materialize = True                        # Set to True to enable Delta table materialization

# Uncomment if running interactively to install dependencies
%pip install msal requests nbformat

In [None]:
# Authentication Setup
import msal
import requests
import json

def get_access_token():
    """Obtain an access token using MSAL for the Power BI / Power Platform API"""
    app = msal.ConfidentialClientApplication(
        client_id,
        authority=f"https://login.microsoftonline.com/{tenant_id}",
        client_credential=client_secret
    )
    
    result = app.acquire_token_for_client(scopes=["https://analysis.windows.net/powerbi/api/.default"])
    if "access_token" not in result:
        raise Exception(f"Failed to get token: {result.get('error_description')}")
    return result["access_token"]

print('Authentication module loaded.')

In [None]:
# Dataflow API Functions
def get_dataflow_info():
    """Retrieve the complete Dataflow definition from the API"""
    token = get_access_token()
    headers = {"Authorization": f"Bearer {token}"}
    
    # Adjust URL if your API endpoint is different
    url = f"https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/dataflows/{dataflow_id}"
    response = requests.get(url, headers=headers)
    if response.status_code != 200:
        raise Exception(f"API call failed: {response.text}")
    return response.json()

def get_dataflow_steps():
    """Extract transformation steps from the Dataflow definition."""
    dataflow = get_dataflow_info()
    # Adjust this extraction logic according to the dataflow JSON schema
    steps = dataflow.get("steps", [])
    if not steps:
        print("Warning: No transformation steps found in the dataflow definition.")
    return steps

print('Dataflow API functions are ready.')

In [None]:
# Comprehensive Transformation Translation Functions
import re
from pyspark.sql import functions as F

def translate_mquery_to_pyspark(step):
    """
    Translate a single Dataflow transformation step (from M language) to its corresponding PySpark code.
    Supports a wide range of operations.
    """
    step_name = step.get("name", "UnnamedStep")
    trans_type = step.get("transformationType", "custom").lower()
    details = step.get("details", {})
    code = f"# Transformation Step: {step_name}\n"
    
    if trans_type == "extract":
        # Example: Extract data from CSV, JSON, or Parquet
        source_type = details.get("sourceType", "csv")
        path = details.get("path", "<source_path>")
        options = details.get("options", {})
        opts = ", ".join([f".option('{k}', '{v}')" for k, v in options.items()])
        code += f"df = spark.read.format('{source_type}'){opts}.load('{path}')\n"

    elif trans_type == "filter":
        condition = details.get("condition", "1=1")
        code += f"df = df.filter(\"{condition}\")\n"

    elif trans_type == "rename":
        old_name = details.get("oldName", "old")
        new_name = details.get("newName", "new")
        code += f"df = df.withColumnRenamed('{old_name}', '{new_name}')\n"

    elif trans_type == "select":
        # Select only specific columns
        cols = details.get("columns", [])
        code += f"df = df.select({', '.join([f'\"'+c+'\"' for c in cols])})\n"

    elif trans_type == "distinct":
        code += "df = df.distinct()\n"

    elif trans_type == "sort":
        # Order by one or more columns
        sort_cols = details.get("columns", [])
        ascending = details.get("ascending", True)
        code += f"df = df.orderBy({', '.join([f'\"'+c+'\"' for c in sort_cols])}, ascending={ascending})\n"

    elif trans_type == "aggregate":
        group_by = details.get("groupBy", [])
        aggregations = details.get("aggregations", {})
        code += "from pyspark.sql import functions as F\n"
        gb_cols = ", ".join([f"F.col('{col}')" for col in group_by])
        agg_expr = ", ".join([f"F.{func.lower()}(F.col('{col}')).alias('{alias}')" for col, (func, alias) in aggregations.items()])
        code += f"df = df.groupBy({gb_cols}).agg({agg_expr})\n"

    elif trans_type == "join":
        # Join two tables based on a key
        left_table = details.get("leftTable", "left")
        right_table = details.get("rightTable", "right")
        left_key = details.get("leftKey", "id")
        right_key = details.get("rightKey", "id")
        join_type = details.get("joinType", "inner")
        code += (
            f"left_df = spark.table('{left_table}')\n"
            f"right_df = spark.table('{right_table}')\n"
            f"df = left_df.join(right_df, left_df['{left_key}'] == right_df['{right_key}'], '{join_type}')\n"
        )

    elif trans_type == "pivot":
        # Pivot transformation
        group_by = details.get("groupBy", [])
        pivot_column = details.get("pivotColumn", "")
        value_column = details.get("valueColumn", "")
        agg_func = details.get("aggFunc", "sum")
        code += (
            f"df = df.groupBy({', '.join([f'\"'+c+'\"' for c in group_by])}).pivot('{pivot_column}').agg(F.{agg_func}(F.col('{value_column}')))\n"
        )

    elif trans_type == "unpivot":
        # Unpivot is not directly available in PySpark; use stack() as a workaround
        static_cols = details.get("staticColumns", [])
        pivot_cols = details.get("pivotColumns", [])
        num_pivots = len(pivot_cols)
        expr = ", ".join([f"'{c}', {c}" for c in pivot_cols])
        code += (
            f"df = df.select({', '.join(static_cols)}, F.expr('stack({num_pivots}, {expr}) as (pivot_column, pivot_value)'))\n"
        )

    elif trans_type == "date_transform":
        # Example: Add or subtract days, extract date parts
        date_col = details.get("dateColumn", "date")
        operation = details.get("operation", "add")
        days = details.get("days", 0)
        if operation == "add":
            code += f"df = df.withColumn('{date_col}_plus', F.date_add(F.col('{date_col}'), {days}))\n"
        elif operation == "subtract":
            code += f"df = df.withColumn('{date_col}_minus', F.date_sub(F.col('{date_col}'), {days}))\n"
        else:
            code += "# Unknown date operation\n"

    elif trans_type == "type_conversion":
        # Convert a column from one type to another
        col = details.get("column", "col")
        target_type = details.get("targetType", "string")
        code += f"df = df.withColumn('{col}', F.col('{col}').cast('{target_type}'))\n"

    elif trans_type == "merge" or trans_type == "append":
        # For merging (union) two datasets
        table1 = details.get("table1", "table1")
        table2 = details.get("table2", "table2")
        code += (
            f"df1 = spark.table('{table1}')\n"
            f"df2 = spark.table('{table2}')\n"
            f"df = df1.unionByName(df2)\n"
        )

    elif trans_type == "replace":
        # Replace values in a given column
        col = details.get("column", "col")
        old_value = details.get("oldValue", "")
        new_value = details.get("newValue", "")
        code += f"df = df.replace({old_value}, {new_value}, subset=['{col}'])\n"

    elif trans_type == "split_column":
        # Split a string column by a delimiter
        col = details.get("column", "col")
        delimiter = details.get("delimiter", ",")
        new_cols = details.get("newColumns", [])
        code += f"df = df.withColumn('{col}_split', F.split(F.col('{col}'), '{delimiter}'))\n"
        if new_cols:
            for idx, new_col in enumerate(new_cols):
                code += f"df = df.withColumn('{new_col}', F.col('{col}_split').getItem({idx}))\n"

    elif trans_type == "conditional":
        # Conditional transformation using when/otherwise
        col = details.get("column", "col")
        condition = details.get("condition", "")
        true_val = details.get("trueValue", "")
        false_val = details.get("falseValue", "")
        code += f"df = df.withColumn('{col}_cond', F.when({condition}, {true_val}).otherwise({false_val}))\n"

    elif trans_type == "custom":
        # Fallback for custom logic
        script = details.get("script", "# Custom transformation logic not provided")
        code += script + "\n"

    else:
        code += f"# No translation logic implemented for transformation type: {trans_type}\n"

    return code

print('Comprehensive translator functions loaded.')

In [None]:
# Notebook Generation: Create a comprehensive Databricks Pipeline Notebook
import nbformat as nbf

def generate_pipeline_notebook():
    """
    Generate a Databricks notebook that re-creates the entire Dataflow pipeline using translated PySpark code.
    """
    nb = nbf.v4.new_notebook()
    cells = []

    # Introductory Markdown Cell
    intro_md = (
        f"# Generated Databricks Pipeline: {dataflow_name}\n\n"
        "This notebook was autogenerated from your Microsoft Dataflow definition. Each cell below corresponds to a transformation step translated into PySpark code.\n\n"
        "Please review and adjust the generated code as necessary."
    )
    cells.append(nbf.v4.new_markdown_cell(intro_md))

    # Retrieve the transformation steps from the dataflow definition
    steps = get_dataflow_steps()
    if not steps:
        cells.append(nbf.v4.new_markdown_cell("**Warning:** No transformation steps found in the dataflow definition."))
    
    # Create a Markdown cell and a Code cell for each transformation step
    for idx, step in enumerate(steps, start=1):
        step_name = step.get("name", f"Step {idx}")
        desc = step.get("description", "")
        md_text = f"## Transformation {idx}: {step_name}\n{desc}"
        cells.append(nbf.v4.new_markdown_cell(md_text))
        
        code = translate_mquery_to_pyspark(step)
        cells.append(nbf.v4.new_code_cell(code))

    # Append Delta Lake materialization cell if enabled
    if materialize:
        mat_md = "### Delta Table Materialization Enabled\nThe following cell creates a Delta table based on the pipeline output."
        cells.append(nbf.v4.new_markdown_cell(mat_md))
        mat_code = (
            "# Sample Delta table creation\n"
            "df.write.format('delta').mode('overwrite').saveAsTable('semantic.materialized_output')\n"
            "print('Delta table created.')\n"
        )
        cells.append(nbf.v4.new_code_cell(mat_code))

    nb['cells'] = cells

    # Build the output filename and save the notebook
    output_filename = f"pipeline_{dataflow_name.replace(' ', '_').lower()}_databricks.ipynb"
    with open(output_filename, 'w', encoding='utf-8') as f:
        nbf.write(nb, f)

    print(f"Generated Databricks pipeline notebook: {output_filename}")
    return output_filename

print('Pipeline notebook generation functions ready.')

In [None]:
# Execute Pipeline Notebook Generation
generated_notebook = generate_pipeline_notebook()
print(f"Migration complete. The generated Databricks pipeline notebook is saved as: {generated_notebook}")