In [None]:
from pdf2image import convert_from_bytes
from uuid import uuid4
from loguru import logger
import os

ROOT_TEMP_DIR = '/home/naufal/soji_ai/temp'
FILE_ID = uuid4().hex
FILE_PATH = '/home/naufal/soji_ai/documents/EASA_AD_2025-0254R1_1.pdf'
SAVE_IMGS_DIR = os.path.join(ROOT_TEMP_DIR, FILE_ID)

with open(FILE_PATH, "rb") as doc:
    os.makedirs(SAVE_IMGS_DIR, exist_ok=True)
    img_paths = convert_from_bytes(doc.read(), output_folder=SAVE_IMGS_DIR, fmt="png", paths_only=True, dpi=300) # change it use tempdir

# prepare messages function (will be)

def prepare_messages(img_paths: str):
    
    from base64 import b64encode

    messages = [
        "Now, extract the following images!"
    ]


    img_contents = []

    for img_path in img_paths:

        try:
            with open(img_path, "rb") as f:
                logger.info(f"Processing image: {img_path}")
                img_bytes = b64encode(f.read()).decode("utf-8")
                img_contents.append(
                    types.Part.from_bytes(
                        data=img_bytes,
                        mime_type="image/png"
                    )
                )
        except:
            raise

    messages.extend(img_contents)
    return messages

input_message = prepare_messages(
    img_paths=img_paths,
)

# extraction AD api calling part
from google import genai
from google.genai import types
import os

client = genai.Client(
    api_key=os.getenv("GOOGLE_API_KEY")
)

model = "gemini-2.5-flash"
model_config = types.GenerateContentConfig(
    system_instruction=system_prompt,
    temperature=0.1,
    response_mime_type="application/json",
    response_json_schema=ADApplicabilityExtraction.model_json_schema()
)

response = client.models.generate_content(
    model=model,
    config=model_config,
    contents=input_message
)

# save response to json in temdir
import json

outputs = ADApplicabilityExtraction.model_validate_json(response.text).model_dump()
SAVE_JSON_DIR = os.path.join(ROOT_TEMP_DIR, FILE_ID, f"{FILE_ID}_RESULTS.json")
with open(SAVE_JSON_DIR, 'w') as f:
    json.dump(outputs, f, indent=2, ensure_ascii=False)

# inference part
import pandas as pd
import json
from loguru import logger

test_data = pd.read_csv("/home/naufal/soji_ai/test/ad_test_data.csv", sep=",")
ad_23_file_path = "/home/naufal/soji_ai/temp/44fbed7dccdb44099a083f153f20a3f7/44fbed7dccdb44099a083f153f20a3f7_RESULTS.json"
ad_25_file_path = "/home/naufal/soji_ai/temp/1b333a987b10405e9e589c4f1ad0e67d/1b333a987b10405e9e589c4f1ad0e67d_RESULTS.json"

with open(ad_23_file_path, "r") as f:
    ad_23 = json.load(f)

## load if there is more than 1 AD to compare
with open(ad_25_file_path, "r") as f:
    ad_25 = json.load(f)
    
ad_file_dict = {
    "AD 2025-0254R1": ad_25,
    "AD 2025-23-53": ad_23
}

result_df = compare_to_ad(test_data, ad_file_dict=ad_file_dict)

# saving classifacation result to save dir
result_df.to_csv("/home/naufal/soji_ai/test/ad_test_data_result.csv", index=False)

In [48]:
import shutil
shutil.rmtree("/home/naufal/soji_ai/temp/327c89509c2a47b6838f0881e7834f5a")

In [16]:
from enum import Enum
from typing import Optional, List
from pydantic import BaseModel, Field
from google.genai import types

class TimeUnit(str, Enum):
    FLIGHT_HOURS = "flight_hours"
    FLIGHT_CYCLES = "flight_cycles"
    DAYS = "days"
    MONTHS = "months"
    YEARS = "years"
    CALENDAR_DATE = "calendar_date"


class NumericRange(BaseModel):
    start: Optional[int] = Field(
        default=None,
        description=(
            "Lower bound of the MSN range (inclusive by default). "
            "Set to None if there is no lower bound."
        )
    )
    end: Optional[int] = Field(
        default=None,
        description=(
            "Upper bound of the MSN range (inclusive by default). "
            "Set to None if there is no upper bound."
        )
    )
    inclusive_start: bool = Field(
        default=True,
        description="True means >= (greater than or equal to start). False means > (strictly greater than)."
    )
    inclusive_end: bool = Field(
        default=True,
        description="True means <= (less than or equal to end). False means < (strictly less than)."
    )


class MSNConstraint(BaseModel):
    all: Optional[bool] = Field(
        default=None,
        description=(
            "Set to True when the AD explicitly states 'all manufacturer serial numbers (MSN)' or 'all MSN'. "
            "IMPORTANT: Never leave this None when the AD explicitly uses the word 'all' for MSN applicability — "
            "even if other exclusions apply, the 'all' inclusion must still be captured here. "
            "Leave None only when applicability is defined purely by a specific range or list."
        )
    )
    range: Optional[NumericRange] = Field(
        default=None,
        description=(
            "A continuous numeric range of MSNs this constraint covers. "
            "Use when the AD specifies a span like 'MSN 100 through MSN 500'. "
            "Do not use together with specific_msns."
        )
    )
    specific_msns: Optional[List[int]] = Field(
        default=None,
        description=(
            "An explicit list of individual MSN integers this constraint covers. "
            "Use when the AD names specific serial numbers, e.g. 'MSN 364 or MSN 385'. "
            "Do not use together with range."
        )
    )
    excluded: bool = Field(
        default=False,
        description=(
            "Set to True when these MSNs are EXCLUDED from applicability "
            "(AD language like 'except MSN...', 'excluding MSN...'). "
            "Set to False when these MSNs are positively INCLUDED in applicability. "
            "Default is False (inclusion)."
        )
    )


class ModificationConstraint(BaseModel):
    modification_id: str = Field(
        description=(
            "The exact modification identifier as written in the AD. "
            "Always an Airbus 'mod' number, e.g. 'mod 24591', 'mod 24977'. "
            "IMPORTANT: Modification numbers are never Service Bulletins — "
            "do not confuse with SB identifiers (e.g. 'A320-57-XXXX'). "
            "Copy the identifier verbatim from the AD text."
        )
    )
    embodied: Optional[bool] = Field(
        default=None,
        description=(
            "True = this modification IS embodied on the aircraft. "
            "False = this modification is NOT embodied on the aircraft. "
            "None = embodiment status is unspecified or not relevant to this constraint."
        )
    )
    excluded: bool = Field(
        default=False,
        description=(
            "Set to True when aircraft WITH this modification embodied are EXCLUDED from applicability "
            "(AD language like 'except those on which mod XXXXX has been embodied in production'). "
            "Set to False when this modification is a positive inclusion condition. "
            "Default is False (inclusion)."
        )
    )


class ServiceBulletinConstraint(BaseModel):
    sb_identifier: str = Field(
        description=(
            "The exact Service Bulletin identifier as written in the AD, "
            "e.g. 'A320-57-1089', 'A320-57-1100'. "
            "IMPORTANT: Only actual Airbus Service Bulletins belong here (format: 'AXXX-XX-XXXX'). "
            "Airbus modification numbers ('mod XXXXX') must NEVER be placed here — "
            "those belong exclusively in ModificationConstraint. "
            "Copy the identifier verbatim from the AD text, without the 'SB' prefix."
        )
    )
    revision: Optional[str] = Field(
        default=None,
        description=(
            "The revision qualifier for this SB constraint, exactly as stated in the AD. "
            "Examples: 'Revision 04', 'any revision lower than Revision 04', 'Revision 03 or later'. "
            "Leave None if no specific revision is mentioned and any revision applies."
        )
    )
    incorporated: Optional[bool] = Field(
        default=None,
        description=(
            "True = this SB HAS been incorporated on the aircraft. "
            "False = this SB has NOT been incorporated on the aircraft. "
            "None = incorporation status is unspecified or not relevant to this constraint."
        )
    )
    excluded: bool = Field(
        default=False,
        description=(
            "Set to True when aircraft on which this SB HAS been embodied are EXCLUDED from applicability "
            "(AD language like 'except those on which SB XXXX has been embodied'). "
            "Set to False when this SB is a positive inclusion or compliance condition. "
            "Default is False (inclusion)."
        )
    )


class AircraftGroup(BaseModel):
    group_id: str = Field(
        description=(
            "The group label exactly as defined in the AD's Groups section. "
            "Examples: 'Group 1', 'Group 2', 'Group A', 'Group B'. "
            "Use verbatim from the AD — do not invent or rename groups."
        )
    )
    models: Optional[List[str]] = Field(
        default=None,
        description=(
            "Aircraft model variants that belong to this group, "
            "derived from the group definition. "
            "Examples: ['A321-111', 'A321-112'] or ['A320']. "
            "Leave None if the group definition does not restrict by model "
            "(i.e. it applies to all models already listed in the top-level applicability)."
        )
    )
    msn_constraints: Optional[List[MSNConstraint]] = Field(
        default=None,
        description=(
            "MSN-based constraints that define or restrict membership in this group. "
            "Apply the same rules as top-level msn_constraints: "
            "if the group definition says 'all MSN', populate with MSNConstraint(all=True, excluded=False). "
            "If the group is defined by specific MSNs, list them in specific_msns. "
            "Leave None only if MSN is not a factor in this group's definition."
        )
    )
    modification_constraints: Optional[List[ModificationConstraint]] = Field(
        default=None,
        description=(
            "Modification-based constraints that define or exclude aircraft from this group. "
            "Only use ModificationConstraint here — never mix with SB identifiers. "
            "Examples: a group excluding aircraft with a specific mod embodied in production. "
            "Leave None if modifications are not a factor in this group's definition."
        )
    )
    sb_constraints: Optional[List[ServiceBulletinConstraint]] = Field(
        default=None,
        description=(
            "Service Bulletin constraints that define or exclude aircraft from this group. "
            "Only use actual SB identifiers here — never use mod numbers. "
            "Example: a group defined by aircraft on which a specific SB has NOT been embodied. "
            "Leave None if SBs are not a factor in this group's definition."
        )
    )
    description: Optional[str] = Field(
        default=None,
        description=(
            "Free-text fallback for group membership logic that cannot be fully expressed "
            "by the structured fields above. "
            "Transcribe the exact defining sentence from the AD. "
            "Always populate this field — it serves as a human-readable audit trail "
            "even when structured fields are also populated."
        )
    )


class ComplianceTime(BaseModel):
    value: Optional[int] = Field(
        default=None,
        description=(
            "The numeric value of this compliance time. Always a positive integer. "
            "Examples: 37300 for '37 300 flight hours', 24 for '24 months', 90 for '90 days'. "
            "Set to None only when a specific calendar_date is used instead of a relative time value."
        )
    )
    unit: Optional[TimeUnit] = Field(
        default=None,
        description=(
            "The unit of measurement corresponding to value. "
            "Must be one of the TimeUnit enum values. "
            "Set to None only when calendar_date is used instead of value+unit."
        )
    )
    reference: Optional[str] = Field(
        default=None,
        description=(
            "The reference point from which this time is measured, transcribed from the AD. "
            "Examples: 'since first flight of the aeroplane', "
            "'after the effective date of this AD', "
            "'since the last inspection', "
            "'from the effective date of this AD'. "
            "Leave None only if no reference point is stated and the context is self-evident."
        )
    )
    calendar_date: Optional[str] = Field(
        default=None,
        description=(
            "An absolute calendar deadline in ISO 8601 format (YYYY-MM-DD). "
            "Use only when the AD specifies a hard date rather than a relative time window. "
            "When populated, value and unit should be None. "
            "Example: '2026-06-01' for 'before 01 June 2026'."
        )
    )
    is_interval: bool = Field(
        default=False,
        description=(
            "Set to True for RECURRING intervals between repeated actions "
            "(AD language like 'thereafter, at intervals not exceeding X FH'). "
            "Set to False for one-time initial thresholds "
            "(AD language like 'before exceeding X FH since first flight'). "
            "Default is False."
        )
    )


class RequirementAction(BaseModel):
    paragraph_id: str = Field(
        description=(
            "The paragraph identifier exactly as numbered in the AD's Required Actions section. "
            "Examples: '(1)', '(5)', '(8)', '(12)'. "
            "Used to cross-reference paragraphs (e.g. corrective actions referencing their "
            "triggering inspection paragraph)."
        )
    )
    action_type: str = Field(
        description=(
            "The category of this required action. Use exactly one of the following values: "
            "'inspection' — any DET, GVI, SDI, ESDI, or other inspection task; "
            "'modification' — a structural, design, or configuration change to the aircraft; "
            "'corrective_action' — a repair or follow-up action triggered by a finding during inspection; "
            "'terminating_action' — an action whose accomplishment ends one or more repetitive requirements; "
            "'prohibition' — an action that must NOT be accomplished (e.g. 'do not embody SB X below Rev Y'); "
            "'clarification' — a paragraph that clarifies scope or interaction between other paragraphs "
            "without itself requiring a physical action (e.g. 'accomplishment of paragraph X does not "
            "terminate paragraph Y')."
        )
    )
    applies_to_groups: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of group IDs, exactly as defined in the AD's Groups section, "
            "to which this requirement applies. "
            "Examples: ['Group 1'], ['Group 1', 'Group 4']. "
            "Leave None if the requirement is stated in terms of direct model references "
            "rather than group labels, or if it applies implicitly to all groups "
            "(e.g. clarification paragraphs)."
        )
    )
    applies_to_models: Optional[List[str]] = Field(
        default=None,
        description=(
            "Direct aircraft model references for requirements that do not use group labels. "
            "Examples: ['A320-211', 'A320-212']. "
            "Leave None when applies_to_groups is populated — do not duplicate the same "
            "applicability in both fields."
        )
    )
    additional_applicability_condition: Optional[str] = Field(
        default=None,
        description=(
            "Any further condition within the stated group or model scope that narrows "
            "which aircraft this paragraph applies to, transcribed verbatim from the AD. "
            "Use when the paragraph adds a qualifier beyond the group definition itself. "
            "Examples: "
            "'except aeroplanes modified in accordance with the instructions of Airbus SB A320-57-1100', "
            "'having embodied SB A320-57-1089 at any revision lower than Revision 04 (for Group 4 aeroplanes)'. "
            "Leave None if no additional condition is stated."
        )
    )
    description: str = Field(
        description=(
            "A concise, self-contained human-readable summary of what action must be performed. "
            "Include: the inspection method or action type (e.g. DET, GVI, modification), "
            "the area or component involved, and the reference document(s) to follow. "
            "Write in plain language suitable for a maintenance engineer to understand at a glance. "
            "Example: 'Accomplish a detailed inspection (DET) of the LH and RH wing inner rear spars "
            "at the MLG anchorage fitting attachment holes, per SB A320-57-1101 Revision 04.'"
        )
    )
    compliance_times: Optional[List[ComplianceTime]] = Field(
        default=None,
        description=(
            "One or more initial compliance thresholds by which this action must first be accomplished. "
            "When the AD states multiple limits with 'whichever occurs first', "
            "list each as a separate ComplianceTime entry — the whichever-first logic is implied "
            "by multiple entries in this list. "
            "Example: '37 300 FH or 20 000 FC whichever occurs first since first flight' → "
            "two ComplianceTime entries: one for 37300 FH and one for 20000 FC, "
            "both with reference 'since first flight of the aeroplane' and is_interval=False. "
            "Leave None for clarification paragraphs or terminating action notes with no time limit."
        )
    )
    interval: Optional[List[ComplianceTime]] = Field(
        default=None,
        description=(
            "One or more recurring intervals for repetitive requirements. "
            "Populate only when the AD states 'thereafter, at intervals not exceeding...'. "
            "As with compliance_times, list each limit as a separate ComplianceTime entry "
            "when multiple limits apply with 'whichever occurs first'. "
            "All entries must have is_interval=True. "
            "Leave None for one-time actions (modifications, one-time inspections, corrective actions)."
        )
    )
    reference_documents: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of Airbus Service Bulletins or other technical documents whose instructions "
            "must be followed to accomplish this action. "
            "Include the revision where the AD specifies it. "
            "Examples: ['SB A320-57-1101 Revision 04', 'SB A320-57-1256']. "
            "Leave None for corrective actions where the repair instructions are obtained "
            "from Airbus on a case-by-case basis, or for clarification paragraphs."
        )
    )
    triggered_by_paragraph: Optional[str] = Field(
        default=None,
        description=(
            "For corrective_action paragraphs only: the paragraph_id of the inspection "
            "or action that triggers this corrective action when discrepancies are found. "
            "Example: '(1)' means this corrective action is triggered by findings during "
            "the inspection required by paragraph (1). "
            "Leave None for all non-corrective action types."
        )
    )
    terminating_action_for: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of paragraph_ids whose repetitive requirements are permanently terminated "
            "upon accomplishment of this action. "
            "Example: ['(5)'] means completing this action ends the recurring inspections "
            "required by paragraph (5) for that aircraft. "
            "Leave None if this action has no terminating effect on other paragraphs. "
            "Note: also set is_terminating_action=True when this field is populated."
        )
    )
    is_terminating_action: bool = Field(
        default=False,
        description=(
            "Set to True if accomplishing this action permanently terminates one or more "
            "repetitive requirements in this AD. "
            "Must be True whenever terminating_action_for is populated. "
            "Default is False."
        )
    )


class ADApplicabilityExtraction(BaseModel):
    ad_number: str = Field(
        description=(
            "The full AD identifier including any revision suffix, exactly as it appears in the AD header. "
            "Examples: '2025-0254R1', '2023-0041', 'AD 2021-23-10'. "
            "Never omit the revision suffix if present."
        )
    )
    issuing_authority: Optional[str] = Field(
        default=None,
        description=(
            "The aviation authority that issued this AD. "
            "Examples: 'EASA', 'FAA', 'TCCA', 'CASA'. "
            "Taken from the AD header or introductory paragraph."
        )
    )
    effective_date: Optional[str] = Field(
        default=None,
        description=(
            "The effective date of this AD (or its most recent revision) in ISO 8601 format (YYYY-MM-DD). "
            "If multiple dates are listed (original issue and revision), use the revision's effective date. "
            "Example: '2025-12-08'."
        )
    )
    revision: Optional[str] = Field(
        default=None,
        description=(
            "The revision label of this AD exactly as stated in the document. "
            "Examples: 'Revision 01', 'R1', 'Amendment 2'. "
            "Leave None for original issue (no revision)."
        )
    )
    supersedes: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of AD identifiers that this AD supersedes, replaces, or revises, "
            "taken from the Revision field or the Reason section. "
            "Include all superseded ADs, not just the immediate predecessor. "
            "Examples: ['2025-0254', '2007-0162', '2014-0169']. "
            "Leave None if this is a first-issue AD that supersedes nothing."
        )
    )
    models: Optional[List[str]] = Field(
        default=None,
        description=(
            "Complete list of every aircraft model variant explicitly named in the "
            "Applicability section of the AD. "
            "List each variant as a separate string, exactly as written. "
            "Examples: ['A320-211', 'A320-212', 'A320-214', 'A321-111', 'A321-112']. "
            "Do not collapse variants (e.g. do not write 'A320' if the AD lists 'A320-211', 'A320-212' etc.)."
        )
    )
    msn_constraints: Optional[List[MSNConstraint]] = Field(
        default=None,
        description=(
            "Top-level MSN constraints covering the entire AD applicability, before any group scoping. "
            "IMPORTANT — never leave this None when the AD mentions MSN applicability: "
            "If the AD says 'all manufacturer serial numbers (MSN)' or 'all MSN', "
            "always populate with at least one MSNConstraint(all=True, excluded=False). "
            "If specific MSN ranges or numbers are excluded (e.g. 'except MSN 001 to 099'), "
            "add a separate MSNConstraint with excluded=True for those. "
            "Only leave None if the AD makes absolutely no reference to MSN applicability."
        )
    )
    modification_constraints: Optional[List[ModificationConstraint]] = Field(
        default=None,
        description=(
            "Top-level Airbus modification constraints covering the entire AD applicability. "
            "IMPORTANT: Only 'mod XXXXX' numbers belong here — never SB identifiers. "
            "These are almost always exclusions: aircraft on which a specific mod has been "
            "embodied in production are excluded from the AD's scope. "
            "Capture each mod as a separate ModificationConstraint. "
            "Example: 'except those on which Airbus mod 24591 has been embodied in production' → "
            "ModificationConstraint(modification_id='mod 24591', embodied=True, excluded=True). "
            "Leave None only if no modification-based applicability constraints exist in this AD."
        )
    )
    sb_constraints: Optional[List[ServiceBulletinConstraint]] = Field(
        default=None,
        description=(
            "Top-level Service Bulletin constraints covering the entire AD applicability. "
            "IMPORTANT: Only actual Airbus SB identifiers (format 'AXXX-XX-XXXX') belong here. "
            "Airbus modification numbers ('mod XXXXX') must NEVER be placed here — "
            "those belong exclusively in modification_constraints. "
            "These are typically SB-based exclusions, e.g. aircraft on which a specific SB "
            "revision has been embodied are excluded from scope. "
            "Example: 'except those on which SB A320-57-1089 at Revision 04 has been embodied' → "
            "ServiceBulletinConstraint(sb_identifier='A320-57-1089', revision='Revision 04', "
            "incorporated=True, excluded=True). "
            "Leave None only if no SB-based applicability constraints exist in this AD."
        )
    )
    compliance_time: Optional[List[ComplianceTime]] = Field(
        default=None,
        description=(
            "Top-level summary of the most immediate compliance deadline(s) imposed by this AD as a whole. "
            "The intent is to surface the AD's urgency at a glance, without requiring a consumer "
            "to parse every RequirementAction. "
            "Populate with the most restrictive (shortest) initial deadline across all requirements. "
            "When the shortest deadline is expressed as 'X or Y whichever occurs first', "
            "list both as separate ComplianceTime entries. "
            "This field is a summary — full per-paragraph compliance times are still "
            "captured in each RequirementAction.compliance_times. "
            "Leave None only if this AD contains no time-limited requirements "
            "(e.g. a purely prohibitive AD with no deadline)."
        )
    )
    groups: Optional[List[AircraftGroup]] = Field(
        default=None,
        description=(
            "Definitions of all aircraft groups declared in the AD's Groups section, "
            "one AircraftGroup entry per defined group. "
            "Groups are internal AD constructs that partition applicable aircraft for "
            "the purpose of applying different requirements to different subsets. "
            "Preserve the exact group labels and definitions from the AD. "
            "Leave None only if the AD does not define any named groups."
        )
    )
    requirements: Optional[List[RequirementAction]] = Field(
        default=None,
        description=(
            "Complete list of all required actions, one RequirementAction per numbered paragraph "
            "in the AD's Required Actions section. "
            "This is the primary output of the extraction. "
            "Every paragraph must be captured — inspections, modifications, corrective actions, "
            "prohibitions, terminating actions, and clarification notes alike. "
            "Preserve paragraph numbering exactly as in the AD. "
            "Leave None only if the AD contains no required actions (which should never occur "
            "for a valid AD)."
        )
    )

system_prompt = """
You are an aviation regulatory document parser specialized in Airworthiness Directives (ADs).
Extract structured applicability and compliance information from the provided AD document.

EXTRACTION RULES:
- Extract only information explicitly stated in the document. Never infer or assume.
- Preserve all identifiers verbatim (model names, SB numbers, mod numbers, MSNs).
- If a field has no corresponding information in the document, set it to null.
- Output valid JSON only. No markdown, no explanation, no commentary.

CRITICAL DISTINCTIONS:
- Airbus modification numbers (e.g. "mod 24591") → always go in modification_constraints. Never in sb_constraints.
- Service Bulletin identifiers (e.g. "A320-57-1089") → always go in sb_constraints. Never in modification_constraints.
- If the AD states "all MSN" or "all manufacturer serial numbers", always set MSNConstraint(all=True, excluded=False). Never leave msn_constraints null when MSN applicability is mentioned.
- When multiple compliance limits use "whichever occurs first", list each as a separate ComplianceTime entry.
- Recurring intervals ("thereafter, at intervals not exceeding...") → is_interval=True.
- One-time thresholds ("before exceeding...") → is_interval=False.

OUTPUT: Valid JSON strictly following the provided schema.
"""

def prepare_messages(img_paths: str):
    
    from base64 import b64encode

    messages = [
        "Now, extract the following images!"
    ]


    img_contents = []

    for img_path in img_paths:

        try:
            with open(img_path, "rb") as f:
                logger.info(f"Processing image: {img_path}")
                img_bytes = b64encode(f.read()).decode("utf-8")
                img_contents.append(
                    types.Part.from_bytes(
                        data=img_bytes,
                        mime_type="image/png"
                    )
                )
        except:
            raise

    messages.extend(img_contents)
    return messages

In [17]:
input_message = prepare_messages(
    img_paths=result,
)

[32m2026-02-20 13:42:06.855[0m | [1mINFO    [0m | [36m__main__[0m:[36mprepare_messages[0m:[36m550[0m - [1mProcessing image: /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/dc34dff2-8c2d-4da8-b82c-b503ce77f182-1.png[0m
[32m2026-02-20 13:42:06.860[0m | [1mINFO    [0m | [36m__main__[0m:[36mprepare_messages[0m:[36m550[0m - [1mProcessing image: /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/dc34dff2-8c2d-4da8-b82c-b503ce77f182-2.png[0m
[32m2026-02-20 13:42:06.861[0m | [1mINFO    [0m | [36m__main__[0m:[36mprepare_messages[0m:[36m550[0m - [1mProcessing image: /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/dc34dff2-8c2d-4da8-b82c-b503ce77f182-3.png[0m
[32m2026-02-20 13:42:06.863[0m | [1mINFO    [0m | [36m__main__[0m:[36mprepare_messages[0m:[36m550[0m - [1mProcessing image: /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/dc34dff2-8c2d-4da8-b82c-b503ce77f182-4.png[0m
[32m2026-02-20 13:42:06.865[0m | 

In [None]:
from google import genai
from google.genai import types
import os

client = genai.Client(
    api_key=os.getenv("GOOGLE_API_KEY")
)

model = "gemini-2.5-flash"
model_config = types.GenerateContentConfig(
    system_instruction=system_prompt,
    temperature=0.1,
    response_mime_type="application/json",
    response_json_schema=ADApplicabilityExtraction.model_json_schema()
)

response = client.models.generate_content(
    model=model,
    config=model_config,
    contents=input_message
)

In [19]:
response = client.models.generate_content(
    model=model,
    config=model_config,
    contents=input_message
)

[2026-02-20 13:42:13,231] [    INFO] models.py:5613 - AFC is enabled with max remote calls: 10.
[2026-02-20 13:42:59,491] [    INFO] _client.py:1025 - HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"


In [74]:
import json

outputs = ADApplicabilityExtraction.model_validate_json(response.text).model_dump()
SAVE_JSON_DIR = os.path.join(ROOT_TEMP_DIR, FILE_ID, f"{FILE_ID}_RESULTS.json")
with open(SAVE_JSON_DIR, 'w') as f:
    json.dump(outputs, f, indent=2, ensure_ascii=False)

In [None]:
print(outputs)

In [None]:
for key, value in outputs.items():
    print(f"# {key}")
    print(value)
    print("\n---\n")

In [70]:

with open(SAVE_JSON_DIR, "r") as f:
    results = json.load(f)
print(results)

{'ad_number': '2025-0254R1', 'issuing_authority': 'EASA', 'effective_date': '2025-12-05', 'revision': 'Revision 01', 'supersedes': ['2025-0254', '2007-0162', '2014-0169'], 'models': ['A320-211', 'A320-212', 'A320-214', 'A320-215', 'A320-216', 'A320-231', 'A320-232', 'A320-233', 'A321-111', 'A321-112', 'A321-131'], 'msn_constraints': [{'all': True, 'range': None, 'specific_msns': None, 'excluded': False}], 'modification_constraints': [{'modification_id': 'mod 24991', 'embodied': True, 'excluded': True}, {'modification_id': 'mod 24977', 'embodied': True, 'excluded': True}], 'sb_constraints': [{'sb_identifier': 'A320-57-1089', 'revision': 'Revision 04', 'incorporated': True, 'excluded': True}], 'compliance_time': [{'value': 17300, 'unit': 'flight_cycles', 'reference': 'since aeroplane first flight', 'calendar_date': None, 'is_interval': False}, {'value': 32300, 'unit': 'flight_hours', 'reference': 'since aeroplane first flight', 'calendar_date': None, 'is_interval': False}, {'value': 24, 

In [2]:
import pandas as pd
import json
from loguru import logger

test_data = pd.read_csv("/home/naufal/soji_ai/test/ad_test_data.csv", sep=",")
ad_23_file_path = "/home/naufal/soji_ai/temp/44fbed7dccdb44099a083f153f20a3f7/44fbed7dccdb44099a083f153f20a3f7_RESULTS.json"
ad_25_file_path = "/home/naufal/soji_ai/temp/1b333a987b10405e9e589c4f1ad0e67d/1b333a987b10405e9e589c4f1ad0e67d_RESULTS.json"

with open(ad_23_file_path, "r") as f:
    ad_23 = json.load(f)

with open(ad_25_file_path, "r") as f:
    ad_25 = json.load(f)

FileNotFoundError: [Errno 2] No such file or directory: '/home/naufal/soji_ai/temp/44fbed7dccdb44099a083f153f20a3f7/44fbed7dccdb44099a083f153f20a3f7_RESULTS.json'

In [6]:
test_data

Unnamed: 0,aircraft_model,msn,modifications_applied
0,MD-11,48123,
1,DC-10-30F,47890,
2,Boeing 737-800,30123,
3,A320-214,5234,
4,A320-232,6789,mod 24591 (production)
5,A320-214,7456,SB A320-57-1089 Rev 04
6,A321-111,8123,
7,A321-112,364,mod 24977 (production)
8,A319-100,9234,
9,MD-10-10F,46234,


In [53]:
print(test_data.info())

<class 'pandas.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 3 columns):
 #   Column                 Non-Null Count  Dtype
---  ------                 --------------  -----
 0   aircraft_model         10 non-null     str  
 1   msn                    10 non-null     int64
 2   modifications_applied  3 non-null      str  
dtypes: int64(1), str(2)
memory usage: 372.0 bytes
None


In [79]:
print(ad_23["msn_constraints"])

None


In [1]:
print(True if "DC-10-30F" in "DC-10-30F (KC-10A and KDC-10)" else False)

True


In [55]:
test_data["mod_applied"] = test_data["modifications_applied"].apply(lambda x: 'N/A' if pd.isna(x) else x)
test_data

Unnamed: 0,aircraft_model,msn,modifications_applied,mod_applied
0,MD-11,48123,,
1,DC-10-30F,47890,,
2,Boeing 737-800,30123,,
3,A320-214,5234,,
4,A320-232,6789,mod 24591 (production),mod 24591 (production)
5,A320-214,7456,SB A320-57-1089 Rev 04,SB A320-57-1089 Rev 04
6,A321-111,8123,,
7,A321-112,364,mod 24977 (production),mod 24977 (production)
8,A319-100,9234,,
9,MD-10-10F,46234,,


In [None]:
import re
import pandas as pd


def compare_to_ad(df: pd.DataFrame, ad_file_dict: dict) -> pd.DataFrame:

    ad_columns = list(ad_file_dict.keys())
    ad_rows = []

    for _, item in df.iterrows():
        model = str(item["aircraft_model"])
        msn = int(item["msn"])

        # Support multiple modifications/SBs as a comma-separated string or single value
        raw_mod = item["modifications_applied"]
        if pd.isna(raw_mod) or str(raw_mod).strip().lower() in ("none", "n/a", ""):
            mods_applied = []
        else:
            mods_applied = [m.strip() for m in str(raw_mod).split(",")]

        logger.info(f"Checking AD status for model: {model}, msn: {msn}, mods applied: {mods_applied}")

        ad_status_rows = []

        for ad in ad_columns:

            logger.info(f"Checking in {ad} file")

            ad_data = ad_file_dict[ad]

            # ----------------------------------------------------------------
            # STEP 1: Model check
            # ----------------------------------------------------------------
            model_status = any(model in m for m in ad_data["models"])

            if not model_status:
                ad_status_rows.append("❌ Not applicable")
                continue

            # ----------------------------------------------------------------
            # STEP 2: MSN check
            # ----------------------------------------------------------------
            msn_constraints = ad_data.get("msn_constraints") or []

            if not msn_constraints:
                # No MSN constraints defined → all MSNs in scope
                msn_status = True

            else:
                msn_status = False  # default: not in scope until a constraint includes it

                for msn_constraint in msn_constraints:
                    all_msn      = msn_constraint.get("all")
                    range_data   = msn_constraint.get("range")
                    specific     = msn_constraint.get("specific_msns")
                    excluded     = msn_constraint.get("excluded", False)

                    matched = False

                    if all_msn:
                        matched = True

                    elif range_data:
                        start           = range_data.get("start")
                        end             = range_data.get("end")
                        incl_start      = range_data.get("inclusive_start", True)
                        incl_end        = range_data.get("inclusive_end", True)

                        lower_ok = (msn >= start) if incl_start else (msn > start)
                        upper_ok = (msn <= end)   if incl_end   else (msn < end)

                        matched = lower_ok and upper_ok

                    elif specific:
                        matched = msn in specific

                    if matched:
                        # excluded=True → this constraint REMOVES the aircraft from scope
                        # excluded=False → this constraint ADDS the aircraft to scope
                        msn_status = not excluded
                        break

            if not msn_status:
                ad_status_rows.append("❌ Not applicable")
                continue

            # ----------------------------------------------------------------
            # STEP 3: Modification / SB exclusion check
            # ----------------------------------------------------------------
            if not mods_applied:
                # No modifications on this aircraft → no exclusion can apply
                ad_status_rows.append("✅ Affected")
                continue

            excluded_by_mod = False

            for mod_applied in mods_applied:

                if "mod" in mod_applied.lower():
                    # --- Airbus modification number check ---
                    mod_constraints = ad_data.get("modification_constraints") or []

                    for mod_constraint in mod_constraints:
                        mod_id       = mod_constraint.get("modification_id", "")
                        is_excluded  = mod_constraint.get("excluded", False)

                        # Use word-boundary match to avoid "mod 245" matching "mod 24591"
                        if re.search(r'\b' + re.escape(mod_id) + r'\b', mod_applied):
                            if is_excluded:
                                # This mod excludes the aircraft from AD scope
                                excluded_by_mod = True
                            break

                else:
                    # --- Service Bulletin check ---
                    sb_constraints = ad_data.get("sb_constraints") or []

                    for sb_constraint in sb_constraints:
                        sb_id       = sb_constraint.get("sb_identifier", "")
                        is_excluded = sb_constraint.get("excluded", False)

                        if re.search(r'\b' + re.escape(sb_id) + r'\b', mod_applied):
                            if is_excluded:
                                # This SB excludes the aircraft from AD scope
                                excluded_by_mod = True
                            break

                if excluded_by_mod:
                    break

            if excluded_by_mod:
                ad_status_rows.append("❌ Not Affected")
            else:
                ad_status_rows.append("✅ Affected")

        ad_rows.append(ad_status_rows)

    ad_df = pd.DataFrame(ad_rows, columns=ad_columns)
    
    combined_df = pd.concat([df, ad_df], axis=1)

    return combined_df

In [59]:
test_x = test_data.loc[1, "aircraft_model"].strip()
ad_23_models = ad_23["models"]
print(test_x)
print(ad_23_models)
print(any([test_x in model for model in ad_23_models]))


DC-10-30F
['MD-11', 'MD-11F', 'MD-10-10F', 'MD-10-30F', 'DC-10-10', 'DC-10-10F', 'DC-10-15', 'DC-10-30', 'DC-10-30F (KC-10A and KDC-10)', 'DC-10-40', 'DC-10-40F']
True


In [77]:
ad_file_dict = {
    "AD 2025-0254R1": ad_25,
    "AD 2025-23-53": ad_23
}

result_df = compare_to_ad(test_data, ad_file_dict=ad_file_dict)

[32m2026-02-20 06:02:00.370[0m | [1mINFO    [0m | [36m__main__[0m:[36mcompare_to_ad[0m:[36m21[0m - [1mChecking AD status for model: MD-11, msn: 48123, mods applied: [][0m
[32m2026-02-20 06:02:00.370[0m | [1mINFO    [0m | [36m__main__[0m:[36mcompare_to_ad[0m:[36m27[0m - [1mChecking in AD 2025-0254R1 file[0m
[32m2026-02-20 06:02:00.371[0m | [1mINFO    [0m | [36m__main__[0m:[36mcompare_to_ad[0m:[36m27[0m - [1mChecking in AD 2025-23-53 file[0m
[32m2026-02-20 06:02:00.372[0m | [1mINFO    [0m | [36m__main__[0m:[36mcompare_to_ad[0m:[36m21[0m - [1mChecking AD status for model: DC-10-30F, msn: 47890, mods applied: [][0m
[32m2026-02-20 06:02:00.372[0m | [1mINFO    [0m | [36m__main__[0m:[36mcompare_to_ad[0m:[36m27[0m - [1mChecking in AD 2025-0254R1 file[0m
[32m2026-02-20 06:02:00.373[0m | [1mINFO    [0m | [36m__main__[0m:[36mcompare_to_ad[0m:[36m27[0m - [1mChecking in AD 2025-23-53 file[0m
[32m2026-02-20 06:02:00.373[0m | 

In [61]:
result_df.to_csv("/home/naufal/soji_ai/test/ad_test_data_result.csv", index=False)

In [62]:
print(result_df.to_markdown())

|    | aircraft_model   |   msn | modifications_applied   | mod_applied            | AD 2025-0254R1    | AD 2025-23-53     |
|---:|:-----------------|------:|:------------------------|:-----------------------|:------------------|:------------------|
|  0 | MD-11            | 48123 | nan                     | N/A                    | ❌ Not applicable | ✅ Affected       |
|  1 | DC-10-30F        | 47890 | nan                     | N/A                    | ❌ Not applicable | ✅ Affected       |
|  2 | Boeing 737-800   | 30123 | nan                     | N/A                    | ❌ Not applicable | ❌ Not applicable |
|  3 | A320-214         |  5234 | nan                     | N/A                    | ✅ Affected       | ❌ Not applicable |
|  4 | A320-232         |  6789 | mod 24591 (production)  | mod 24591 (production) | ❌ Not Affected   | ❌ Not applicable |
|  5 | A320-214         |  7456 | SB A320-57-1089 Rev 04  | SB A320-57-1089 Rev 04 | ❌ Not Affected   | ❌ Not applicable |
|  6 | A321-

In [78]:
print(result_df.to_markdown())

|    | aircraft_model   |   msn | modifications_applied   | AD 2025-0254R1    | AD 2025-23-53     |
|---:|:-----------------|------:|:------------------------|:------------------|:------------------|
|  0 | MD-11            | 48123 | nan                     | ❌ Not applicable | ✅ Affected       |
|  1 | DC-10-30F        | 47890 | nan                     | ❌ Not applicable | ✅ Affected       |
|  2 | Boeing 737-800   | 30123 | nan                     | ❌ Not applicable | ❌ Not applicable |
|  3 | A320-214         |  5234 | nan                     | ✅ Affected       | ❌ Not applicable |
|  4 | A320-232         |  6789 | mod 24591 (production)  | ❌ Not Affected   | ❌ Not applicable |
|  5 | A320-214         |  7456 | SB A320-57-1089 Rev 04  | ❌ Not Affected   | ❌ Not applicable |
|  6 | A321-111         |  8123 | nan                     | ✅ Affected       | ❌ Not applicable |
|  7 | A321-112         |   364 | mod 24977 (production)  | ❌ Not Affected   | ❌ Not applicable |
|  8 | A319-100 

In [49]:
## schemas.py
from enum import Enum
from typing import Optional, List
from pydantic import BaseModel, Field


class TimeUnit(str, Enum):
    FLIGHT_HOURS = "flight_hours"
    FLIGHT_CYCLES = "flight_cycles"
    DAYS = "days"
    MONTHS = "months"
    YEARS = "years"
    CALENDAR_DATE = "calendar_date"


class NumericRange(BaseModel):
    start: Optional[int] = Field(
        default=None,
        description=(
            "Lower bound of the MSN range (inclusive by default). "
            "Set to None if there is no lower bound."
        )
    )
    end: Optional[int] = Field(
        default=None,
        description=(
            "Upper bound of the MSN range (inclusive by default). "
            "Set to None if there is no upper bound."
        )
    )
    inclusive_start: bool = Field(
        default=True,
        description="True means >= (greater than or equal to start). False means > (strictly greater than)."
    )
    inclusive_end: bool = Field(
        default=True,
        description="True means <= (less than or equal to end). False means < (strictly less than)."
    )


class MSNConstraint(BaseModel):
    all: Optional[bool] = Field(
        default=None,
        description=(
            "Set to True when the AD explicitly states 'all manufacturer serial numbers (MSN)' or 'all MSN'. "
            "IMPORTANT: Never leave this None when the AD explicitly uses the word 'all' for MSN applicability — "
            "even if other exclusions apply, the 'all' inclusion must still be captured here. "
            "Leave None only when applicability is defined purely by a specific range or list."
        )
    )
    range: Optional[NumericRange] = Field(
        default=None,
        description=(
            "A continuous numeric range of MSNs this constraint covers. "
            "Use when the AD specifies a span like 'MSN 100 through MSN 500'. "
            "Do not use together with specific_msns."
        )
    )
    specific_msns: Optional[List[int]] = Field(
        default=None,
        description=(
            "An explicit list of individual MSN integers this constraint covers. "
            "Use when the AD names specific serial numbers, e.g. 'MSN 364 or MSN 385'. "
            "Do not use together with range."
        )
    )
    excluded: bool = Field(
        default=False,
        description=(
            "Set to True when these MSNs are EXCLUDED from applicability "
            "(AD language like 'except MSN...', 'excluding MSN...'). "
            "Set to False when these MSNs are positively INCLUDED in applicability. "
            "Default is False (inclusion)."
        )
    )


class ModificationConstraint(BaseModel):
    modification_id: str = Field(
        description=(
            "The exact modification identifier as written in the AD. "
            "Always an Airbus 'mod' number, e.g. 'mod 24591', 'mod 24977'. "
            "IMPORTANT: Modification numbers are never Service Bulletins — "
            "do not confuse with SB identifiers (e.g. 'A320-57-XXXX'). "
            "Copy the identifier verbatim from the AD text."
        )
    )
    embodied: Optional[bool] = Field(
        default=None,
        description=(
            "True = this modification IS embodied on the aircraft. "
            "False = this modification is NOT embodied on the aircraft. "
            "None = embodiment status is unspecified or not relevant to this constraint."
        )
    )
    excluded: bool = Field(
        default=False,
        description=(
            "Set to True when aircraft WITH this modification embodied are EXCLUDED from applicability "
            "(AD language like 'except those on which mod XXXXX has been embodied in production'). "
            "Set to False when this modification is a positive inclusion condition. "
            "Default is False (inclusion)."
        )
    )


class ServiceBulletinConstraint(BaseModel):
    sb_identifier: str = Field(
        description=(
            "The exact Service Bulletin identifier as written in the AD, "
            "e.g. 'A320-57-1089', 'A320-57-1100'. "
            "IMPORTANT: Only actual Airbus Service Bulletins belong here (format: 'AXXX-XX-XXXX'). "
            "Airbus modification numbers ('mod XXXXX') must NEVER be placed here — "
            "those belong exclusively in ModificationConstraint. "
            "Copy the identifier verbatim from the AD text, without the 'SB' prefix."
        )
    )
    revision: Optional[str] = Field(
        default=None,
        description=(
            "The revision qualifier for this SB constraint, exactly as stated in the AD. "
            "Examples: 'Revision 04', 'any revision lower than Revision 04', 'Revision 03 or later'. "
            "Leave None if no specific revision is mentioned and any revision applies."
        )
    )
    incorporated: Optional[bool] = Field(
        default=None,
        description=(
            "True = this SB HAS been incorporated on the aircraft. "
            "False = this SB has NOT been incorporated on the aircraft. "
            "None = incorporation status is unspecified or not relevant to this constraint."
        )
    )
    excluded: bool = Field(
        default=False,
        description=(
            "Set to True when aircraft on which this SB HAS been embodied are EXCLUDED from applicability "
            "(AD language like 'except those on which SB XXXX has been embodied'). "
            "Set to False when this SB is a positive inclusion or compliance condition. "
            "Default is False (inclusion)."
        )
    )


class AircraftGroup(BaseModel):
    group_id: str = Field(
        description=(
            "The group label exactly as defined in the AD's Groups section. "
            "Examples: 'Group 1', 'Group 2', 'Group A', 'Group B'. "
            "Use verbatim from the AD — do not invent or rename groups."
        )
    )
    models: Optional[List[str]] = Field(
        default=None,
        description=(
            "Aircraft model variants that belong to this group, "
            "derived from the group definition. "
            "Examples: ['A321-111', 'A321-112'] or ['A320']. "
            "Leave None if the group definition does not restrict by model "
            "(i.e. it applies to all models already listed in the top-level applicability)."
        )
    )
    msn_constraints: Optional[List[MSNConstraint]] = Field(
        default=None,
        description=(
            "MSN-based constraints that define or restrict membership in this group. "
            "Apply the same rules as top-level msn_constraints: "
            "if the group definition says 'all MSN', populate with MSNConstraint(all=True, excluded=False). "
            "If the group is defined by specific MSNs, list them in specific_msns. "
            "Leave None only if MSN is not a factor in this group's definition."
        )
    )
    modification_constraints: Optional[List[ModificationConstraint]] = Field(
        default=None,
        description=(
            "Modification-based constraints that define or exclude aircraft from this group. "
            "Only use ModificationConstraint here — never mix with SB identifiers. "
            "Examples: a group excluding aircraft with a specific mod embodied in production. "
            "Leave None if modifications are not a factor in this group's definition."
        )
    )
    sb_constraints: Optional[List[ServiceBulletinConstraint]] = Field(
        default=None,
        description=(
            "Service Bulletin constraints that define or exclude aircraft from this group. "
            "Only use actual SB identifiers here — never use mod numbers. "
            "Example: a group defined by aircraft on which a specific SB has NOT been embodied. "
            "Leave None if SBs are not a factor in this group's definition."
        )
    )
    description: Optional[str] = Field(
        default=None,
        description=(
            "Free-text fallback for group membership logic that cannot be fully expressed "
            "by the structured fields above. "
            "Transcribe the exact defining sentence from the AD. "
            "Always populate this field — it serves as a human-readable audit trail "
            "even when structured fields are also populated."
        )
    )


class ComplianceTime(BaseModel):
    value: Optional[int] = Field(
        default=None,
        description=(
            "The numeric value of this compliance time. Always a positive integer. "
            "Examples: 37300 for '37 300 flight hours', 24 for '24 months', 90 for '90 days'. "
            "Set to None only when a specific calendar_date is used instead of a relative time value."
        )
    )
    unit: Optional[TimeUnit] = Field(
        default=None,
        description=(
            "The unit of measurement corresponding to value. "
            "Must be one of the TimeUnit enum values. "
            "Set to None only when calendar_date is used instead of value+unit."
        )
    )
    reference: Optional[str] = Field(
        default=None,
        description=(
            "The reference point from which this time is measured, transcribed from the AD. "
            "Examples: 'since first flight of the aeroplane', "
            "'after the effective date of this AD', "
            "'since the last inspection', "
            "'from the effective date of this AD'. "
            "Leave None only if no reference point is stated and the context is self-evident."
        )
    )
    calendar_date: Optional[str] = Field(
        default=None,
        description=(
            "An absolute calendar deadline in ISO 8601 format (YYYY-MM-DD). "
            "Use only when the AD specifies a hard date rather than a relative time window. "
            "When populated, value and unit should be None. "
            "Example: '2026-06-01' for 'before 01 June 2026'."
        )
    )
    is_interval: bool = Field(
        default=False,
        description=(
            "Set to True for RECURRING intervals between repeated actions "
            "(AD language like 'thereafter, at intervals not exceeding X FH'). "
            "Set to False for one-time initial thresholds "
            "(AD language like 'before exceeding X FH since first flight'). "
            "Default is False."
        )
    )


class RequirementAction(BaseModel):
    paragraph_id: str = Field(
        description=(
            "The paragraph identifier exactly as numbered in the AD's Required Actions section. "
            "Examples: '(1)', '(5)', '(8)', '(12)'. "
            "Used to cross-reference paragraphs (e.g. corrective actions referencing their "
            "triggering inspection paragraph)."
        )
    )
    action_type: str = Field(
        description=(
            "The category of this required action. Use exactly one of the following values: "
            "'inspection' — any DET, GVI, SDI, ESDI, or other inspection task; "
            "'modification' — a structural, design, or configuration change to the aircraft; "
            "'corrective_action' — a repair or follow-up action triggered by a finding during inspection; "
            "'terminating_action' — an action whose accomplishment ends one or more repetitive requirements; "
            "'prohibition' — an action that must NOT be accomplished (e.g. 'do not embody SB X below Rev Y'); "
            "'clarification' — a paragraph that clarifies scope or interaction between other paragraphs "
            "without itself requiring a physical action (e.g. 'accomplishment of paragraph X does not "
            "terminate paragraph Y')."
        )
    )
    applies_to_groups: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of group IDs, exactly as defined in the AD's Groups section, "
            "to which this requirement applies. "
            "Examples: ['Group 1'], ['Group 1', 'Group 4']. "
            "Leave None if the requirement is stated in terms of direct model references "
            "rather than group labels, or if it applies implicitly to all groups "
            "(e.g. clarification paragraphs)."
        )
    )
    applies_to_models: Optional[List[str]] = Field(
        default=None,
        description=(
            "Direct aircraft model references for requirements that do not use group labels. "
            "Examples: ['A320-211', 'A320-212']. "
            "Leave None when applies_to_groups is populated — do not duplicate the same "
            "applicability in both fields."
        )
    )
    additional_applicability_condition: Optional[str] = Field(
        default=None,
        description=(
            "Any further condition within the stated group or model scope that narrows "
            "which aircraft this paragraph applies to, transcribed verbatim from the AD. "
            "Use when the paragraph adds a qualifier beyond the group definition itself. "
            "Examples: "
            "'except aeroplanes modified in accordance with the instructions of Airbus SB A320-57-1100', "
            "'having embodied SB A320-57-1089 at any revision lower than Revision 04 (for Group 4 aeroplanes)'. "
            "Leave None if no additional condition is stated."
        )
    )
    description: str = Field(
        description=(
            "A concise, self-contained human-readable summary of what action must be performed. "
            "Include: the inspection method or action type (e.g. DET, GVI, modification), "
            "the area or component involved, and the reference document(s) to follow. "
            "Write in plain language suitable for a maintenance engineer to understand at a glance. "
            "Example: 'Accomplish a detailed inspection (DET) of the LH and RH wing inner rear spars "
            "at the MLG anchorage fitting attachment holes, per SB A320-57-1101 Revision 04.'"
        )
    )
    compliance_times: Optional[List[ComplianceTime]] = Field(
        default=None,
        description=(
            "One or more initial compliance thresholds by which this action must first be accomplished. "
            "When the AD states multiple limits with 'whichever occurs first', "
            "list each as a separate ComplianceTime entry — the whichever-first logic is implied "
            "by multiple entries in this list. "
            "Example: '37 300 FH or 20 000 FC whichever occurs first since first flight' → "
            "two ComplianceTime entries: one for 37300 FH and one for 20000 FC, "
            "both with reference 'since first flight of the aeroplane' and is_interval=False. "
            "Leave None for clarification paragraphs or terminating action notes with no time limit."
        )
    )
    interval: Optional[List[ComplianceTime]] = Field(
        default=None,
        description=(
            "One or more recurring intervals for repetitive requirements. "
            "Populate only when the AD states 'thereafter, at intervals not exceeding...'. "
            "As with compliance_times, list each limit as a separate ComplianceTime entry "
            "when multiple limits apply with 'whichever occurs first'. "
            "All entries must have is_interval=True. "
            "Leave None for one-time actions (modifications, one-time inspections, corrective actions)."
        )
    )
    reference_documents: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of Airbus Service Bulletins or other technical documents whose instructions "
            "must be followed to accomplish this action. "
            "Include the revision where the AD specifies it. "
            "Examples: ['SB A320-57-1101 Revision 04', 'SB A320-57-1256']. "
            "Leave None for corrective actions where the repair instructions are obtained "
            "from Airbus on a case-by-case basis, or for clarification paragraphs."
        )
    )
    triggered_by_paragraph: Optional[str] = Field(
        default=None,
        description=(
            "For corrective_action paragraphs only: the paragraph_id of the inspection "
            "or action that triggers this corrective action when discrepancies are found. "
            "Example: '(1)' means this corrective action is triggered by findings during "
            "the inspection required by paragraph (1). "
            "Leave None for all non-corrective action types."
        )
    )
    terminating_action_for: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of paragraph_ids whose repetitive requirements are permanently terminated "
            "upon accomplishment of this action. "
            "Example: ['(5)'] means completing this action ends the recurring inspections "
            "required by paragraph (5) for that aircraft. "
            "Leave None if this action has no terminating effect on other paragraphs. "
            "Note: also set is_terminating_action=True when this field is populated."
        )
    )
    is_terminating_action: bool = Field(
        default=False,
        description=(
            "Set to True if accomplishing this action permanently terminates one or more "
            "repetitive requirements in this AD. "
            "Must be True whenever terminating_action_for is populated. "
            "Default is False."
        )
    )


class ADApplicabilityExtraction(BaseModel):
    ad_number: str = Field(
        description=(
            "The full AD identifier including any revision suffix, exactly as it appears in the AD header. "
            "Examples: '2025-0254R1', '2023-0041', 'AD 2021-23-10'. "
            "Never omit the revision suffix if present."
        )
    )
    issuing_authority: Optional[str] = Field(
        default=None,
        description=(
            "The aviation authority that issued this AD. "
            "Examples: 'EASA', 'FAA', 'TCCA', 'CASA'. "
            "Taken from the AD header or introductory paragraph."
        )
    )
    effective_date: Optional[str] = Field(
        default=None,
        description=(
            "The effective date of this AD (or its most recent revision) in ISO 8601 format (YYYY-MM-DD). "
            "If multiple dates are listed (original issue and revision), use the revision's effective date. "
            "Example: '2025-12-08'."
        )
    )
    revision: Optional[str] = Field(
        default=None,
        description=(
            "The revision label of this AD exactly as stated in the document. "
            "Examples: 'Revision 01', 'R1', 'Amendment 2'. "
            "Leave None for original issue (no revision)."
        )
    )
    supersedes: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of AD identifiers that this AD supersedes, replaces, or revises, "
            "taken from the Revision field or the Reason section. "
            "Include all superseded ADs, not just the immediate predecessor. "
            "Examples: ['2025-0254', '2007-0162', '2014-0169']. "
            "Leave None if this is a first-issue AD that supersedes nothing."
        )
    )
    models: Optional[List[str]] = Field(
        default=None,
        description=(
            "Complete list of every aircraft model variant explicitly named in the "
            "Applicability section of the AD. "
            "List each variant as a separate string, exactly as written. "
            "Examples: ['A320-211', 'A320-212', 'A320-214', 'A321-111', 'A321-112']. "
            "Do not collapse variants (e.g. do not write 'A320' if the AD lists 'A320-211', 'A320-212' etc.)."
        )
    )
    msn_constraints: Optional[List[MSNConstraint]] = Field(
        default=None,
        description=(
            "Top-level MSN constraints covering the entire AD applicability, before any group scoping. "
            "IMPORTANT — never leave this None when the AD mentions MSN applicability: "
            "If the AD says 'all manufacturer serial numbers (MSN)' or 'all MSN', "
            "always populate with at least one MSNConstraint(all=True, excluded=False). "
            "If specific MSN ranges or numbers are excluded (e.g. 'except MSN 001 to 099'), "
            "add a separate MSNConstraint with excluded=True for those. "
            "Only leave None if the AD makes absolutely no reference to MSN applicability."
        )
    )
    modification_constraints: Optional[List[ModificationConstraint]] = Field(
        default=None,
        description=(
            "Top-level Airbus modification constraints covering the entire AD applicability. "
            "IMPORTANT: Only 'mod XXXXX' numbers belong here — never SB identifiers. "
            "These are almost always exclusions: aircraft on which a specific mod has been "
            "embodied in production are excluded from the AD's scope. "
            "Capture each mod as a separate ModificationConstraint. "
            "Example: 'except those on which Airbus mod 24591 has been embodied in production' → "
            "ModificationConstraint(modification_id='mod 24591', embodied=True, excluded=True). "
            "Leave None only if no modification-based applicability constraints exist in this AD."
        )
    )
    sb_constraints: Optional[List[ServiceBulletinConstraint]] = Field(
        default=None,
        description=(
            "Top-level Service Bulletin constraints covering the entire AD applicability. "
            "IMPORTANT: Only actual Airbus SB identifiers (format 'AXXX-XX-XXXX') belong here. "
            "Airbus modification numbers ('mod XXXXX') must NEVER be placed here — "
            "those belong exclusively in modification_constraints. "
            "These are typically SB-based exclusions, e.g. aircraft on which a specific SB "
            "revision has been embodied are excluded from scope. "
            "Example: 'except those on which SB A320-57-1089 at Revision 04 has been embodied' → "
            "ServiceBulletinConstraint(sb_identifier='A320-57-1089', revision='Revision 04', "
            "incorporated=True, excluded=True). "
            "Leave None only if no SB-based applicability constraints exist in this AD."
        )
    )
    compliance_time: Optional[List[ComplianceTime]] = Field(
        default=None,
        description=(
            "Top-level summary of the most immediate compliance deadline(s) imposed by this AD as a whole. "
            "The intent is to surface the AD's urgency at a glance, without requiring a consumer "
            "to parse every RequirementAction. "
            "Populate with the most restrictive (shortest) initial deadline across all requirements. "
            "When the shortest deadline is expressed as 'X or Y whichever occurs first', "
            "list both as separate ComplianceTime entries. "
            "This field is a summary — full per-paragraph compliance times are still "
            "captured in each RequirementAction.compliance_times. "
            "Leave None only if this AD contains no time-limited requirements "
            "(e.g. a purely prohibitive AD with no deadline)."
        )
    )
    groups: Optional[List[AircraftGroup]] = Field(
        default=None,
        description=(
            "Definitions of all aircraft groups declared in the AD's Groups section, "
            "one AircraftGroup entry per defined group. "
            "Groups are internal AD constructs that partition applicable aircraft for "
            "the purpose of applying different requirements to different subsets. "
            "Preserve the exact group labels and definitions from the AD. "
            "Leave None only if the AD does not define any named groups."
        )
    )
    requirements: Optional[List[RequirementAction]] = Field(
        default=None,
        description=(
            "Complete list of all required actions, one RequirementAction per numbered paragraph "
            "in the AD's Required Actions section. "
            "This is the primary output of the extraction. "
            "Every paragraph must be captured — inspections, modifications, corrective actions, "
            "prohibitions, terminating actions, and clarification notes alike. "
            "Preserve paragraph numbering exactly as in the AD. "
            "Leave None only if the AD contains no required actions (which should never occur "
            "for a valid AD)."
        )
    )


In [50]:
## prompt.py
SYSTEM_PROMPT = """
You are an aviation regulatory document parser specialized in Airworthiness Directives (ADs).
Extract structured applicability and compliance information from the provided AD document.

EXTRACTION RULES:
- Extract only information explicitly stated in the document. Never infer or assume.
- Preserve all identifiers verbatim (model names, SB numbers, mod numbers, MSNs).
- If a field has no corresponding information in the document, set it to null.
- Output valid JSON only. No markdown, no explanation, no commentary.

CRITICAL DISTINCTIONS:
- Airbus modification numbers (e.g. "mod 24591") → always go in modification_constraints. Never in sb_constraints.
- Service Bulletin identifiers (e.g. "A320-57-1089") → always go in sb_constraints. Never in modification_constraints.
- If the AD states "all MSN" or "all manufacturer serial numbers", always set MSNConstraint(all=True, excluded=False). Never leave msn_constraints null when MSN applicability is mentioned.
- When multiple compliance limits use "whichever occurs first", list each as a separate ComplianceTime entry.
- Recurring intervals ("thereafter, at intervals not exceeding...") → is_interval=True.
- One-time thresholds ("before exceeding...") → is_interval=False.

OUTPUT: Valid JSON strictly following the provided schema.
"""

In [51]:
## utils.py
def prepare_message(paths: str):
    
    messages = [
        "Now, extract the following images!"
    ]

    img_contents = []

    for img_path in paths:
        logger.info(f"Preparing image: {img_path}")
        try:
            with open(img_path, "rb") as f:
                logger.info(f"Processing image: {img_path}")
                img_bytes = b64encode(f.read()).decode("utf-8")
                img_contents.append(
                    types.Part.from_bytes(
                        data=img_bytes,
                        mime_type="image/png"
                    )
                )
        except:
            raise

    messages.extend(img_contents)
    return messages

In [None]:
## main.py

class ADClassification:

    def __init__(
        self,
        genai_client,
        dpi
    ):
        self.dpi = dpi
        self.client = genai_client

    def run_analysis(
        self,
        test
    )

In [52]:
## config.py

# OCR & LLM

In [None]:
# process like the previouse but add OCR engine at the init

from paddleocr import PaddleOCR

ocr_engine = PaddleOCR(
    use_doc_orientation_classify=False,
    use_doc_unwarping=False,
    use_textline_orientation=False,
    device="gpu:0",
    precision="fp16",
    text_detection_model_name="PP-OCRv5_mobile_det",
    text_recognition_model_name="PP-OCRv5_mobile_rec",
)



In [None]:
from paddleocr import PaddleOCR, PPStructureV3

ocr_engine = PaddleOCR(
    use_doc_orientation_classify=False,
    use_doc_unwarping=False,
    use_textline_orientation=False,
    device="gpu:0",
    precision="fp16",
    text_detection_model_name="PP-OCRv5_mobile_det",
    text_recognition_model_name="PP-OCRv5_mobile_rec",
)

# OCR predict
ocr_results = ocr_engine.predict(img_paths)

# dict_keys(['input_path', 'page_index', 'doc_preprocessor_res', 'dt_polys', 'model_settings', 'text_det_params', 'text_type', 'text_rec_score_thresh', 'return_word_box', 'rec_texts', 'rec_scores', 'rec_polys', 'vis_fonts', 'textline_orientation_angles', 'rec_boxes'])

# function to postprocess ocr & draw the bbox
import numpy as np
from typing import List, Dict, Any

def sort_ocr_reading_order(
    texts: List[str],
    boxes: List[np.ndarray],
    y_threshold: float = 15.0
) -> tuple[List[str], List[np.ndarray]]:
    """
    Sort OCR results in natural reading order (top-to-bottom, left-to-right).
    
    Args:
        texts: List of recognized text strings.
        boxes: List of bounding boxes, each with shape (4,) as [x_min, y_min, x_max, y_max]
               or (4, 2) as [[x1,y1], [x2,y2], [x3,y3], [x4,y4]].
        y_threshold: Vertical pixel threshold to consider texts on the same line.
    
    Returns:
        Sorted (texts, boxes) in reading order.
    """
    if not texts:
        return texts, boxes

    # Extract center-y and left-x for each box
    coords = []
    for i, box in enumerate(boxes):
        box = np.array(box)
        if box.shape == (4,):
            # [x_min, y_min, x_max, y_max]
            x_left = box[0]
            y_center = (box[1] + box[3]) / 2
        elif box.shape == (4, 2):
            # [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
            x_left = box[:, 0].min()
            y_center = box[:, 1].mean()
        else:
            raise ValueError(f"Unexpected box shape: {box.shape}")
        coords.append((i, x_left, y_center))

    # Sort by y_center first
    coords.sort(key=lambda c: c[2])

    # Group into lines based on y_threshold
    lines = []
    current_line = [coords[0]]

    for item in coords[1:]:
        if abs(item[2] - current_line[0][2]) <= y_threshold:
            current_line.append(item)
        else:
            lines.append(current_line)
            current_line = [item]
    lines.append(current_line)

    # Sort each line left-to-right, then flatten
    sorted_indices = []
    for line in lines:
        line.sort(key=lambda c: c[1])
        sorted_indices.extend([item[0] for item in line])

    sorted_texts = [texts[i] for i in sorted_indices]
    sorted_boxes = [boxes[i] for i in sorted_indices]

    return sorted_texts, sorted_boxes


def get_full_text(
    ocr_results: List[Dict[str, Any]],
    y_threshold: float = 15.0,
    page_separator: str = "\n\n{'='*60}\n  PAGE {page_num} / {total_pages}\n{'='*60}\n\n"
) -> str:
    all_pages_text = []

    total_pages = len(ocr_results)

    for page_idx, page in enumerate(ocr_results):
        texts = page.get("rec_texts", [])
        boxes = page.get("rec_boxes", [])

        if not texts:
            continue

        sorted_texts, sorted_boxes = sort_ocr_reading_order(texts, boxes, y_threshold)

        coords = []
        for i, box in enumerate(sorted_boxes):
            box = np.array(box)
            if box.shape == (4,):
                y_center = (box[1] + box[3]) / 2
            else:
                y_center = box[:, 1].mean()
            coords.append((i, y_center))

        lines_text = []
        current_line_texts = [sorted_texts[0]]
        current_y = coords[0][1]

        for idx in range(1, len(coords)):
            if abs(coords[idx][1] - current_y) <= y_threshold:
                current_line_texts.append(sorted_texts[idx])
            else:
                line = " ".join(t for t in current_line_texts if t.strip())
                if line.strip():
                    lines_text.append(line)
                current_line_texts = [sorted_texts[idx]]
                current_y = coords[idx][1]

        line = " ".join(t for t in current_line_texts if t.strip())
        if line.strip():
            lines_text.append(line)

        page_num = page_idx + 1
        header = f"\n{'='*60}\n  PAGE {page_num} / {total_pages}\n{'='*60}\n"
        all_pages_text.append(header + "\n".join(lines_text))

    return "\n".join(all_pages_text)

from PIL import Image, ImageDraw, ImageFont
import numpy as np


def draw_ocr_bboxes(
    image_path: str,
    ocr_result: dict,
    use_polys: bool = True,
    output_path: str = "ocr_visualized.png",
    box_color: str = "red",
    text_color: str = "blue",
    show_text: bool = True,
    font_size: int = 14,
):
    """
    Draw OCR bounding boxes on the original image.

    Args:
        image_path: Path to the original image/page.
        ocr_result: Single page OCR result dict.
        use_polys: If True, use rec_polys (polygon). If False, use rec_boxes (rectangle).
        output_path: Where to save the visualized image.
        box_color: Color of the bounding box.
        text_color: Color of the text label.
        show_text: Whether to draw recognized text above each box.
        font_size: Font size for text labels.
    """
    img = Image.open(image_path).convert("RGB")
    draw = ImageDraw.Draw(img)

    texts = ocr_result.get("rec_texts", [])
    
    if use_polys:
        polys = ocr_result.get("rec_polys", [])
    else:
        polys = ocr_result.get("rec_boxes", [])

    try:
        font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size)
    except:
        font = ImageFont.load_default()

    for i, poly in enumerate(polys):
        poly = np.array(poly)

        if poly.shape == (4,):
            # [x_min, y_min, x_max, y_max]
            x_min, y_min, x_max, y_max = poly
            draw.rectangle([x_min, y_min, x_max, y_max], outline=box_color, width=2)
            text_pos = (x_min, y_min - font_size - 2)
        elif poly.shape == (4, 2):
            # Polygon: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
            points = [tuple(p) for p in poly.astype(int)]
            points.append(points[0])  # close the polygon
            draw.line(points, fill=box_color, width=2)
            text_pos = (poly[:, 0].min(), poly[:, 1].min() - font_size - 2)
        else:
            continue

        if show_text and i < len(texts) and texts[i].strip():
            draw.text(text_pos, texts[i], fill=text_color, font=font)

    img.save(output_path)
    print(f"Saved to {output_path}")
    return img

# LLM EXTRACTION (TEXT ONLY)
from google import genai
from google.genai import types
import os

client = genai.Client(
    api_key=os.getenv("GOOGLE_API_KEY")
)

model = "gemini-2.5-flash"
model_config = types.GenerateContentConfig(
    system_instruction=system_prompt,
    temperature=0.1,
    response_mime_type="application/json",
    response_json_schema=ADApplicabilityExtraction.model_json_schema()
)
response_ocr = client.models.generate_content(
    model=model,
    config=model_config,
    contents=f"Now extract the following ocrd text:\n\n{full_page_result}"
)

# THE REST is same
# Save the ocr bbox visulaization too


  from .autonotebook import tqdm as notebook_tqdm
[33mChecking connectivity to the model hosters, this may take a while. To bypass this check, set `PADDLE_PDX_DISABLE_MODEL_SOURCE_CHECK` to `True`.[0m
[32mCreating model: ('PP-OCRv5_mobile_det', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/naufal/.paddlex/official_models/PP-OCRv5_mobile_det`.[0m
[32mCreating model: ('PP-OCRv5_mobile_rec', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/naufal/.paddlex/official_models/PP-OCRv5_mobile_rec`.[0m


In [4]:
img_paths = result

In [None]:
ocr_results = ocr_engine.predict(img_paths)

In [24]:
for r in ocr_results:
    print(r.keys())

dict_keys(['input_path', 'page_index', 'doc_preprocessor_res', 'dt_polys', 'model_settings', 'text_det_params', 'text_type', 'text_rec_score_thresh', 'return_word_box', 'rec_texts', 'rec_scores', 'rec_polys', 'vis_fonts', 'textline_orientation_angles', 'rec_boxes'])
dict_keys(['input_path', 'page_index', 'doc_preprocessor_res', 'dt_polys', 'model_settings', 'text_det_params', 'text_type', 'text_rec_score_thresh', 'return_word_box', 'rec_texts', 'rec_scores', 'rec_polys', 'vis_fonts', 'textline_orientation_angles', 'rec_boxes'])
dict_keys(['input_path', 'page_index', 'doc_preprocessor_res', 'dt_polys', 'model_settings', 'text_det_params', 'text_type', 'text_rec_score_thresh', 'return_word_box', 'rec_texts', 'rec_scores', 'rec_polys', 'vis_fonts', 'textline_orientation_angles', 'rec_boxes'])
dict_keys(['input_path', 'page_index', 'doc_preprocessor_res', 'dt_polys', 'model_settings', 'text_det_params', 'text_type', 'text_rec_score_thresh', 'return_word_box', 'rec_texts', 'rec_scores', 're

In [25]:
from PIL import Image, ImageDraw, ImageFont
import numpy as np


def draw_ocr_bboxes(
    image_path: str,
    ocr_result: dict,
    use_polys: bool = True,
    output_path: str = "ocr_visualized.png",
    box_color: str = "red",
    text_color: str = "blue",
    show_text: bool = True,
    font_size: int = 14,
):
    """
    Draw OCR bounding boxes on the original image.

    Args:
        image_path: Path to the original image/page.
        ocr_result: Single page OCR result dict.
        use_polys: If True, use rec_polys (polygon). If False, use rec_boxes (rectangle).
        output_path: Where to save the visualized image.
        box_color: Color of the bounding box.
        text_color: Color of the text label.
        show_text: Whether to draw recognized text above each box.
        font_size: Font size for text labels.
    """
    img = Image.open(image_path).convert("RGB")
    draw = ImageDraw.Draw(img)

    texts = ocr_result.get("rec_texts", [])
    
    if use_polys:
        polys = ocr_result.get("rec_polys", [])
    else:
        polys = ocr_result.get("rec_boxes", [])

    try:
        font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size)
    except:
        font = ImageFont.load_default()

    for i, poly in enumerate(polys):
        poly = np.array(poly)

        if poly.shape == (4,):
            # [x_min, y_min, x_max, y_max]
            x_min, y_min, x_max, y_max = poly
            draw.rectangle([x_min, y_min, x_max, y_max], outline=box_color, width=2)
            text_pos = (x_min, y_min - font_size - 2)
        elif poly.shape == (4, 2):
            # Polygon: [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
            points = [tuple(p) for p in poly.astype(int)]
            points.append(points[0])  # close the polygon
            draw.line(points, fill=box_color, width=2)
            text_pos = (poly[:, 0].min(), poly[:, 1].min() - font_size - 2)
        else:
            continue

        if show_text and i < len(texts) and texts[i].strip():
            draw.text(text_pos, texts[i], fill=text_color, font=font)

    img.save(output_path)
    print(f"Saved to {output_path}")
    return img

In [26]:
# Single page
img = draw_ocr_bboxes(
    image_path="/home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/dc34dff2-8c2d-4da8-b82c-b503ce77f182-1.png",
    ocr_result=ocr_results[0],
    use_polys=True,       # Try both True/False to see which looks better
    output_path="/home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/dc34dff2-8c2d-4da8-b82c-b503ce77f182-1-drawed.png",
    show_text=False,
)

Saved to /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/dc34dff2-8c2d-4da8-b82c-b503ce77f182-1-drawed.png


In [28]:
for img_path, r in zip(img_paths, ocr_results):
    img_name = img_path.split("/")[-1]
    img_name = img_name.replace(".png", "")

    ROOT_DIR = "/home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7"
    output_path = os.path.join(ROOT_DIR, f"{img_name}-DRAWED.png")
    draw_ocr_bboxes(
        image_path=img_path,
        ocr_result=r,
        use_polys=True,
        output_path=output_path,
        show_text=False
    )

Saved to /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/2ba8039e-b036-4b9e-912d-76bbba4c6e08-1-DRAWED.png
Saved to /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/2ba8039e-b036-4b9e-912d-76bbba4c6e08-2-DRAWED.png
Saved to /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/2ba8039e-b036-4b9e-912d-76bbba4c6e08-3-DRAWED.png
Saved to /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/2ba8039e-b036-4b9e-912d-76bbba4c6e08-4-DRAWED.png
Saved to /home/naufal/soji_ai/temp/47746f73da404876842a05d2d66fa7c7/2ba8039e-b036-4b9e-912d-76bbba4c6e08-5-DRAWED.png


In [6]:
import numpy as np
from typing import List, Dict, Any

def sort_ocr_reading_order(
    texts: List[str],
    boxes: List[np.ndarray],
    y_threshold: float = 15.0
) -> tuple[List[str], List[np.ndarray]]:
    """
    Sort OCR results in natural reading order (top-to-bottom, left-to-right).
    
    Args:
        texts: List of recognized text strings.
        boxes: List of bounding boxes, each with shape (4,) as [x_min, y_min, x_max, y_max]
               or (4, 2) as [[x1,y1], [x2,y2], [x3,y3], [x4,y4]].
        y_threshold: Vertical pixel threshold to consider texts on the same line.
    
    Returns:
        Sorted (texts, boxes) in reading order.
    """
    if not texts:
        return texts, boxes

    # Extract center-y and left-x for each box
    coords = []
    for i, box in enumerate(boxes):
        box = np.array(box)
        if box.shape == (4,):
            # [x_min, y_min, x_max, y_max]
            x_left = box[0]
            y_center = (box[1] + box[3]) / 2
        elif box.shape == (4, 2):
            # [[x1,y1], [x2,y2], [x3,y3], [x4,y4]]
            x_left = box[:, 0].min()
            y_center = box[:, 1].mean()
        else:
            raise ValueError(f"Unexpected box shape: {box.shape}")
        coords.append((i, x_left, y_center))

    # Sort by y_center first
    coords.sort(key=lambda c: c[2])

    # Group into lines based on y_threshold
    lines = []
    current_line = [coords[0]]

    for item in coords[1:]:
        if abs(item[2] - current_line[0][2]) <= y_threshold:
            current_line.append(item)
        else:
            lines.append(current_line)
            current_line = [item]
    lines.append(current_line)

    # Sort each line left-to-right, then flatten
    sorted_indices = []
    for line in lines:
        line.sort(key=lambda c: c[1])
        sorted_indices.extend([item[0] for item in line])

    sorted_texts = [texts[i] for i in sorted_indices]
    sorted_boxes = [boxes[i] for i in sorted_indices]

    return sorted_texts, sorted_boxes


def get_full_text(
    ocr_results: List[Dict[str, Any]],
    y_threshold: float = 15.0,
    page_separator: str = "\n\n{'='*60}\n  PAGE {page_num} / {total_pages}\n{'='*60}\n\n"
) -> str:
    all_pages_text = []

    total_pages = len(ocr_results)

    for page_idx, page in enumerate(ocr_results):
        texts = page.get("rec_texts", [])
        boxes = page.get("rec_boxes", [])

        if not texts:
            continue

        sorted_texts, sorted_boxes = sort_ocr_reading_order(texts, boxes, y_threshold)

        coords = []
        for i, box in enumerate(sorted_boxes):
            box = np.array(box)
            if box.shape == (4,):
                y_center = (box[1] + box[3]) / 2
            else:
                y_center = box[:, 1].mean()
            coords.append((i, y_center))

        lines_text = []
        current_line_texts = [sorted_texts[0]]
        current_y = coords[0][1]

        for idx in range(1, len(coords)):
            if abs(coords[idx][1] - current_y) <= y_threshold:
                current_line_texts.append(sorted_texts[idx])
            else:
                line = " ".join(t for t in current_line_texts if t.strip())
                if line.strip():
                    lines_text.append(line)
                current_line_texts = [sorted_texts[idx]]
                current_y = coords[idx][1]

        line = " ".join(t for t in current_line_texts if t.strip())
        if line.strip():
            lines_text.append(line)

        page_num = page_idx + 1
        header = f"\n{'='*60}\n  PAGE {page_num} / {total_pages}\n{'='*60}\n"
        all_pages_text.append(header + "\n".join(lines_text))

    return "\n".join(all_pages_text)

In [7]:
full_page_result = get_full_text(ocr_results=ocr_results, y_threshold=12)

In [8]:
print(full_page_result)


  PAGE 1 / 5
EASA AD No.: 2025-0254R1
Airworthiness Directive
-
AD No.: 2025-0254R1
European Union Aviation Safety Agency
Issued: 05 December 2025
Note: This Airworthiness Directive (AD) is issued by EASA, acting in accordance with Regulation
(EU) 2018/1139 on behalf of the European Union, its Member States and of the European third
countries that participate in the activities of EASA under Article 129 of that Regulation.
This AD is issued in accordance with Regulation (EU) 748/2012, Part 21.A.3B. In accordance with Regulation (EU) 1321/2014 Annex I Part M.A.301, or
Annex Vb Part ML.A.301, as applicable, the continuing airworthiness of an aircraft shall be ensured by accomplishing any applicable ADs. Consequently,
no person may operate an aircraft to which an AD applies, except in accordance with the requirements of that AD, unless otherwise specified by the
Agency [Regulation (EU) 1321/2014 Annex I Part M.A.303, or Annex Vb Part ML.A.303, as applicable] or agreed with the Authority o

In [3]:
from enum import Enum
from typing import Optional, List
from pydantic import BaseModel, Field
from google.genai import types

class TimeUnit(str, Enum):
    FLIGHT_HOURS = "flight_hours"
    FLIGHT_CYCLES = "flight_cycles"
    DAYS = "days"
    MONTHS = "months"
    YEARS = "years"
    CALENDAR_DATE = "calendar_date"


class NumericRange(BaseModel):
    start: Optional[int] = Field(
        default=None,
        description=(
            "Lower bound of the MSN range (inclusive by default). "
            "Set to None if there is no lower bound."
        )
    )
    end: Optional[int] = Field(
        default=None,
        description=(
            "Upper bound of the MSN range (inclusive by default). "
            "Set to None if there is no upper bound."
        )
    )
    inclusive_start: bool = Field(
        default=True,
        description="True means >= (greater than or equal to start). False means > (strictly greater than)."
    )
    inclusive_end: bool = Field(
        default=True,
        description="True means <= (less than or equal to end). False means < (strictly less than)."
    )


class MSNConstraint(BaseModel):
    all: Optional[bool] = Field(
        default=None,
        description=(
            "Set to True when the AD explicitly states 'all manufacturer serial numbers (MSN)' or 'all MSN'. "
            "IMPORTANT: Never leave this None when the AD explicitly uses the word 'all' for MSN applicability — "
            "even if other exclusions apply, the 'all' inclusion must still be captured here. "
            "Leave None only when applicability is defined purely by a specific range or list."
        )
    )
    range: Optional[NumericRange] = Field(
        default=None,
        description=(
            "A continuous numeric range of MSNs this constraint covers. "
            "Use when the AD specifies a span like 'MSN 100 through MSN 500'. "
            "Do not use together with specific_msns."
        )
    )
    specific_msns: Optional[List[int]] = Field(
        default=None,
        description=(
            "An explicit list of individual MSN integers this constraint covers. "
            "Use when the AD names specific serial numbers, e.g. 'MSN 364 or MSN 385'. "
            "Do not use together with range."
        )
    )
    excluded: bool = Field(
        default=False,
        description=(
            "Set to True when these MSNs are EXCLUDED from applicability "
            "(AD language like 'except MSN...', 'excluding MSN...'). "
            "Set to False when these MSNs are positively INCLUDED in applicability. "
            "Default is False (inclusion)."
        )
    )


class ModificationConstraint(BaseModel):
    modification_id: str = Field(
        description=(
            "The exact modification identifier as written in the AD. "
            "Always an Airbus 'mod' number, e.g. 'mod 24591', 'mod 24977'. "
            "IMPORTANT: Modification numbers are never Service Bulletins — "
            "do not confuse with SB identifiers (e.g. 'A320-57-XXXX'). "
            "Copy the identifier verbatim from the AD text."
        )
    )
    embodied: Optional[bool] = Field(
        default=None,
        description=(
            "True = this modification IS embodied on the aircraft. "
            "False = this modification is NOT embodied on the aircraft. "
            "None = embodiment status is unspecified or not relevant to this constraint."
        )
    )
    excluded: bool = Field(
        default=False,
        description=(
            "Set to True when aircraft WITH this modification embodied are EXCLUDED from applicability "
            "(AD language like 'except those on which mod XXXXX has been embodied in production'). "
            "Set to False when this modification is a positive inclusion condition. "
            "Default is False (inclusion)."
        )
    )


class ServiceBulletinConstraint(BaseModel):
    sb_identifier: str = Field(
        description=(
            "The exact Service Bulletin identifier as written in the AD, "
            "e.g. 'A320-57-1089', 'A320-57-1100'. "
            "IMPORTANT: Only actual Airbus Service Bulletins belong here (format: 'AXXX-XX-XXXX'). "
            "Airbus modification numbers ('mod XXXXX') must NEVER be placed here — "
            "those belong exclusively in ModificationConstraint. "
            "Copy the identifier verbatim from the AD text, without the 'SB' prefix."
        )
    )
    revision: Optional[str] = Field(
        default=None,
        description=(
            "The revision qualifier for this SB constraint, exactly as stated in the AD. "
            "Examples: 'Revision 04', 'any revision lower than Revision 04', 'Revision 03 or later'. "
            "Leave None if no specific revision is mentioned and any revision applies."
        )
    )
    incorporated: Optional[bool] = Field(
        default=None,
        description=(
            "True = this SB HAS been incorporated on the aircraft. "
            "False = this SB has NOT been incorporated on the aircraft. "
            "None = incorporation status is unspecified or not relevant to this constraint."
        )
    )
    excluded: bool = Field(
        default=False,
        description=(
            "Set to True when aircraft on which this SB HAS been embodied are EXCLUDED from applicability "
            "(AD language like 'except those on which SB XXXX has been embodied'). "
            "Set to False when this SB is a positive inclusion or compliance condition. "
            "Default is False (inclusion)."
        )
    )


class AircraftGroup(BaseModel):
    group_id: str = Field(
        description=(
            "The group label exactly as defined in the AD's Groups section. "
            "Examples: 'Group 1', 'Group 2', 'Group A', 'Group B'. "
            "Use verbatim from the AD — do not invent or rename groups."
        )
    )
    models: Optional[List[str]] = Field(
        default=None,
        description=(
            "Aircraft model variants that belong to this group, "
            "derived from the group definition. "
            "Examples: ['A321-111', 'A321-112'] or ['A320']. "
            "Leave None if the group definition does not restrict by model "
            "(i.e. it applies to all models already listed in the top-level applicability)."
        )
    )
    msn_constraints: Optional[List[MSNConstraint]] = Field(
        default=None,
        description=(
            "MSN-based constraints that define or restrict membership in this group. "
            "Apply the same rules as top-level msn_constraints: "
            "if the group definition says 'all MSN', populate with MSNConstraint(all=True, excluded=False). "
            "If the group is defined by specific MSNs, list them in specific_msns. "
            "Leave None only if MSN is not a factor in this group's definition."
        )
    )
    modification_constraints: Optional[List[ModificationConstraint]] = Field(
        default=None,
        description=(
            "Modification-based constraints that define or exclude aircraft from this group. "
            "Only use ModificationConstraint here — never mix with SB identifiers. "
            "Examples: a group excluding aircraft with a specific mod embodied in production. "
            "Leave None if modifications are not a factor in this group's definition."
        )
    )
    sb_constraints: Optional[List[ServiceBulletinConstraint]] = Field(
        default=None,
        description=(
            "Service Bulletin constraints that define or exclude aircraft from this group. "
            "Only use actual SB identifiers here — never use mod numbers. "
            "Example: a group defined by aircraft on which a specific SB has NOT been embodied. "
            "Leave None if SBs are not a factor in this group's definition."
        )
    )
    description: Optional[str] = Field(
        default=None,
        description=(
            "Free-text fallback for group membership logic that cannot be fully expressed "
            "by the structured fields above. "
            "Transcribe the exact defining sentence from the AD. "
            "Always populate this field — it serves as a human-readable audit trail "
            "even when structured fields are also populated."
        )
    )


class ComplianceTime(BaseModel):
    value: Optional[int] = Field(
        default=None,
        description=(
            "The numeric value of this compliance time. Always a positive integer. "
            "Examples: 37300 for '37 300 flight hours', 24 for '24 months', 90 for '90 days'. "
            "Set to None only when a specific calendar_date is used instead of a relative time value."
        )
    )
    unit: Optional[TimeUnit] = Field(
        default=None,
        description=(
            "The unit of measurement corresponding to value. "
            "Must be one of the TimeUnit enum values. "
            "Set to None only when calendar_date is used instead of value+unit."
        )
    )
    reference: Optional[str] = Field(
        default=None,
        description=(
            "The reference point from which this time is measured, transcribed from the AD. "
            "Examples: 'since first flight of the aeroplane', "
            "'after the effective date of this AD', "
            "'since the last inspection', "
            "'from the effective date of this AD'. "
            "Leave None only if no reference point is stated and the context is self-evident."
        )
    )
    calendar_date: Optional[str] = Field(
        default=None,
        description=(
            "An absolute calendar deadline in ISO 8601 format (YYYY-MM-DD). "
            "Use only when the AD specifies a hard date rather than a relative time window. "
            "When populated, value and unit should be None. "
            "Example: '2026-06-01' for 'before 01 June 2026'."
        )
    )
    is_interval: bool = Field(
        default=False,
        description=(
            "Set to True for RECURRING intervals between repeated actions "
            "(AD language like 'thereafter, at intervals not exceeding X FH'). "
            "Set to False for one-time initial thresholds "
            "(AD language like 'before exceeding X FH since first flight'). "
            "Default is False."
        )
    )


class RequirementAction(BaseModel):
    paragraph_id: str = Field(
        description=(
            "The paragraph identifier exactly as numbered in the AD's Required Actions section. "
            "Examples: '(1)', '(5)', '(8)', '(12)'. "
            "Used to cross-reference paragraphs (e.g. corrective actions referencing their "
            "triggering inspection paragraph)."
        )
    )
    action_type: str = Field(
        description=(
            "The category of this required action. Use exactly one of the following values: "
            "'inspection' — any DET, GVI, SDI, ESDI, or other inspection task; "
            "'modification' — a structural, design, or configuration change to the aircraft; "
            "'corrective_action' — a repair or follow-up action triggered by a finding during inspection; "
            "'terminating_action' — an action whose accomplishment ends one or more repetitive requirements; "
            "'prohibition' — an action that must NOT be accomplished (e.g. 'do not embody SB X below Rev Y'); "
            "'clarification' — a paragraph that clarifies scope or interaction between other paragraphs "
            "without itself requiring a physical action (e.g. 'accomplishment of paragraph X does not "
            "terminate paragraph Y')."
        )
    )
    applies_to_groups: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of group IDs, exactly as defined in the AD's Groups section, "
            "to which this requirement applies. "
            "Examples: ['Group 1'], ['Group 1', 'Group 4']. "
            "Leave None if the requirement is stated in terms of direct model references "
            "rather than group labels, or if it applies implicitly to all groups "
            "(e.g. clarification paragraphs)."
        )
    )
    applies_to_models: Optional[List[str]] = Field(
        default=None,
        description=(
            "Direct aircraft model references for requirements that do not use group labels. "
            "Examples: ['A320-211', 'A320-212']. "
            "Leave None when applies_to_groups is populated — do not duplicate the same "
            "applicability in both fields."
        )
    )
    additional_applicability_condition: Optional[str] = Field(
        default=None,
        description=(
            "Any further condition within the stated group or model scope that narrows "
            "which aircraft this paragraph applies to, transcribed verbatim from the AD. "
            "Use when the paragraph adds a qualifier beyond the group definition itself. "
            "Examples: "
            "'except aeroplanes modified in accordance with the instructions of Airbus SB A320-57-1100', "
            "'having embodied SB A320-57-1089 at any revision lower than Revision 04 (for Group 4 aeroplanes)'. "
            "Leave None if no additional condition is stated."
        )
    )
    description: str = Field(
        description=(
            "A concise, self-contained human-readable summary of what action must be performed. "
            "Include: the inspection method or action type (e.g. DET, GVI, modification), "
            "the area or component involved, and the reference document(s) to follow. "
            "Write in plain language suitable for a maintenance engineer to understand at a glance. "
            "Example: 'Accomplish a detailed inspection (DET) of the LH and RH wing inner rear spars "
            "at the MLG anchorage fitting attachment holes, per SB A320-57-1101 Revision 04.'"
        )
    )
    compliance_times: Optional[List[ComplianceTime]] = Field(
        default=None,
        description=(
            "One or more initial compliance thresholds by which this action must first be accomplished. "
            "When the AD states multiple limits with 'whichever occurs first', "
            "list each as a separate ComplianceTime entry — the whichever-first logic is implied "
            "by multiple entries in this list. "
            "Example: '37 300 FH or 20 000 FC whichever occurs first since first flight' → "
            "two ComplianceTime entries: one for 37300 FH and one for 20000 FC, "
            "both with reference 'since first flight of the aeroplane' and is_interval=False. "
            "Leave None for clarification paragraphs or terminating action notes with no time limit."
        )
    )
    interval: Optional[List[ComplianceTime]] = Field(
        default=None,
        description=(
            "One or more recurring intervals for repetitive requirements. "
            "Populate only when the AD states 'thereafter, at intervals not exceeding...'. "
            "As with compliance_times, list each limit as a separate ComplianceTime entry "
            "when multiple limits apply with 'whichever occurs first'. "
            "All entries must have is_interval=True. "
            "Leave None for one-time actions (modifications, one-time inspections, corrective actions)."
        )
    )
    reference_documents: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of Airbus Service Bulletins or other technical documents whose instructions "
            "must be followed to accomplish this action. "
            "Include the revision where the AD specifies it. "
            "Examples: ['SB A320-57-1101 Revision 04', 'SB A320-57-1256']. "
            "Leave None for corrective actions where the repair instructions are obtained "
            "from Airbus on a case-by-case basis, or for clarification paragraphs."
        )
    )
    triggered_by_paragraph: Optional[str] = Field(
        default=None,
        description=(
            "For corrective_action paragraphs only: the paragraph_id of the inspection "
            "or action that triggers this corrective action when discrepancies are found. "
            "Example: '(1)' means this corrective action is triggered by findings during "
            "the inspection required by paragraph (1). "
            "Leave None for all non-corrective action types."
        )
    )
    terminating_action_for: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of paragraph_ids whose repetitive requirements are permanently terminated "
            "upon accomplishment of this action. "
            "Example: ['(5)'] means completing this action ends the recurring inspections "
            "required by paragraph (5) for that aircraft. "
            "Leave None if this action has no terminating effect on other paragraphs. "
            "Note: also set is_terminating_action=True when this field is populated."
        )
    )
    is_terminating_action: bool = Field(
        default=False,
        description=(
            "Set to True if accomplishing this action permanently terminates one or more "
            "repetitive requirements in this AD. "
            "Must be True whenever terminating_action_for is populated. "
            "Default is False."
        )
    )


class ADApplicabilityExtraction(BaseModel):
    ad_number: str = Field(
        description=(
            "The full AD identifier including any revision suffix, exactly as it appears in the AD header. "
            "Examples: '2025-0254R1', '2023-0041', 'AD 2021-23-10'. "
            "Never omit the revision suffix if present."
        )
    )
    issuing_authority: Optional[str] = Field(
        default=None,
        description=(
            "The aviation authority that issued this AD. "
            "Examples: 'EASA', 'FAA', 'TCCA', 'CASA'. "
            "Taken from the AD header or introductory paragraph."
        )
    )
    effective_date: Optional[str] = Field(
        default=None,
        description=(
            "The effective date of this AD (or its most recent revision) in ISO 8601 format (YYYY-MM-DD). "
            "If multiple dates are listed (original issue and revision), use the revision's effective date. "
            "Example: '2025-12-08'."
        )
    )
    revision: Optional[str] = Field(
        default=None,
        description=(
            "The revision label of this AD exactly as stated in the document. "
            "Examples: 'Revision 01', 'R1', 'Amendment 2'. "
            "Leave None for original issue (no revision)."
        )
    )
    supersedes: Optional[List[str]] = Field(
        default=None,
        description=(
            "List of AD identifiers that this AD supersedes, replaces, or revises, "
            "taken from the Revision field or the Reason section. "
            "Include all superseded ADs, not just the immediate predecessor. "
            "Examples: ['2025-0254', '2007-0162', '2014-0169']. "
            "Leave None if this is a first-issue AD that supersedes nothing."
        )
    )
    models: Optional[List[str]] = Field(
        default=None,
        description=(
            "Complete list of every aircraft model variant explicitly named in the "
            "Applicability section of the AD. "
            "List each variant as a separate string, exactly as written. "
            "Examples: ['A320-211', 'A320-212', 'A320-214', 'A321-111', 'A321-112']. "
            "Do not collapse variants (e.g. do not write 'A320' if the AD lists 'A320-211', 'A320-212' etc.)."
        )
    )
    msn_constraints: Optional[List[MSNConstraint]] = Field(
        default=None,
        description=(
            "Top-level MSN constraints covering the entire AD applicability, before any group scoping. "
            "IMPORTANT — never leave this None when the AD mentions MSN applicability: "
            "If the AD says 'all manufacturer serial numbers (MSN)' or 'all MSN', "
            "always populate with at least one MSNConstraint(all=True, excluded=False). "
            "If specific MSN ranges or numbers are excluded (e.g. 'except MSN 001 to 099'), "
            "add a separate MSNConstraint with excluded=True for those. "
            "Only leave None if the AD makes absolutely no reference to MSN applicability."
        )
    )
    modification_constraints: Optional[List[ModificationConstraint]] = Field(
        default=None,
        description=(
            "Top-level Airbus modification constraints covering the entire AD applicability. "
            "IMPORTANT: Only 'mod XXXXX' numbers belong here — never SB identifiers. "
            "These are almost always exclusions: aircraft on which a specific mod has been "
            "embodied in production are excluded from the AD's scope. "
            "Capture each mod as a separate ModificationConstraint. "
            "Example: 'except those on which Airbus mod 24591 has been embodied in production' → "
            "ModificationConstraint(modification_id='mod 24591', embodied=True, excluded=True). "
            "Leave None only if no modification-based applicability constraints exist in this AD."
        )
    )
    sb_constraints: Optional[List[ServiceBulletinConstraint]] = Field(
        default=None,
        description=(
            "Top-level Service Bulletin constraints covering the entire AD applicability. "
            "IMPORTANT: Only actual Airbus SB identifiers (format 'AXXX-XX-XXXX') belong here. "
            "Airbus modification numbers ('mod XXXXX') must NEVER be placed here — "
            "those belong exclusively in modification_constraints. "
            "These are typically SB-based exclusions, e.g. aircraft on which a specific SB "
            "revision has been embodied are excluded from scope. "
            "Example: 'except those on which SB A320-57-1089 at Revision 04 has been embodied' → "
            "ServiceBulletinConstraint(sb_identifier='A320-57-1089', revision='Revision 04', "
            "incorporated=True, excluded=True). "
            "Leave None only if no SB-based applicability constraints exist in this AD."
        )
    )
    compliance_time: Optional[List[ComplianceTime]] = Field(
        default=None,
        description=(
            "Top-level summary of the most immediate compliance deadline(s) imposed by this AD as a whole. "
            "The intent is to surface the AD's urgency at a glance, without requiring a consumer "
            "to parse every RequirementAction. "
            "Populate with the most restrictive (shortest) initial deadline across all requirements. "
            "When the shortest deadline is expressed as 'X or Y whichever occurs first', "
            "list both as separate ComplianceTime entries. "
            "This field is a summary — full per-paragraph compliance times are still "
            "captured in each RequirementAction.compliance_times. "
            "Leave None only if this AD contains no time-limited requirements "
            "(e.g. a purely prohibitive AD with no deadline)."
        )
    )
    groups: Optional[List[AircraftGroup]] = Field(
        default=None,
        description=(
            "Definitions of all aircraft groups declared in the AD's Groups section, "
            "one AircraftGroup entry per defined group. "
            "Groups are internal AD constructs that partition applicable aircraft for "
            "the purpose of applying different requirements to different subsets. "
            "Preserve the exact group labels and definitions from the AD. "
            "Leave None only if the AD does not define any named groups."
        )
    )
    requirements: Optional[List[RequirementAction]] = Field(
        default=None,
        description=(
            "Complete list of all required actions, one RequirementAction per numbered paragraph "
            "in the AD's Required Actions section. "
            "This is the primary output of the extraction. "
            "Every paragraph must be captured — inspections, modifications, corrective actions, "
            "prohibitions, terminating actions, and clarification notes alike. "
            "Preserve paragraph numbering exactly as in the AD. "
            "Leave None only if the AD contains no required actions (which should never occur "
            "for a valid AD)."
        )
    )

system_prompt = """
You are an aviation regulatory document parser specialized in Airworthiness Directives (ADs).
Extract structured applicability and compliance information from the provided AD document.

EXTRACTION RULES:
- Extract only information explicitly stated in the document. Never infer or assume.
- Preserve all identifiers verbatim (model names, SB numbers, mod numbers, MSNs).
- If a field has no corresponding information in the document, set it to null.
- Output valid JSON only. No markdown, no explanation, no commentary.

CRITICAL DISTINCTIONS:
- Airbus modification numbers (e.g. "mod 24591") → always go in modification_constraints. Never in sb_constraints.
- Service Bulletin identifiers (e.g. "A320-57-1089") → always go in sb_constraints. Never in modification_constraints.
- If the AD states "all MSN" or "all manufacturer serial numbers", always set MSNConstraint(all=True, excluded=False). Never leave msn_constraints null when MSN applicability is mentioned.
- When multiple compliance limits use "whichever occurs first", list each as a separate ComplianceTime entry.
- Recurring intervals ("thereafter, at intervals not exceeding...") → is_interval=True.
- One-time thresholds ("before exceeding...") → is_interval=False.

OUTPUT: Valid JSON strictly following the provided schema.
"""

In [None]:
from google import genai
from google.genai import types
import os

client = genai.Client(
    api_key=os.getenv("GOOGLE_API_KEY")
)

model = "gemini-2.5-flash"
model_config = types.GenerateContentConfig(
    system_instruction=system_prompt,
    temperature=0.1,
    response_mime_type="application/json",
    response_json_schema=ADApplicabilityExtraction.model_json_schema()
)
response_ocr = client.models.generate_content(
    model=model,
    config=model_config,
    contents=f"Now extract the following ocrd text:\n\n{full_page_result}"
)

In [21]:
response_ocr = client.models.generate_content(
    model=model,
    config=model_config,
    contents=f"Now extract the following ocrd text:\n\n{full_page_result}"
)

[2026-02-20 13:44:05,958] [    INFO] models.py:5613 - AFC is enabled with max remote calls: 10.
[2026-02-20 13:44:45,140] [    INFO] _client.py:1025 - HTTP Request: POST https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent "HTTP/1.1 200 OK"


In [23]:
import json

outputs = ADApplicabilityExtraction.model_validate_json(response_ocr.text).model_dump()
SAVE_JSON_DIR = os.path.join(ROOT_TEMP_DIR, FILE_ID, f"{FILE_ID}_RESULTS_OCR.json")
with open(SAVE_JSON_DIR, 'w') as f:
    json.dump(outputs, f, indent=2, ensure_ascii=False)

# FINAL WORKFLOW

## WITHOUT OCR FULLY VLM

In [10]:
import os
import json
import shutil
from uuid import uuid4
from typing import Optional

import pandas as pd
from loguru import logger
from pydantic import BaseModel
from google import genai
from google.genai import types
from pdf2image import convert_from_bytes


class ADRecognitionFullLLM:
    def __init__(
        self,
        dpi: int,
        llm_model: str,
        llm_system_prompt: str,
        llm_temperature: float,
        llm_output_schema: type[BaseModel],
        temp_dir: Optional[str] = None,
    ):
        self.dpi = dpi
        self.llm_client = genai.Client(
            api_key=os.getenv("GOOGLE_API_KEY")
        )
        self.llm_model = llm_model
        self.llm_system_prompt = llm_system_prompt
        self.llm_temperature = llm_temperature
        self.llm_output_schema = llm_output_schema

        if not temp_dir:
            current_dir = os.getcwd()
            self.temp_dir = os.path.join(current_dir, "tmp/ad_recognition")

        else:
            self.temp_dir = temp_dir
            
        os.makedirs(self.temp_dir, exist_ok=True)
        self._run_dirs: list[str] = []  # track created run dirs for cleanup

    # ------------------------------------------------------------------ #
    #  Helper: Derive AD label from filename
    # ------------------------------------------------------------------ #
    @staticmethod
    def _label_from_path(pdf_path: str) -> str:
        return os.path.splitext(os.path.basename(pdf_path))[0]

    # ------------------------------------------------------------------ #
    #  Cleanup
    # ------------------------------------------------------------------ #
    def _cleanup_temp(self):
        """Remove all temporary run directories created during this session."""
        if not self._run_dirs:
            return

        logger.info(f"🧹 Cleaning up {len(self._run_dirs)} temp directories...")
        for run_dir in self._run_dirs:
            try:
                shutil.rmtree(run_dir)
                logger.debug(f"   🗑️  Removed: {run_dir}")
            except Exception as e:
                logger.warning(f"   ⚠️  Failed to remove {run_dir}: {e}")
        self._run_dirs.clear()

        # Remove parent temp dir if empty
        try:
            if os.path.exists(self.temp_dir) and not os.listdir(self.temp_dir):
                os.rmdir(self.temp_dir)
                logger.debug(f"   🗑️  Removed empty temp dir: {self.temp_dir}")
        except Exception:
            pass

        logger.info("✅ Cleanup complete")

    # ------------------------------------------------------------------ #
    #  Step 1: PDF -> Images
    # ------------------------------------------------------------------ #
    def _pdf_to_images(self, pdf_path: str, run_dir: str) -> list[str]:
        logger.info(f"📄 Converting PDF to images: {pdf_path} (dpi={self.dpi})")
        imgs_dir = os.path.join(run_dir, "pages")
        os.makedirs(imgs_dir, exist_ok=True)

        with open(pdf_path, "rb") as f:
            img_paths = convert_from_bytes(
                f.read(),
                output_folder=imgs_dir,
                fmt="png",
                paths_only=True,
                dpi=self.dpi,
            )
        logger.info(f"🖼️  Generated {len(img_paths)} page images")
        return img_paths

    # ------------------------------------------------------------------ #
    #  Step 2: Prepare LLM messages
    # ------------------------------------------------------------------ #
    def _prepare_messages(self, img_paths: list[str]) -> list:
        logger.info(f"📦 Preparing {len(img_paths)} images for LLM...")
        messages = ["Now, extract the following images!"]
        for img_path in img_paths:
            logger.debug(f"   🔗 Encoding: {os.path.basename(img_path)}")
            with open(img_path, "rb") as f:
                img_bytes = f.read()
            messages.append(
                types.Part.from_bytes(
                    data=img_bytes,
                    mime_type="image/png",
                )
            )
        logger.info("✅ All images encoded and ready")
        return messages

    # ------------------------------------------------------------------ #
    #  Step 3: Call Gemini for structured extraction
    # ------------------------------------------------------------------ #
    def _extract_with_llm(self, messages: list) -> dict:
        logger.info(f"🤖 Calling LLM model: {self.llm_model}")

        config = types.GenerateContentConfig(
            system_instruction=self.llm_system_prompt,
            temperature=self.llm_temperature,
            response_mime_type="application/json",
            response_json_schema=self.llm_output_schema.model_json_schema(),
        )

        response = self.llm_client.models.generate_content(
            model=self.llm_model,
            config=config,
            contents=messages,
        )

        parsed = self.llm_output_schema.model_validate_json(response.text)
        logger.info("🎯 LLM extraction completed successfully")
        return parsed.model_dump()

    # ------------------------------------------------------------------ #
    #  Step 4: Save extraction results
    # ------------------------------------------------------------------ #
    def _save_extraction(self, data: dict, run_dir: str, label: str) -> str:
        out_path = os.path.join(run_dir, f"{label}_extraction.json")
        with open(out_path, "w") as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        logger.info(f"💾 Saved extraction: {out_path}")
        return out_path

    # ------------------------------------------------------------------ #
    #  Step 5: Extract a single AD PDF
    # ------------------------------------------------------------------ #
    def extract_ad(self, pdf_path: str, label: Optional[str] = None) -> dict:
        if label is None:
            label = self._label_from_path(pdf_path)

        run_id = uuid4().hex
        run_dir = os.path.join(self.temp_dir, run_id)
        os.makedirs(run_dir, exist_ok=True)
        self._run_dirs.append(run_dir)
        logger.info(f"🚀 [{label}] Starting extraction — run_id={run_id}")

        img_paths = self._pdf_to_images(pdf_path, run_dir)
        messages = self._prepare_messages(img_paths)
        extraction = self._extract_with_llm(messages)
        self._save_extraction(extraction, run_dir, label)

        logger.info(f"✅ [{label}] Extraction complete!")
        return extraction

    # ------------------------------------------------------------------ #
    #  Step 6: Compare test data against extracted AD(s)
    # ------------------------------------------------------------------ #
    def compare_to_ad(self, df: pd.DataFrame, ad_file_dict: dict) -> pd.DataFrame:

        ad_columns = list(ad_file_dict.keys())
        ad_rows = []

        for _, item in df.iterrows():
            model = str(item["aircraft_model"])
            msn = int(item["msn"])

            # Support multiple modifications/SBs as a comma-separated string or single value
            raw_mod = item["modifications_applied"]
            if pd.isna(raw_mod) or str(raw_mod).strip().lower() in ("none", "n/a", ""):
                mods_applied = []
            else:
                mods_applied = [m.strip() for m in str(raw_mod).split(",")]

            logger.info(f"Checking AD status for model: {model}, msn: {msn}, mods applied: {mods_applied}")

            ad_status_rows = []

            for ad in ad_columns:

                logger.info(f"Checking in {ad} file")

                ad_data = ad_file_dict[ad]

                # ----------------------------------------------------------------
                # STEP 1: Model check
                # ----------------------------------------------------------------
                model_status = any(model in m for m in ad_data["models"])

                if not model_status:
                    ad_status_rows.append("❌ Not applicable")
                    continue

                # ----------------------------------------------------------------
                # STEP 2: MSN check
                # ----------------------------------------------------------------
                msn_constraints = ad_data.get("msn_constraints") or []

                if not msn_constraints:
                    # No MSN constraints defined → all MSNs in scope
                    msn_status = True

                else:
                    msn_status = False  # default: not in scope until a constraint includes it

                    for msn_constraint in msn_constraints:
                        all_msn      = msn_constraint.get("all")
                        range_data   = msn_constraint.get("range")
                        specific     = msn_constraint.get("specific_msns")
                        excluded     = msn_constraint.get("excluded", False)

                        matched = False

                        if all_msn:
                            matched = True

                        elif range_data:
                            start           = range_data.get("start")
                            end             = range_data.get("end")
                            incl_start      = range_data.get("inclusive_start", True)
                            incl_end        = range_data.get("inclusive_end", True)

                            lower_ok = (msn >= start) if incl_start else (msn > start)
                            upper_ok = (msn <= end)   if incl_end   else (msn < end)

                            matched = lower_ok and upper_ok

                        elif specific:
                            matched = msn in specific

                        if matched:
                            # excluded=True → this constraint REMOVES the aircraft from scope
                            # excluded=False → this constraint ADDS the aircraft to scope
                            msn_status = not excluded
                            break

                if not msn_status:
                    ad_status_rows.append("❌ Not applicable")
                    continue

                # ----------------------------------------------------------------
                # STEP 3: Modification / SB exclusion check
                # ----------------------------------------------------------------
                if not mods_applied:
                    # No modifications on this aircraft → no exclusion can apply
                    ad_status_rows.append("✅ Affected")
                    continue

                excluded_by_mod = False

                for mod_applied in mods_applied:

                    if "mod" in mod_applied.lower():
                        # --- Airbus modification number check ---
                        mod_constraints = ad_data.get("modification_constraints") or []

                        for mod_constraint in mod_constraints:
                            mod_id       = mod_constraint.get("modification_id", "")
                            is_excluded  = mod_constraint.get("excluded", False)

                            # Use word-boundary match to avoid "mod 245" matching "mod 24591"
                            if re.search(r'\b' + re.escape(mod_id) + r'\b', mod_applied):
                                if is_excluded:
                                    # This mod excludes the aircraft from AD scope
                                    excluded_by_mod = True
                                break

                    else:
                        # --- Service Bulletin check ---
                        sb_constraints = ad_data.get("sb_constraints") or []

                        for sb_constraint in sb_constraints:
                            sb_id       = sb_constraint.get("sb_identifier", "")
                            is_excluded = sb_constraint.get("excluded", False)

                            if re.search(r'\b' + re.escape(sb_id) + r'\b', mod_applied):
                                if is_excluded:
                                    # This SB excludes the aircraft from AD scope
                                    excluded_by_mod = True
                                break

                    if excluded_by_mod:
                        break

                if excluded_by_mod:
                    ad_status_rows.append("❌ Not Affected")
                else:
                    ad_status_rows.append("✅ Affected")

            ad_rows.append(ad_status_rows)

        ad_df = pd.DataFrame(ad_rows, columns=ad_columns)
        
        combined_df = pd.concat([df, ad_df], axis=1)

        return combined_df

    # ------------------------------------------------------------------ #
    #  Step 7: Full pipeline
    # ------------------------------------------------------------------ #
    def run_analysis(
        self,
        test_data_path: str,
        ad_file_paths: list[str],
        save_dir: str,
        cleanup: bool = True,
    ) -> str:
        """
        Run the complete AD recognition and comparison pipeline.

        Args:
            test_data_path: Path to test CSV file.
            ad_file_paths: List of AD PDF file paths to extract and compare.
            save_dir: Directory to save final results.
            cleanup: Whether to delete temp directories after saving results.

        Returns:
            Path to the saved results CSV.
        """
        logger.info("🔰" + "=" * 58)
        logger.info(f"🛫 Starting AD Recognition Pipeline — {len(ad_file_paths)} AD(s)")
        logger.info("🔰" + "=" * 58)

        try:
            # --- Extract all AD PDFs ---
            ad_extractions: dict[str, dict] = {}
            for i, pdf_path in enumerate(ad_file_paths, 1):
                label = self._label_from_path(pdf_path)
                logger.info(f"📋 [{i}/{len(ad_file_paths)}] Processing: {label}")
                extraction = self.extract_ad(pdf_path, label=label)
                ad_extractions[label] = extraction

            # --- Load test data ---
            logger.info(f"📊 Loading test data: {test_data_path}")
            test_data = pd.read_csv(test_data_path, sep=",")
            logger.info(f"📐 Test data shape: {test_data.shape}")

            # --- Compare ---
            logger.info(f"⚙️  Running AD comparison against {len(ad_extractions)} AD(s)...")
            result_df = self.compare_to_ad(test_data, ad_file_dict=ad_extractions)
            logger.info(f"🏁 Comparison done — {len(result_df)} rows classified")

            # --- Save results ---
            os.makedirs(save_dir, exist_ok=True)
            result_path = os.path.join(save_dir, "ad_classification_results.csv")
            result_df.to_csv(result_path, index=False)
            logger.info(f"💾 Results saved: {result_path}")

            extractions_path = os.path.join(save_dir, "ad_extractions.json")
            with open(extractions_path, "w") as f:
                json.dump(ad_extractions, f, indent=2, ensure_ascii=False)
            logger.info(f"💾 Extractions saved: {extractions_path}")

        finally:
            # Always cleanup temp dirs, even if pipeline fails
            if cleanup:
                self._cleanup_temp()

        logger.info("🔰" + "=" * 58)
        logger.info("🎉 Pipeline complete!")
        logger.info("🔰" + "=" * 58)

        return result_path

In [11]:
SYSTEM_PROMPT = """
You are an aviation regulatory document parser specialized in Airworthiness Directives (ADs).
Extract structured applicability and compliance information from the provided AD document.

EXTRACTION RULES:
- Extract only information explicitly stated in the document. Never infer or assume.
- Preserve all identifiers verbatim (model names, SB numbers, mod numbers, MSNs).
- If a field has no corresponding information in the document, set it to null.
- Output valid JSON only. No markdown, no explanation, no commentary.

CRITICAL DISTINCTIONS:
- Airbus modification numbers (e.g. "mod 24591") → always go in modification_constraints. Never in sb_constraints.
- Service Bulletin identifiers (e.g. "A320-57-1089") → always go in sb_constraints. Never in modification_constraints.
- If the AD states "all MSN" or "all manufacturer serial numbers", always set MSNConstraint(all=True, excluded=False). Never leave msn_constraints null when MSN applicability is mentioned.
- When multiple compliance limits use "whichever occurs first", list each as a separate ComplianceTime entry.
- Recurring intervals ("thereafter, at intervals not exceeding...") → is_interval=True.
- One-time thresholds ("before exceeding...") → is_interval=False.

OUTPUT: Valid JSON strictly following the provided schema.
"""

pipeline = ADRecognitionFullLLM(
    dpi=300,
    llm_model="gemini-2.5-flash",
    llm_system_prompt=SYSTEM_PROMPT,
    llm_temperature=0.1,
    llm_output_schema=ADApplicabilityExtraction,
)

In [12]:
result_path = pipeline.run_analysis(
    test_data_path="/home/naufal/soji_ai/test/ad_test_data.csv",
    ad_file_paths=[
        "/home/naufal/soji_ai/documents/EASA_AD_2025-0254R1_1.pdf",
        "/home/naufal/soji_ai/documents/EASA_AD_US-2025-23-53_1.pdf",
    ],
    save_dir="/home/naufal/soji_ai/results",
)

[32m2026-02-20 18:06:31.435[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_analysis[0m:[36m335[0m - [1m🛫 Starting AD Recognition Pipeline — 2 AD(s)[0m
[32m2026-02-20 18:06:31.435[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_analysis[0m:[36m343[0m - [1m📋 [1/2] Processing: EASA_AD_2025-0254R1_1[0m
[32m2026-02-20 18:06:31.437[0m | [1mINFO    [0m | [36m__main__[0m:[36mextract_ad[0m:[36m160[0m - [1m🚀 [EASA_AD_2025-0254R1_1] Starting extraction — run_id=c35751c2216e4246b66fd9e02b428978[0m
[32m2026-02-20 18:06:31.437[0m | [1mINFO    [0m | [36m__main__[0m:[36m_pdf_to_images[0m:[36m82[0m - [1m📄 Converting PDF to images: /home/naufal/soji_ai/documents/EASA_AD_2025-0254R1_1.pdf (dpi=300)[0m
[32m2026-02-20 18:06:38.273[0m | [1mINFO    [0m | [36m__main__[0m:[36m_pdf_to_images[0m:[36m94[0m - [1m🖼️  Generated 5 page images[0m
[32m2026-02-20 18:06:38.274[0m | [1mINFO    [0m | [36m__main__[0m:[36m_prepare_messages[0m:[36m101[0m - [1

In [53]:
df = pd.read_csv("/home/naufal/soji_ai/results/ad_classification_results.csv")
df

Unnamed: 0,aircraft_model,msn,modifications_applied,EASA_AD_2025-0254R1_1,EASA_AD_US-2025-23-53_1
0,MD-11,48123,,❌ Not applicable,✅ Affected
1,DC-10-30F,47890,,❌ Not applicable,✅ Affected
2,Boeing 737-800,30123,,❌ Not applicable,❌ Not applicable
3,A320-214,5234,,✅ Affected,❌ Not applicable
4,A320-232,6789,mod 24591 (production),❌ Not Affected,❌ Not applicable
5,A320-214,7456,SB A320-57-1089 Rev 04,❌ Not Affected,❌ Not applicable
6,A321-111,8123,,✅ Affected,❌ Not applicable
7,A321-112,364,mod 24977 (production),❌ Not Affected,❌ Not applicable
8,A319-100,9234,,❌ Not applicable,❌ Not applicable
9,MD-10-10F,46234,,❌ Not applicable,✅ Affected


In [43]:
print(os.getcwd())
print(os.path.join(os.getcwd(), "tmp/ad_recognition"))

/home/naufal/soji_ai/notebooks
/home/naufal/soji_ai/notebooks/tmp/ad_recognition


In [None]:
import os
import json
import shutil
import numpy as np
import pandas as pd
from uuid import uuid4
from typing import Optional, List, Dict, Any
from PIL import Image, ImageDraw, ImageFont
from loguru import logger
from pydantic import BaseModel
from google import genai
from google.genai import types
from pdf2image import convert_from_bytes
from paddleocr import PaddleOCR
from src.core.utils import compare_to_ad

class ADRecognitionOCR:
    def __init__(
        self,
        dpi: int,
        llm_model: str,
        llm_system_prompt: str,
        llm_temperature: float,
        llm_output_schema: type[BaseModel],
        ocr_device: str = "gpu:0",
        ocr_precision: str = "fp16",
        ocr_det_model: str = "PP-OCRv5_mobile_det",
        ocr_rec_model: str = "PP-OCRv5_mobile_rec",
        y_threshold: float = 15.0,
        save_ocr_viz: bool = True,
        cpu_threads: int = 8,
        temp_dir: Optional[str] = None,
    ):
        self.dpi = dpi
        self.y_threshold = y_threshold
        self.save_ocr_viz = save_ocr_viz

        # --- LLM ---
        self.llm_client = genai.Client(api_key=os.getenv("GOOGLE_API_KEY"))
        self.llm_model = llm_model
        self.llm_system_prompt = llm_system_prompt
        self.llm_temperature = llm_temperature
        self.llm_output_schema = llm_output_schema

        # --- OCR Engine ---
        is_cpu = ocr_device.lower() == "cpu"

        if is_cpu:
            logger.info(f"🔧 Initializing PaddleOCR engine on CPU with {cpu_threads} threads...")
        else:
            logger.info(f"🔧 Initializing PaddleOCR engine on {ocr_device}...")

        self.ocr_engine = PaddleOCR(
            use_doc_orientation_classify=False,
            use_doc_unwarping=False,
            use_textline_orientation=False,
            device=ocr_device,
            precision=ocr_precision,
            text_detection_model_name=ocr_det_model,
            text_recognition_model_name=ocr_rec_model,
            cpu_threads=cpu_threads if is_cpu else None,
        )

        if is_cpu:
            logger.info(f"✅ PaddleOCR engine ready (CPU mode — {cpu_threads} threads)")
        else:
            logger.info(f"✅ PaddleOCR engine ready ({ocr_device})")

        # --- Temp dir ---
        if not temp_dir:
            self.temp_dir = os.path.join(os.getcwd(), "tmp/ad_recognition_ocr")
        else:
            self.temp_dir = temp_dir
        os.makedirs(self.temp_dir, exist_ok=True)
        self._run_dirs: list[str] = []

    # ================================================================== #
    #  Helpers
    # ================================================================== #
    @staticmethod
    def _label_from_path(pdf_path: str) -> str:
        return os.path.splitext(os.path.basename(pdf_path))[0]

    def _cleanup_temp(self):
        """Remove all temporary run directories created during this session."""
        if not self._run_dirs:
            return

        logger.info(f"🧹 Cleaning up {len(self._run_dirs)} temp directories...")
        for run_dir in self._run_dirs:
            try:
                shutil.rmtree(run_dir)
                logger.debug(f"   🗑️  Removed: {run_dir}")
            except Exception as e:
                logger.warning(f"   ⚠️  Failed to remove {run_dir}: {e}")
        self._run_dirs.clear()

        try:
            if os.path.exists(self.temp_dir) and not os.listdir(self.temp_dir):
                os.rmdir(self.temp_dir)
                logger.debug(f"   🗑️  Removed empty temp dir: {self.temp_dir}")
        except Exception:
            pass

        logger.info("✅ Cleanup complete")

    # ================================================================== #
    #  Step 1: PDF -> Images
    # ================================================================== #
    def _pdf_to_images(self, pdf_path: str, run_dir: str) -> list[str]:
        logger.info(f"📄 Converting PDF to images: {pdf_path} (dpi={self.dpi})")
        imgs_dir = os.path.join(run_dir, "pages")
        os.makedirs(imgs_dir, exist_ok=True)

        with open(pdf_path, "rb") as f:
            img_paths = convert_from_bytes(
                f.read(),
                output_folder=imgs_dir,
                fmt="png",
                paths_only=True,
                dpi=self.dpi,
            )
        logger.info(f"🖼️  Generated {len(img_paths)} page images")
        return img_paths

    # ================================================================== #
    #  Step 2: OCR
    # ================================================================== #
    def _run_ocr(self, img_paths: list[str]) -> list[dict]:
        logger.info(f"🔍 Running OCR on {len(img_paths)} pages...")
        ocr_results = list(self.ocr_engine.predict(img_paths))
        logger.info(f"✅ OCR complete — {len(ocr_results)} pages processed")
        return ocr_results

    # ================================================================== #
    #  Step 3: OCR Postprocessing (sort + full text)
    # ================================================================== #
    @staticmethod
    def _sort_ocr_reading_order(
        texts: List[str],
        boxes: List[np.ndarray],
        y_threshold: float = 15.0,
    ) -> tuple[List[str], List[np.ndarray]]:
        """Sort OCR results in natural reading order (top-to-bottom, left-to-right)."""
        if not texts:
            return texts, boxes

        coords = []
        for i, box in enumerate(boxes):
            box = np.array(box)
            if box.shape == (4,):
                x_left = box[0]
                y_center = (box[1] + box[3]) / 2
            elif box.shape == (4, 2):
                x_left = box[:, 0].min()
                y_center = box[:, 1].mean()
            else:
                raise ValueError(f"Unexpected box shape: {box.shape}")
            coords.append((i, x_left, y_center))

        coords.sort(key=lambda c: c[2])

        lines = []
        current_line = [coords[0]]
        for item in coords[1:]:
            if abs(item[2] - current_line[0][2]) <= y_threshold:
                current_line.append(item)
            else:
                lines.append(current_line)
                current_line = [item]
        lines.append(current_line)

        sorted_indices = []
        for line in lines:
            line.sort(key=lambda c: c[1])
            sorted_indices.extend([item[0] for item in line])

        sorted_texts = [texts[i] for i in sorted_indices]
        sorted_boxes = [boxes[i] for i in sorted_indices]
        return sorted_texts, sorted_boxes

    def _get_full_text(self, ocr_results: List[Dict[str, Any]]) -> str:
        """Convert OCR results to full text in reading order with page headers."""
        all_pages_text = []
        total_pages = len(ocr_results)

        for page_idx, page in enumerate(ocr_results):
            texts = page.get("rec_texts", [])
            boxes = page.get("rec_boxes", [])

            if not texts:
                continue

            sorted_texts, sorted_boxes = self._sort_ocr_reading_order(
                texts, boxes, self.y_threshold
            )

            coords = []
            for i, box in enumerate(sorted_boxes):
                box = np.array(box)
                if box.shape == (4,):
                    y_center = (box[1] + box[3]) / 2
                else:
                    y_center = box[:, 1].mean()
                coords.append((i, y_center))

            lines_text = []
            current_line_texts = [sorted_texts[0]]
            current_y = coords[0][1]

            for idx in range(1, len(coords)):
                if abs(coords[idx][1] - current_y) <= self.y_threshold:
                    current_line_texts.append(sorted_texts[idx])
                else:
                    line = " ".join(t for t in current_line_texts if t.strip())
                    if line.strip():
                        lines_text.append(line)
                    current_line_texts = [sorted_texts[idx]]
                    current_y = coords[idx][1]

            line = " ".join(t for t in current_line_texts if t.strip())
            if line.strip():
                lines_text.append(line)

            page_num = page_idx + 1
            header = f"\n{'='*60}\n  PAGE {page_num} / {total_pages}\n{'='*60}\n"
            all_pages_text.append(header + "\n".join(lines_text))

        return "\n".join(all_pages_text)

    # ================================================================== #
    #  Step 4: Draw OCR bbox visualizations
    # ================================================================== #
    @staticmethod
    def _draw_ocr_bboxes(
        image_path: str,
        ocr_result: dict,
        output_path: str,
        use_polys: bool = True,
        box_color: str = "red",
        text_color: str = "blue",
        show_text: bool = False,
        font_size: int = 14,
    ) -> None:
        """Draw OCR bounding boxes on the original image and save."""
        img = Image.open(image_path).convert("RGB")
        draw = ImageDraw.Draw(img)

        texts = ocr_result.get("rec_texts", [])
        polys = ocr_result.get("rec_polys" if use_polys else "rec_boxes", [])

        try:
            font = ImageFont.truetype(
                "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", font_size
            )
        except Exception:
            font = ImageFont.load_default()

        for i, poly in enumerate(polys):
            poly = np.array(poly)

            if poly.shape == (4,):
                x_min, y_min, x_max, y_max = poly
                draw.rectangle([x_min, y_min, x_max, y_max], outline=box_color, width=2)
                text_pos = (x_min, y_min - font_size - 2)
            elif poly.shape == (4, 2):
                points = [tuple(p) for p in poly.astype(int)]
                points.append(points[0])
                draw.line(points, fill=box_color, width=2)
                text_pos = (int(poly[:, 0].min()), int(poly[:, 1].min()) - font_size - 2)
            else:
                continue

            if show_text and i < len(texts) and texts[i].strip():
                draw.text(text_pos, texts[i], fill=text_color, font=font)

        img.save(output_path)

    def _save_ocr_visualizations(
        self,
        img_paths: list[str],
        ocr_results: list[dict],
        save_dir: str,
        label: str,
    ) -> list[str]:
        """Draw and save bbox visualizations for all pages."""
        viz_dir = os.path.join(save_dir, f"{label}_ocr_viz")
        os.makedirs(viz_dir, exist_ok=True)
        viz_paths = []

        logger.info(f"🎨 Drawing OCR visualizations for {len(img_paths)} pages...")
        for i, (img_path, ocr_result) in enumerate(zip(img_paths, ocr_results)):
            viz_path = os.path.join(viz_dir, f"page_{i+1}_ocr_viz.png")
            self._draw_ocr_bboxes(
                image_path=img_path,
                ocr_result=ocr_result,
                output_path=viz_path,
            )
            viz_paths.append(viz_path)
            logger.debug(f"   🖍️  Saved viz: page {i+1}")

        logger.info(f"✅ All OCR visualizations saved to: {viz_dir}")
        return viz_paths

    # ================================================================== #
    #  Step 5: LLM extraction (text-only input)
    # ================================================================== #
    def _extract_with_llm(self, full_text: str) -> dict:
        logger.info(f"🤖 Calling LLM model: {self.llm_model} (text-only mode)")

        config = types.GenerateContentConfig(
            system_instruction=self.llm_system_prompt,
            temperature=self.llm_temperature,
            response_mime_type="application/json",
            response_json_schema=self.llm_output_schema.model_json_schema(),
        )

        response = self.llm_client.models.generate_content(
            model=self.llm_model,
            config=config,
            contents=f"Now extract the following OCR'd text:\n\n{full_text}",
        )

        parsed = self.llm_output_schema.model_validate_json(response.text)
        logger.info("🎯 LLM extraction completed successfully")
        return parsed.model_dump()

    # ================================================================== #
    #  Step 6: Save extraction results
    # ================================================================== #
    def _save_extraction(self, data: dict, run_dir: str, label: str) -> str:
        out_path = os.path.join(run_dir, f"{label}_extraction.json")
        with open(out_path, "w") as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        logger.info(f"💾 Saved extraction: {out_path}")
        return out_path

    # ================================================================== #
    #  Step 7: Extract a single AD PDF (full OCR pipeline)
    # ================================================================== #
    def extract_ad(
        self, pdf_path: str, label: Optional[str] = None
    ) -> tuple[dict, list[str], list[dict]]:
        """
        Full OCR extraction pipeline for a single AD PDF.

        Returns:
            (extraction_dict, img_paths, ocr_results)
        """
        if label is None:
            label = self._label_from_path(pdf_path)

        run_id = uuid4().hex
        run_dir = os.path.join(self.temp_dir, run_id)
        os.makedirs(run_dir, exist_ok=True)
        self._run_dirs.append(run_dir)
        logger.info(f"🚀 [{label}] Starting OCR extraction — run_id={run_id}")

        # PDF -> Images
        img_paths = self._pdf_to_images(pdf_path, run_dir)

        # Images -> OCR
        ocr_results = self._run_ocr(img_paths)

        # OCR -> Sorted full text
        full_text = self._get_full_text(ocr_results)
        logger.info(f"📝 Full text extracted: {len(full_text)} characters")

        # Save raw OCR text for debugging
        text_path = os.path.join(run_dir, f"{label}_ocr_text.txt")
        with open(text_path, "w", encoding="utf-8") as f:
            f.write(full_text)
        logger.debug(f"   📄 Raw OCR text saved: {text_path}")

        # Text -> LLM structured extraction
        extraction = self._extract_with_llm(full_text)
        self._save_extraction(extraction, run_dir, label)

        logger.info(f"✅ [{label}] OCR extraction complete!")
        return extraction, img_paths, ocr_results

    # ================================================================== #
    #  Step 9: Full pipeline
    # ================================================================== #
    def run_analysis(
        self,
        test_data_path: str,
        ad_file_paths: list[str],
        save_dir: str,
        cleanup: bool = True,
    ) -> str:
        logger.info("🔰" + "=" * 58)
        logger.info(f"🛫 Starting AD Recognition Pipeline (OCR) — {len(ad_file_paths)} AD(s)")
        logger.info("🔰" + "=" * 58)

        try:
            # --- Extract all AD PDFs via OCR ---
            ad_extractions: dict[str, dict] = {}
            ad_ocr_data: dict[str, tuple[list[str], list[dict]]] = {}

            for i, pdf_path in enumerate(ad_file_paths, 1):
                label = self._label_from_path(pdf_path)
                logger.info(f"📋 [{i}/{len(ad_file_paths)}] Processing: {label}")
                extraction, img_paths, ocr_results = self.extract_ad(pdf_path, label=label)
                ad_extractions[label] = extraction
                ad_ocr_data[label] = (img_paths, ocr_results)

            # --- Save OCR visualizations to save_dir ---
            os.makedirs(save_dir, exist_ok=True)
            if self.save_ocr_viz:
                for label, (img_paths, ocr_results) in ad_ocr_data.items():
                    self._save_ocr_visualizations(
                        img_paths, ocr_results, save_dir, label
                    )

            # --- Load test data ---
            logger.info(f"📊 Loading test data: {test_data_path}")
            test_data = pd.read_csv(test_data_path, sep=",")
            logger.info(f"📐 Test data shape: {test_data.shape}")

            # --- Compare ---
            logger.info(f"⚙️  Running AD comparison against {len(ad_extractions)} AD(s)...")
            result_df = compare_to_ad(test_data, ad_file_dict=ad_extractions)
            logger.info(f"🏁 Comparison done — {len(result_df)} rows classified")

            # --- Save results ---
            result_path = os.path.join(save_dir, "ad_classification_results.csv")
            result_df.to_csv(result_path, index=False)
            logger.info(f"💾 Results saved: {result_path}")

            extractions_path = os.path.join(save_dir, "ad_extractions.json")
            with open(extractions_path, "w") as f:
                json.dump(ad_extractions, f, indent=2, ensure_ascii=False)
            logger.info(f"💾 Extractions saved: {extractions_path}")

        finally:
            if cleanup:
                self._cleanup_temp()

        logger.info("🔰" + "=" * 58)
        logger.info("🎉 Pipeline complete!")
        logger.info("🔰" + "=" * 58)

        return result_path

In [7]:
SYSTEM_PROMPT = """
You are an aviation regulatory document parser specialized in Airworthiness Directives (ADs).
Extract structured applicability and compliance information from the provided AD document.

EXTRACTION RULES:
- Extract only information explicitly stated in the document. Never infer or assume.
- Preserve all identifiers verbatim (model names, SB numbers, mod numbers, MSNs).
- If a field has no corresponding information in the document, set it to null.
- Output valid JSON only. No markdown, no explanation, no commentary.

CRITICAL DISTINCTIONS:
- Airbus modification numbers (e.g. "mod 24591") → always go in modification_constraints. Never in sb_constraints.
- Service Bulletin identifiers (e.g. "A320-57-1089") → always go in sb_constraints. Never in modification_constraints.
- If the AD states "all MSN" or "all manufacturer serial numbers", always set MSNConstraint(all=True, excluded=False). Never leave msn_constraints null when MSN applicability is mentioned.
- When multiple compliance limits use "whichever occurs first", list each as a separate ComplianceTime entry.
- Recurring intervals ("thereafter, at intervals not exceeding...") → is_interval=True.
- One-time thresholds ("before exceeding...") → is_interval=False.

OUTPUT: Valid JSON strictly following the provided schema.
"""

pipeline_ocr = ADRecognitionOCR(
    dpi=300,
    llm_model="gemini-2.5-flash",
    llm_system_prompt=SYSTEM_PROMPT,
    llm_temperature=0.1,
    llm_output_schema=ADApplicabilityExtraction,
    ocr_device="gpu:0",
    ocr_precision="fp16",
    y_threshold=15.0,
    save_ocr_viz=True,
    temp_dir="/home/naufal/soji_ai/temp",
)

[32m2026-02-20 17:41:02.734[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m47[0m - [1m🔧 Initializing PaddleOCR engine...[0m
[32mCreating model: ('PP-OCRv5_mobile_det', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/naufal/.paddlex/official_models/PP-OCRv5_mobile_det`.[0m
[32mCreating model: ('PP-OCRv5_mobile_rec', None)[0m
[32mModel files already exist. Using cached files. To redownload, please delete the directory manually: `/home/naufal/.paddlex/official_models/PP-OCRv5_mobile_rec`.[0m
[32m2026-02-20 17:41:04.262[0m | [1mINFO    [0m | [36m__main__[0m:[36m__init__[0m:[36m57[0m - [1m✅ PaddleOCR engine ready[0m


In [8]:
result_path = pipeline_ocr.run_analysis(
    test_data_path="/home/naufal/soji_ai/test/ad_test_data.csv",
    ad_file_paths=[
        "/home/naufal/soji_ai/documents/EASA_AD_2025-0254R1_1.pdf",
        "/home/naufal/soji_ai/documents/EASA_AD_US-2025-23-53_1.pdf",
    ],
    save_dir="/home/naufal/soji_ai/results",
    cleanup=True,
)

[32m2026-02-20 17:41:11.127[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_analysis[0m:[36m494[0m - [1m🛫 Starting AD Recognition Pipeline (OCR) — 2 AD(s)[0m
[32m2026-02-20 17:41:11.128[0m | [1mINFO    [0m | [36m__main__[0m:[36mrun_analysis[0m:[36m504[0m - [1m📋 [1/2] Processing: EASA_AD_2025-0254R1_1[0m
[32m2026-02-20 17:41:11.130[0m | [1mINFO    [0m | [36m__main__[0m:[36mextract_ad[0m:[36m347[0m - [1m🚀 [EASA_AD_2025-0254R1_1] Starting OCR extraction — run_id=df76cf6faa24418a9084048d20aa278b[0m
[32m2026-02-20 17:41:11.130[0m | [1mINFO    [0m | [36m__main__[0m:[36m_pdf_to_images[0m:[36m101[0m - [1m📄 Converting PDF to images: /home/naufal/soji_ai/documents/EASA_AD_2025-0254R1_1.pdf (dpi=300)[0m
[32m2026-02-20 17:41:14.376[0m | [1mINFO    [0m | [36m__main__[0m:[36m_pdf_to_images[0m:[36m113[0m - [1m🖼️  Generated 5 page images[0m
[32m2026-02-20 17:41:14.376[0m | [1mINFO    [0m | [36m__main__[0m:[36m_run_ocr[0m:[36m120[0m - 

In [None]:
python src/run.py \
    --ad-files documents/EASA_AD_2025-0254R1.pdf documents/EASA_AD_US-2025-23-53_1.pdf\
    --test-data test/ad_test_data.csv \
    --save-dir results/

In [None]:
uv run python -m src.run \
    --mode llm \
    --ad-files documents/EASA_AD_2025-0254R1_1.pdf documents/EASA_AD_US-2025-23-53_1.pdf\
    --test-data test/ad_test_data.csv \
    --save-dir results/