# Data Contract Generator

**Purpose:** Generate ODCS v3.0.2 aligned data contracts for Nova Framework transformations.

This notebook is a standalone tool for ad-hoc contract generation. It is **not** integrated into the main framework - use it independently when you need to create a new data contract for a transformation.

## How to Use

1. **Configure the settings** in the Configuration cell below
2. **Run all cells** to generate your contract
3. **Review the output** and make any manual adjustments
4. **Save the contract** to your desired location

## What This Generates

- Complete YAML data contract with all required sections
- Schema definition based on your output columns
- Transformation configuration linking to your Python/SQL code
- Basic data quality rules

---

## 1. Configuration

**Edit the values below to configure your data contract.**

In [None]:
# ════════════════════════════════════════════════════════════════════════════
# CONTRACT CONFIGURATION - Edit these values
# ════════════════════════════════════════════════════════════════════════════

# Contract Identity
CONTRACT_NAME = "my_transformation"  # Name of the contract (no spaces, use underscores)
CONTRACT_VERSION = "1.0.0"           # Semantic version

# Domain & Data Product
DOMAIN = "sales"                      # Business domain (e.g., sales, marketing, finance)
DATA_PRODUCT = "analytics"            # Data product name

# Team Information
TEAM = {
    "dataOwner": "data-owner@example.com",
    "seniorManager": "senior-manager@example.com",
    "dataSteward": "data-steward@example.com"
}

# Output Schema Configuration
OUTPUT_CATALOG = "gold"               # Target catalog (gold, silver)
OUTPUT_SCHEMA = "sales"               # Target schema
OUTPUT_TABLE = "my_transformation"    # Target table name
OUTPUT_FORMAT = "delta"               # Output format (delta, parquet)
SCHEMA_DESCRIPTION = "Description of what this data represents"

# Transformation Type: "python", "sql", or "scala"
TRANSFORMATION_TYPE = "python"

# Python Transformation Settings (if TRANSFORMATION_TYPE == "python")
TRANSFORMATION_MODULE = "my_transformation"      # Python module name (without .py)
TRANSFORMATION_CLASS = "MyTransformation"        # Class name (if class-based)
TRANSFORMATION_FUNCTION = None                   # Function name (if function-based, set CLASS to None)

# SQL Transformation (if TRANSFORMATION_TYPE == "sql")
# Set TRANSFORMATION_SQL to your SQL query or leave as None for Python/Scala
TRANSFORMATION_SQL = None

# Pipeline Configuration
PIPELINE_TYPE = "transformation"      # transformation, ingestion
WRITE_STRATEGY = "overwrite"          # overwrite, append, merge, scd2
SOFT_DELETE = False                   # Enable soft delete

# Transformation Config (passed to your transformation as **kwargs)
TRANSFORMATION_CONFIG = {
    "source_catalog": "bronze",
    # Add your custom config parameters here
    # "lookback_days": 30,
    # "min_amount": 10.0,
}

## 2. Define Output Schema

Define the columns that your transformation will produce.

In [None]:
# ════════════════════════════════════════════════════════════════════════════
# OUTPUT SCHEMA DEFINITION
# ════════════════════════════════════════════════════════════════════════════
# Define each column your transformation produces.
#
# Supported types:
#   string, int, integer, bigint, double, float, decimal(p,s),
#   date, timestamp, boolean, array<type>, map<key,value>, struct<...>
#
# Properties:
#   name         - Column name (required)
#   type         - Data type (required)
#   description  - Column description (recommended)
#   isPrimaryKey - True if part of natural key (optional)
#   isNullable   - True if nullable, False otherwise (default: True)
# ════════════════════════════════════════════════════════════════════════════

OUTPUT_COLUMNS = [
    {
        "name": "id",
        "type": "bigint",
        "description": "Unique identifier",
        "isPrimaryKey": True,
        "isNullable": False
    },
    {
        "name": "name",
        "type": "string",
        "description": "Name field",
        "isNullable": False
    },
    {
        "name": "amount",
        "type": "decimal(18,2)",
        "description": "Amount value",
        "isNullable": False
    },
    {
        "name": "created_date",
        "type": "date",
        "description": "Date of creation",
        "isNullable": True
    },
    # Add more columns as needed...
]

## 3. Define Data Quality Rules (Optional)

Add validation rules for your output data.

In [None]:
# ════════════════════════════════════════════════════════════════════════════
# DATA QUALITY RULES (Optional)
# ════════════════════════════════════════════════════════════════════════════
# Define validation rules for your output data.
#
# Supported rule types:
#   not_null       - Column must not be null
#   not_blank      - Column must not be empty string
#   unique         - Column values must be unique
#   min            - Minimum value (use 'value' parameter)
#   max            - Maximum value (use 'value' parameter)
#   between        - Value between range (use 'min' and 'max' parameters)
#   allowed_values - Column must be one of specified values (use 'values' list)
#   regex          - Column must match regex pattern (use 'pattern' parameter)
#   min_length     - Minimum string length (use 'value' parameter)
#   max_length     - Maximum string length (use 'value' parameter)
#
# Severity levels: error, warning, info
# ════════════════════════════════════════════════════════════════════════════

QUALITY_VALIDATION_RULES = [
    {
        "column": "id",
        "rule": "unique",
        "severity": "error"
    },
    {
        "column": "id",
        "rule": "not_null",
        "severity": "error"
    },
    {
        "column": "amount",
        "rule": "min",
        "value": 0,
        "severity": "error"
    },
    # Add more validation rules as needed...
]

# Cleansing rules (applied before validation)
QUALITY_CLEANSING_RULES = [
    # Example: Trim all string fields
    # {
    #     "type": "transformation",
    #     "rule": "trim_string_fields"
    # },
    # Example: Uppercase specific columns
    # {
    #     "type": "transformation",
    #     "rule": "uppercase",
    #     "columns": ["status"]
    # },
]

## 4. Contract Generator Functions

Run this cell to load the generator functions.

In [None]:
# ════════════════════════════════════════════════════════════════════════════
# CONTRACT GENERATOR - Core Functions
# ════════════════════════════════════════════════════════════════════════════

import yaml
from typing import Dict, List, Any, Optional
from datetime import datetime


def generate_contract(
    name: str,
    version: str,
    domain: str,
    data_product: str,
    team: Dict[str, str],
    schema_config: Dict[str, Any],
    columns: List[Dict[str, Any]],
    transformation_type: str,
    transformation_config: Dict[str, Any],
    pipeline_config: Dict[str, Any],
    quality_validation: List[Dict[str, Any]] = None,
    quality_cleansing: List[Dict[str, Any]] = None,
    transformation_module: str = None,
    transformation_class: str = None,
    transformation_function: str = None,
    transformation_sql: str = None,
) -> str:
    """
    Generate a complete ODCS v3.0.2 data contract YAML.
    
    Returns:
        YAML string of the complete data contract
    """
    
    # Build the contract structure
    contract = {
        "apiVersion": "v3.0.2",
        "kind": "DataContract",
        "name": name,
        "version": version,
        "domain": domain,
        "dataProduct": data_product,
        "team": team,
    }
    
    # Build schema section
    contract["schema"] = {
        "name": f"{schema_config['catalog']}_{schema_config['schema']}.{schema_config['table']}",
        "table": schema_config['table'],
        "format": schema_config['format'],
        "description": schema_config.get('description', ''),
        "properties": columns
    }
    
    # Build customProperties section
    custom_properties = {
        "pipelineType": pipeline_config.get('type', 'transformation'),
        "writeStrategy": pipeline_config.get('write_strategy', 'overwrite'),
        "softDelete": pipeline_config.get('soft_delete', False),
        "transformationType": transformation_type,
    }
    
    # Add transformation-specific config
    if transformation_type == "python":
        if transformation_module:
            custom_properties["transformationModule"] = transformation_module
        if transformation_class:
            custom_properties["transformationClass"] = transformation_class
        if transformation_function:
            custom_properties["transformationFunction"] = transformation_function
    elif transformation_type == "sql" and transformation_sql:
        custom_properties["transformationSql"] = transformation_sql
    elif transformation_type == "scala":
        if transformation_module:
            custom_properties["transformationModule"] = transformation_module
        if transformation_class:
            custom_properties["transformationClass"] = transformation_class
    
    # Add transformation config if provided
    if transformation_config:
        custom_properties["transformationConfig"] = transformation_config
    
    contract["customProperties"] = custom_properties
    
    # Build quality section
    quality_rules = []
    
    if quality_cleansing:
        quality_rules.extend(quality_cleansing)
    
    if quality_validation:
        quality_rules.extend(quality_validation)
    
    if quality_rules:
        contract["quality"] = {
            "validation": quality_validation or []
        }
    
    return contract


def contract_to_yaml(contract: Dict[str, Any], add_header: bool = True) -> str:
    """
    Convert contract dictionary to formatted YAML string.
    
    Args:
        contract: Contract dictionary
        add_header: Whether to add documentation header
        
    Returns:
        Formatted YAML string
    """
    
    # Custom YAML representer to handle multiline strings properly
    def str_representer(dumper, data):
        if '\n' in data:
            return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
        return dumper.represent_scalar('tag:yaml.org,2002:str', data)
    
    yaml.add_representer(str, str_representer)
    
    # Generate YAML
    yaml_content = yaml.dump(
        contract,
        default_flow_style=False,
        sort_keys=False,
        allow_unicode=True,
        width=120
    )
    
    if add_header:
        header = f"""# ════════════════════════════════════════════════════════════════════════════
# Data Contract: {contract.get('name', 'unnamed')}
# ════════════════════════════════════════════════════════════════════════════
# Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
# API Version: ODCS v3.0.2
#
# This contract was generated using the Contract Generator tool.
# Review and customize as needed before deploying.
# ════════════════════════════════════════════════════════════════════════════

"""
        return header + yaml_content
    
    return yaml_content


def save_contract(yaml_content: str, file_path: str) -> None:
    """
    Save contract YAML to file.
    
    Args:
        yaml_content: YAML string to save
        file_path: Path to save the file
    """
    with open(file_path, 'w') as f:
        f.write(yaml_content)
    print(f"Contract saved to: {file_path}")


print("Contract generator functions loaded.")

## 5. Generate Contract

Run this cell to generate your data contract based on the configuration above.

In [None]:
# ════════════════════════════════════════════════════════════════════════════
# GENERATE THE CONTRACT
# ════════════════════════════════════════════════════════════════════════════

# Build schema config
schema_config = {
    "catalog": OUTPUT_CATALOG,
    "schema": OUTPUT_SCHEMA,
    "table": OUTPUT_TABLE,
    "format": OUTPUT_FORMAT,
    "description": SCHEMA_DESCRIPTION
}

# Build pipeline config
pipeline_config = {
    "type": PIPELINE_TYPE,
    "write_strategy": WRITE_STRATEGY,
    "soft_delete": SOFT_DELETE
}

# Generate contract
contract = generate_contract(
    name=CONTRACT_NAME,
    version=CONTRACT_VERSION,
    domain=DOMAIN,
    data_product=DATA_PRODUCT,
    team=TEAM,
    schema_config=schema_config,
    columns=OUTPUT_COLUMNS,
    transformation_type=TRANSFORMATION_TYPE,
    transformation_config=TRANSFORMATION_CONFIG,
    pipeline_config=pipeline_config,
    quality_validation=QUALITY_VALIDATION_RULES,
    quality_cleansing=QUALITY_CLEANSING_RULES,
    transformation_module=TRANSFORMATION_MODULE,
    transformation_class=TRANSFORMATION_CLASS,
    transformation_function=TRANSFORMATION_FUNCTION,
    transformation_sql=TRANSFORMATION_SQL,
)

# Convert to YAML
contract_yaml = contract_to_yaml(contract)

# Display the generated contract
print("Generated Contract:")
print("=" * 80)
print(contract_yaml)

## 6. Save Contract (Optional)

Uncomment and run this cell to save the contract to a file.

In [None]:
# ════════════════════════════════════════════════════════════════════════════
# SAVE THE CONTRACT TO FILE
# ════════════════════════════════════════════════════════════════════════════

# Uncomment the lines below to save the contract

# Option 1: Save to samples folder
# save_contract(contract_yaml, f"../samples/transformation_examples/{CONTRACT_NAME}.yaml")

# Option 2: Save to Databricks Volumes (update path as needed)
# save_contract(contract_yaml, f"/Volumes/your_catalog/nova_framework/data_contracts/{CONTRACT_NAME}.yaml")

# Option 3: Save to current directory
# save_contract(contract_yaml, f"{CONTRACT_NAME}.yaml")

print("To save the contract, uncomment one of the save_contract() lines above.")

---

## Appendix: Extract Schema from Existing DataFrame (Databricks)

If you have an existing DataFrame, you can extract its schema to populate the OUTPUT_COLUMNS list.

In [None]:
# ════════════════════════════════════════════════════════════════════════════
# HELPER: Extract schema from DataFrame
# ════════════════════════════════════════════════════════════════════════════
# Run this in Databricks if you have an existing DataFrame and want to
# generate the OUTPUT_COLUMNS list automatically.
# ════════════════════════════════════════════════════════════════════════════

def extract_columns_from_dataframe(df) -> List[Dict[str, Any]]:
    """
    Extract column definitions from a PySpark DataFrame.
    
    Args:
        df: PySpark DataFrame
        
    Returns:
        List of column definitions for OUTPUT_COLUMNS
    """
    columns = []
    
    for field in df.schema.fields:
        col_def = {
            "name": field.name,
            "type": str(field.dataType).lower().replace("type", ""),
            "description": f"Column: {field.name}",  # Update with actual description
            "isNullable": field.nullable
        }
        columns.append(col_def)
    
    return columns


def print_columns_as_python(columns: List[Dict[str, Any]]) -> None:
    """
    Print column definitions as Python code for copy-paste.
    """
    print("OUTPUT_COLUMNS = [")
    for col in columns:
        print("    {")
        for key, value in col.items():
            if isinstance(value, str):
                print(f'        "{key}": "{value}",')
            else:
                print(f'        "{key}": {value},')
        print("    },")
    print("]")


# Example usage (uncomment in Databricks):
# df = spark.table("bronze.my_table")
# columns = extract_columns_from_dataframe(df)
# print_columns_as_python(columns)

print("DataFrame schema extraction functions loaded.")
print("To use: uncomment the example lines above in a Databricks notebook.")

---

## Appendix: Parse Existing Python Transformation

Extract metadata from an existing Python transformation file.

In [None]:
# ════════════════════════════════════════════════════════════════════════════
# HELPER: Parse Python transformation file
# ════════════════════════════════════════════════════════════════════════════

import ast
import inspect
from pathlib import Path


def parse_transformation_file(file_path: str) -> Dict[str, Any]:
    """
    Parse a Python transformation file and extract metadata.
    
    Args:
        file_path: Path to Python transformation file
        
    Returns:
        Dictionary with extracted metadata
    """
    with open(file_path, 'r') as f:
        source = f.read()
    
    tree = ast.parse(source)
    
    result = {
        "module_name": Path(file_path).stem,
        "classes": [],
        "functions": [],
        "docstring": ast.get_docstring(tree),
        "config_params": []
    }
    
    for node in ast.walk(tree):
        # Find classes
        if isinstance(node, ast.ClassDef):
            class_info = {
                "name": node.name,
                "docstring": ast.get_docstring(node),
                "methods": []
            }
            
            for item in node.body:
                if isinstance(item, ast.FunctionDef):
                    class_info["methods"].append(item.name)
            
            result["classes"].append(class_info)
        
        # Find top-level functions
        elif isinstance(node, ast.FunctionDef) and isinstance(node, ast.FunctionDef):
            if not any(isinstance(parent, ast.ClassDef) for parent in ast.walk(tree)):
                result["functions"].append({
                    "name": node.name,
                    "docstring": ast.get_docstring(node)
                })
        
        # Find kwargs.get() calls to extract config params
        if isinstance(node, ast.Call):
            if (isinstance(node.func, ast.Attribute) and 
                node.func.attr == 'get' and
                isinstance(node.func.value, ast.Name) and
                node.func.value.id == 'kwargs'):
                if node.args:
                    param_name = node.args[0]
                    if isinstance(param_name, ast.Constant):
                        param_info = {"name": param_name.value}
                        if len(node.args) > 1:
                            default = node.args[1]
                            if isinstance(default, ast.Constant):
                                param_info["default"] = default.value
                        result["config_params"].append(param_info)
    
    return result


def print_transformation_info(info: Dict[str, Any]) -> None:
    """
    Print extracted transformation information.
    """
    print(f"Module: {info['module_name']}")
    print(f"Docstring: {info['docstring'][:100] if info['docstring'] else 'None'}...")
    print()
    
    if info['classes']:
        print("Classes:")
        for cls in info['classes']:
            print(f"  - {cls['name']}")
            if cls['methods']:
                print(f"    Methods: {', '.join(cls['methods'])}")
    
    if info['config_params']:
        print("\nConfig Parameters (from kwargs.get()):")
        for param in info['config_params']:
            default = param.get('default', 'N/A')
            print(f"  - {param['name']}: default={default}")
        
        print("\nSuggested TRANSFORMATION_CONFIG:")
        print("TRANSFORMATION_CONFIG = {")
        for param in info['config_params']:
            default = param.get('default', 'None')
            if isinstance(default, str):
                print(f'    "{param["name"]}": "{default}",')
            else:
                print(f'    "{param["name"]}": {default},')
        print("}")


# Example usage:
# info = parse_transformation_file("../nova_framework/transformations/python/product_sales_summary_example.py")
# print_transformation_info(info)

print("Transformation file parser loaded.")
print("To use: uncomment the example lines above and provide your file path.")