<a href="https://colab.research.google.com/github/renzungo/Clarin_Covers_Sent_Analysis/blob/main/Clarin_Cover_Text_Extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#=====================
# 0) ENV & INSTALL
# =====================
!apt-get -y install tesseract-ocr tesseract-ocr-spa > /dev/null
!pip -q install opencv-python-headless pytesseract pandas numpy rapidfuzz pillow tqdm unidecode > /dev/null
# Optional (uncomment if you want PaddleOCR fallback)
# !pip -q install paddlepaddle==2.6.1 paddleocr==2.7.0.3 > /dev/null

import os, sys, json, math, shutil, glob, re, io
from pathlib import Path
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from PIL import Image
import cv2
import pytesseract
from rapidfuzz import fuzz
from unidecode import unidecode

# Upgrade to tessdata_best for Spanish (improves accuracy).
# Colab tessdata path is typically /usr/share/tesseract-ocr/4.00/tessdata
TESSDATA_DIR = "/usr/share/tesseract-ocr/4.00/tessdata"
os.makedirs(TESSDATA_DIR, exist_ok=True)

# Download spa.traineddata (best) if not present
if not Path(TESSDATA_DIR, "spa.traineddata").exists():
    import urllib.request
    url = "https://github.com/tesseract-ocr/tessdata_best/raw/main/spa.traineddata"
    print("Downloading tessdata_best spa...")
    urllib.request.urlretrieve(url, str(Path(TESSDATA_DIR, "spa.traineddata")))

# (optional) English best
if not Path(TESSDATA_DIR, "eng.traineddata").exists():
    import urllib.request
    url = "https://github.com/tesseract-ocr/tessdata_best/raw/main/eng.traineddata"
    print("Downloading tessdata_best eng...")
    urllib.request.urlretrieve(url, str(Path(TESSDATA_DIR, "eng.traineddata")))

os.environ["TESSDATA_PREFIX"] = "/usr/share/tesseract-ocr/4.00/tessdata"


In [2]:
# =====================
# 1) CONFIG
# =====================
# If using Google Drive, mount and set INPUT_DIR to your folder of covers
USE_GOOGLE_DRIVE = True
if USE_GOOGLE_DRIVE:
    from google.colab import drive
    drive.mount('/content/drive')


# Directory containing .jpg covers (change this!)
INPUT_DIR = "/content/drive/MyDrive/Data Justicialista/Clarin Cover Sentiment Analysis/Covers" # e.g., your folder with 600+ images
# Where outputs (txt + csv + debug) will go
OUTPUT_DIR = "/content/drive/MyDrive/Data Justicialista/Clarin Cover Sentiment Analysis/OCR_Out"


# Batch options
RECURSIVE = True              # scan subfolders
N_WORKERS = 0                 # 0=single-thread (Colab CPU can struggle with cv2 in threads)
SAVE_DEBUG_VIS = False        # if True, saves preprocessed and box overlays
USE_EAST_DETECTOR = False     # True: detect text boxes first (better layout, slower)
EAST_MODEL_URL = "https://github.com/oyyd/frozendict/releases/download/v0.0.0/frozen_east_text_detection.pb"  # small mirror; replace if needed

# Tesseract options
LANGS = "spa+eng"
DEFAULT_PSM = 6               # for single blocks/paragraphs; 4 for multi-column full page
DEFAULT_OEM = 1               # LSTM only
MIN_CONF = 58

# Optional: PaddleOCR fallback (set to True after installing)
USE_PADDLE = False

Mounted at /content/drive


In [3]:
# =====================
# 2) UTILITIES
# =====================

def imread_unicode(path):
    # cv2 doesn't like some unicode paths sometimes — use PIL
    with open(path, 'rb') as f:
        img = Image.open(io.BytesIO(f.read()))
        return cv2.cvtColor(np.array(img.convert('RGB')), cv2.COLOR_RGB2BGR)


def save_txt(path, text):
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    with open(path, 'w', encoding='utf-8') as f:
        f.write(text)


def ensure_dir(p):
    Path(p).mkdir(parents=True, exist_ok=True)


In [4]:
# =====================
# 3) PREPROCESSING
# =====================
from math import degrees

def preprocess_for_ocr(img_bgr, target_long_edge=2600):
    h, w = img_bgr.shape[:2]
    scale = target_long_edge / max(h, w)
    if scale != 1.0:
        img_bgr = cv2.resize(img_bgr, (int(w*scale), int(h*scale)), interpolation=cv2.INTER_CUBIC)

    # Denoise JPEG artifacts
    img_bgr = cv2.fastNlMeansDenoisingColored(img_bgr, None, 7, 7, 7, 21)

    # LAB -> L channel + CLAHE
    lab = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2LAB)
    L, A, B = cv2.split(lab)
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    Lc = clahe.apply(L)
    gray = Lc

    # Adaptive threshold
    bin_img = cv2.adaptiveThreshold(gray, 255,
                                    cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                    cv2.THRESH_BINARY, 35, 11)

    # Unsharp mask
    blur = cv2.GaussianBlur(bin_img, (0,0), 1.0)
    sharp = cv2.addWeighted(bin_img, 1.5, blur, -0.5, 0)

    # Deskew via Hough lines
    edges = cv2.Canny(sharp, 80, 160)
    lines = cv2.HoughLines(edges, 1, np.pi/180, threshold=150)
    angle = 0.0
    if lines is not None:
        angles = []
        for rho, theta in lines[:,0]:
            a = degrees(theta)
            if a < 45 or a > 135:  # horizontal-ish
                ang = a-180 if a>90 else a
                angles.append(ang)
        if len(angles):
            angle = float(np.median(angles))

    if abs(angle) > 0.5:
        (h2, w2) = sharp.shape[:2]
        M = cv2.getRotationMatrix2D((w2//2, h2//2), angle, 1.0)
        sharp = cv2.warpAffine(sharp, M, (w2, h2), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)

    return sharp


In [5]:
# =====================
# 4) (OPTIONAL) TEXT DETECTION WITH EAST
# =====================
EAST_PATH = "/content/east_text_detection.pb"

def ensure_east_model():
    if not Path(EAST_PATH).exists():
        import urllib.request
        print("Downloading EAST model (1.4MB)...")
        urllib.request.urlretrieve(EAST_MODEL_URL, EAST_PATH)


def detect_text_boxes_east(image_bin, conf=0.55, nms=0.35):
    # image_bin is single-channel; EAST expects 3-channel
    H, W = image_bin.shape[:2]
    image = cv2.cvtColor(image_bin, cv2.COLOR_GRAY2BGR)
    net = cv2.dnn.readNet(EAST_PATH)
    newW, newH = (W//32)*32, (H//32)*32
    rW, rH = W / float(newW), H / float(newH)
    blob = cv2.dnn.blobFromImage(image, 1.0, (newW, newH), (123.68, 116.78, 103.94), swapRB=True, crop=False)
    net.setInput(blob)
    (scores, geometry) = net.forward(["feature_fusion/Conv_7/Sigmoid", "feature_fusion/concat_3"])

    # decode
    numRows, numCols = scores.shape[2:4]
    rects, confidences = [], []
    for y in range(numRows):
        scoresData = scores[0,0,y]
        xData0 = geometry[0,0,y]
        xData1 = geometry[0,1,y]
        xData2 = geometry[0,2,y]
        xData3 = geometry[0,3,y]
        angles = geometry[0,4,y]
        for x in range(numCols):
            if scoresData[x] < conf:
                continue
            offsetX, offsetY = x*4.0, y*4.0
            angle = angles[x]
            cos, sin = np.cos(angle), np.sin(angle)
            h = xData0[x] + xData2[x]
            w = xData1[x] + xData3[x]
            endX = int(offsetX + (cos * xData1[x]) + (sin * xData2[x]))
            endY = int(offsetY - (sin * xData1[x]) + (cos * xData2[x]))
            startX = int(endX - w)
            startY = int(endY - h)
            # scale back up
            startX = int(startX * rW); startY = int(startY * rH)
            endX = int(endX * rW); endY = int(endY * rH)
            rects.append((startX, startY, endX, endY))
            confidences.append(float(scoresData[x]))

    if not rects:
        return []

    boxes = cv2.dnn.NMSBoxes(
        bboxes=[(x, y, ex-x, ey-y) for (x,y,ex,ey) in rects],
        scores=confidences, score_threshold=conf, nms_threshold=nms)

    out = []
    if len(boxes) > 0:
        for i in boxes.flatten():
            x,y, w,h = int(rects[i][0]), int(rects[i][1]), int(rects[i][2]-rects[i][0]), int(rects[i][3]-rects[i][1])
            pad = 4
            out.append((max(0,x-pad), max(0,y-pad), min(W, x+w+pad), min(H, y+h+pad)))
    return out

In [6]:
# =====================
# 5) OCR + CLEANUP
# =====================

def ocr_tesseract(img_bin, lang=LANGS, psm=DEFAULT_PSM, oem=DEFAULT_OEM, min_conf=MIN_CONF):
    cfg = f"--oem {oem} --psm {psm} -c preserve_interword_spaces=1"
    df = pytesseract.image_to_data(img_bin, lang=lang, config=cfg, output_type=pytesseract.Output.DATAFRAME)
    df = df.dropna()
    if 'conf' in df:
        df = df[df['conf'].astype(int) >= min_conf]
    lines = []
    for (page, block, par, line), g in df.groupby(['page_num','block_num','par_num','line_num']):
        words = g.sort_values('left')['text'].astype(str).tolist()
        line_text = ' '.join([w for w in words if w.strip()])
        if line_text.strip():
            lines.append(line_text.strip())
    return '\n'.join(lines), df


def cleanup_text(text: str) -> str:
    t = text
    t = re.sub(r'[ \t]+', ' ', t)
    t = re.sub(r'(\w)-\n(\w)', r'\1\2', t)  # join hyphenated at EOL
    t = t.replace('“','"').replace('”','"').replace('’',"'").replace('‘',"'")
    t = re.sub(r'(?<![.!?])\n(?!\n)', ' ', t)  # merge single breaks within sentences
    t = re.sub(r'\n{3,}', '\n\n', t)
    # token fixes for ALL-CAPS with digits
    def fix_token(tok):
        if tok.isupper() and any(c.isdigit() for c in tok):
            tok = tok.replace('0','O').replace('1','I').replace('5','S')
        return tok
    t = ' '.join(fix_token(tok) for tok in t.split())
    return t.strip()

# Optional PaddleOCR fallback
PADDLE_OCR = None
if USE_PADDLE:
    from paddleocr import PaddleOCR
    PADDLE_OCR = PaddleOCR(use_angle_cls=True, lang='es', show_log=False)


def ocr_paddle(img_bgr):
    # Paddle works better on color/gray image, not binary only
    result = PADDLE_OCR.ocr(img_bgr, cls=True)
    lines = []
    for res in result:
        for box, (txt, prob) in res:
            if prob >= 0.45:
                lines.append(txt)
    return '\n'.join(lines)

In [7]:
# =====================
# 7) BATCH RUNNER
# =====================

def list_images(root, recursive=True):
    exts = {'.jpg','.jpeg','.png','.webp','.tif','.tiff'}
    files = []
    root = Path(root)
    if recursive:
        for p in root.rglob('*'):
            if p.suffix.lower() in exts:
                files.append(str(p))
    else:
        for p in root.glob('*'):
            if p.suffix.lower() in exts:
                files.append(str(p))
    files.sort()
    return files


def run_batch():
    ensure_dir(OUTPUT_DIR)
    images = list_images(INPUT_DIR, RECURSIVE)
    print(f"Found {len(images)} images")

    rows = []
    for img in tqdm(images):
        try:
            res = process_one(img)
            rows.append(res)
        except Exception as e:
            rows.append({"image": img, "ok": False, "error": str(e)})

    df = pd.DataFrame(rows)
    csv_path = Path(OUTPUT_DIR, "batch_summary.csv")
    df.to_csv(csv_path, index=False)
    print("Saved:", csv_path)


In [8]:
# =====================
# 6) IMAGE → TEXT PIPELINE (ONE FILE)
# =====================

def process_one(image_path, out_dir=OUTPUT_DIR, save_debug=SAVE_DEBUG_VIS):
    try:
        img_bgr = imread_unicode(image_path)
    except Exception as e:
        return {"image": image_path, "ok": False, "error": f"read_error: {e}"}

    pre = preprocess_for_ocr(img_bgr)

    all_text = []
    box_data = []

    if USE_EAST_DETECTOR:
        ensure_east_model()
        boxes = detect_text_boxes_east(pre)
        if not boxes:
            # fallback to whole-page OCR
            text, df = ocr_tesseract(pre, psm=4)
            all_text.append(text)
            if df is not None and len(df):
                for _, r in df.iterrows():
                    box_data.append({"left": int(r.get('left',0)), "top": int(r.get('top',0)),
                                     "width": int(r.get('width',0)), "height": int(r.get('height',0)),
                                     "conf": float(r.get('conf',0)), "text": r.get('text','')})
        else:
            # Sort boxes top-to-bottom, then left-to-right
            boxes = sorted(boxes, key=lambda b: (b[1], b[0]))
            for (x1,y1,x2,y2) in boxes:
                roi = pre[y1:y2, x1:x2]
                # choose PSM by aspect ratio/size
                h, w = roi.shape[:2]
                psm = 6 if w/h > 1.2 else 7  # heuristic: wide block likely a line/paragraph
                text, df = ocr_tesseract(roi, psm=psm)
                if text.strip():
                    all_text.append(text)
                if df is not None and len(df):
                    for _, r in df.iterrows():
                        box_data.append({"left": int(x1+int(r.get('left',0))),
                                         "top": int(y1+int(r.get('top',0))),
                                         "width": int(r.get('width',0)),
                                         "height": int(r.get('height',0)),
                                         "conf": float(r.get('conf',0)),
                                         "text": r.get('text','')})
    else:
        text, df = ocr_tesseract(pre, psm=4)  # full page, multi-column
        all_text.append(text)
        if df is not None and len(df):
            for _, r in df.iterrows():
                box_data.append({"left": int(r.get('left',0)), "top": int(r.get('top',0)),
                                 "width": int(r.get('width',0)), "height": int(r.get('height',0)),
                                 "conf": float(r.get('conf',0)), "text": r.get('text','')})

    raw_text = '\n'.join([t for t in all_text if t.strip()])
    cleaned = cleanup_text(raw_text)

    # Optional Paddle fallback if very short and Paddle is enabled
    if USE_PADDLE and len(cleaned) < 30:
        paddle_txt = ocr_paddle(img_bgr)
        if len(paddle_txt) > len(cleaned):
            cleaned = cleanup_text(paddle_txt)

    rel = os.path.relpath(image_path, INPUT_DIR)
    stem = Path(rel).with_suffix("")

    txt_out = Path(out_dir, "txt", f"{stem}.txt")
    json_out = Path(out_dir, "json", f"{stem}.json")
    dbg_dir  = Path(out_dir, "debug")

    save_txt(txt_out, cleaned)

    # Save JSON with boxes + stats
    rec = {
        "image": image_path,
        "text_path": str(txt_out),
        "n_chars": len(cleaned),
        "n_lines": cleaned.count('\n') + 1 if cleaned else 0,
        "use_east": USE_EAST_DETECTOR,
        "lang": LANGS,
        "box_data": box_data[:5000]  # avoid overly large JSONs
    }
    Path(json_out).parent.mkdir(parents=True, exist_ok=True)
    with open(json_out, 'w', encoding='utf-8') as f:
        json.dump(rec, f, ensure_ascii=False, indent=2)

    # Save debug images
    if save_debug:
        ensure_dir(dbg_dir)
        pre_path = Path(dbg_dir, f"{stem}_pre.png")
        Path(pre_path).parent.mkdir(parents=True, exist_ok=True)
        cv2.imwrite(str(pre_path), pre)
        if USE_EAST_DETECTOR:
            vis = cv2.cvtColor(pre, cv2.COLOR_GRAY2BGR)
            for b in detect_text_boxes_east(pre):
                x1,y1,x2,y2 = b
                cv2.rectangle(vis, (x1,y1), (x2,y2), (0,255,0), 2)
            cv2.imwrite(str(Path(dbg_dir, f"{stem}_boxes.png")), vis)

    return {"image": image_path, "ok": True, "text_path": str(txt_out), "n_chars": len(cleaned)}


In [9]:
# ================= RESUMABLE, BATCH-SAVING RUNNER (drop-in) =================
import os, json, time, csv
from pathlib import Path
from datetime import datetime
from tqdm.auto import tqdm

# --- Where we commit progress (inside OUTPUT_DIR) ---
TXT_DIR     = Path(OUTPUT_DIR) / "txt"
LOG_CSV     = Path(OUTPUT_DIR) / "batch_log.csv"
STATE_JSON  = Path(OUTPUT_DIR) / "state.json"
BATCH_SIZE  = 50   # commit every N files (tweak to 25 if your session is flaky)

TXT_DIR.mkdir(parents=True, exist_ok=True)

def _expected_txt_for_image(image_path: str) -> Path:
    """Mirror process_one() logic to compute the txt output path for an image."""
    rel  = os.path.relpath(image_path, INPUT_DIR)     # keeps subfolders
    stem = Path(rel).with_suffix("")                  # drop extension(s)
    return Path(OUTPUT_DIR) / "txt" / f"{stem}.txt"

def _append_rows_csv(csv_path: Path, rows: list, header: list):
    file_exists = csv_path.exists()
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    with open(csv_path, "a", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        if not file_exists:
            w.writerow(header)
        w.writerows(rows)
        f.flush()
        os.fsync(f.fileno())

def _load_state(state_path: Path):
    if state_path.exists():
        with open(state_path, "r", encoding="utf-8") as f:
            return json.load(f)
    return {"processed_files": []}

def _save_state(state_path: Path, state: dict):
    tmp = state_path.with_suffix(".json.tmp")
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(state, f, ensure_ascii=False, indent=2)
        f.flush()
        os.fsync(f.fileno())
    tmp.replace(state_path)

def run_resumable_batches():
    # Discover inputs (reuses your list_images + config flags)
    images = list_images(INPUT_DIR, RECURSIVE)
    print(f"Found {len(images)} images under {INPUT_DIR}")

    # Load state and build skip set (also skip if TXT already exists)
    state = _load_state(STATE_JSON)
    already = set(state.get("processed_files", []))

    to_do = []
    for img in images:
        txt_out = _expected_txt_for_image(img)
        if txt_out.exists() or (img in already):
            continue
        to_do.append(img)

    print(f"Already processed (state or files): {len(images) - len(to_do)} | Remaining: {len(to_do)}")

    header = ["timestamp_utc","file","txt_out","chars","lines","status","msg","duration_s"]
    buffer_rows = []
    t_batch = time.time()

    for i, img in enumerate(tqdm(to_do, desc="OCR (resumable)")):
        t0 = time.time()
        ts = datetime.utcnow().isoformat(timespec="seconds")
        txt_out = _expected_txt_for_image(img)

        # Ensure subfolder exists (mirrors your process_one behavior)
        txt_out.parent.mkdir(parents=True, exist_ok=True)

        try:
            # process_one() ALREADY writes the TXT + JSON for this image
            res = process_one(img, out_dir=OUTPUT_DIR, save_debug=SAVE_DEBUG_VIS)

            # Quick stats for log
            n_chars = res.get("n_chars", 0)
            n_lines = 1 + n_chars and str(res.get("text_path","")).count("\n")  # best-effort
            buffer_rows.append([ts, img, str(txt_out), n_chars, n_lines, "OK", "", round(time.time()-t0, 3)])

            # Mark state
            already.add(img)
            state["processed_files"] = list(already)

        except Exception as e:
            buffer_rows.append([ts, img, str(txt_out), 0, 0, "ERROR", f"{type(e).__name__}: {e}", round(time.time()-t0, 3)])

        # Commit every BATCH_SIZE or at the very end
        if (len(buffer_rows) >= BATCH_SIZE) or (i == len(to_do) - 1):
            _append_rows_csv(LOG_CSV, buffer_rows, header)
            _save_state(STATE_JSON, state)
            buffer_rows = []
            # small sleep helps Drive indexing
            time.sleep(0.4)

    print(f"Done chunk. Elapsed ~{round(time.time()-t_batch,1)}s. You can safely re-run to resume.")


In [10]:
# =====================
# 8) QUICK TEST (OPTIONAL): download one sample cover and run
# =====================
TEST_ONE = False
if TEST_ONE:
    import urllib.request
    url = "https://tapas.clarin.com/tapa/2025/08/06/20250806_thumb.jpg"
    Path('/content/sample_data').mkdir(exist_ok=True, parents=True)
    local = "/content/sample_data/clarin_20250806.jpg"
    urllib.request.urlretrieve(url, local)
    INPUT_DIR = "/content/sample_data"

In [11]:

# =====================
# 9) GO — run the batch
# =====================
run_resumable_batches()


Found 654 images under /content/drive/MyDrive/Data Justicialista/Clarin Cover Sentiment Analysis/Covers
Already processed (state or files): 354 | Remaining: 300


OCR (resumable):   0%|          | 0/300 [00:00<?, ?it/s]

Done chunk. Elapsed ~2924.4s. You can safely re-run to resume.
