# APSIM SQLite to LLM-ready JSON Converter

Converts SQLite databases generated by APSIM (Agricultural Production Systems sIMulator) or APSIM-derived pipelines into clean, deterministic, LLM-readable JSON format while preserving scientific meaning.

## Features

- **Complete table enumeration**: Automatically discovers and exports all tables in the database
- **APSIM-aware processing**: Special handling for Daily, Report, _Messages, and metadata tables
- **Data cleaning**: Converts dates to ISO format, handles APSIM placeholder dates, and preserves data types
- **LLM-ready output**: Structured JSON format optimized for AI reasoning and analysis
- **Data dictionary**: Auto-generates APSIM-aware column descriptions
- **Year-grouped exports**: For large Daily tables (>5000 rows), creates additional year-grouped files

## Default Settings

- **Input directory**: `C:\Users\ibian\Desktop\ClimAdapt\Anameka\APSIM files`
- **Output location**: Same folder as input file (not a subdirectory)

In [1]:
# Import required libraries
import sqlite3
import json
import os
import sys
import re
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
from collections import defaultdict

# Default input directory
DEFAULT_INPUT_DIR = r"C:\Users\ibian\Desktop\ClimAdapt\Anameka\APSIM files"

print("✓ Imports successful")

✓ Imports successful


## APSIMSQLiteConverter Class

The main converter class that handles all database operations and JSON export.

In [2]:
class APSIMSQLiteConverter:
    """Converts APSIM SQLite databases to LLM-ready JSON format."""
    
    # APSIM placeholder dates that should be converted to null
    INVALID_DATES = [
        '0001-01-01',
        '0001-01-01 00:00:00',
        '0001-01-01T00:00:00',
        '0001-01-01T00:00:00Z'
    ]
    
    def __init__(self, db_path: str, output_dir: str = None):
        """
        Initialize converter.
        
        Args:
            db_path: Path to SQLite database file
            output_dir: Directory to write JSON files (default: same folder as input file)
        """
        self.db_path = Path(db_path)
        if not self.db_path.exists():
            raise FileNotFoundError(f"Database file not found: {db_path}")
        
        if output_dir is None:
            # Write to same folder as input file
            self.output_dir = self.db_path.parent
        else:
            self.output_dir = Path(output_dir)
        
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.conn = None
        self.data_dictionary = {}
    
    def __enter__(self):
        """Context manager entry."""
        self.conn = sqlite3.connect(str(self.db_path))
        self.conn.row_factory = sqlite3.Row  # Enable column access by name
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""
        if self.conn:
            self.conn.close()
    
    def get_tables(self) -> List[str]:
        """Enumerate all tables in the database."""
        cursor = self.conn.cursor()
        cursor.execute("""
            SELECT name FROM sqlite_master 
            WHERE type='table' 
            ORDER BY name
        """)
        return [row[0] for row in cursor.fetchall()]
    
    def get_table_schema(self, table_name: str) -> Dict[str, str]:
        """Get column names and types for a table."""
        cursor = self.conn.cursor()
        cursor.execute(f"PRAGMA table_info({table_name})")
        schema = {}
        for row in cursor.fetchall():
            col_name = row[1]
            col_type = row[2].upper()
            # Normalize SQLite types
            if 'INT' in col_type:
                schema[col_name] = 'integer'
            elif 'REAL' in col_type or 'FLOAT' in col_type or 'DOUBLE' in col_type:
                schema[col_name] = 'float'
            elif 'TEXT' in col_type or 'CHAR' in col_type or 'VARCHAR' in col_type:
                schema[col_name] = 'text'
            elif 'BLOB' in col_type:
                schema[col_name] = 'blob'
            else:
                schema[col_name] = 'text'  # Default
        return schema
    
    def clean_value(self, value: Any, col_type: str) -> Any:
        """
        Clean a single value according to APSIM rules.
        
        Args:
            value: Raw value from database
            col_type: Column type from schema
        
        Returns:
            Cleaned value
        """
        # Handle None
        if value is None:
            return None
        
        # Handle empty strings
        if isinstance(value, str) and value.strip() == '':
            return None
        
        # Handle dates
        if isinstance(value, str):
            # Check for invalid APSIM placeholder dates
            if value.strip() in self.INVALID_DATES:
                return None
            
            # Try to parse and reformat dates
            if self._looks_like_date(value):
                cleaned = self._clean_date(value)
                if cleaned:
                    return cleaned
        
        # Preserve types
        if col_type == 'integer':
            try:
                return int(value)
            except (ValueError, TypeError):
                return value
        elif col_type == 'float':
            try:
                return float(value)
            except (ValueError, TypeError):
                return value
        elif col_type == 'text':
            return str(value)
        
        return value
    
    def _looks_like_date(self, value: str) -> bool:
        """Check if a string looks like a date."""
        if not isinstance(value, str):
            return False
        
        # Common date patterns
        date_patterns = [
            r'\d{4}-\d{2}-\d{2}',  # YYYY-MM-DD
            r'\d{2}/\d{2}/\d{4}',  # MM/DD/YYYY
            r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}',  # YYYY-MM-DD HH:MM:SS
        ]
        
        for pattern in date_patterns:
            if re.match(pattern, value.strip()):
                return True
        
        return False
    
    def _clean_date(self, value: str) -> Optional[str]:
        """
        Convert date string to ISO format YYYY-MM-DD.
        
        Returns None if date is invalid or placeholder.
        """
        if not value or value.strip() in self.INVALID_DATES:
            return None
        
        value = value.strip()
        
        # Already in ISO format?
        if len(value) >= 10 and value[4] == '-' and value[7] == '-':
            date_part = value[:10]
            if date_part in self.INVALID_DATES:
                return None
            return date_part
        
        # Try parsing various formats
        date_formats = [
            '%Y-%m-%d',
            '%Y-%m-%d %H:%M:%S',
            '%Y-%m-%dT%H:%M:%S',
            '%Y-%m-%dT%H:%M:%SZ',
            '%m/%d/%Y',
            '%d/%m/%Y',
            '%Y/%m/%d',
        ]
        
        for fmt in date_formats:
            try:
                dt = datetime.strptime(value, fmt)
                return dt.strftime('%Y-%m-%d')
            except ValueError:
                continue
        
        # If we can't parse it, return as-is (might not be a date)
        return value
    
    def export_table(self, table_name: str) -> Dict[str, Any]:
        """
        Export a single table to JSON structure.
        
        Returns:
            Dictionary with table data in the required JSON structure
        """
        cursor = self.conn.cursor()
        cursor.execute(f"SELECT * FROM {table_name}")
        
        schema = self.get_table_schema(table_name)
        rows = cursor.fetchall()
        
        # Clean rows
        cleaned_rows = []
        for row in rows:
            cleaned_row = {}
            for col_name, col_type in schema.items():
                value = row[col_name]
                cleaned_value = self.clean_value(value, col_type)
                cleaned_row[col_name] = cleaned_value
            cleaned_rows.append(cleaned_row)
        
        return {
            "table": table_name,
            "row_count": len(cleaned_rows),
            "columns": schema,
            "rows": cleaned_rows
        }
    
    def export_daily_table(self, table_name: str) -> None:
        """
        Export Daily table with APSIM-specific handling.
        
        If row_count > 5000, also generates Daily_by_year.json.
        """
        data = self.export_table(table_name)
        
        # Write main Daily.json
        output_path = self.output_dir / f"{table_name}.json"
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        
        print(f"  ✓ Exported {table_name}.json ({data['row_count']} rows)")
        
        # Generate Daily_by_year.json if row_count > 5000
        if data['row_count'] > 5000:
            grouped_data = self._group_daily_by_year(data)
            output_path = self.output_dir / f"{table_name}_by_year.json"
            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump(grouped_data, f, indent=2, ensure_ascii=False)
            print(f"  ✓ Exported {table_name}_by_year.json")
    
    def _group_daily_by_year(self, data: Dict[str, Any]) -> Dict[str, Any]:
        """Group Daily table rows by Year column."""
        grouped = defaultdict(list)
        
        for row in data['rows']:
            year = row.get('Year')
            if year is not None:
                grouped[str(year)].append(row)
            else:
                grouped['null'].append(row)
        
        return {
            "table": data['table'],
            "grouped_by": "Year",
            "data": dict(grouped)
        }
    
    def export_report_table(self, table_name: str) -> None:
        """Export Report table with APSIM-specific handling."""
        data = self.export_table(table_name)
        
        # Ensure date fields are cleaned
        date_columns = [col for col in data['columns'].keys() 
                       if any(keyword in col.lower() for keyword in ['date', 'sowing', 'flowering', 'maturity'])]
        
        for row in data['rows']:
            for col in date_columns:
                if col in row:
                    row[col] = self._clean_date(str(row[col])) if row[col] else None
        
        output_path = self.output_dir / f"{table_name}.json"
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        
        print(f"  ✓ Exported {table_name}.json ({data['row_count']} rows)")
    
    def export_messages_table(self, table_name: str) -> None:
        """Export _Messages table preserving verbatim message text."""
        data = self.export_table(table_name)
        
        output_path = self.output_dir / f"{table_name}.json"
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        
        print(f"  ✓ Exported {table_name}.json ({data['row_count']} rows)")
    
    def export_metadata_table(self, table_name: str) -> None:
        """Export metadata tables (_Simulations, _Checkpoints, etc.)."""
        data = self.export_table(table_name)
        
        output_path = self.output_dir / f"{table_name}.json"
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        
        print(f"  ✓ Exported {table_name}.json ({data['row_count']} rows)")
    
    def generate_data_dictionary(self, tables: List[str]) -> None:
        """
        Generate data_dictionary.json with APSIM-aware descriptions.
        
        Args:
            tables: List of table names to document
        """
        dictionary = {}
        
        for table_name in tables:
            schema = self.get_table_schema(table_name)
            dictionary[table_name] = {}
            
            for col_name, col_type in schema.items():
                description = self._infer_column_description(table_name, col_name, col_type)
                dictionary[table_name][col_name] = {
                    "type": col_type,
                    "description": description
                }
        
        output_path = self.output_dir / "data_dictionary.json"
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(dictionary, f, indent=2, ensure_ascii=False)
        
        print(f"  ✓ Generated data_dictionary.json")
    
    def _infer_column_description(self, table_name: str, col_name: str, col_type: str) -> str:
        """
        Infer APSIM-aware description for a column.
        
        Uses common APSIM terminology and patterns.
        """
        col_lower = col_name.lower()
        
        # Phenology-related
        if 'flowering' in col_lower and 'day' in col_lower:
            return "Number of days from sowing to flowering as simulated by APSIM phenology"
        if 'maturity' in col_lower and 'day' in col_lower:
            return "Number of days from sowing to maturity as simulated by APSIM phenology"
        if 'sowing' in col_lower and 'day' in col_lower:
            return "Number of days from start of simulation to sowing"
        
        # Dates
        if 'date' in col_lower:
            if 'sowing' in col_lower:
                return "Sowing date in ISO format (YYYY-MM-DD)"
            if 'flowering' in col_lower:
                return "Flowering date in ISO format (YYYY-MM-DD)"
            if 'maturity' in col_lower:
                return "Maturity/harvest date in ISO format (YYYY-MM-DD)"
            return "Date in ISO format (YYYY-MM-DD)"
        
        # Yield and biomass
        if 'yield' in col_lower:
            return "Crop yield as simulated by APSIM (units depend on crop and configuration)"
        if 'biomass' in col_lower:
            return "Plant biomass pool as simulated by APSIM"
        if 'drymatter' in col_lower or 'dm' in col_lower:
            return "Dry matter content as simulated by APSIM"
        
        # Water balance
        if 'water' in col_lower or 'sw' in col_lower:
            if 'stress' in col_lower:
                return "Water stress index (0-1, where 1 indicates no stress)"
            if 'content' in col_lower or 'mm' in col_lower:
                return "Soil water content (typically in mm)"
            return "Water-related variable from APSIM water balance module"
        
        # Nitrogen
        if 'nitrogen' in col_lower or 'n_' in col_lower or col_lower.startswith('n '):
            if 'stress' in col_lower:
                return "Nitrogen stress index (0-1, where 1 indicates no stress)"
            return "Nitrogen-related variable from APSIM nitrogen balance module"
        
        # Weather/climate
        if 'rain' in col_lower or 'precip' in col_lower:
            return "Precipitation/rainfall (typically in mm)"
        if 'temp' in col_lower or 'temperature' in col_lower:
            return "Temperature (typically in degrees Celsius)"
        if 'radn' in col_lower or 'radiation' in col_lower:
            return "Solar radiation (typically in MJ/m²)"
        if 'pan' in col_lower:
            return "Pan evaporation (typically in mm)"
        
        # Stress indices
        if 'stress' in col_lower:
            return "Stress index (0-1, where 1 indicates no stress, 0 indicates maximum stress)"
        
        # Survival
        if 'survival' in col_lower or 'survive' in col_lower:
            return "Crop survival flag (boolean or numeric indicator)"
        
        # Simulation identifiers
        if 'simulation' in col_lower or 'sim' in col_lower:
            return "Simulation identifier or name"
        if 'zone' in col_lower:
            return "Geographic zone or region identifier"
        if 'cultivar' in col_lower or 'variety' in col_lower:
            return "Crop cultivar or variety identifier"
        if col_lower == 'year':
            return "Simulation year"
        if col_lower in ['day', 'dayofyear', 'doy']:
            return "Day of year (1-365/366)"
        
        # Messages
        if table_name.startswith('_Messages') or 'message' in col_lower:
            if 'text' in col_lower or 'message' in col_lower:
                return "Message text from APSIM (warnings, errors, or informational)"
            if 'severity' in col_lower or 'type' in col_lower:
                return "Message severity or type (e.g., warning, error, info)"
        
        # Metadata tables
        if table_name.startswith('_'):
            return f"Metadata field from {table_name} table"
        
        # Generic fallback
        return f"{col_type} field from {table_name} table (meaning may be context-dependent)"
    
    def convert(self) -> None:
        """Main conversion method - processes entire database."""
        print(f"Converting APSIM SQLite database: {self.db_path.name}")
        print(f"Output directory: {self.output_dir}")
        print()
        
        tables = self.get_tables()
        print(f"Found {len(tables)} table(s): {', '.join(tables)}")
        print()
        
        # Process each table
        for table_name in tables:
            print(f"Processing {table_name}...")
            
            try:
                if 'Daily' in table_name and not table_name.startswith('_'):
                    self.export_daily_table(table_name)
                elif 'Report' in table_name and not table_name.startswith('_'):
                    self.export_report_table(table_name)
                elif table_name.startswith('_Messages') or '_Messages' in table_name:
                    self.export_messages_table(table_name)
                elif table_name.startswith('_'):
                    self.export_metadata_table(table_name)
                else:
                    # Generic table export
                    data = self.export_table(table_name)
                    output_path = self.output_dir / f"{table_name}.json"
                    with open(output_path, 'w', encoding='utf-8') as f:
                        json.dump(data, f, indent=2, ensure_ascii=False)
                    print(f"  ✓ Exported {table_name}.json ({data['row_count']} rows)")
            
            except Exception as e:
                print(f"  ✗ Error processing {table_name}: {e}")
                raise
        
        print()
        print("Generating data dictionary...")
        self.generate_data_dictionary(tables)
        
        print()
        print(f"✓ Conversion complete! Output written to: {self.output_dir}")

print("✓ APSIMSQLiteConverter class loaded")

✓ APSIMSQLiteConverter class loaded


## Usage Examples

### Option 1: Convert a Single Database File

Specify the path to a single `.db` file. Outputs will be written to the same folder.

### Option 2: Batch Process All Databases in Default Directory

Process all `.db` files in the default input directory automatically.

In [None]:
# Batch process all .db files in the default directory
input_dir = Path(DEFAULT_INPUT_DIR)

if not input_dir.exists():
    print(f"Error: Input directory not found: {input_dir}")
else:
    db_files = list(input_dir.glob("*.db"))
    
    if not db_files:
        print(f"No .db files found in {input_dir}")
    else:
        print(f"Found {len(db_files)} database file(s) in {input_dir}")
        print()
        
        for db_file in db_files:
            print("=" * 60)
            print(f"Processing: {db_file.name}")
            print("=" * 60)
            try:
                with APSIMSQLiteConverter(str(db_file)) as converter:
                    converter.convert()
                print()
            except Exception as e:
                print(f"Error processing {db_file.name}: {e}")
                print()
                continue
        
        print("=" * 60)
        print(f"Batch processing complete! Processed {len(db_files)} file(s).")

Found 6 database file(s) in C:\Users\ibian\Desktop\ClimAdapt\Anameka\APSIM files

Processing: ClimAdapt_Wheat_neg31.45_117.55_245_calibrated.db
Converting APSIM SQLite database: ClimAdapt_Wheat_neg31.45_117.55_245_calibrated.db
Output directory: C:\Users\ibian\Desktop\ClimAdapt\Anameka\APSIM files

Found 7 table(s): Daily, Report, _Checkpoints, _InitialConditions, _Messages, _Simulations, _Units

Processing Daily...
  ✓ Exported Daily.json (10574 rows)
  ✓ Exported Daily_by_year.json
Processing Report...
  ✓ Exported Report.json (27 rows)
Processing _Checkpoints...
  ✓ Exported _Checkpoints.json (1 rows)
Processing _InitialConditions...
  ✓ Exported _InitialConditions.json (198 rows)
Processing _Messages...
  ✓ Exported _Messages.json (1936 rows)
Processing _Simulations...
  ✓ Exported _Simulations.json (1 rows)
Processing _Units...
  ✓ Exported _Units.json (2 rows)

Generating data dictionary...
  ✓ Generated data_dictionary.json

✓ Conversion complete! Output written to: C:\Users\ibi

### Option 3: Custom Input Directory

Process all databases in a custom directory.

In [None]:
# Custom input directory
custom_input_dir = r"C:\path\to\your\databases"

input_dir = Path(custom_input_dir)

if not input_dir.exists():
    print(f"Error: Input directory not found: {input_dir}")
else:
    db_files = list(input_dir.glob("*.db"))
    
    if not db_files:
        print(f"No .db files found in {input_dir}")
    else:
        print(f"Found {len(db_files)} database file(s) in {input_dir}")
        print()
        
        for db_file in db_files:
            print("=" * 60)
            print(f"Processing: {db_file.name}")
            print("=" * 60)
            try:
                with APSIMSQLiteConverter(str(db_file)) as converter:
                    converter.convert()
                print()
            except Exception as e:
                print(f"Error processing {db_file.name}: {e}")
                print()
                continue
        
        print("=" * 60)
        print(f"Batch processing complete! Processed {len(db_files)} file(s).")

Error: Input directory not found: C:\path\to\your\databases


## Output Structure

For each database processed, JSON files are created in the same folder as the input file:

- **`Daily.json`**: Daily time-step simulation outputs
- **`Daily_by_year.json`**: Year-grouped version (if row count > 5000)
- **`Report.json`**: Annual/checkpoint summaries
- **`_Messages.json`**: Warnings, errors, and APSIM logs
- **`_Simulations.json`**: Simulation metadata
- **`_Checkpoints.json`**: Checkpoint metadata
- **`data_dictionary.json`**: APSIM-aware column descriptions

Each JSON file follows this structure:

```json
{
  "table": "Daily",
  "row_count": 3650,
  "columns": {
    "Year": "integer",
    "Day": "integer",
    "Date": "text",
    "Rain": "float"
  },
  "rows": [
    {
      "Year": 2012,
      "Day": 1,
      "Date": "2012-01-01",
      "Rain": 0.0
    }
  ]
}
```

## File Organization by Simulation

This section provides functionality to automatically identify, classify, and organize database files by scenario and coordinate into simulation-specific folders.

In [None]:
import shutil
from collections import defaultdict

class APSIMFileOrganizer:
    """Organizes APSIM database files by scenario and coordinate into simulation folders."""
    
    # Known scenarios
    SCENARIOS = ['245', '558', '585', 'past']
    
    def __init__(self, input_dir: str):
        """
        Initialize organizer.
        
        Args:
            input_dir: Directory containing .db files
        """
        self.input_dir = Path(input_dir)
        if not self.input_dir.exists():
            raise FileNotFoundError(f"Input directory not found: {input_dir}")
    
    def parse_filename(self, filename: str) -> dict:
        """
        Parse filename to extract scenario and coordinate information.
        
        Examples:
            neg31.45_117.55_245.db -> {'coordinate': 'neg31.45_117.55', 'scenario': '245'}
            ClimAdapt_Wheat_neg31.45_117.55_245_calibrated.db -> {'coordinate': 'neg31.45_117.55', 'scenario': '245'}
            neg31.45_117.55_585_calibrated.db -> {'coordinate': 'neg31.45_117.55', 'scenario': '585'}
        
        Returns:
            Dictionary with 'coordinate', 'scenario', and 'full_name' keys
        """
        # Remove extension
        name = Path(filename).stem
        
        # Strip common prefixes that might be in the filename
        # Remove prefixes like "ClimAdapt_Wheat_", "ClimAdapt_", etc.
        name_cleaned = name
        prefixes_to_remove = [
            'ClimAdapt_Wheat_',
            'ClimAdapt_',
            'Wheat_',
        ]
        for prefix in prefixes_to_remove:
            if name_cleaned.startswith(prefix):
                name_cleaned = name_cleaned[len(prefix):]
        
        # Remove common suffixes like "_calibrated"
        if name_cleaned.endswith('_calibrated'):
            name_cleaned = name_cleaned[:-len('_calibrated')]
        
        # Now try to find scenario and coordinate
        scenario = None
        coordinate = None
        
        # First, try to find scenario at the end (most common pattern)
        for scen in self.SCENARIOS:
            if name_cleaned.endswith(f'_{scen}'):
                scenario = scen
                coordinate = name_cleaned[:-len(f'_{scen}')].strip('_')
                break
            elif name_cleaned.endswith(scen) and len(name_cleaned) > len(scen):
                # Check if it's really the scenario (not part of a number)
                if name_cleaned[-len(scen)-1] == '_' or name_cleaned[-len(scen)-1].isalpha():
                    scenario = scen
                    coordinate = name_cleaned[:-len(scen)].strip('_')
                    break
        
        # If scenario found, extract coordinate pattern
        if scenario:
            # Look for coordinate pattern: negXX.XX_XXX.XX
            coord_match = re.search(r'(neg\d+\.\d+_\d+\.\d+)', coordinate)
            if coord_match:
                coordinate = coord_match.group(1)
            else:
                # If no clear coordinate pattern, use the whole thing before scenario
                # but try to clean it up
                coordinate = coordinate.strip('_')
        else:
            # No scenario found, try to extract coordinate pattern first
            coord_match = re.search(r'(neg\d+\.\d+_\d+\.\d+)', name_cleaned)
            if coord_match:
                coordinate = coord_match.group(1)
                # Check what comes after the coordinate
                remaining = name_cleaned[coord_match.end():].strip('_')
                if remaining in self.SCENARIOS:
                    scenario = remaining
                else:
                    # Default to 'past' if no scenario found
                    scenario = 'past'
            else:
                # Try to find coordinate pattern in parts
                parts = name_cleaned.split('_')
                for i in range(len(parts) - 1):
                    potential_coord = '_'.join(parts[i:i+2])
                    if re.match(r'neg\d+\.\d+_\d+\.\d+', potential_coord):
                        coordinate = potential_coord
                        # Check next part for scenario
                        if i + 2 < len(parts):
                            potential_scenario = parts[i + 2]
                            if potential_scenario in self.SCENARIOS:
                                scenario = potential_scenario
                            else:
                                scenario = 'past'
                        else:
                            scenario = 'past'
                        break
        
        # Final fallback
        if coordinate is None:
            # Try one more time with the cleaned name
            coord_match = re.search(r'(neg\d+\.\d+_\d+\.\d+)', name_cleaned)
            if coord_match:
                coordinate = coord_match.group(1)
                scenario = 'past'  # Default scenario
            else:
                coordinate = name_cleaned
                scenario = 'past'
        
        return {
            'coordinate': coordinate,
            'scenario': scenario,
            'full_name': name,
            'original_filename': filename
        }
    
    def get_simulation_name(self, coordinate: str, scenario: str, crop: str = 'Wheat', 
                           prefix: str = 'ClimAdapt', suffix: str = 'calibrated') -> str:
        """
        Generate simulation folder name.
        
        Format: {prefix}_{crop}_{coordinate}_{scenario}_{suffix}
        Example: ClimAdapt_Wheat_neg31.45_117.55_245_calibrated
        
        Args:
            coordinate: Coordinate string (e.g., 'neg31.45_117.55')
            scenario: Scenario string ('245', '558', or 'past')
            crop: Crop name (default: 'Wheat')
            prefix: Prefix for folder name (default: 'ClimAdapt')
            suffix: Suffix for folder name (default: 'calibrated')
        
        Returns:
            Simulation folder name
        """
        return f"{prefix}_{crop}_{coordinate}_{scenario}_{suffix}"
    
    def organize_files(self, move_files: bool = False, crop: str = 'Wheat', 
                      prefix: str = 'ClimAdapt', suffix: str = 'calibrated') -> dict:
        """
        Organize .db files into simulation-specific folders.
        
        Args:
            move_files: If True, move files; if False, copy files
            crop: Crop name for folder naming
            prefix: Prefix for folder name
            suffix: Suffix for folder name
        
        Returns:
            Dictionary with organization results
        """
        db_files = list(self.input_dir.glob("*.db"))
        
        if not db_files:
            print(f"No .db files found in {self.input_dir}")
            return {}
        
        print(f"Found {len(db_files)} database file(s)")
        print()
        
        # Group files by simulation
        simulation_groups = defaultdict(list)
        file_info = {}
        
        for db_file in db_files:
            parsed = self.parse_filename(db_file.name)
            sim_name = self.get_simulation_name(
                parsed['coordinate'], 
                parsed['scenario'],
                crop=crop,
                prefix=prefix,
                suffix=suffix
            )
            
            simulation_groups[sim_name].append(db_file)
            file_info[db_file.name] = parsed
        
        print(f"Identified {len(simulation_groups)} unique simulation(s):")
        for sim_name, files in simulation_groups.items():
            print(f"  - {sim_name}: {len(files)} file(s)")
        print()
        
        # Create folders and organize files
        results = {
            'simulations': {},
            'total_files': len(db_files),
            'total_simulations': len(simulation_groups)
        }
        
        for sim_name, files in simulation_groups.items():
            sim_folder = self.input_dir / sim_name
            sim_folder.mkdir(exist_ok=True)
            
            print(f"Organizing {sim_name}...")
            moved_files = []
            
            for db_file in files:
                dest_path = sim_folder / db_file.name
                
                try:
                    if move_files:
                        if dest_path.exists():
                            print(f"  ⚠ Skipping {db_file.name} (already exists in destination)")
                        else:
                            shutil.move(str(db_file), str(dest_path))
                            print(f"  ✓ Moved {db_file.name}")
                            moved_files.append(db_file.name)
                    else:
                        if dest_path.exists():
                            print(f"  ⚠ Skipping {db_file.name} (already exists in destination)")
                        else:
                            shutil.copy2(str(db_file), str(dest_path))
                            print(f"  ✓ Copied {db_file.name}")
                            moved_files.append(db_file.name)
                except Exception as e:
                    print(f"  ✗ Error processing {db_file.name}: {e}")
            
            results['simulations'][sim_name] = {
                'folder': str(sim_folder),
                'files': moved_files,
                'file_count': len(moved_files),
                'coordinate': file_info[files[0].name]['coordinate'],
                'scenario': file_info[files[0].name]['scenario']
            }
            print()
        
        return results
    
    def analyze_files(self) -> dict:
        """
        Analyze files without organizing them.
        
        Returns:
            Dictionary with analysis results
        """
        db_files = list(self.input_dir.glob("*.db"))
        
        if not db_files:
            return {'files': [], 'simulations': {}, 'total_files': 0}
        
        analysis = {
            'files': [],
            'simulations': defaultdict(lambda: {'files': [], 'coordinates': set(), 'scenarios': set()}),
            'total_files': len(db_files)
        }
        
        for db_file in db_files:
            parsed = self.parse_filename(db_file.name)
            sim_name = self.get_simulation_name(
                parsed['coordinate'],
                parsed['scenario']
            )
            
            file_data = {
                'filename': db_file.name,
                'coordinate': parsed['coordinate'],
                'scenario': parsed['scenario'],
                'simulation': sim_name
            }
            
            analysis['files'].append(file_data)
            analysis['simulations'][sim_name]['files'].append(db_file.name)
            analysis['simulations'][sim_name]['coordinates'].add(parsed['coordinate'])
            analysis['simulations'][sim_name]['scenarios'].add(parsed['scenario'])
        
        # Convert sets to lists for JSON serialization
        for sim_name in analysis['simulations']:
            analysis['simulations'][sim_name]['coordinates'] = list(analysis['simulations'][sim_name]['coordinates'])
            analysis['simulations'][sim_name]['scenarios'] = list(analysis['simulations'][sim_name]['scenarios'])
        
        return analysis

print("✓ APSIMFileOrganizer class loaded")

✓ APSIMFileOrganizer class loaded


### Step 1: Analyze Files (Preview Before Organizing)

First, analyze the files to see how they will be classified:

In [None]:
# Analyze files in the default directory
organizer = APSIMFileOrganizer(DEFAULT_INPUT_DIR)
analysis = organizer.analyze_files()

print(f"Total files found: {analysis['total_files']}")
print(f"Unique simulations identified: {len(analysis['simulations'])}")
print()
print("File Classification:")
print("=" * 80)

for file_data in analysis['files'][:20]:  # Show first 20 files
    print(f"{file_data['filename']:50s} -> {file_data['simulation']}")
    print(f"  Coordinate: {file_data['coordinate']}, Scenario: {file_data['scenario']}")

if len(analysis['files']) > 20:
    print(f"\n... and {len(analysis['files']) - 20} more file(s)")

print()
print("Simulation Groups:")
print("=" * 80)
for sim_name, sim_data in analysis['simulations'].items():
    print(f"{sim_name}:")
    print(f"  Files: {len(sim_data['files'])}")
    print(f"  Coordinates: {', '.join(sim_data['coordinates'])}")
    print(f"  Scenarios: {', '.join(sim_data['scenarios'])}")
    print()

Total files found: 6
Unique simulations identified: 6

File Classification:
ClimAdapt_Wheat_neg31.45_117.55_245_calibrated.db  -> ClimAdapt_Wheat_neg31.45_117.55_245_calibrated
  Coordinate: neg31.45_117.55, Scenario: 245
ClimAdapt_Wheat_neg31.45_117.55_585_calibrated.db  -> ClimAdapt_Wheat_neg31.45_117.55_585_calibrated
  Coordinate: neg31.45_117.55, Scenario: 585
ClimAdapt_Wheat_neg31.45_117.55_past.db            -> ClimAdapt_Wheat_neg31.45_117.55_past_calibrated
  Coordinate: neg31.45_117.55, Scenario: past
ClimAdapt_Wheat_neg31.75_117.60_245_calibrated.db  -> ClimAdapt_Wheat_neg31.75_117.60_245_calibrated
  Coordinate: neg31.75_117.60, Scenario: 245
ClimAdapt_Wheat_neg31.75_117.60_585_calibrated.db  -> ClimAdapt_Wheat_neg31.75_117.60_585_calibrated
  Coordinate: neg31.75_117.60, Scenario: 585
ClimAdapt_Wheat_neg31.75_117.60_past.db            -> ClimAdapt_Wheat_neg31.75_117.60_past_calibrated
  Coordinate: neg31.75_117.60, Scenario: past

Simulation Groups:
ClimAdapt_Wheat_neg31.45

### Step 2: Organize Files into Simulation Folders

Choose one of the options below:

- **Copy files** (recommended): Creates copies in organized folders, keeps originals
- **Move files**: Moves files to organized folders (removes from original location)

In [None]:
# OPTION A: Copy files (keeps originals in place)
# This is the recommended option for safety

organizer = APSIMFileOrganizer(DEFAULT_INPUT_DIR)
results = organizer.organize_files(
    move_files=False,  # Copy instead of move
    crop='Wheat',      # Change if needed
    prefix='ClimAdapt', # Change if needed
    suffix='calibrated' # Change if needed
)

print("=" * 80)
print("Organization Summary:")
print(f"Total files processed: {results['total_files']}")
print(f"Simulations created: {results['total_simulations']}")
print()
for sim_name, sim_data in results['simulations'].items():
    print(f"{sim_name}:")
    print(f"  Folder: {sim_data['folder']}")
    print(f"  Files: {sim_data['file_count']}")
    print(f"  Coordinate: {sim_data['coordinate']}")
    print(f"  Scenario: {sim_data['scenario']}")
    print()

Found 6 database file(s)

Identified 6 unique simulation(s):
  - ClimAdapt_Wheat_neg31.45_117.55_245_calibrated: 1 file(s)
  - ClimAdapt_Wheat_neg31.45_117.55_585_calibrated: 1 file(s)
  - ClimAdapt_Wheat_neg31.45_117.55_past_calibrated: 1 file(s)
  - ClimAdapt_Wheat_neg31.75_117.60_245_calibrated: 1 file(s)
  - ClimAdapt_Wheat_neg31.75_117.60_585_calibrated: 1 file(s)
  - ClimAdapt_Wheat_neg31.75_117.60_past_calibrated: 1 file(s)

Organizing ClimAdapt_Wheat_neg31.45_117.55_245_calibrated...
  ✓ Copied ClimAdapt_Wheat_neg31.45_117.55_245_calibrated.db

Organizing ClimAdapt_Wheat_neg31.45_117.55_585_calibrated...
  ✓ Copied ClimAdapt_Wheat_neg31.45_117.55_585_calibrated.db

Organizing ClimAdapt_Wheat_neg31.45_117.55_past_calibrated...
  ✓ Copied ClimAdapt_Wheat_neg31.45_117.55_past.db

Organizing ClimAdapt_Wheat_neg31.75_117.60_245_calibrated...
  ✓ Copied ClimAdapt_Wheat_neg31.75_117.60_245_calibrated.db

Organizing ClimAdapt_Wheat_neg31.75_117.60_585_calibrated...
  ✓ Copied ClimAdapt

In [None]:
# OPTION B: Move files (removes from original location)
# Use with caution - this will move files from the original location

# Uncomment the lines below to use this option:
# organizer = APSIMFileOrganizer(DEFAULT_INPUT_DIR)
# results = organizer.organize_files(
#     move_files=True,  # Move instead of copy
#     crop='Wheat',
#     prefix='ClimAdapt',
#     suffix='calibrated'
# )

### Step 3: Convert Organized Files to JSON

After organizing files, you can convert each simulation's databases to JSON. The JSON files will be created in each simulation's folder.

In [None]:
# Convert all databases in organized simulation folders to JSON
input_dir = Path(DEFAULT_INPUT_DIR)

# Find all simulation folders (folders containing .db files)
simulation_folders = [d for d in input_dir.iterdir() 
                     if d.is_dir() and list(d.glob("*.db"))]

print(f"Found {len(simulation_folders)} simulation folder(s)")
print()

for sim_folder in simulation_folders:
    print("=" * 80)
    print(f"Processing simulation: {sim_folder.name}")
    print("=" * 80)
    
    db_files = list(sim_folder.glob("*.db"))
    print(f"Found {len(db_files)} database file(s) in this folder")
    print()
    
    for db_file in db_files:
        print(f"Converting: {db_file.name}")
        try:
            # Output will be in the same folder as the .db file
            with APSIMSQLiteConverter(str(db_file)) as converter:
                converter.convert()
            print()
        except Exception as e:
            print(f"Error converting {db_file.name}: {e}")
            print()
            continue
    
    print(f"✓ Completed processing {sim_folder.name}")
    print()

print("=" * 80)
print("All simulations processed!")

Found 6 simulation folder(s)

Processing simulation: ClimAdapt_Wheat_neg31.45_117.55_245_calibrated
Found 1 database file(s) in this folder

Converting: ClimAdapt_Wheat_neg31.45_117.55_245_calibrated.db
Converting APSIM SQLite database: ClimAdapt_Wheat_neg31.45_117.55_245_calibrated.db
Output directory: C:\Users\ibian\Desktop\ClimAdapt\Anameka\APSIM files\ClimAdapt_Wheat_neg31.45_117.55_245_calibrated

Found 7 table(s): Daily, Report, _Checkpoints, _InitialConditions, _Messages, _Simulations, _Units

Processing Daily...
  ✓ Exported Daily.json (10574 rows)
  ✓ Exported Daily_by_year.json
Processing Report...
  ✓ Exported Report.json (27 rows)
Processing _Checkpoints...
  ✓ Exported _Checkpoints.json (1 rows)
Processing _InitialConditions...
  ✓ Exported _InitialConditions.json (198 rows)
Processing _Messages...
  ✓ Exported _Messages.json (1936 rows)
Processing _Simulations...
  ✓ Exported _Simulations.json (1 rows)
Processing _Units...
  ✓ Exported _Units.json (2 rows)

Generating dat