# Analytic Contract Schema - Comprehensive Example

This notebook provides a **detailed, step-by-step demonstration** of the `contract_schema` library using the bundled `analytic_schema.json` contract.

## What This Notebook Demonstrates

1. **Contract Loading**: How to load and access contract metadata
2. **Input Parsing & Validation**: Multiple ways to parse inputs (dict, CLI args)
3. **Failure Modes**: Validation errors when required inputs are missing
4. **Field Mapping (data_map)**: Transforming non-SchemaONE data to compliant format
5. **Document Creation**: Setting all required and optional output fields
6. **Message Logging**: Using `add_message()` for structured execution logging
7. **Finding Generation**: Creating findings with all required fields
8. **Finalisation**: Auto-computed fields (hashes, timestamps, environment)
9. **Export**: Saving documents as JSON and Markdown reports

---

## 1. Setup and Imports

First, let's import the necessary libraries and configure logging.

In [None]:
from __future__ import annotations

# Standard library imports
from pathlib import Path
import uuid
import logging
import json

# Third-party imports
import pandas as pd
from sklearn.datasets import load_iris

# contract_schema imports
from contract_schema import Contract, SchemaError, to_markdown_card
from contract_schema import utils

# Configure logging
logging.basicConfig(
    level="INFO",
    format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("contract_schema.examples")

print("All imports successful!")

## 2. Contract Loading

The `Contract` class is the main entry point for working with schema contracts. Loading a contract:
- Validates the schema against the meta-schema
- Extracts metadata (title, description, version)
- Provides access to input and output schemas

### Available Bundled Contracts
- `analytic_schema.json` - For security analytics and data analysis pipelines
- `model_schema.json` - For ML model training manifests

In [None]:
# Load the analytic contract from the bundled schemas
contract = Contract.load("analytic_schema.json")

# Access contract metadata
print("=" * 60)
print("CONTRACT METADATA")
print("=" * 60)
print(f"Title:       {contract.title}")
print(f"Version:     {contract.version}")
print(f"Description: {contract.description}")

### 2.1 Exploring the Input Schema

The input schema defines all the parameters that can be passed to an analytic. Let's examine each field:

In [None]:
print("=" * 60)
print("INPUT SCHEMA FIELDS")
print("=" * 60)

input_fields = contract.input_schema.get("fields", {})
for field_name, field_spec in input_fields.items():
    required = "REQUIRED" if field_spec.get("required", False) else "optional"
    field_type = field_spec.get("type", ["any"])
    default = field_spec.get("default", "N/A")
    enum_vals = field_spec.get("enum", None)
    desc = field_spec.get("description", "No description")
    
    print(f"\n{field_name} ({required})")
    print(f"  Type: {field_type}")
    if default != "N/A":
        print(f"  Default: {default}")
    if enum_vals:
        print(f"  Enum: {enum_vals}")
    print(f"  Description: {desc[:80]}...")

### 2.2 Exploring the Output Schema

The output schema defines the structure of the result document. Let's see all the fields:

In [None]:
print("=" * 60)
print("OUTPUT SCHEMA FIELDS")
print("=" * 60)

output_fields = contract.output_schema.get("fields", {})
required_count = sum(1 for f in output_fields.values() if f.get("required", False))
optional_count = len(output_fields) - required_count

print(f"Total fields: {len(output_fields)}")
print(f"Required: {required_count}, Optional: {optional_count}")
print()

# List required fields
print("REQUIRED OUTPUT FIELDS:")
for field_name, field_spec in output_fields.items():
    if field_spec.get("required", False):
        print(f"  - {field_name}")

print()
print("OPTIONAL OUTPUT FIELDS:")
for field_name, field_spec in output_fields.items():
    if not field_spec.get("required", False):
        print(f"  - {field_name}")

## 3. Failure Modes - Validation Errors

This section demonstrates how the module properly throws errors when validation fails. Understanding these error cases is crucial for building robust analytics.

### 3.1 Missing Required Input Fields

The analytic schema requires the following input fields:
- `start_dtg` (required)
- `end_dtg` (required)
- `data_source_type` (required)
- `data_source` (required)

Let's see what happens when we try to validate without these:

In [None]:
# FAILURE MODE 1: Missing all required fields
print("=" * 60)
print("FAILURE MODE 1: Missing all required fields")
print("=" * 60)

try:
    # Attempt to validate an empty input
    inputs = contract.parse_and_validate_input({})
    print("ERROR: This should not print - validation should fail!")
except SchemaError as e:
    print(f"SchemaError raised as expected!")
    print(f"  Error message: {e}")

In [None]:
# FAILURE MODE 2: Missing some required fields (partial input)
print("=" * 60)
print("FAILURE MODE 2: Missing some required fields")
print("=" * 60)

try:
    # Only provide start_dtg, missing end_dtg, data_source_type, data_source
    inputs = contract.parse_and_validate_input({
        "start_dtg": utils._now_iso(),
    })
    print("ERROR: This should not print - validation should fail!")
except SchemaError as e:
    print(f"SchemaError raised as expected!")
    print(f"  Error message: {e}")

In [None]:
# FAILURE MODE 3: Invalid enum value for data_source_type
print("=" * 60)
print("FAILURE MODE 3: Invalid enum value")
print("=" * 60)

now_iso = utils._now_iso()

try:
    # data_source_type must be one of: "file", "IONIC", "api"
    inputs = contract.parse_and_validate_input({
        "start_dtg": now_iso,
        "end_dtg": now_iso,
        "data_source_type": "invalid_type",  # This is not a valid enum value!
        "data_source": "/path/to/data.csv",
    })
    print("ERROR: This should not print - validation should fail!")
except SchemaError as e:
    print(f"SchemaError raised as expected!")
    print(f"  Error message: {e}")

In [None]:
# FAILURE MODE 4: Invalid verbosity level
print("=" * 60)
print("FAILURE MODE 4: Invalid verbosity enum value")
print("=" * 60)

try:
    # verbosity must be one of: "DEBUG", "INFO", "WARN", "ERROR", "FATAL"
    inputs = contract.parse_and_validate_input({
        "start_dtg": now_iso,
        "end_dtg": now_iso,
        "data_source_type": "file",
        "data_source": "/path/to/data.csv",
        "verbosity": "VERBOSE",  # This is not valid!
    })
    print("ERROR: This should not print - validation should fail!")
except SchemaError as e:
    print(f"SchemaError raised as expected!")
    print(f"  Error message: {e}")

In [None]:
# FAILURE MODE 5: Invalid date-time format
print("=" * 60)
print("FAILURE MODE 5: Invalid date-time format")
print("=" * 60)

try:
    # start_dtg and end_dtg must be valid ISO 8601 date-time strings
    inputs = contract.parse_and_validate_input({
        "start_dtg": "not-a-valid-datetime",  # Invalid format!
        "end_dtg": now_iso,
        "data_source_type": "file",
        "data_source": "/path/to/data.csv",
    })
    print("ERROR: This should not print - validation should fail!")
except SchemaError as e:
    print(f"SchemaError raised as expected!")
    print(f"  Error message: {e}")

## 4. Successful Input Validation

Now let's see how to properly construct and validate inputs with ALL fields (required + optional).

### 4.1 Input Fields Reference

| Field | Required | Type | Description |
|-------|----------|------|-------------|
| `start_dtg` | Yes | string (date-time) | Inclusive UTC timestamp marking data window start |
| `end_dtg` | Yes | string (date-time) | Exclusive UTC timestamp marking data window end |
| `data_source_type` | Yes | string (enum) | Transport mechanism: "file", "IONIC", "api" |
| `data_source` | Yes | string | Path, identifier, or URL for data |
| `log_path` | No | string | Where to write logs (default: "stdout") |
| `output` | No | string | Destination for findings (default: "stdout") |
| `analytic_parameters` | No | object/string | Analytic-specific tuning knobs (default: {}) |
| `data_map` | No | object/string | Field mapping for non-SchemaONE data (default: {}) |
| `verbosity` | No | string (enum) | Log level: DEBUG/INFO/WARN/ERROR/FATAL (default: "INFO") |

In [None]:
# Prepare timestamps
now_iso = utils._now_iso()

# Define the complete input with ALL fields
complete_inputs = {
    # --- Required fields ---
    "start_dtg": now_iso,          # Inclusive UTC timestamp for data window start
    "end_dtg": now_iso,            # Exclusive UTC timestamp for data window end
    "data_source_type": "file",    # Transport mechanism: "file", "IONIC", or "api"
    "data_source": "/data/iris_dataset.csv",  # Path to the data file

    # --- Optional fields (explicitly set for demonstration) ---
    "log_path": "stdout",          # Default: "stdout"
    "output": "./output/results",  # Where to write findings
    "analytic_parameters": {       # Custom tuning parameters
        "min_samples": 10,
        "threshold": 0.95,
        "include_summary": True,
    },
    "data_map": {},                # Empty for now, will demo later
    "verbosity": "INFO",           # Log level
}

# Parse and validate
inputs = contract.parse_and_validate_input(complete_inputs)

print("=" * 60)
print("INPUT VALIDATION SUCCESSFUL")
print("=" * 60)
print(f"Validated {len(inputs)} fields:")
for key, value in inputs.items():
    print(f"  {key}: {value}")

## 5. Field Mapping (data_map) Feature

The `data_map` field enables analytics to work with non-SchemaONE data sources by defining mappings from vendor-specific field names to standardized SchemaONE equivalents.

### Why Field Mapping?
- Ingest data from legacy systems with proprietary field names
- Work with third-party APIs that use different naming conventions
- Normalize data from multiple vendors into a common schema

### SchemaONE Common Fields
| Field | Description |
|-------|-------------|
| `src_ip` | Source IP address |
| `dst_ip` | Destination IP address |
| `src_port` | Source port number |
| `dst_port` | Destination port number |
| `timestamp` | Event timestamp (ISO 8601) |
| `protocol` | Network protocol |
| `bytes_in` | Bytes received |
| `bytes_out` | Bytes sent |

In [None]:
# Simulate a Zeek conn.log data source with vendor-specific field names
zeek_sample_data = pd.DataFrame({
    # Zeek uses different field names than SchemaONE
    "ts": ["2025-01-15T10:30:00Z", "2025-01-15T10:30:01Z"],  # Zeek timestamp
    "id.orig_h": ["192.168.1.100", "10.0.0.50"],             # Zeek source IP
    "id.resp_h": ["8.8.8.8", "1.1.1.1"],                     # Zeek dest IP
    "id.orig_p": [54321, 12345],                             # Zeek source port
    "id.resp_p": [443, 80],                                  # Zeek dest port
    "proto": ["tcp", "tcp"],                                 # Protocol
    "service": ["ssl", "http"],                              # Service type
    "orig_bytes": [1500, 2048],                              # Bytes out
    "resp_bytes": [3200, 4096],                              # Bytes in
})

print("=" * 60)
print("ORIGINAL ZEEK DATA (Vendor-Specific Field Names)")
print("=" * 60)
print(zeek_sample_data.to_string(index=False))

In [None]:
# Define the field mapping from Zeek format to SchemaONE
zeek_to_schemaone_map = {
    "ts": "timestamp",
    "id.orig_h": "src_ip",
    "id.resp_h": "dst_ip",
    "id.orig_p": "src_port",
    "id.resp_p": "dst_port",
    "proto": "protocol",
    "service": "service",
    "orig_bytes": "bytes_out",
    "resp_bytes": "bytes_in",
}

print("=" * 60)
print("FIELD MAPPING: Zeek -> SchemaONE")
print("=" * 60)
for zeek_field, schema_field in zeek_to_schemaone_map.items():
    print(f"  {zeek_field:15} -> {schema_field}")

In [None]:
def apply_field_mapping(data: pd.DataFrame, field_map: dict) -> pd.DataFrame:
    """
    Apply a field mapping to transform vendor-specific column names to SchemaONE.
    
    Parameters
    ----------
    data : pd.DataFrame
        Input DataFrame with vendor-specific column names
    field_map : dict
        Mapping from vendor field names to SchemaONE field names
        
    Returns
    -------
    pd.DataFrame
        DataFrame with columns renamed to SchemaONE equivalents
    """
    rename_map = {
        old_name: new_name
        for old_name, new_name in field_map.items()
        if old_name in data.columns
    }
    return data.rename(columns=rename_map)


# Apply the mapping
schemaone_data = apply_field_mapping(zeek_sample_data, zeek_to_schemaone_map)

print("=" * 60)
print("TRANSFORMED DATA (SchemaONE Field Names)")
print("=" * 60)
print(schemaone_data.to_string(index=False))

print()
print("Column transformation successful!")
print(f"  Original columns: {list(zeek_sample_data.columns)}")
print(f"  SchemaONE columns: {list(schemaone_data.columns)}")

In [None]:
# Now let's validate inputs WITH the data_map field
inputs_with_mapping = contract.parse_and_validate_input({
    "start_dtg": now_iso,
    "end_dtg": now_iso,
    "data_source_type": "file",
    "data_source": "zeek_conn.log",
    "data_map": zeek_to_schemaone_map,  # Store the mapping for audit purposes
    "verbosity": "INFO",
})

print("=" * 60)
print("INPUT VALIDATION WITH data_map")
print("=" * 60)
print(f"data_map stored: {len(inputs_with_mapping['data_map'])} field mappings")
print(json.dumps(inputs_with_mapping['data_map'], indent=2))

## 6. Document Creation

The `Document` class is a dict subclass that:
- Tracks the output schema for validation
- Auto-populates `initialization_dtg` on creation
- Provides `add_message()` for structured logging
- Computes hashes and environment info on `finalise()`

### Output Fields Reference

The analytic output schema has many required and optional fields. Let's create a document that demonstrates ALL of them.

In [None]:
# Load Iris dataset for demonstration
iris = load_iris(as_frame=True)
df = iris.frame

print("=" * 60)
print("DATASET: Iris (for demonstration)")
print("=" * 60)
print(f"Shape: {df.shape}")
print(f"Features: {iris.feature_names}")
print(f"Target classes: {list(iris.target_names)}")
print()
print(df.head())

### 6.1 Generating Findings

Each finding represents a detection or observation from the analytic. ALL fields are required:

| Field | Type | Description |
|-------|------|-------------|
| `finding_id` | string | Unique identifier (UUID v4 recommended) |
| `title` | string | Concise summary of detection |
| `description` | string | Detailed explanation |
| `event_dtg` | string (date-time) | UTC timestamp of the event |
| `severity` | string | Impact level: low/medium/high/critical |
| `confidence` | string | Probability the finding is valid |
| `observables` | list[string] | Artifacts (IPs, hashes, usernames) |
| `mitre_attack_tactics` | list[string] | MITRE ATT&CK tactics |
| `mitre_attack_techniques` | list[string] | MITRE ATT&CK techniques |
| `recommended_actions` | string | Response guidance |
| `recommended_pivots` | string | Suggested data sources for context |
| `classification` | string | Data handling classification (U, CUI, etc.) |

In [None]:
# Analyze the dataset
class_counts = df["target"].value_counts().to_dict()
feature_stats = df.describe()

# Generate multiple findings to demonstrate the structure
findings = [
    {
        # Finding 1: Class distribution analysis
        "finding_id": str(uuid.uuid4()),
        "title": "Class distribution analysis",
        "description": (
            f"Dataset contains {len(df)} samples across {len(class_counts)} classes. "
            f"Class distribution: setosa={class_counts.get(0, 0)}, "
            f"versicolor={class_counts.get(1, 0)}, "
            f"virginica={class_counts.get(2, 0)}. "
            "Classes are balanced with 50 samples each."
        ),
        "event_dtg": utils._now_iso(),
        "severity": "low",
        "confidence": "high",
        "observables": ["iris-setosa", "iris-versicolor", "iris-virginica"],
        "mitre_attack_tactics": [],  # Non-security analytic
        "mitre_attack_techniques": [],
        "recommended_actions": "None - informational finding only.",
        "recommended_pivots": "Review feature correlation analysis.",
        "classification": "U",  # Unclassified
    },
    {
        # Finding 2: Feature range summary
        "finding_id": str(uuid.uuid4()),
        "title": "Feature value ranges",
        "description": (
            f"Feature ranges: sepal_length [{df['sepal length (cm)'].min():.1f}, "
            f"{df['sepal length (cm)'].max():.1f}], "
            f"sepal_width [{df['sepal width (cm)'].min():.1f}, "
            f"{df['sepal width (cm)'].max():.1f}], "
            f"petal_length [{df['petal length (cm)'].min():.1f}, "
            f"{df['petal length (cm)'].max():.1f}], "
            f"petal_width [{df['petal width (cm)'].min():.1f}, "
            f"{df['petal width (cm)'].max():.1f}]"
        ),
        "event_dtg": utils._now_iso(),
        "severity": "low",
        "confidence": "high",
        "observables": list(iris.feature_names),
        "mitre_attack_tactics": [],
        "mitre_attack_techniques": [],
        "recommended_actions": "Consider normalizing features before ML training.",
        "recommended_pivots": "N/A",
        "classification": "U",
    },
]

print("=" * 60)
print("GENERATED FINDINGS")
print("=" * 60)
print(f"Total findings: {len(findings)}")
for i, finding in enumerate(findings, 1):
    print(f"\n{i}. {finding['title']}")
    print(f"   ID: {finding['finding_id']}")
    print(f"   Severity: {finding['severity']}, Confidence: {finding['confidence']}")
    print(f"   Observables: {finding['observables'][:3]}...")

### 6.2 Creating the Document with ALL Fields

Now we create the output document with all required and optional fields:

In [None]:
# Use the validated inputs from earlier
inputs = inputs_with_mapping

# Create the document with ALL fields
doc = contract.create_document(
    # =========================================================================
    # PROVENANCE & AUTHORSHIP (Required)
    # =========================================================================
    input_schema_version="1.0.1",
    output_schema_version=contract.version,
    author="Notebook Author",
    author_organization="Example Organization",
    contact="author@example.com",
    license="Apache-2.0",
    documentation_link="https://scikit-learn.org/stable/datasets/toy_dataset.html#iris-dataset",

    # =========================================================================
    # CONTRIBUTORS (Optional)
    # =========================================================================
    contributors={
        "Alice Smith": "Data preprocessing and validation",
        "Bob Jones": "Code review and testing",
        "Charlie Brown": "Documentation",
    },

    # =========================================================================
    # EXECUTION STATUS (Required)
    # =========================================================================
    status="success",
    exit_code=0,

    # =========================================================================
    # DATASET METADATA (Required)
    # =========================================================================
    dataset_description=(
        "Fisher's Iris flower data set (1936) containing 150 samples of iris "
        "flowers with 4 features each. Features include sepal length/width and "
        "petal length/width measurements."
    ),
    dataset_size=len(df),
    dataset_hash=utils._hash(df),
    data_schema={
        **{c: "number" for c in iris.feature_names},
        "target": "integer"
    },
    feature_names=list(iris.feature_names),

    # =========================================================================
    # ANALYTIC METADATA (Required)
    # =========================================================================
    inputs=inputs,
    analytic_id="example_analytic.ipynb",
    analytic_name="Iris Dataset Analyzer",
    analytic_version="1.0.0",
    analytic_description=(
        "Demonstration analytic that analyzes the Iris dataset distribution "
        "and feature characteristics. Generates findings about class balance "
        "and feature value ranges."
    ),

    # =========================================================================
    # FINDINGS (Required)
    # =========================================================================
    findings=findings,

    # =========================================================================
    # ADDITIONAL RUN PROPERTIES (Optional)
    # =========================================================================
    additional_run_properties={
        "class_counts": class_counts,
        "notebook_environment": "Jupyter",
        "field_mapping_applied": True,
        "source_format": "Zeek conn.log",
        "target_format": "SchemaONE",
        "ci_job_url": "https://example.com/ci/job/12345",
        "git_commit": "abc123def456",
    },
)

print("=" * 60)
print("DOCUMENT CREATED SUCCESSFULLY")
print("=" * 60)
print(f"Document has {len(doc)} fields set")
print(f"initialization_dtg: {doc.get('initialization_dtg', 'Not set')}")

## 7. Message Logging with `add_message()`

The `add_message()` method appends timestamped, leveled log entries to the document's `messages` field. This provides a structured audit trail separate from console logging.

### Supported Levels
- `DEBUG` - Detailed diagnostic information
- `INFO` - General informational messages
- `WARN` - Warning messages
- `ERROR` - Error messages (non-fatal)
- `FATAL` - Fatal error messages

In [None]:
# Add structured messages at different levels
doc.add_message("INFO", "Analytic execution started")
doc.add_message("DEBUG", f"Loaded {len(df)} records from iris dataset")
doc.add_message("DEBUG", f"Dataset hash: {utils._hash(df)[:16]}...")
doc.add_message("INFO", f"Applied field mapping: {len(zeek_to_schemaone_map)} fields")
doc.add_message("INFO", f"Analyzing {len(class_counts)} distinct classes")
doc.add_message("DEBUG", f"Class distribution: {class_counts}")
doc.add_message("INFO", f"Generated {len(findings)} finding(s)")
doc.add_message("WARN", "No anomalies detected in this dataset")
doc.add_message("INFO", "Analytic execution completed successfully")

print("=" * 60)
print("MESSAGES ADDED TO DOCUMENT")
print("=" * 60)
for i, msg in enumerate(doc.get("messages", []), 1):
    print(f"{i}. [{msg['level']:5}] {msg['timestamp']} - {msg['text'][:50]}...")

### 7.1 Message Logging After Finalisation (Failure Mode)

Once a document is finalised, `add_message()` becomes a no-op. Let's demonstrate this:

In [None]:
# Save the current message count
message_count_before = len(doc.get("messages", []))
print(f"Message count before finalise: {message_count_before}")
print("(We will check this again after finalise to show add_message is a no-op)")

## 8. Finalisation and Auto-Computed Fields

The `finalise()` method performs several important operations:

1. Records `finalization_dtg` (current UTC timestamp)
2. Computes `total_runtime_seconds` from init to finalization
3. Generates a unique `run_id` (UUID v4)
4. Computes `input_hash` from the inputs dict
5. Computes `findings_hash` from the findings list
6. Captures `execution_environment` (Python version, libraries, OS, hardware)
7. Validates the complete document against the output schema
8. Marks the document as immutable

In [None]:
# Finalise the document
doc.finalise()

print("=" * 60)
print("DOCUMENT FINALISED")
print("=" * 60)

print("\nAuto-computed fields:")
print(f"  run_id:                {doc['run_id']}")
print(f"  initialization_dtg:    {doc['initialization_dtg']}")
print(f"  finalization_dtg:      {doc['finalization_dtg']}")
print(f"  total_runtime_seconds: {doc['total_runtime_seconds']}")
print(f"  input_hash:            {doc['input_hash'][:32]}...")
print(f"  findings_hash:         {doc['findings_hash'][:32]}...")

print("\nExecution Environment:")
env = doc["execution_environment"]
print(f"  Python version: {env['python_version']}")
print(f"  OS: {env['operating_system']}")
print(f"  User: {env['username']}")
print(f"  Hardware: CPU={env['hardware_specs']['cpu']}, RAM={env['hardware_specs']['ram']}")
print(f"  Libraries: {env['library_dependencies']}")

In [None]:
# Demonstrate that add_message is a no-op after finalise
doc.add_message("INFO", "This message will NOT be added (document is immutable)")
message_count_after = len(doc.get("messages", []))

print("=" * 60)
print("VERIFYING IMMUTABILITY")
print("=" * 60)
print(f"Messages before finalise: {message_count_before}")
print(f"Messages after finalise:  {message_count_after}")
print(f"Message was added: {message_count_after > message_count_before}")
print("Document is immutable after finalise()!")

### 8.1 Finalisation Failure Mode - Missing Required Fields

If you try to finalise a document without all required fields, validation will fail:

In [None]:
# Create an incomplete document to demonstrate validation failure
print("=" * 60)
print("FAILURE MODE: Incomplete document finalisation")
print("=" * 60)

try:
    # Create a document with missing required fields
    incomplete_doc = contract.create_document(
        # Only set a few fields, missing many required ones
        author="Test Author",
        status="success",
        exit_code=0,
    )
    incomplete_doc.finalise()
    print("ERROR: This should not print!")
except SchemaError as e:
    print(f"SchemaError raised as expected!")
    print(f"  Error: {str(e)[:100]}...")

### 8.2 Saving Before Finalisation (Failure Mode)

The document must be finalised before it can be saved:

In [None]:
# Create a new document and try to save without finalising
print("=" * 60)
print("FAILURE MODE: Save without finalise")
print("=" * 60)

try:
    unsaved_doc = contract.create_document(
        input_schema_version="1.0.1",
        output_schema_version=contract.version,
        author="Test",
        author_organization="Test Org",
        contact="test@test.com",
        license="MIT",
        documentation_link="https://example.com",
        status="success",
        exit_code=0,
        dataset_description="Test",
        dataset_size=100,
        dataset_hash="0" * 64,
        data_schema={},
        feature_names=[],
        inputs={},
        analytic_id="test",
        analytic_name="Test",
        analytic_version="1.0.0",
        analytic_description="Test",
        findings=[],
    )
    # Try to save without finalising
    unsaved_doc.save("/tmp/test_output.json")
    print("ERROR: This should not print!")
except RuntimeError as e:
    print(f"RuntimeError raised as expected!")
    print(f"  Error: {e}")

## 9. Exporting Results

The finalised document can be exported to:
1. **JSON** - For programmatic consumption
2. **Markdown** - For human-readable reports

In [None]:
import tempfile
import os

# Create a temporary directory for output files
output_dir = tempfile.mkdtemp()
json_path = os.path.join(output_dir, "iris_analytic_report.json")
md_path = os.path.join(output_dir, "iris_analytic_report.md")

# Save to JSON
doc.save(json_path)
print(f"JSON saved to: {json_path}")

# Generate and save Markdown report
md_content = to_markdown_card(doc)
with open(md_path, "w") as f:
    f.write(md_content)
print(f"Markdown saved to: {md_path}")

# Show file sizes
json_size = os.path.getsize(json_path)
md_size = os.path.getsize(md_path)
print(f"\nFile sizes:")
print(f"  JSON: {json_size:,} bytes")
print(f"  Markdown: {md_size:,} bytes")

In [None]:
# Preview the JSON output
print("=" * 60)
print("JSON OUTPUT PREVIEW")
print("=" * 60)

with open(json_path, "r") as f:
    json_content = json.load(f)

# Show the top-level keys
print("Top-level fields in output:")
for key in json_content.keys():
    value = json_content[key]
    if isinstance(value, dict):
        print(f"  {key}: {{...}} ({len(value)} keys)")
    elif isinstance(value, list):
        print(f"  {key}: [...] ({len(value)} items)")
    elif isinstance(value, str) and len(value) > 50:
        print(f'  {key}: "{value[:50]}..."')
    else:
        print(f"  {key}: {value}")

In [None]:
# Preview the Markdown output
print("=" * 60)
print("MARKDOWN OUTPUT PREVIEW (first 50 lines)")
print("=" * 60)

with open(md_path, "r") as f:
    md_lines = f.readlines()

for line in md_lines[:50]:
    print(line, end="")
    
if len(md_lines) > 50:
    print(f"\n... ({len(md_lines) - 50} more lines)")

## 10. Summary

This notebook demonstrated the complete workflow for using the `contract_schema` library with the analytic contract:

### Features Demonstrated

| Feature | Status |
|---------|--------|
| Contract Loading | Done |
| Input Schema Exploration | Done |
| Output Schema Exploration | Done |
| Input Validation (Success) | Done |
| Input Validation (Failure - Missing Required) | Done |
| Input Validation (Failure - Invalid Enum) | Done |
| Input Validation (Failure - Invalid DateTime) | Done |
| Field Mapping (data_map) | Done |
| Document Creation (All Fields) | Done |
| Message Logging (add_message) | Done |
| Finding Generation | Done |
| Finalisation | Done |
| Auto-computed Fields | Done |
| Immutability After Finalise | Done |
| Save Without Finalise (Failure) | Done |
| Export to JSON | Done |
| Export to Markdown | Done |

### Key Classes and Methods

```python
from contract_schema import Contract, SchemaError, to_markdown_card
from contract_schema import utils

# Load a contract
contract = Contract.load("analytic_schema.json")

# Validate inputs
inputs = contract.parse_and_validate_input({...})

# Create and populate a document
doc = contract.create_document(**fields)

# Add structured log messages
doc.add_message("INFO", "message text")

# Finalise and validate
doc.finalise()

# Export
doc.save("output.json")
to_markdown_card(doc)  # Returns markdown string
```

### Error Handling

Always wrap input validation and finalisation in try/except blocks to handle `SchemaError`:

```python
try:
    inputs = contract.parse_and_validate_input(user_input)
except SchemaError as e:
    log.error(f"Input validation failed: {e}")
```

In [None]:
# Cleanup temporary files
import shutil
shutil.rmtree(output_dir)
print("Temporary files cleaned up")
print("\n" + "=" * 60)
print("NOTEBOOK COMPLETED SUCCESSFULLY")
print("=" * 60)