# 02 - OPM Flow Integration

**CLARISSA** uses OPM Flow as its open-source simulation backend. This notebook covers:

1. OPM Flow architecture and capabilities
2. Docker containerization for isolated execution
3. Running simulations programmatically
4. Parsing simulation results (SMSPEC, UNSMRY)
5. Error handling and recovery strategies
6. Integration with CLARISSA's simulation layer

---

## 1. What is OPM Flow?

**OPM** (Open Porous Media) is an open-source project providing reservoir simulation capabilities.

### Key Features

| Feature | OPM Flow | ECLIPSE (Commercial) |
|---------|----------|----------------------|
| License | GPL v3 (Free) | $$$$ |
| Black-Oil | ✅ Full | ✅ Full |
| Compositional | ⚠️ Limited | ✅ Full |
| Thermal | ❌ No | ✅ Yes |
| Input Format | ECLIPSE-compatible | Native |
| Output Format | ECLIPSE-compatible | Native |

### Why OPM for CLARISSA?

1. **No licensing barriers** - Democratizes access to simulation
2. **ECLIPSE compatibility** - Uses same input/output formats
3. **Docker-friendly** - Easy to containerize and scale
4. **Active development** - Backed by SINTEF, Equinor, and others

In [None]:
# OPM Flow version and capabilities check
import subprocess
import shutil

def check_opm_installation() -> dict:
    """
    Check if OPM Flow is installed and get version info.
    """
    result = {
        'installed': False,
        'version': None,
        'path': None,
        'capabilities': []
    }
    
    # Check if flow binary exists
    flow_path = shutil.which('flow')
    if flow_path:
        result['installed'] = True
        result['path'] = flow_path
        
        # Get version
        try:
            proc = subprocess.run(
                ['flow', '--version'],
                capture_output=True,
                text=True,
                timeout=10
            )
            result['version'] = proc.stdout.strip()
        except Exception as e:
            result['version'] = f"Error: {e}"
    
    return result

# Check installation
opm_info = check_opm_installation()
print(f"OPM Flow installed: {opm_info['installed']}")
if opm_info['installed']:
    print(f"Path: {opm_info['path']}")
    print(f"Version: {opm_info['version']}")
else:
    print("OPM Flow not found - will use Docker container")

## 2. Docker Configuration for OPM Flow

For production deployments, we containerize OPM Flow for:
- Isolation from host system
- Reproducible builds
- Easy scaling with Kubernetes

In [None]:
# Dockerfile for OPM Flow
OPM_DOCKERFILE = '''
# ============================================================
# OPM Flow Docker Image for CLARISSA
# ============================================================
FROM ubuntu:22.04 as base

# Avoid interactive prompts during build
ENV DEBIAN_FRONTEND=noninteractive

# Install OPM from official repository
RUN apt-get update && apt-get install -y \\
    software-properties-common \\
    curl \\
    && add-apt-repository ppa:opm/ppa \\
    && apt-get update \\
    && apt-get install -y \\
        libopm-simulators-bin \\
        python3 \\
        python3-pip \\
    && rm -rf /var/lib/apt/lists/*

# Install Python dependencies for result parsing
RUN pip3 install --no-cache-dir \\
    numpy \\
    pandas \\
    ecl-data-io \\
    resdata

# Create working directories
RUN mkdir -p /simulation/input /simulation/output /simulation/logs

WORKDIR /simulation

# Copy helper scripts
COPY scripts/run_simulation.py /usr/local/bin/
COPY scripts/parse_results.py /usr/local/bin/
RUN chmod +x /usr/local/bin/*.py

# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \\
    CMD flow --version || exit 1

# Default command shows version
CMD ["flow", "--version"]
'''

print(OPM_DOCKERFILE)

In [None]:
# Docker Compose configuration for local development
DOCKER_COMPOSE = '''
version: "3.8"

services:
  opm-flow:
    build:
      context: .
      dockerfile: Dockerfile.opm
    image: clarissa/opm-flow:latest
    volumes:
      - ./data/input:/simulation/input:ro
      - ./data/output:/simulation/output
      - ./data/logs:/simulation/logs
    environment:
      - OMP_NUM_THREADS=4
    deploy:
      resources:
        limits:
          cpus: "4"
          memory: 8G
        reservations:
          cpus: "2"
          memory: 4G
    command: ["flow", "--help"]

  # API wrapper for simulation jobs
  opm-api:
    build:
      context: .
      dockerfile: Dockerfile.opm-api
    ports:
      - "8080:8080"
    volumes:
      - ./data:/simulation
    depends_on:
      - opm-flow
    environment:
      - OPM_FLOW_BINARY=/usr/bin/flow
      - MAX_CONCURRENT_JOBS=4
'''

print(DOCKER_COMPOSE)

## 3. Simulation Runner Service

The `SimulationRunner` manages OPM Flow execution, including:
- Job submission and queuing
- Progress monitoring
- Error capture and analysis
- Result retrieval

In [None]:
import asyncio
import os
import tempfile
import uuid
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Optional, List, Dict, Any
import subprocess
import json

class JobStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class SimulationJob:
    """Represents a simulation job submitted to OPM Flow"""
    job_id: str
    deck_path: Path
    output_dir: Path
    status: JobStatus = JobStatus.PENDING
    
    # Timing
    created_at: datetime = field(default_factory=datetime.utcnow)
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    
    # Results
    return_code: Optional[int] = None
    stdout: str = ""
    stderr: str = ""
    
    # Parsed metrics
    timesteps_completed: int = 0
    solver_iterations: int = 0
    convergence_failures: int = 0
    
    @property
    def runtime_seconds(self) -> Optional[float]:
        if self.started_at and self.completed_at:
            return (self.completed_at - self.started_at).total_seconds()
        return None
    
    @property
    def succeeded(self) -> bool:
        return self.status == JobStatus.COMPLETED and self.return_code == 0

class SimulationRunner:
    """
    Manages OPM Flow simulation execution.
    
    This is CLARISSA's interface to the simulation backend.
    """
    
    def __init__(
        self,
        flow_binary: str = "flow",
        work_dir: Path = None,
        use_docker: bool = False,
        docker_image: str = "clarissa/opm-flow:latest"
    ):
        self.flow_binary = flow_binary
        self.work_dir = work_dir or Path(tempfile.gettempdir()) / "clarissa_sim"
        self.use_docker = use_docker
        self.docker_image = docker_image
        
        # Job tracking
        self.jobs: Dict[str, SimulationJob] = {}
        
        # Ensure work directory exists
        self.work_dir.mkdir(parents=True, exist_ok=True)
    
    def create_job(
        self,
        deck_content: str,
        job_name: str = None
    ) -> SimulationJob:
        """
        Create a new simulation job from deck content.
        """
        job_id = job_name or str(uuid.uuid4())[:8]
        job_dir = self.work_dir / job_id
        job_dir.mkdir(parents=True, exist_ok=True)
        
        # Write deck file
        deck_path = job_dir / "MODEL.DATA"
        deck_path.write_text(deck_content)
        
        # Create output directory
        output_dir = job_dir / "output"
        output_dir.mkdir(exist_ok=True)
        
        job = SimulationJob(
            job_id=job_id,
            deck_path=deck_path,
            output_dir=output_dir
        )
        
        self.jobs[job_id] = job
        return job
    
    def _build_command(self, job: SimulationJob) -> List[str]:
        """
        Build the command line for OPM Flow execution.
        """
        if self.use_docker:
            return [
                "docker", "run", "--rm",
                "-v", f"{job.deck_path.parent}:/simulation/input:ro",
                "-v", f"{job.output_dir}:/simulation/output",
                self.docker_image,
                "flow",
                "--output-dir=/simulation/output",
                "/simulation/input/MODEL.DATA"
            ]
        else:
            return [
                self.flow_binary,
                f"--output-dir={job.output_dir}",
                str(job.deck_path)
            ]
    
    async def run_async(self, job: SimulationJob) -> SimulationJob:
        """
        Run simulation asynchronously.
        """
        job.status = JobStatus.RUNNING
        job.started_at = datetime.utcnow()
        
        cmd = self._build_command(job)
        
        try:
            process = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE
            )
            
            stdout, stderr = await process.communicate()
            
            job.return_code = process.returncode
            job.stdout = stdout.decode('utf-8', errors='replace')
            job.stderr = stderr.decode('utf-8', errors='replace')
            
            if process.returncode == 0:
                job.status = JobStatus.COMPLETED
            else:
                job.status = JobStatus.FAILED
                
        except Exception as e:
            job.status = JobStatus.FAILED
            job.stderr = str(e)
        
        finally:
            job.completed_at = datetime.utcnow()
        
        # Parse output for metrics
        self._parse_output_metrics(job)
        
        return job
    
    def run_sync(
        self,
        job: SimulationJob,
        timeout: int = 300
    ) -> SimulationJob:
        """
        Run simulation synchronously with timeout.
        """
        job.status = JobStatus.RUNNING
        job.started_at = datetime.utcnow()
        
        cmd = self._build_command(job)
        
        try:
            result = subprocess.run(
                cmd,
                capture_output=True,
                timeout=timeout,
                text=True
            )
            
            job.return_code = result.returncode
            job.stdout = result.stdout
            job.stderr = result.stderr
            
            if result.returncode == 0:
                job.status = JobStatus.COMPLETED
            else:
                job.status = JobStatus.FAILED
                
        except subprocess.TimeoutExpired:
            job.status = JobStatus.FAILED
            job.stderr = f"Simulation timed out after {timeout} seconds"
            
        except Exception as e:
            job.status = JobStatus.FAILED
            job.stderr = str(e)
        
        finally:
            job.completed_at = datetime.utcnow()
        
        self._parse_output_metrics(job)
        return job
    
    def _parse_output_metrics(self, job: SimulationJob):
        """
        Extract metrics from simulation output.
        """
        import re
        
        # Parse timesteps from stdout
        timestep_matches = re.findall(
            r'Time step\s+(\d+)',
            job.stdout
        )
        if timestep_matches:
            job.timesteps_completed = max(int(t) for t in timestep_matches)
        
        # Parse solver iterations
        iter_matches = re.findall(
            r'(\d+)\s+linear iterations',
            job.stdout
        )
        if iter_matches:
            job.solver_iterations = sum(int(i) for i in iter_matches)
        
        # Count convergence failures
        job.convergence_failures = job.stderr.count('convergence failure')

# Example usage
print("SimulationRunner class defined successfully")

In [None]:
# Create a simple test deck
TEST_DECK = '''
-- Minimal test deck for OPM Flow
RUNSPEC

TITLE
CLARISSA Test Model

DIMENS
  5 5 2 /

OIL
WATER

FIELD

START
  1 'JAN' 2025 /

GRID

DX
  50*100 /

DY
  50*100 /

DZ
  50*20 /

TOPS
  25*8000 /

PERMX
  50*100 /

PERMY
  50*100 /

PERMZ
  50*10 /

PORO
  50*0.2 /

PROPS

DENSITY
  50.0 62.4 0.06 /

PVTW
  4000 1.01 3.0E-6 0.5 0 /

PVDO
  100  1.05 2.0
  2000 1.02 1.5
  4000 1.01 1.2 /
/

ROCK
  4000 4.0E-6 /

SWOF
  0.2 0.0   1.0  0
  0.5 0.15  0.3  0
  0.8 0.35  0.0  0 /
/

SOLUTION

EQUIL
  8010 4000 8100 0 0 0 /

SUMMARY

FOPR
FWPR
FPR

SCHEDULE

WELSPECS
  'PROD' 'G' 5 5 8010 'OIL' /
/

COMPDAT
  'PROD' 5 5 1 2 'OPEN' 2* 0.5 /
/

WCONPROD
  'PROD' 'OPEN' 'ORAT' 500 4* 1000 /
/

TSTEP
  10*30 /

END
'''

print("Test deck created")
print(f"Deck size: {len(TEST_DECK)} characters")

In [None]:
# Demonstrate job creation (without actual execution)
runner = SimulationRunner(work_dir=Path('/tmp/clarissa_demo'))

job = runner.create_job(TEST_DECK, job_name="test_model")

print(f"Job ID: {job.job_id}")
print(f"Deck path: {job.deck_path}")
print(f"Output dir: {job.output_dir}")
print(f"Status: {job.status.value}")

# Show the command that would be executed
cmd = runner._build_command(job)
print(f"\nCommand: {' '.join(cmd)}")

## 4. Parsing Simulation Results

OPM Flow produces ECLIPSE-compatible output files:

| File Extension | Content |
|----------------|----------|
| .SMSPEC | Summary specification (vector names, units) |
| .UNSMRY | Summary data (time series) |
| .EGRID | Grid geometry |
| .INIT | Initial state |
| .UNRST | Restart file (cell data over time) |
| .PRT | Print file (text log) |

In [None]:
import struct
from typing import Tuple, Dict, List, Any
import numpy as np

@dataclass
class SummaryVector:
    """A single summary output vector (time series)"""
    keyword: str        # e.g., 'FOPR', 'WBHP'
    wgname: str         # Well/group name (or ':+:+:+:+' for field)
    unit: str           # e.g., 'STB/DAY', 'PSIA'
    values: np.ndarray  # Time series values

@dataclass
class SummaryData:
    """Parsed summary file data"""
    times: np.ndarray           # Time values (days)
    vectors: Dict[str, SummaryVector]  # keyword -> vector
    
    @property
    def num_timesteps(self) -> int:
        return len(self.times)
    
    def get_vector(self, keyword: str, wgname: str = None) -> Optional[np.ndarray]:
        """
        Get values for a summary vector.
        
        Args:
            keyword: Vector keyword (e.g., 'FOPR', 'WOPR')
            wgname: Well/group name for well-level vectors
        """
        key = keyword if wgname is None else f"{keyword}:{wgname}"
        if key in self.vectors:
            return self.vectors[key].values
        # Try field-level if no wgname specified
        if wgname is None and keyword in self.vectors:
            return self.vectors[keyword].values
        return None

class EclipseBinaryReader:
    """
    Reader for ECLIPSE binary file format.
    
    ECLIPSE binary files use a specific format:
    - Header: keyword (8 chars) + count (int) + type (4 chars)
    - Data: array of values
    - Each block wrapped with Fortran record markers
    """
    
    TYPE_MAP = {
        'INTE': ('i', 4),   # Integer
        'REAL': ('f', 4),   # Float (single)
        'DOUB': ('d', 8),   # Double
        'CHAR': ('s', 8),   # Character string
        'LOGI': ('i', 4),   # Logical (stored as int)
        'MESS': (None, 0),  # Message (no data)
    }
    
    def __init__(self, filepath: Path):
        self.filepath = Path(filepath)
        self.file = None
    
    def __enter__(self):
        self.file = open(self.filepath, 'rb')
        return self
    
    def __exit__(self, *args):
        if self.file:
            self.file.close()
    
    def read_record(self) -> Tuple[str, List[Any]]:
        """
        Read a single record from the binary file.
        
        Returns:
            Tuple of (keyword, values)
        """
        # Read Fortran record marker (4 bytes)
        marker_data = self.file.read(4)
        if not marker_data:
            return None, None
        
        record_size = struct.unpack('>i', marker_data)[0]
        
        # Read header
        keyword = self.file.read(8).decode('ascii').strip()
        count = struct.unpack('>i', self.file.read(4))[0]
        data_type = self.file.read(4).decode('ascii').strip()
        
        # Read trailing marker
        self.file.read(4)
        
        # Read data if present
        values = []
        if count > 0 and data_type in self.TYPE_MAP:
            fmt, size = self.TYPE_MAP[data_type]
            if fmt:
                # Read data block with markers
                self.file.read(4)  # Leading marker
                
                if data_type == 'CHAR':
                    for _ in range(count):
                        values.append(
                            self.file.read(8).decode('ascii').strip()
                        )
                else:
                    for _ in range(count):
                        val = struct.unpack('>' + fmt, self.file.read(size))[0]
                        values.append(val)
                
                self.file.read(4)  # Trailing marker
        
        return keyword, values
    
    def read_all(self) -> List[Tuple[str, List[Any]]]:
        """Read all records from file"""
        records = []
        while True:
            keyword, values = self.read_record()
            if keyword is None:
                break
            records.append((keyword, values))
        return records

print("EclipseBinaryReader class defined")

In [None]:
# High-level result parser
class ResultParser:
    """
    Parse OPM Flow simulation results.
    
    This is CLARISSA's interface for extracting meaningful
    information from simulation output.
    """
    
    def __init__(self, output_dir: Path):
        self.output_dir = Path(output_dir)
    
    def find_output_files(self) -> Dict[str, Path]:
        """Find all output files in the directory"""
        files = {}
        
        extensions = [
            '.SMSPEC', '.UNSMRY', '.EGRID', '.INIT', 
            '.UNRST', '.PRT', '.RSM'
        ]
        
        for ext in extensions:
            matches = list(self.output_dir.glob(f'*{ext}'))
            if matches:
                files[ext] = matches[0]
        
        return files
    
    def parse_print_file(self) -> Dict[str, Any]:
        """
        Parse the .PRT (print) file for run statistics.
        """
        files = self.find_output_files()
        prt_file = files.get('.PRT')
        
        if not prt_file or not prt_file.exists():
            return {}
        
        content = prt_file.read_text()
        
        stats = {
            'errors': [],
            'warnings': [],
            'timesteps': 0,
            'solver_time': 0.0
        }
        
        import re
        
        # Extract errors
        stats['errors'] = re.findall(
            r'\*\*\*ERROR\*\*\*(.+?)(?=\n|$)', 
            content,
            re.IGNORECASE
        )
        
        # Extract warnings
        stats['warnings'] = re.findall(
            r'Warning:(.+?)(?=\n|$)',
            content,
            re.IGNORECASE
        )
        
        # Count timesteps
        stats['timesteps'] = len(re.findall(r'Report step', content))
        
        return stats
    
    def get_summary_vectors(self) -> List[str]:
        """
        List available summary vectors.
        """
        files = self.find_output_files()
        smspec = files.get('.SMSPEC')
        
        if not smspec:
            return []
        
        vectors = []
        try:
            with EclipseBinaryReader(smspec) as reader:
                records = reader.read_all()
                
                keywords = None
                wgnames = None
                
                for kw, values in records:
                    if kw == 'KEYWORDS':
                        keywords = values
                    elif kw == 'WGNAMES':
                        wgnames = values
                
                if keywords and wgnames:
                    for k, w in zip(keywords, wgnames):
                        if w and w != ':+:+:+:+':
                            vectors.append(f"{k}:{w}")
                        else:
                            vectors.append(k)
        except Exception as e:
            print(f"Error reading SMSPEC: {e}")
        
        return vectors
    
    def get_production_summary(self) -> Dict[str, Any]:
        """
        Get high-level production summary.
        
        This is what CLARISSA reports back to the user.
        """
        summary = {
            'status': 'unknown',
            'timesteps': 0,
            'final_time_days': 0,
            'cumulative_oil': 0,
            'cumulative_water': 0,
            'cumulative_gas': 0,
            'final_pressure': 0,
            'water_breakthrough': None,
            'errors': [],
            'warnings': []
        }
        
        # Parse PRT file for status
        prt_stats = self.parse_print_file()
        summary['errors'] = prt_stats.get('errors', [])
        summary['warnings'] = prt_stats.get('warnings', [])
        summary['timesteps'] = prt_stats.get('timesteps', 0)
        
        if summary['errors']:
            summary['status'] = 'failed'
        elif summary['timesteps'] > 0:
            summary['status'] = 'completed'
        
        # TODO: Parse UNSMRY for actual values
        # This requires full implementation of binary reader
        
        return summary

print("ResultParser class defined")

## 5. Error Handling and Recovery

Simulations can fail for many reasons. CLARISSA must:
1. Detect failures accurately
2. Diagnose the root cause
3. Suggest fixes to the user
4. Automatically retry with corrections when appropriate

In [None]:
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional, Tuple
import re

class ErrorCategory(Enum):
    """Categories of simulation errors"""
    SYNTAX = "syntax"              # Deck parsing errors
    INITIALIZATION = "init"        # EQUIL/initial conditions
    CONVERGENCE = "convergence"    # Solver convergence
    WELL = "well"                  # Well-related errors
    GRID = "grid"                  # Grid/geometry issues
    PVT = "pvt"                    # Fluid property issues
    NUMERICAL = "numerical"        # Numerical instability
    UNKNOWN = "unknown"            # Unclassified

@dataclass
class DiagnosedError:
    """A diagnosed simulation error with suggested fix"""
    category: ErrorCategory
    message: str
    line_number: Optional[int] = None
    keyword: Optional[str] = None
    suggested_fix: Optional[str] = None
    auto_fixable: bool = False

class ErrorDiagnostics:
    """
    Diagnose simulation errors and suggest fixes.
    
    This is critical for CLARISSA's error recovery capability.
    """
    
    # Error patterns and their diagnoses
    ERROR_PATTERNS = [
        # Syntax errors
        (
            r'Unknown keyword[:\s]+([A-Z]+)',
            ErrorCategory.SYNTAX,
            lambda m: f"Unrecognized keyword: {m.group(1)}",
            lambda m: f"Remove or replace keyword {m.group(1)} with a supported alternative"
        ),
        (
            r'Error in keyword ([A-Z]+).*line (\d+)',
            ErrorCategory.SYNTAX,
            lambda m: f"Syntax error in {m.group(1)} at line {m.group(2)}",
            lambda m: f"Check syntax of {m.group(1)} keyword near line {m.group(2)}"
        ),
        
        # Initialization errors
        (
            r'Negative (pressure|saturation) in cell \((\d+),(\d+),(\d+)\)',
            ErrorCategory.INITIALIZATION,
            lambda m: f"Negative {m.group(1)} at cell ({m.group(2)},{m.group(3)},{m.group(4)})",
            lambda m: "Check EQUIL datum pressure or contact depths"
        ),
        (
            r'Water saturation.*above 1|Saturation.*out of bounds',
            ErrorCategory.INITIALIZATION,
            lambda m: "Invalid saturation values during initialization",
            lambda m: "Verify SWOF/SGOF endpoint saturations and EQUIL contacts"
        ),
        
        # Convergence errors
        (
            r'Convergence failure at report step (\d+)',
            ErrorCategory.CONVERGENCE,
            lambda m: f"Convergence failure at step {m.group(1)}",
            lambda m: "Reduce timestep size or add TUNING keyword"
        ),
        (
            r'Maximum number of iterations exceeded',
            ErrorCategory.CONVERGENCE,
            lambda m: "Solver exceeded maximum iterations",
            lambda m: "Increase MAXITER or improve initial guess via smaller timesteps"
        ),
        
        # Well errors
        (
            r"Well '?([A-Za-z0-9_]+)'?.*outside.*grid",
            ErrorCategory.WELL,
            lambda m: f"Well {m.group(1)} is outside grid boundaries",
            lambda m: f"Check WELSPECS I,J indices for well {m.group(1)}"
        ),
        (
            r"Well '?([A-Za-z0-9_]+)'?.*no.*(connection|perforation)",
            ErrorCategory.WELL,
            lambda m: f"Well {m.group(1)} has no valid connections",
            lambda m: f"Check COMPDAT for well {m.group(1)} - verify layer indices"
        ),
        
        # Grid errors
        (
            r'Invalid cell volume|Zero.*pore volume',
            ErrorCategory.GRID,
            lambda m: "Invalid cell geometry (zero or negative volume)",
            lambda m: "Check DX, DY, DZ values - ensure all positive"
        ),
        
        # PVT errors
        (
            r'(PVTO|PVDO|PVTW|PVDG).*not.*monotonic',
            ErrorCategory.PVT,
            lambda m: f"Non-monotonic data in {m.group(1)} table",
            lambda m: f"Ensure {m.group(1)} pressure values are strictly increasing"
        ),
    ]
    
    @classmethod
    def diagnose(
        cls,
        stderr: str,
        stdout: str = ""
    ) -> List[DiagnosedError]:
        """
        Diagnose errors from simulation output.
        """
        errors = []
        combined = stderr + "\n" + stdout
        
        for pattern, category, msg_func, fix_func in cls.ERROR_PATTERNS:
            matches = re.finditer(pattern, combined, re.IGNORECASE)
            for match in matches:
                errors.append(DiagnosedError(
                    category=category,
                    message=msg_func(match),
                    suggested_fix=fix_func(match)
                ))
        
        # If no specific errors found but simulation failed
        if not errors and 'error' in combined.lower():
            errors.append(DiagnosedError(
                category=ErrorCategory.UNKNOWN,
                message="Simulation failed with unrecognized error",
                suggested_fix="Review full error output for details"
            ))
        
        return errors
    
    @classmethod
    def suggest_deck_fix(
        cls,
        error: DiagnosedError,
        deck_content: str
    ) -> Tuple[str, str]:
        """
        Suggest specific deck modifications to fix an error.
        
        Returns:
            Tuple of (modified_deck, description)
        """
        if error.category == ErrorCategory.CONVERGENCE:
            # Add TUNING keyword if not present
            if 'TUNING' not in deck_content:
                # Insert before TSTEP in SCHEDULE
                tuning_block = '''
-- Added by CLARISSA for convergence improvement
TUNING
  1.0 5.0 0.1 /    -- Initial, max, min timestep
  /
  12 1 20 /        -- Max nonlinear, linear iterations
/

'''
                modified = deck_content.replace(
                    'TSTEP',
                    tuning_block + 'TSTEP'
                )
                return modified, "Added TUNING keyword for convergence control"
        
        # Default: return unchanged
        return deck_content, "No automatic fix available"

# Test error diagnosis
test_stderr = '''
***ERROR*** Convergence failure at report step 5
***ERROR*** Well 'PROD1' is outside grid boundaries
Warning: Non-monotonic data in PVDO table
'''

diagnosed = ErrorDiagnostics.diagnose(test_stderr)
print(f"Found {len(diagnosed)} errors:\n")
for err in diagnosed:
    print(f"Category: {err.category.value}")
    print(f"Message: {err.message}")
    print(f"Fix: {err.suggested_fix}")
    print()

## 6. Complete Simulation Service

Putting it all together into a service that CLARISSA can use.

In [None]:
@dataclass
class SimulationResult:
    """
    Complete result from a simulation run.
    
    This is what CLARISSA returns to the user.
    """
    success: bool
    job_id: str
    runtime_seconds: float
    
    # Summary metrics
    timesteps_completed: int
    final_time_days: float
    
    # Production totals (if available)
    cumulative_oil_stb: Optional[float] = None
    cumulative_water_stb: Optional[float] = None
    cumulative_gas_mscf: Optional[float] = None
    
    # Key events
    water_breakthrough_days: Optional[float] = None
    peak_oil_rate_stb_day: Optional[float] = None
    final_pressure_psi: Optional[float] = None
    
    # Errors and warnings
    errors: List[DiagnosedError] = field(default_factory=list)
    warnings: List[str] = field(default_factory=list)
    
    # Raw data paths
    output_directory: Optional[Path] = None

class SimulationService:
    """
    High-level simulation service for CLARISSA.
    
    Orchestrates:
    - Deck validation
    - Simulation execution
    - Result parsing
    - Error diagnosis
    - Automatic retry with fixes
    """
    
    def __init__(
        self,
        runner: SimulationRunner,
        max_retries: int = 2
    ):
        self.runner = runner
        self.max_retries = max_retries
    
    async def run_simulation(
        self,
        deck_content: str,
        auto_fix: bool = True
    ) -> SimulationResult:
        """
        Run a simulation with automatic error recovery.
        """
        current_deck = deck_content
        all_errors = []
        
        for attempt in range(self.max_retries + 1):
            # Create and run job
            job = self.runner.create_job(
                current_deck,
                job_name=f"sim_attempt_{attempt}"
            )
            
            job = await self.runner.run_async(job)
            
            # Parse results
            parser = ResultParser(job.output_dir)
            
            if job.succeeded:
                # Success! Parse and return results
                summary = parser.get_production_summary()
                
                return SimulationResult(
                    success=True,
                    job_id=job.job_id,
                    runtime_seconds=job.runtime_seconds or 0,
                    timesteps_completed=job.timesteps_completed,
                    final_time_days=summary.get('final_time_days', 0),
                    cumulative_oil_stb=summary.get('cumulative_oil'),
                    cumulative_water_stb=summary.get('cumulative_water'),
                    warnings=summary.get('warnings', []),
                    output_directory=job.output_dir
                )
            
            # Diagnose errors
            errors = ErrorDiagnostics.diagnose(job.stderr, job.stdout)
            all_errors.extend(errors)
            
            # Try auto-fix if enabled and we have retries left
            if auto_fix and attempt < self.max_retries and errors:
                for error in errors:
                    current_deck, fix_desc = ErrorDiagnostics.suggest_deck_fix(
                        error, current_deck
                    )
                    if fix_desc != "No automatic fix available":
                        print(f"Auto-fix applied: {fix_desc}")
                        break
                else:
                    # No fixable errors found
                    break
            else:
                break
        
        # Return failure result
        return SimulationResult(
            success=False,
            job_id=job.job_id,
            runtime_seconds=job.runtime_seconds or 0,
            timesteps_completed=job.timesteps_completed,
            final_time_days=0,
            errors=all_errors,
            output_directory=job.output_dir
        )
    
    def format_result_summary(self, result: SimulationResult) -> str:
        """
        Format result for user presentation.
        
        This is what CLARISSA shows to the engineer.
        """
        lines = []
        
        if result.success:
            lines.append(f"✓ Simulation completed successfully")
            lines.append(f"  Runtime: {result.runtime_seconds:.1f} seconds")
            lines.append(f"  Timesteps: {result.timesteps_completed}")
            
            if result.cumulative_oil_stb:
                lines.append(f"  Cumulative oil: {result.cumulative_oil_stb:,.0f} STB")
            if result.cumulative_water_stb:
                lines.append(f"  Cumulative water: {result.cumulative_water_stb:,.0f} STB")
            if result.water_breakthrough_days:
                lines.append(f"  Water breakthrough: Day {result.water_breakthrough_days:.0f}")
        else:
            lines.append(f"✗ Simulation failed")
            lines.append(f"  Completed {result.timesteps_completed} timesteps before failure")
            
            if result.errors:
                lines.append("")
                lines.append("Errors diagnosed:")
                for err in result.errors:
                    lines.append(f"  • {err.message}")
                    if err.suggested_fix:
                        lines.append(f"    Fix: {err.suggested_fix}")
        
        if result.warnings:
            lines.append("")
            lines.append("Warnings:")
            for warn in result.warnings[:5]:  # Limit to 5
                lines.append(f"  ⚠ {warn}")
        
        return "\n".join(lines)

print("SimulationService class defined")

## Summary: OPM Flow Integration

### Key Components

1. **SimulationRunner**: Executes OPM Flow (native or Docker)
2. **ResultParser**: Reads ECLIPSE binary output files
3. **ErrorDiagnostics**: Classifies errors and suggests fixes
4. **SimulationService**: Orchestrates the complete workflow

### CLARISSA Integration Points

```python
# In CLARISSA's workflow:

# 1. User describes model
# 2. CLARISSA generates deck
deck = generate_complete_deck(...)

# 3. Submit to simulation service
service = SimulationService(runner)
result = await service.run_simulation(deck)

# 4. Report back to user
summary = service.format_result_summary(result)
```

### Next Notebook

In **03_Knowledge_Layer.ipynb**, we'll cover:
- Vector database setup with pgvector
- Embedding generation for simulator documentation
- Semantic search for keyword assistance
- Analog database for reservoir properties