## Summerizer input jpg --> .txt

In [3]:
import os, time
import cv2
import pytesseract

input_folder = "/Users/moon/Documents/dev-summarizer/dev-raw-ex"
output_file  = "/Users/moon/Documents/dev-summarizer/dev-raw-ex/extracted_texts.txt"

# Tesseract config: LSTM engine, assume block of text (psm 6). Try psm 4/6/7 depending on layout.
TESS_CONFIG = r"--oem 1 --psm 6"

def preprocess_fast(img_gray, max_w=1800):
    # optional downscale for speed
    h, w = img_gray.shape
    if w > max_w:
        scale = max_w / float(w)
        img_gray = cv2.resize(img_gray, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_AREA)
    # quick denoise + binarize
    img_gray = cv2.GaussianBlur(img_gray, (3,3), 0)
    _, thresh = cv2.threshold(img_gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    return thresh

def extract_text(image_path):
    t0 = time.perf_counter()
    # read directly as grayscale (saves a conversion)
    gray = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
    t_read = time.perf_counter()

    proc = preprocess_fast(gray)
    t_prep = time.perf_counter()

    text = pytesseract.image_to_string(proc, lang="eng", config=TESS_CONFIG)
    t_ocr = time.perf_counter()

    return text.strip(), (t_read - t0, t_prep - t_read, t_ocr - t_prep)

# --- only first 5 jpgs ---
jpg_files = sorted([f for f in os.listdir(input_folder) if f.lower().endswith(".jpg")])#[:5]

results = []
for file in jpg_files:
    path = os.path.join(input_folder, file)
    text, (t_read, t_prep, t_ocr) = extract_text(path)
    results.append(f"\n\n===== {file} =====\n{text}")
    print(f"Extracted: {file} | read {t_read:.2f}s, prep {t_prep:.2f}s, ocr {t_ocr:.2f}s")

with open(output_file, "w", encoding="utf-8") as f:
    f.write("\n".join(results))

print(f"\n✅ Done. Output: {output_file}")


Extracted: DEV-2022-038.jpg | read 0.22s, prep 0.01s, ocr 2.42s
Extracted: DEV-2022-039.jpg | read 0.14s, prep 0.01s, ocr 2.63s
Extracted: DEV-2022-069.jpg | read 0.14s, prep 0.01s, ocr 2.09s
Extracted: DEV-2022-070.jpg | read 0.13s, prep 0.01s, ocr 3.23s
Extracted: DEV-2022-071.jpg | read 0.14s, prep 0.01s, ocr 3.16s
Extracted: DEV-2022-072.jpg | read 0.14s, prep 0.01s, ocr 1.91s
Extracted: DEV-2022-074.jpg | read 0.14s, prep 0.01s, ocr 3.27s
Extracted: DEV-2022-084.jpg | read 0.14s, prep 0.01s, ocr 3.93s
Extracted: DEV-2022-085.jpg | read 0.14s, prep 0.01s, ocr 3.40s
Extracted: DEV-2022-108.jpg | read 0.13s, prep 0.01s, ocr 3.27s
Extracted: DEV-2022-122.jpg | read 0.21s, prep 0.01s, ocr 2.80s

✅ Done. Output: /Users/moon/Documents/dev-summarizer/dev-raw-ex/extracted_texts.txt


## Making DB

In [50]:
# ...existing code...
import datetime, itertools, typing, re

# tolerant date finder — picks first plausible date-like token and normalizes to YYYY-MM-DD if possible
DATE_RE = re.compile(r'\b(\d{1,2}[/-]\d{1,2}[/-]\d{2,4}|\d{4}[/-]\d{1,2}[/-]\d{1,2}|[A-Za-z]{3,9}\s+\d{1,2},\s*\d{4})\b')

def normalize_date_str(s: str) -> str:
    s = s.strip()
    for fmt in ("%m/%d/%Y","%m/%d/%y","%d/%m/%Y","%d/%m/%y","%Y-%m-%d","%b %d, %Y","%B %d, %Y"):
        try:
            dt = datetime.datetime.strptime(s, fmt)
            return dt.date().isoformat()
        except Exception:
            pass
    # best effort: try splitting numeric parts
    m = re.match(r'(\d{1,2})[/-](\d{1,2})[/-](\d{2,4})', s)
    if m:
        mm, dd, yy = m.group(1), m.group(2), m.group(3)
        if len(yy) == 2: yy = '20' + yy if int(yy) <= 50 else '19' + yy
        try:
            return datetime.date(int(yy), int(mm), int(dd)).isoformat()
        except Exception:
            pass
    return s  # fallback: return raw

# Inject this small helper into extract_records post-loop: if a required date label is empty,
# search the entire doc body for a date and assign the normalized value.
# Add after building `record` (before extracted.append(...)):

# Example patch to insert inside extract_records just before `extracted.append(dict(record))`:
"""
        # --- date fallback: attempt to discover a date anywhere in the document if missing ---
        if required_labels:
            # canonical name used in your notebook: "Date of Occurrence"
            target = "Date of Occurrence"
            if target in required_labels and not record.get(target):
                # search header first, then body
                search_text = (head + "\n" + body) if head else body
                # remove some noisy OCR chars that break matches
                search_text = re.sub(r'[^\x00-\x7F]+', ' ', search_text)
                m = DATE_RE.search(search_text)
                if m:
                    record[target] = normalize_date_str(m.group(1))
"""

# ...existing code...

'\n        # --- date fallback: attempt to discover a date anywhere in the document if missing ---\n        if required_labels:\n            # canonical name used in your notebook: "Date of Occurrence"\n            target = "Date of Occurrence"\n            if target in required_labels and not record.get(target):\n                # search header first, then body\n                search_text = (head + "\n" + body) if head else body\n                # remove some noisy OCR chars that break matches\n                search_text = re.sub(r\'[^\x00-\x7f]+\', \' \', search_text)\n                m = DATE_RE.search(search_text)\n                if m:\n                    record[target] = normalize_date_str(m.group(1))\n'

In [51]:
from collections import defaultdict, Counter
import re, unicodedata, difflib

# --- helpers you already have (shown here for completeness) ---
def normalize_text(s:str)->str:
    s = unicodedata.normalize("NFKC", s)
    s = s.replace("：", ":").replace("|", ":").replace("—", "-").replace("–","-")
    s = re.sub(r"[ \t]+", " ", s)
    s = re.sub(r"\r\n?", "\n", s)
    return s

def is_labelish(line:str)->bool:
    if len(line) < 2 or len(line) > 80: return False
    if re.search(r"\s*[:\-]\s*$", line): return True
    if re.match(r"^\s*[\w /()%#&.,]+?\s*[:\-]\s+\S", line): return True
    if re.match(r"^[A-Z][A-Za-z0-9 /()%#&.,]{2,}$", line) and len(line.split())<=6:
        return True
    return False

def norm_key(s:str)->str:
    return re.sub(r"[^a-z0-9]+","", s.lower())

def best_canonical(label, canon_list, synonyms=None, thresh=0.82):
    raw = label.strip()
    if synonyms and raw in synonyms:
        return synonyms[raw]
    nk = norm_key(raw)
    if synonyms:
        for k,v in synonyms.items():
            if norm_key(k)==nk: return v
    if canon_list:
        m = difflib.get_close_matches(raw, canon_list, n=1, cutoff=thresh)
        if m: return m[0]
        norm_map = {c:norm_key(c) for c in canon_list}
        candidates = [c for c in canon_list if difflib.SequenceMatcher(None, nk, norm_map[c]).ratio()>=thresh]
        if candidates: return candidates[0]
    return raw

# -------- core extractor (with required labels) --------
def extract_records(
    raw_text: str,
    file_split_pattern: str = r"^=+ .+?\.jpg =+$",
    synonyms: dict | None = None,
    required_labels: list[str] | None = None,
):
    text = normalize_text(raw_text)

    # split into docs if you have separators; else one doc
    parts = re.split(file_split_pattern, text, flags=re.MULTILINE)
    headers = re.findall(file_split_pattern, text, flags=re.MULTILINE)
    docs = []
    for i,chunk in enumerate(parts):
        if not chunk.strip(): 
            continue
        header = headers[i-1] if i>0 and i-1 < len(headers) else ""
        docs.append((header.strip("= ").strip(), chunk.strip()))
    if not docs:
        docs = [("", text)]

    # seed canonical labels with required ones so fuzzy matching prefers them
    canonical_labels = list(dict.fromkeys(required_labels or []))

    extracted = []
    line_label_value = re.compile(r"^\s*([\w /()%#&.,]+?)\s*[:\-]\s*(.*)$")

    for head, body in docs:
        record = defaultdict(str)
        if head:
            record["source_header"] = head

        lines = body.split("\n")
        i = 0

        while i < len(lines):
            line = lines[i].rstrip()

            # 1) "Label: value" on one line
            m = line_label_value.match(line)
            if m and is_labelish(m.group(1)):
                raw_label = m.group(1).strip()
                value = m.group(2).strip()
                canon = best_canonical(raw_label, canonical_labels, synonyms)
                if canon not in canonical_labels:
                    canonical_labels.append(canon)
                # collect continuation until next label
                j = i+1
                cont = []
                while j < len(lines):
                    nxt = lines[j].rstrip()
                    if line_label_value.match(nxt) and is_labelish(line_label_value.match(nxt).group(1)):
                        break
                    if is_labelish(nxt) and nxt.strip().endswith(":"):
                        break
                    cont.append(nxt)
                    j += 1
                block = "\n".join([value] + cont).strip()
                record[canon] = (record[canon] + "\n" + block).strip() if record[canon] else block
                i = j
                continue

            # 2) "Label:" on its own line; value starts next line(s)
            if is_labelish(line) and line.strip().endswith(":"):
                raw_label = line.strip()[:-1].strip()
                canon = best_canonical(raw_label, canonical_labels, synonyms)
                if canon not in canonical_labels:
                    canonical_labels.append(canon)
                j = i+1
                cont = []
                while j < len(lines):
                    nxt = lines[j].rstrip()
                    if line_label_value.match(nxt) and is_labelish(line_label_value.match(nxt).group(1)):
                        break
                    if is_labelish(nxt) and nxt.strip().endswith(":"):
                        break
                    cont.append(nxt)
                    j += 1
                block = "\n".join(cont).strip()
                if block:
                    record[canon] = (record[canon] + "\n" + block).strip() if record[canon] else block
                i = j
                continue

            i += 1

        # ensure required labels exist in every record
        if required_labels:
            for rl in required_labels:
                record.setdefault(rl, "")

        extracted.append(dict(record))

    # also return canonical_labels with required ones at the front (unique order)
    canonical_labels = list(dict.fromkeys((required_labels or []) + canonical_labels))
    return extracted, canonical_labels


# --------- learning the schema (optional) ----------
def learn_schema(records, top_k=5):
    freq = Counter()
    for r in records:
        for k in r.keys():
            if k != "source_header":
                freq[k]+=1
    # descending by frequency then name
    return [k for k,_ in freq.most_common(top_k)]


def normalize_date_str(s: str) -> str:
    s = s.strip()
    for fmt in ("%m/%d/%Y","%m/%d/%y","%d/%m/%Y","%d/%m/%y","%Y-%m-%d","%b %d, %Y","%B %d, %Y"):
        try:
            dt = datetime.datetime.strptime(s, fmt)
            return dt.date().isoformat()
        except Exception:
            pass
    # best effort: try splitting numeric parts
    m = re.match(r'(\d{1,2})[/-](\d{1,2})[/-](\d{2,4})', s)
    if m:
        mm, dd, yy = m.group(1), m.group(2), m.group(3)
        if len(yy) == 2: yy = '20' + yy if int(yy) <= 50 else '19' + yy
        try:
            return datetime.date(int(yy), int(mm), int(dd)).isoformat()
        except Exception:
            pass
    return s  # fallback: return raw


In [None]:
raw = open("/Users/moon/Documents/dev-summarizer/dev-raw-ex/extracted_texts.txt", "r", encoding="utf-8").read()

# Seed a few synonyms so minor wording changes still map together:
synonyms = {
    "Deviation Number": "Deviation Number",
    "De viation Number": "Deviation Number",
    "Date of Occurrence": "Date of Occurrence",
    "Document / Record Number": "Document / Record Number",
    "Equipment Number": "Equipment Number",
    "Project Number": "Project Number",
    "Product Stock Number": "Product Stock Number",
    "Lot Number": "Lot Number",
    "Description of Deviation": "Description of Deviation",
    "Description of Deviation or Non-conformance": "Description of Deviation or Non-conformance",
    "Immediate Correction Employed": "Immediate Correction Employed",
    "Immediate Correction": "Immediate Correction Employed",
    "Corrective Action Required": "Corrective Action Required",
    "Corrective Action": "Corrective Action",
    "Corresponding CAPA Number": "CAPA Number",
    "CAPA Number": "CAPA Number",
    "No Impact Justification/Rationale": "No Impact Justification/Rationale",
    "Client": "Client",
    "Notification Required?": "Notification Required?",
    "Deviation Type (Check All that Apply)": "Deviation Type"
}


required = [
    "Deviation Number",
    "Date of Occurrence",
    "Document / Record Number",
    "Equipment Number",
    "Project Number",
    "Product Stock Number",
    "Lot Number",
    "Description of Deviation or Non-conformance",
    "Immediate Correction Employed",
    "Corrective Action Required",
    "Corrective Action",
    "CAPA Number",
    "No Impact Justification/Rationale",
    "Client",
    "Notification Required?",
]

records, canon_labels = extract_records(raw, synonyms=synonyms, required_labels=required)

# If you still want to learn extra labels (beyond required) for the CSV order:
schema_learned = learn_schema(records, top_k=25)
schema = list(dict.fromkeys(required + schema_learned))  # required first

import pandas as pd
rows = []
for rec in records:
    row = {k: rec.get(k,"") for k in schema}
    row["source_header"] = rec.get("source_header","")
    rows.append(row)
df = pd.DataFrame(rows)
df.to_csv("deviation_db.csv", index=False, encoding="utf-8")



## DB to summerizer

In [29]:
# make_summary_column.py
import os
import sys
import subprocess
import pandas as pd
from pathlib import Path
from typing import List

INPUT_CSV  = "deviation_db.csv"              # your existing DB
OUTPUT_CSV = "deviation_db_with_summary.csv" # output path
SUMMARY_COL = "summary"

# ---- 1) Choose which columns to include ----
# By default: include *all* object/text columns (auto-detected).
# If you want to pin specific columns, set INCLUDE_COLS = ["description", "immediate_correction", ...]
INCLUDE_COLS: List[str] = []   # leave empty to auto-detect

# ---- 2) Summarizer wiring ----
# We try to import summarize() from summarizer.py. If that fails, we fall back to a CLI call.
SUMMARIZER_PATH = Path("summarizer.py")

def try_import_summarizer():
    try:
        import importlib.util
        spec = importlib.util.spec_from_file_location("summarizer", SUMMARIZER_PATH)
        mod = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(mod)  # type: ignore
        if hasattr(mod, "summarize") and callable(mod.summarize):
            return mod.summarize
    except Exception:
        pass
    return None

def summarize_via_cli(text: str) -> str:
    """
    Fallback: run `python summarizer.py` and pass text on stdin.
    Modify this if your CLI expects flags, e.g. ['python','summarizer.py','--mode','short'].
    """
    try:
        proc = subprocess.run(
            [sys.executable, str(SUMMARIZER_PATH)],
            input=text.encode("utf-8"),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            check=False,
        )
        out = proc.stdout.decode("utf-8", errors="ignore").strip()
        if not out:
            # If your CLI prints JSON, adjust parsing here.
            out = "(empty summary)"
        return out
    except Exception as e:
        return f"(summary error: {e})"

def build_row_text(row: pd.Series, cols: List[str]) -> str:
    """Concatenate selected columns -> a single text block.
       Uses 'Label: value' lines, skips empties/NaN."""
    parts = []
    for c in cols:
        val = row.get(c, "")
        if pd.isna(val) or str(val).strip() == "":
            continue
        parts.append(f"{c}: {str(val).strip()}")
    return "\n".join(parts).strip()


# Load DB
if not Path(INPUT_CSV).exists():
    raise FileNotFoundError(f"Could not find {INPUT_CSV}")

df = pd.read_csv(INPUT_CSV)

# Auto-detect text columns if INCLUDE_COLS is empty
if not INCLUDE_COLS:
    text_cols = [c for c in df.columns if df[c].dtype == "object"]
else:
    text_cols = [c for c in INCLUDE_COLS if c in df.columns]

# Optional: put your important columns first so they appear earlier in the prompt
priority = ["deviation_number","date_of_occurrence","description","immediate_correction",
            "corrective_action_required","corrective_action","capa_number","no_impact_rationale"]
ordered = [c for c in priority if c in text_cols] + [c for c in text_cols if c not in priority]
text_cols = ordered

# Prepare summarizer
summarize_fn = try_import_summarizer()
use_import = summarize_fn is not None
if not use_import and not SUMMARIZER_PATH.exists():
    raise FileNotFoundError("summarizer.py not found and import failed. Put it next to this script.")

# Create summary column
summaries = []
for idx, row in df.iterrows():
    payload = build_row_text(row, text_cols)
    if not payload:
        summaries.append("")
        continue

    if use_import:
        try:
            out = summarize_fn(payload)  # type: ignore
        except Exception as e:
            out = f"(summary error: {e})"
    else:
        out = summarize_via_cli(payload)

    summaries.append(out)

df[SUMMARY_COL] = summaries

# Save output (keep a backup if overwriting)
if Path(OUTPUT_CSV).resolve() == Path(INPUT_CSV).resolve():
    Path(f"{INPUT_CSV}.bak").write_text(df.to_csv(index=False), encoding="utf-8")
df.to_csv(OUTPUT_CSV, index=False, encoding="utf-8")
print(f"Saved: {OUTPUT_CSV}")
print(f"Rows summarized: {len(df)}")
print(f"Columns used: {text_cols}")




Saved: deviation_db_with_summary.csv
Rows summarized: 11
Columns used: ['Description of Deviation or Non', 'Immediate Correction Employed', 'Corrective Action Required', 'Corrective Action', 'Deviation Number', 'Document / Record Number', 'CAPA Number', 'Equipment Number', 'Deviation Type', 'No Impact Justification/Rationale', 'Processing', 'SOP Number 24', 'Written By', 'Deviation Type (Check Ail that Apply)', 'Protocol', 'source_header']
