In [184]:
%pip install boto3 PyPDF2 pydantic

Note: you may need to restart the kernel to use updated packages.


In [185]:
import boto3, json, io, base64, re, datetime, logging
from __future__ import annotations
from pathlib import Path
# from pypdf import PdfReader, PdfWriter
from PyPDF2 import PdfWriter, PdfReader, PdfFileMerger
from botocore.config import Config
from pydantic import BaseModel, ValidationError
from typing   import Literal, Optional, List, Dict, Any, Union
from dataclasses import dataclass
logging.basicConfig(level=logging.DEBUG)
logging.getLogger("botocore").setLevel(logging.DEBUG)

# (Optional) customize timeouts if you found you needed it:
cfg = Config(connect_timeout=30, read_timeout=300)
session = boto3.Session(profile_name="par_servicios")
# Create the Bedrock client:
region = "us-east-1"
bedrock = session.client(
    "bedrock-runtime",
    region_name=region,
    config=cfg
)

DEBUG:botocore.hooks:Changing event name from creating-client-class.iot-data to creating-client-class.iot-data-plane
DEBUG:botocore.hooks:Changing event name from before-call.apigateway to before-call.api-gateway
DEBUG:botocore.hooks:Changing event name from request-created.machinelearning.Predict to request-created.machine-learning.Predict
DEBUG:botocore.hooks:Changing event name from before-parameter-build.autoscaling.CreateLaunchConfiguration to before-parameter-build.auto-scaling.CreateLaunchConfiguration
DEBUG:botocore.hooks:Changing event name from before-parameter-build.route53 to before-parameter-build.route-53
DEBUG:botocore.hooks:Changing event name from request-created.cloudsearchdomain.Search to request-created.cloudsearch-domain.Search
DEBUG:botocore.hooks:Changing event name from docs.*.autoscaling.CreateLaunchConfiguration.complete-section to docs.*.auto-scaling.CreateLaunchConfiguration.complete-section
DEBUG:botocore.hooks:Changing event name from before-parameter-buil

In [186]:
@dataclass
class NovaRequest:
    model_id: str
    messages: List[Dict[str, Any]]
    # inference_cfg: Dict[str, Any]
    params: Dict[str, Any]
    system: Optional[List[str, Any]] = None
    toolConfig: Optional[Dict[str, Any]] = None
    # additional_fields: Optional[Dict[str, Any]] = None

def read_document(file_to_read):
    with open(file_to_read, "rb") as document:
        raw = document.read()
        return raw


def markdown_to_plain(md_path: Path) -> str:
    """Return a prompt string with most Markdown chrome removed."""
    text = md_path.read_text(encoding="utf-8")

    # 1) Remove HTML comments (often used for model delimiters)
    text = re.sub(r"<!--.*?-->", "", text, flags=re.DOTALL)

    # 2) Drop <details> … </details> blocks (examples already in your codebase)
    text = re.sub(r"<details>.*?</details>", "", text,
                  flags=re.DOTALL | re.IGNORECASE)

    # 3) Remove fenced-code blocks ```…``` (they tend to confuse the model)
    text = re.sub(r"```.*?```", "", text, flags=re.DOTALL)

    # 4) Strip Markdown headings, bold, italics, tables
    text = re.sub(r"^#+\s*", "", text, flags=re.MULTILINE)      # headings
    text = re.sub(r"\*\*(.+?)\*\*", r"\1", text)               # bold → plain
    text = re.sub(r"__(.+?)__", r"\1", text)
    text = re.sub(r"\*(.+?)\*",  r"\1", text)                  # italics
    text = re.sub(r"_([^_]+)_", r"\1", text)
    text = re.sub(r"^\|.*\|\s*$", "", text, flags=re.MULTILINE)  # tables rows

    # 5) Collapse multiple blank lines
    text = re.sub(r"\n{2,}", "\n", text).strip()

    return text


# def set_model_params(max_tokens=300, top_p=0.1, temperature=0.3):
#     return {
#         "maxTokens":    max_tokens,
#         "topP":         top_p,
#         "temperature":  temperature
#     }

def set_model_params(model_id: str,
                     max_tokens: int = 300,
                     top_p: float = 0.9,
                     temperature: float = 0.3):
    """
    Build the provider-specific sampling config.
    """
    if ".meta." in model_id:           # Meta Llama / Scout / Maverick
        return {
            "max_gen_len":  max_tokens,
            "top_p":        top_p,
            "temperature":  temperature,
        }
    elif ".anthropic." in model_id:    # Claude 3
        return {
            "max_tokens":   max_tokens,
            "top_p":        top_p,
            "temperature":  temperature,
        }
    # Default for Nova / Titan / Amazon models
    return {
        "maxTokens":    max_tokens,
        "topP":         top_p,
        "temperature":  temperature,
    }

def invoke_nova(model_id: str, messages: list, inference_cfg: dict):
    payload = {
        "messages":        messages,
        "inferenceConfig": inference_cfg
    }
    resp = bedrock.invoke_model(
        modelId=model_id,
        contentType="application/json",
        accept="application/json",
        body=json.dumps(payload).encode("utf-8")
    )
    return json.loads(resp["body"].read().decode("utf-8"))

def converse_with_nova(req: Union[NovaRequest, Dict[str, Any]]):
    # Accept dict or dataclass
    if isinstance(req, dict):
        req = NovaRequest(**req)

    payload = {
        "modelId":  req.model_id,
        "messages": req.messages,
    }

    # 1) Provider routing: decide where to place the knobs
    if ".meta." in req.model_id or ".anthropic." in req.model_id \
       or ".mistral." in req.model_id:
        payload["additionalModelRequestFields"] = req.params
    else:
        payload["inferenceConfig"] = req.params

    # 2) Optionals
    if req.system:
        payload["system"] = req.system
    if req.toolConfig:
        payload["toolConfig"] = req.toolConfig

    # 3) Call Bedrock
    logging.debug("payload:\n%s", payload)
    return bedrock.converse(**payload)

In [187]:
LOCAL_ROOT = Path("../../shared/file_examples")
S3_BUCKET  = "par-servicios-docs"

# def create_messages(prompt: str, pdf_bytes: bytes):

#     msg = {
#         "role": "user",
#         "content": [
#             # 1) prompt about what you want done
#             {"text": prompt},

#             # 2) the PDF itself, under a "document" key:
#             {
#                 "document": {
#                     "name":   "document_to_evaluate.pdf",  # an arbitrary label
#                     "format": "pdf",                       # file format
#                     "source": {
#                         # "bytes": pdf_bytes                # raw bytes → SDK handles the rest
#                         "bytes": base64.b64encode(pdf_bytes).decode("utf-8")
#                     }
#                 }
#             }
#         ]
#     }
#     return [msg]

def sanitize_name(raw_name: str) -> str:
    """
    Strip off any disallowed characters (including periods) from a filename.
    We'll just keep the stem (no extension) and remove anything but
    alphanumerics, spaces, hyphens, parentheses, and brackets.
    """
    stem = Path(raw_name).stem  # e.g. "231_CA_2020-02-29"
    # Remove underscores and periods, keep only allowed chars
    safe = "".join(ch for ch in stem
                   if ch.isalnum()
                   or ch in " -()[]")
    # Collapse multiple spaces or hyphens if you like
    return safe or "document"  # fallback if everything got stripped

def create_message( prompt: str,
                    role: str,
                    pdf_bytes: bytes | None = None,
                    pdf_path: str | None = None
                    ):
    """
    Build a single Bedrock message that contains:
        • main prompt (instructions + result envelope)
        • schema.json as plain text
        • N example JSON files as plain text
        • optional PDF document block
    """
    content = []

    content.append({"text": prompt})

    # 4) Optional PDF document
    if pdf_bytes is not None:
        raw_name = pdf_path or "document.pdf"
        name = sanitize_name(raw_name)

        content.append({
            "document": {
                "name":   name,
                "format": "pdf",
                "source": {
                    "bytes": pdf_bytes
                }
            }
        })

    # Assemble the message envelope Bedrock expects
    msg = {"role": role, "content": content}
    return msg

def add_now_process(folder_path):
    now_process = (
        f"\nNOW PROCESS:\n"
        f"Folder path: `{folder_path}`\n"
        f"Extracted text follows the PDF below."
    )

    return now_process


def build_folder_path(pdf_path: Path, use_s3: bool = False) -> str:
    rel_parts = pdf_path.relative_to(LOCAL_ROOT).parts[:-1]   # drop filename
    key = "/".join(rel_parts)                                 # ACC/800216686
    return f"s3://{S3_BUCKET}/{key}" if use_s3 else f"file_examples/{key}"

In [188]:
def get_first_pdf_page(file_name):
    inputpdf = PdfReader(open(file_name, "rb"))
    first_page = inputpdf.pages[0]
    writer = PdfWriter()
    writer.add_page(first_page)
    buffer = io.BytesIO()
    writer.write(buffer)
    return buffer.getvalue()

In [189]:
def _normalise(raw_obj: dict, *, file_path: str | None = None) -> dict:
    """
    • rename documenttype → document_type (case-insensitive)
    • ensure document_number and path exist (fallbacks)
    • keep snippet only if present
    """
    norm = {k.lower(): v for k, v in raw_obj.items()}          # case-fold keys

    # key mapping
    if "documenttype" in norm and "document_type" not in norm:
        norm["document_type"] = norm.pop("documenttype")

    # fallback values
    if "document_number" not in norm:
        # try to derive from the file path    e.g.  .../800035887/...
        if file_path:
            m = re.search(r"/(\d{6,})/", file_path)
            norm["document_number"] = m.group(1) if m else "UNKNOWN"
        else:
            norm["document_number"] = "UNKNOWN"

    if "path" not in norm and file_path:
        norm["path"] = file_path
    elif "path" not in norm:
        norm["path"] = "UNKNOWN"

    return norm

def _strip_fences(text: str) -> str:
    """
    Removes ```json ... ``` or ``` ... ``` even if the opening fence is
    immediately followed by '{'.
    """
    text = text.strip()

    # opening fence
    text = re.sub(r'^```(?:json)?', '', text, flags=re.IGNORECASE).lstrip()
    # closing fence
    text = re.sub(r'```$', '', text).rstrip()
    return text

In [190]:
# ──────────────────────────────────────────────────────────────────
# 1)  Pydantic schema – guarantees we got the 5 required keys
# ──────────────────────────────────────────────────────────────────
class ClassMeta(BaseModel):
    document_number: str
    document_type:   Literal["person", "company"]
    category:        Literal["CERL", "CECRL", "RUT", "RUB", "ACC",
                            "BLANK", "LINK_ONLY"]
    path:            str
    text: Optional[str]

# ──────────────────────────────────────────────────────────────────
# 2)  Pull the assistant’s line of JSON out of Bedrock’s response
# ──────────────────────────────────────────────────────────────────
def _extract_text(resp_json: dict) -> str:
    """
    Bedrock Nova returns:
        {"output":{"message":{"content":[{"text":"..."}]}}}
    """
    try:
        return resp_json["output"]["message"]["content"][0]["text"].strip()
    except (KeyError, IndexError, TypeError):
        raise RuntimeError("Unexpected response shape from Bedrock") from None

# ──────────────────────────────────────────────────────────────────
# 3)  Validate + return a ClassMeta instance (raises on error)
# ──────────────────────────────────────────────────────────────────
def parse_classification(resp_json: dict, *, pdf_path: str | None = None) -> ClassMeta:
    raw_text = _extract_text(resp_json)
    raw_text = _strip_fences(raw_text)          # remove ```json … ```

    try:
        raw_obj = json.loads(raw_text)
    except json.JSONDecodeError as e:
        # last-chance: grab first '{' … last '}'
        m1, m2 = raw_text.find("{"), raw_text.rfind("}")
        if m1 != -1 and m2 != -1:
            raw_obj = json.loads(raw_text[m1:m2+1])
        else:
            raise RuntimeError(f"Assistant did not return JSON: {e}") from None

    patched = _normalise(raw_obj, file_path=pdf_path)

    return ClassMeta.model_validate(patched)

def parse_bedrock_response(resp: dict) -> dict:
    """
    Parse a Bedrock response dict and extract company data.

    :param resp: The JSON-decoded response from Bedrock.
    :return: A dict with companyName, documentType, taxId and relatedParties.
    :raises ValueError: if the embedded JSON block isn’t found or can’t be parsed.
    """
    # 1) Grab the assistant’s text bundle
    text = resp["output"]["message"]["content"][0]["text"]

    # 2) Extract the JSON snippet between ```json and ```
    match = re.search(r"```json\s*(\{[\s\S]*?\})\s*```", text)
    if not match:
        raise ValueError("Embedded JSON not found in Bedrock response")

    raw = match.group(1)

    # 3) Parse it
    try:
        data = json.loads(raw)
    except json.JSONDecodeError as e:
        raise ValueError(f"Failed to parse embedded JSON: {e}")

    return data

def create_payload_data(data):

    # 4) The payload might sit under "result" or at the top level
    payload = data.get("result", data)

    # 5) Return only the fields you care about
    response = {key: value for key, value in payload.items()}

    return response

# ──────────────────────────────────────────────────────────────────
# 4)  Build the payload that Phase-2 expects
# ──────────────────────────────────────────────────────────────────
def build_payload(meta: ClassMeta) -> dict:
    return {
        "path":            meta.path,
        "result":          meta.model_dump(mode="json"),   # dict
        "document_type":   meta.document_type,
        "document_number": meta.document_number,
        "category":        meta.category
    }

In [191]:
def save_to_json(response, output_path="response_indented.json", indent=2):
    """
    Dumps `response` to JSON at `output_path`.
    If necessary, creates parent directories.
    Catches and reports serialization errors.
    """
    out = Path(output_path)
    out.parent.mkdir(parents=True, exist_ok=True)

    try:
        with out.open("w", encoding="utf-8") as f:
            json.dump(response, f, indent=indent, ensure_ascii=False)
        print(f"✅ Saved JSON to {out}")
    except TypeError as e:
        print(f"⚠️ Could not JSON-serialize response: {e}")
        # Fall back to writing the raw repr
        with out.open("w", encoding="utf-8") as f:
            f.write(repr(response))
        print(f"🔧 Wrote raw Python repr to {out}")

In [192]:
def get_instructions(action: str) -> str:
    mapping = {
        "user": Path("./instructions/user.txt"),
        "system": Path("./instructions/system.txt")
    }
    fn = mapping.get(action, Path("./instructions/clasification.txt"))
    text = fn.read_text(encoding="utf-8")
    return text

In [195]:
# pdf_path = Path("../../shared/file_examples/CERL/860006752/22_CamCom_2020-02-28.pdf")
# pdf_path = Path("../../shared/file_examples/CERL/800035887/9_CamCom_2020-02-28.pdf")
# pdf_path = Path("../../shared/file_examples/ACC/800216686/231_CA_2020-02-29.pdf")
# pdf_path = Path("../../shared/file_examples/RUT/900285194/InfoRUT/468_2020-02-29.pdf")
# pdf_path = Path("../../shared/file_examples/RUB/860013951/374416_RUB_2024-08-20.pdf")
# pdf_path = Path("../../shared/file_examples/RUB/830067329/377850_RUB_2024-11-05.pdf")
pdf_path = Path("../../shared/file_examples/CECRL/5120000874/L75GVP89V_2024-09-12.pdf")
folder_path = build_folder_path(pdf_path)
first_page = get_first_pdf_page(pdf_path)
# file_to_read  = read_document(first_page)
file_to_read = first_page

# clasification_prompt_raw = get_instructions("clasification")
# base_prompt = markdown_to_plain(clasification_prompt_raw)
# clasification_prompt = base_prompt + add_now_process(folder_path)
user_prompt = get_instructions("user") + add_now_process(folder_path)
system_prompt = get_instructions("system")
system_parameter = [{"text": system_prompt}]

message_created = create_message(user_prompt, "user", file_to_read)
messages = [message_created]

In [196]:
modelId = "us.amazon.nova-pro-v1:0"
# modelId = "us.meta.llama4-maverick-17b-instruct-v1:0"
temperature = 0.1
top_p = 0.9
max_tokens = 8192

cfg = set_model_params(modelId, max_tokens, top_p, temperature)
version = 28
folder_route = f"outputs/classification/prompt_txt/v{version}"
# folder_route = "outputs/prompt_md"
folder_next_step_route = f"../../shared/clasification_results_examples/"

# resp_json = invoke_nova(modelId, messages, cfg)

req_params = {
    "model_id":  modelId,
    "messages":  messages,
    "params":    {**cfg},          # **cfg if you expand
    "system":    system_parameter,
}
resp_json = converse_with_nova(NovaRequest(**req_params))

save_to_json(resp_json, f"{folder_route}/response_model_test_v{version}.json", 2)

meta    = parse_classification(resp_json, pdf_path = str(pdf_path) )
category = meta.category
payload = build_payload(meta)
save_to_json(meta.model_dump(), f"{folder_route}/meta_v{version}.json",   2)
save_to_json(payload, f"{folder_route}/payload_v{version}.json",2)
save_to_json(meta.model_dump(), f"{folder_next_step_route}/meta_{category}.json",  2)

DEBUG:root:payload:
{'modelId': 'us.amazon.nova-pro-v1:0', 'messages': [{'role': 'user', 'content': [{'text': 'I\'ll provide you with PDF document text and a folder path. Your task is to classify the document into one of our legal document categories and return a properly formatted JSON response.\n\n# Document Categories\n\nPlease classify each document into one of these categories:\n\n- CERL: Certificados de Existencia y Representación Legal\n  These are certificates that prove a company\'s legal existence and representation.\n\n- CECRL: Copia de cédulas de ciudadanía del Representante Legal\n  These are copies of the legal representative\'s citizenship ID cards.\n\n- RUT: Registro Único Tributario\n  These are tax registration documents.\n\n- RUB: Registro Único de Beneficiarios\n  These are documents registering the beneficiaries of a company or entity.\n\n- ACC: Composiciones Accionarias\n  These documents detail the shareholder composition of a company.\n\n# Document Path Structur

✅ Saved JSON to outputs/classification/prompt_txt/v28/response_model_test_v28.json
✅ Saved JSON to outputs/classification/prompt_txt/v28/meta_v28.json
✅ Saved JSON to outputs/classification/prompt_txt/v28/payload_v28.json
✅ Saved JSON to ../../shared/clasification_results_examples/meta_CECRL.json
