# MySQL Schema Parser with Gemini Flash

This notebook creates a parser for complex MySQL table schemas, handling features like:
- Type modifiers (UNSIGNED, etc)
- COLLATE clauses
- Column COMMENTS
- Generated columns (AS expressions)
- Complex indexes with BTREE
- Table-level constraints

We'll use this to parse schemas like `plan_impacts` table.

In [None]:
# Sample SQL schema to parse
sql_schema = """
CREATE TABLE `plan_impacts` (
    `id` INT UNSIGNED NOT NULL AUTO_INCREMENT,
    `plan_id` INT UNSIGNED NOT NULL,
    `file_path` VARCHAR(500) NOT NULL COLLATE 'utf8mb4_general_ci',
    `phase` VARCHAR(20) NOT NULL DEFAULT 'idea' COLLATE 'utf8mb4_general_ci',
    `change_type` VARCHAR(20) NOT NULL COLLATE 'utf8mb4_general_ci',
    `status` VARCHAR(20) NOT NULL DEFAULT 'pending' COLLATE 'utf8mb4_general_ci',
    `description` TEXT NULL DEFAULT NULL COLLATE 'utf8mb4_general_ci',
    `batch_id` VARCHAR(36) NULL DEFAULT NULL COMMENT 'Groups related file changes together' COLLATE 'utf8mb4_general_ci',
    `affected_apps` JSON NULL DEFAULT NULL COMMENT 'Array of apps affected by the change',
    `auto_generated` TINYINT(1) NOT NULL DEFAULT '0' COMMENT 'True if auto-generated impact',
    `cross_app_analysis` JSON NULL DEFAULT NULL COMMENT 'Cross-app dependency analysis results',
    `fileName` VARCHAR(255) AS (substring_index(`file_path`,_utf8mb4'/',-(1))) stored,
    `fileFolder` VARCHAR(255) AS (substr(`file_path`,1,((length(`file_path`) - length(substring_index(`file_path`,_utf8mb4'/',-(1)))) - 1))) stored,
    `created_at` TIMESTAMP NULL DEFAULT NULL,
    `created_by` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8mb4_general_ci',
    `updated_at` TIMESTAMP NULL DEFAULT NULL,
    `updated_by` VARCHAR(50) NULL DEFAULT NULL COLLATE 'utf8mb4_general_ci',
    PRIMARY KEY (`id`) USING BTREE,
    INDEX `idx_plan_impacts_plan_id` (`plan_id`) USING BTREE,
    INDEX `idx_plan_impacts_file` (`file_path`) USING BTREE,
    INDEX `Idx_plan_impacts_phase` (`phase`) USING BTREE,
    INDEX `idx_plan_impacts_folder` (`fileFolder`) USING BTREE,
    INDEX `idx_plan_impacts_batch_id` (`batch_id`) USING BTREE,
    INDEX `idx_plan_impacts_auto_generated` (`auto_generated`) USING BTREE,
    CONSTRAINT `plan_impacts_ibfk_1` FOREIGN KEY (`plan_id`) REFERENCES `plans` (`id`) ON UPDATE NO ACTION ON DELETE NO ACTION
)
COLLATE='utf8mb4_general_ci'
ENGINE=InnoDB
AUTO_INCREMENT=153;
"""

In [None]:
import re
import json
from typing import Dict, List, Optional, TypedDict, Union

class ColumnDef(TypedDict):
    name: str
    type: str
    length: Optional[int]
    unsigned: bool
    nullable: bool
    default: Optional[str]
    collate: Optional[str]
    comment: Optional[str]
    autoIncrement: bool
    generated: Optional[Dict[str, str]]  # expression, type (STORED/VIRTUAL)

class IndexDef(TypedDict):
    name: str
    columns: List[str]
    type: str
    using: str

class ForeignKeyDef(TypedDict):
    name: str
    column: str
    references: Dict[str, str]  # table, column
    onUpdate: str
    onDelete: str

class TableSchema(TypedDict):
    name: str
    columns: List[ColumnDef]
    primaryKey: Optional[IndexDef]
    indexes: List[IndexDef]
    foreignKeys: List[ForeignKeyDef]
    options: Dict[str, str]

class MySQLSchemaParser:
    def parse(self, sql: str) -> TableSchema:
        # Extract table name and main content
        table_match = re.match(
            r"CREATE TABLE `(\w+)`\s*\((.*)\)\s*(.*);",
            sql,
            re.DOTALL
        )
        if not table_match:
            raise ValueError("Invalid CREATE TABLE statement")
            
        table_name = table_match.group(1)
        columns_part = table_match.group(2)
        table_options = table_match.group(3)
        
        # Split into individual definitions
        parts = self._split_definitions(columns_part)
        
        # Process each part
        columns: List[ColumnDef] = []
        indexes: List[IndexDef] = []
        foreign_keys: List[ForeignKeyDef] = []
        primary_key = None
        
        for part in parts:
            part = part.strip()
            if part.startswith('`'):  # Column definition
                columns.append(self._parse_column(part))
            elif part.startswith('PRIMARY KEY'):
                primary_key = self._parse_primary_key(part)
            elif part.startswith('INDEX') or part.startswith('KEY'):
                indexes.append(self._parse_index(part))
            elif part.startswith('CONSTRAINT') and 'FOREIGN KEY' in part:
                foreign_keys.append(self._parse_foreign_key(part))
                
        # Parse table options
        options = self._parse_table_options(table_options)
                
        return {
            "name": table_name,
            "columns": columns,
            "primaryKey": primary_key,
            "indexes": indexes,
            "foreignKeys": foreign_keys,
            "options": options
        }

In [None]:
    def _split_definitions(self, content: str) -> List[str]:
        """Split the table content into individual column/constraint definitions"""
        parts = []
        current = []
        paren_level = 0
        
        for char in content:
            if char == '(' and len(current) > 0:
                paren_level += 1
            elif char == ')' and paren_level > 0:
                paren_level -= 1
            elif char == ',' and paren_level == 0:
                parts.append(''.join(current).strip())
                current = []
                continue
                
            current.append(char)
            
        if current:
            parts.append(''.join(current).strip())
            
        return parts
        
    def _parse_column(self, definition: str) -> ColumnDef:
        """Parse a column definition line"""
        # Base pattern for column name and type
        pattern = (
            r"`(\w+)`\s+"  # Column name
            r"(\w+)"       # Base type
            r"(?:\((\d+)\))?"  # Optional length
            r"(\s+UNSIGNED)?"  # UNSIGNED modifier
            r"(\s+(?:NOT\s+)?NULL)?"  # Nullability
            r"(?:\s+DEFAULT\s+([^,\s]+))?"  # Default value
            r"(?:\s+COLLATE\s+'([^']+)')?"  # Collation
            r"(?:\s+COMMENT\s+'([^']+)')?"  # Comment
            r"(?:\s+AUTO_INCREMENT)?"  # Auto increment
            r"(?:\s+AS\s+\((.*?)\)\s+(STORED|VIRTUAL))?"  # Generated column
        )
        
        match = re.match(pattern, definition, re.IGNORECASE | re.DOTALL)
        if not match:
            raise ValueError(f"Invalid column definition: {definition}")
            
        name, type_, length, unsigned, nullable, default, collate, comment, expr, gen_type = match.groups()
        
        return {
            "name": name,
            "type": type_.upper(),
            "length": int(length) if length else None,
            "unsigned": bool(unsigned),
            "nullable": not (nullable and "NOT NULL" in nullable.upper()),
            "default": default.strip("'") if default else None,
            "collate": collate,
            "comment": comment,
            "autoIncrement": "AUTO_INCREMENT" in definition.upper(),
            "generated": {"expression": expr, "type": gen_type} if expr else None
        }
        
    def _parse_primary_key(self, definition: str) -> IndexDef:
        """Parse PRIMARY KEY definition"""
        match = re.match(r"PRIMARY KEY\s*\(`([^`]+)`\)(?:\s+USING\s+(\w+))?", definition)
        if not match:
            raise ValueError(f"Invalid PRIMARY KEY definition: {definition}")
            
        column, using = match.groups()
        return {
            "name": "PRIMARY",
            "columns": [column],
            "type": "PRIMARY",
            "using": using or "BTREE"
        }
        
    def _parse_index(self, definition: str) -> IndexDef:
        """Parse INDEX definition"""
        pattern = r"(?:INDEX|KEY)\s+`([^`]+)`\s*\(`([^`]+)`\)(?:\s+USING\s+(\w+))?"
        match = re.match(pattern, definition)
        if not match:
            raise ValueError(f"Invalid INDEX definition: {definition}")
            
        name, column, using = match.groups()
        return {
            "name": name,
            "columns": [col.strip() for col in column.split(',')],
            "type": "INDEX",
            "using": using or "BTREE"
        }
        
    def _parse_foreign_key(self, definition: str) -> ForeignKeyDef:
        """Parse FOREIGN KEY constraint"""
        pattern = (
            r"CONSTRAINT\s+`([^`]+)`\s+"
            r"FOREIGN KEY\s*\(`([^`]+)`\)\s+"
            r"REFERENCES\s+`([^`]+)`\s*\(`([^`]+)`\)\s*"
            r"(?:ON DELETE\s+(\w+(?:\s+\w+)?))?\s*"
            r"(?:ON UPDATE\s+(\w+(?:\s+\w+)?))?")
            
        match = re.match(pattern, definition)
        if not match:
            raise ValueError(f"Invalid FOREIGN KEY definition: {definition}")
            
        name, column, ref_table, ref_column, on_delete, on_update = match.groups()
        return {
            "name": name,
            "column": column,
            "references": {
                "table": ref_table,
                "column": ref_column
            },
            "onDelete": on_delete or "NO ACTION",
            "onUpdate": on_update or "NO ACTION"
        }
        
    def _parse_table_options(self, options_str: str) -> Dict[str, str]:
        """Parse table-level options"""
        options = {}
        
        if "COLLATE" in options_str:
            collate_match = re.search(r"COLLATE='([^']+)'", options_str)
            if collate_match:
                options["collate"] = collate_match.group(1)
                
        if "ENGINE" in options_str:
            engine_match = re.search(r"ENGINE=(\w+)", options_str)
            if engine_match:
                options["engine"] = engine_match.group(1)
                
        if "AUTO_INCREMENT" in options_str:
            ai_match = re.search(r"AUTO_INCREMENT=(\d+)", options_str)
            if ai_match:
                options["autoIncrement"] = ai_match.group(1)
                
        return options

In [None]:
# Create parser instance and parse the schema
parser = MySQLSchemaParser()
result = parser.parse(sql_schema)

# Pretty print the result
print(json.dumps(result, indent=2))