# Create From PDF: End-to-End Document Processing Orchestration

A complete orchestration workflow that transforms a PDF document into a full-stack M/TEXT application: datamodel, form, testcase, mappings, and templates—all AI-generated.

## Who This Is For

**Business & Product Teams**: This workflow demonstrates how to chain multiple AI services to automate the complete digitization of paper documents. Upload a PDF (invoice, form, contract), and the system generates:
- Data schema (datamodel XML)
- Web form (Form.io JSON)
- Test data (testcase XML)  
- Data transformation rules (XSLT)
- Document templates (M/TEXT models & template XML)

This is the "full stack AI" approach—one PDF in, complete working application out.

**Developers & Engineers**: This notebook shows how to:
- Orchestrate 8 sequential LLM-powered steps with error handling
- Use streaming responses to show progress for long-running workflows
- Chain workflow outputs as inputs to subsequent steps
- Handle partial failures gracefully in multi-step pipelines
- Integrate OCR (Mistral), text generation (Claude/GPT), and structured output generation

This is a reference implementation for building complex, multi-agent workflows.


## How It Works: The 8-Step Pipeline

This workflow chains 8 AI-powered steps in sequence. Each step produces artifacts consumed by later steps:

1. **OCR (Mistral)** → Extract markdown text from PDF using Mistral OCR API
   - Input: PDF file (base64)
   - Output: Markdown text with page breaks
   - Duration: ~5-10s

2. **Datamodel Generation** → Create BusinessData.datamodel XML from markdown
   - Input: Markdown + optional instructions
   - Output: Datamodel XML with typed fields
   - Duration: ~10-20s

3. **Form.io Generation** → Build web form JSON schema
   - Input: Datamodel XML + markdown (for context)
   - Output: Form.io JSON component definition
   - Duration: ~15-25s

4. **Testcase Generation** → Generate sample data XML
   - Input: Datamodel XML + description
   - Output: Testcase XML with realistic values
   - Duration: ~8-15s

5. **Datamapping Generation** → Create XSLT transformation rules
   - Input: Datamodel XML + Testcase XML
   - Output: XSLT 1.0 stylesheet
   - Duration: ~10-20s

6. **Models Generation** → Extract M/TEXT DOM models (1-3 per page)
   - Input: Markdown + Datamodel + Testcase + instructions
   - Output: Array of M/TEXT XML models
   - Duration: ~20-40s

7. **Template Generation** → Merge models into single template
   - Input: Array of M/TEXT models
   - Output: Complete M/TEXT template XML
   - Duration: <1s (algorithmic merge)

8. **Export** → Write all artifacts to file storage
   - Input: All previous outputs
   - Output: 7+ files ready for download
   - Duration: ~2-5s

**Total Duration**: 70-140 seconds depending on PDF complexity and model availability.

The workflow uses **streaming responses** to show real-time progress on each step.


## Inputs and Prerequisites

**Required:**
- **PDF Document**: Uploaded file (invoice, form, letter, etc.) stored in Redis with a `documentFileId`
- **Mistral API Key**: For OCR processing (`MISTRAL_API_KEY` environment variable)

**Optional:**
- **Instructions**: Free-text guidance for the AI (e.g., "Focus on invoice line items", "Ignore headers/footers")

**Example Request:**
```json
{
  "documentFileId": "file_abc123",
  "instructions": "Prioritize customer and product information"
}
```

The workflow makes internal calls to multiple endpoints, so ensure all required LLM provider API keys are configured (Anthropic, OpenAI, or Mistral).


## Orchestration Pattern: Error Handling & State Management

This workflow doesn't use a single system prompt—it orchestrates 8 distinct API calls, each with its own prompt. The orchestration layer manages:

**State Tracking:**
```typescript
interface StepState {
  id: "ocr" | "datamodel" | "formio" | "testcase" | "datamapping" | "models" | "template" | "export";
  status: "pending" | "running" | "success" | "error";
  detail?: Record<string, any>;  // Step-specific metadata
}
```

**Error Handling Strategy:**
- **Fail-fast**: If any step fails, immediately abort and stream error state
- **Detailed logging**: Each step logs duration, tokens, artifact sizes
- **Partial artifacts**: Even on failure, return all successfully completed artifacts
- **Retry logic**: Individual workflow endpoints (not orchestrator) handle retries

**Streaming Progress:**
The orchestrator sends NDJSON updates after each step:
```javascript
{ type: "steps", steps: [...] }           // Progress update
{ type: "result", data: {...} }           // Final success
{ type: "error", message: "...", steps: [...] }  // Failure with context
```


In [None]:
# Orchestration pseudocode (TypeScript-style for clarity)

async function orchestrateCreateFromPdf(documentFileId: string, instructions?: string) {
  const steps = initializeSteps();  // All pending
  const stream = createNDJSONStream();
  
  try {
    // Step 1: OCR
    updateStep(steps, "ocr", "running");
    stream.send({ type: "steps", steps });
    const pdf = await loadPdfFromStorage(documentFileId);
    const markdown = await mistralOCR(pdf);
    updateStep(steps, "ocr", "success", { markdownLength: markdown.length });
    stream.send({ type: "steps", steps });
    
    // Step 2: Datamodel
    updateStep(steps, "datamodel", "running");
    stream.send({ type: "steps", steps });
    const datamodelXml = await postJson("/api/workflows/createfrompdf/datamodel", { markdown, instructions });
    updateStep(steps, "datamodel", "success");
    stream.send({ type: "steps", steps });
    
    // Steps 3-8: Similar pattern...
    // Each step updates state, sends progress, passes artifacts to next step
    
    // Final: Stream result
    stream.send({ type: "result", data: { markdown, datamodelXml, testcaseXml, formioJson, models, templateXml, xslt } });
    
  } catch (error) {
    updateStep(steps, currentStep, "error", { message: error.message });
    stream.send({ type: "error", message: error.message, steps });
  }
}



## Step Dependencies & Data Flow

Each step consumes outputs from previous steps. The dependency graph:

```
PDF (input)
  ↓
[1. OCR] → markdown
  ↓
[2. Datamodel] → datamodelXml
  ├→ [3. Form.io] → formioJson
  ├→ [4. Testcase] → testcaseXml
  │     ├→ [5. Datamapping] → xslt
  │     └→ [6. Models] → models[]
  │            └→ [7. Template] → templateXml
  └─────────────→ [8. Export] → files
```

**Critical Dependencies:**
- Steps 3, 4, 6 can theoretically run in parallel (all need datamodel + markdown)
- Steps 5 requires testcase (from step 4)
- Step 7 requires models (from step 6)
- Step 8 requires all previous outputs


## Best Practices for Orchestration Workflows

**1. Use Streaming for Long Workflows** (>30s)
- Keeps connections alive
- Provides real-time progress feedback
- Allows client to show partial results

**2. Fail-Fast with Detailed Errors**
- Abort immediately on first failure
- Include step name, error message, and partial state
- Log full context for debugging

**3. Make Steps Idempotent**
- Same inputs → same outputs
- No side effects except final export
- Enables retries without corruption

**4. Cache Intermediate Results**
- Store expensive step outputs (OCR, models) by content hash
- Retry from cached checkpoint on partial failure  
- Reduces cost for iterative refinement

**5. Monitor Step Performance**
- Track duration, token counts, error rates per step
- Identify bottlenecks for optimization
- Set SLOs for critical paths



In [None]:
# Example: Monitoring orchestration progress

class OrchestrationMonitor:
    def __init__(self):
        self.step_timings = {}
        self.start_time = None
    
    def step_started(self, step_id):
        self.step_timings[step_id] = {"start": time.time()}
    
    def step_completed(self, step_id, detail=None):
        end = time.time()
        start = self.step_timings[step_id]["start"]
        self.step_timings[step_id].update({
            "end": end,
            "duration": end - start,
            "detail": detail or {}
        })
        
        # Log metrics
        print(f"{step_id}: {end - start:.1f}s", detail or "")
    
    def get_total_duration(self):
        return time.time() - self.start_time

# In production, send metrics to observability platform (Datadog, Prometheus, etc.)



## Adaptation Ideas

This orchestration pattern is highly adaptable. Here are ways to customize it:

**Different Document Types:**
- **Invoices**: Focus datamodel on line items, totals, tax
- **Contracts**: Extract parties, clauses, dates, terms
- **Forms**: Generate form fields directly from document structure
- **Letters**: Focus on personalization elements, addresses

**Different Output Formats:**
- Skip Form.io generation if you don't need web forms
- Skip template generation if you only need data extraction
- Add personalization step for custom content

**Parallel Execution:**
```typescript
// After datamodel is ready, run these in parallel:
const [formioJson, testcaseXml, models] = await Promise.all([
  postJson("/api/workflows/formio/generate", { datamodel, markdown }),
  postJson("/api/workflows/testcase", { datamodel }),
  postJson("/api/workflows/modelgeneration/from-markdown", { datamodel, markdown })
]);
```

**Checkpointing for Retries:**
Store each step's output in cache. On failure, resume from last successful checkpoint.
