
# Pipeline de Coleta → Extração → Limpeza → Deduplicação → Shards (Notebook)

Este notebook executa o pipeline em **etapas**, permitindo inspeção e depuração fase a fase.

> **Tema**: violência doméstica no contexto da atuação policial  
> **Entrada**: `crawl_manifest.csv` (manifesto deduplicado com URLs)  
> **Saída**: diretório `OUTDIR/` com brutos (`data/raw`), textos limpos (`data/text`), shards JSONL (`data/shards`) e relatórios (`logs/`)


## 1) Configurações

In [None]:

from pathlib import Path
import os

# Caminhos
MANIFEST = Path("crawl_manifest.csv")  # ajuste se necessário
OUTDIR = Path("corpus_out_nb")

# Parâmetros
RATE = 1.0                # requisições por segundo (global)
MAX_WORKERS = 2           # threads de download/extração
TIMEOUT = 15              # timeout por requisição (s)
MIN_CHARS = 300           # mínimo de caracteres por doc (após limpeza)
SHARD_SIZE_MB = 100.0     # tamanho alvo de cada shard
LOG_LEVEL = "INFO"        # "DEBUG" para mais verbosidade
USER_AGENT = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119 Safari/537.36"

# Preparar diretórios de saída
for sub in ["data/raw", "data/text", "data/shards", "logs"]:
    (OUTDIR / sub).mkdir(parents=True, exist_ok=True)

print("OUTDIR:", OUTDIR.resolve())


## 2) Imports e utilitários

In [None]:

import csv, re, json, time, hashlib, logging
from urllib.parse import urlparse, urlunparse
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests
from bs4 import BeautifulSoup

# Dependências opcionais (tratadas com fallback)
try:
    import trafilatura
except Exception:
    trafilatura = None

try:
    import fitz  # PyMuPDF
except Exception:
    fitz = None

try:
    from pdfminer.high_level import extract_text as pdfminer_extract_text
except Exception:
    pdfminer_extract_text = None

# Logging
logging.basicConfig(level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
                    format="%(asctime)s %(levelname)s: %(message)s")
log = logging.getLogger("pipeline_nb")

# Sessão HTTP com retries e headers de navegador
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def build_session(timeout: int, user_agent: str):
    s = requests.Session()
    retries = Retry(total=3, backoff_factor=0.8, status_forcelist=[429,500,502,503,504])
    s.mount("http://", HTTPAdapter(max_retries=retries))
    s.mount("https://", HTTPAdapter(max_retries=retries))
    s.headers.update({
        "User-Agent": user_agent,
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "pt-BR,pt;q=0.9,en;q=0.8",
        "Connection": "keep-alive",
    })
    orig = s.request
    def wrapped(method, url, **kwargs):
        kwargs.setdefault("timeout", timeout)
        return orig(method, url, **kwargs)
    s.request = wrapped
    return s

session = build_session(TIMEOUT, USER_AGENT)

def sha1(s: str) -> str: return hashlib.sha1(s.encode("utf-8")).hexdigest()
def sha256(s: str) -> str: return hashlib.sha256(s.encode("utf-8")).hexdigest()

def norm_url(u: str) -> str:
    p = urlparse(u); path = p.path.rstrip("/")
    return urlunparse((p.scheme, p.netloc.lower(), path, "", "", ""))

# Rate limiter simples (token bucket)
class RateLimiter:
    def __init__(self, rate_per_sec: float):
        import threading
        self.rate = max(rate_per_sec, 0.1)
        self.tokens = self.rate
        self.last = time.monotonic()
        self.lock = threading.Lock()
    def acquire(self):
        with self.lock:
            while True:
                now = time.monotonic()
                elapsed = now - self.last
                self.tokens = min(self.rate, self.tokens + elapsed * self.rate)
                self.last = now
                if self.tokens >= 1.0:
                    self.tokens -= 1.0
                    return
                time.sleep((1.0 - self.tokens) / self.rate)

limiter = RateLimiter(RATE)


## 3) Checagem de `robots.txt` (com timeout)

In [None]:

from urllib import robotparser

class RobotsCache:
    """Busca robots.txt com requests e timeout; se falhar, assume permitido."""
    def __init__(self, user_agent: str, timeout: int = 15):
        self.user_agent = user_agent
        self.timeout = timeout
        self.cache = {}
    def can_fetch(self, url: str) -> bool:
        p = urlparse(url)
        base = f"{p.scheme}://{p.netloc}"
        if base in self.cache:
            rp = self.cache[base]
            if rp is None: return True
            try:
                return rp.can_fetch(self.user_agent, url)
            except Exception:
                return True
        robots_url = base + "/robots.txt"
        rp = robotparser.RobotFileParser()
        try:
            resp = session.get(robots_url)  # session já tem timeout
            if resp.status_code != 200 or not resp.content:
                self.cache[base] = None
                return True
            rp.parse(resp.text.splitlines())
            self.cache[base] = rp
            return rp.can_fetch(self.user_agent, url)
        except Exception:
            self.cache[base] = None
            return True

robots = RobotsCache(USER_AGENT, timeout=TIMEOUT)


## 4) Carregar manifesto e deduplicar URLs

In [None]:

import pandas as pd

df = pd.read_csv(MANIFEST)
df.columns = [c.strip().lower() for c in df.columns]
df["url_norm"] = df["url"].apply(norm_url)

dedup = df.drop_duplicates(subset=["url_norm"]).reset_index(drop=True)
len(df), len(dedup), dedup.head(3)


## 5) Funções de extração e limpeza

In [None]:

EMAIL_RE = re.compile(r"\b[\w\.-]+@[\w\.-]+\.\w{2,}\b", re.IGNORECASE)
CPF_RE = re.compile(r"\b\d{3}\.\d{3}\.\d{3}-\d{2}\b")
CNPJ_RE = re.compile(r"\b\d{2}\.\d{3}\.\d{3}/\d{4}-\d{2}\b")
MULTISPACE = re.compile(r"[ \t]+")
MULTINEWLINE = re.compile(r"\n{3,}")

def clean_text(s: str) -> str:
    if not s: return ""
    s = s.replace("\ufeff", "").replace("-\n","")
    s = MULTISPACE.sub(" ", s)
    s = MULTINEWLINE.sub("\n\n", s)
    s = EMAIL_RE.sub("<EMAIL>", s)
    s = CPF_RE.sub("<CPF>", s)
    s = CNPJ_RE.sub("<CNPJ>", s)
    return s.strip()

def extract_html_main(html_bytes: bytes) -> str:
    if trafilatura is not None:
        try:
            txt = trafilatura.extract(html_bytes, include_comments=False, include_tables=False, favor_recall=True)
            if txt and txt.strip():
                return txt.strip()
        except Exception:
            pass
    try:
        soup = BeautifulSoup(html_bytes, "lxml")
        for tag in soup(["script","style","nav","header","footer","noscript","aside"]):
            tag.decompose()
        return soup.get_text("\n").strip()
    except Exception:
        return ""

def extract_pdf_text(pdf_path: str) -> str:
    if fitz is not None:
        try:
            doc = fitz.open(pdf_path)
            parts = [page.get_text("text") for page in doc]
            out = "\n".join(parts)
            if out.strip():
                return out.strip()
        except Exception:
            pass
    if pdfminer_extract_text is not None:
        try:
            out = pdfminer_extract_text(pdf_path) or ""
            return out.strip()
        except Exception:
            pass
    return ""


## 6) Baixar e extrair (HTML/PDF)

In [None]:

from tqdm import tqdm

RAW_DIR = OUTDIR / "data" / "raw"
TEXT_DIR = OUTDIR / "data" / "text"

results = []

def fetch_and_extract(row):
    url = row["url"]
    # O true pula a verificação de robots.txt
    if not true:#robots.can_fetch(url):
        return {"url": url, "status": "robots_disallow"}
    limiter.acquire()
    try:
        resp = session.get(url)
        ct = resp.headers.get("Content-Type", "")
        content = resp.content
        status = resp.status_code
    except Exception as e:
        return {"url": url, "status": f"fetch_error:{e}"}
    if status != 200 or not content:
        return {"url": url, "status": f"http_{status}"}
    is_pdf = ("pdf" in (ct or "").lower()) or url.lower().endswith(".pdf")
    ext = ".pdf" if is_pdf else ".html"
    fname = sha256(row["url_norm"])[:20] + ext
    raw_path = RAW_DIR / fname
    try:
        raw_path.write_bytes(content)
    except Exception as e:
        return {"url": url, "status": f"save_raw_error:{e}"}
    try:
        text = extract_pdf_text(str(raw_path)) if is_pdf else extract_html_main(content)
    except Exception as e:
        text = ""
    text = clean_text(text)
    if len(text) < MIN_CHARS:
        return {"url": url, "status": "too_short", "chars": len(text)}
    tname = fname.replace(ext, ".txt")
    text_path = TEXT_DIR / tname
    try:
        text_path.write_text(text, encoding="utf-8")
    except Exception as e:
        return {"url": url, "status": f"save_text_error:{e}"}
    return {"url": url, "status": "ok", "text_path": str(text_path), "chars": len(text)}

# Execução (paralela)
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
    futs = [ex.submit(fetch_and_extract, r) for r in dedup.to_dict(orient="records")]
    for fut in tqdm(as_completed(futs), total=len(futs), desc="Coletando/Extraindo"):
        results.append(fut.result())

len(results), results[:3]


## 7) Deduplicar por hash e gerar shards JSONL

In [None]:

import json

SHARDS_DIR = OUTDIR / "data" / "shards"
SHARDS_DIR.mkdir(parents=True, exist_ok=True)
seen_hash = set()
shard_bytes_limit = int(SHARD_SIZE_MB * 1024 * 1024)
buf = []; size = 0; shard_id = 0

def flush():
    global buf, size, shard_id
    if not buf: return
    out_path = SHARDS_DIR / f"corpus_{shard_id:04d}.jsonl"
    with out_path.open("w", encoding="utf-8") as f:
        for line in buf:
            f.write(line + "\n")
    buf = []; size = 0; shard_id += 1
    print("Shard salvo:", out_path)

ok_items = [r for r in results if r.get("status") == "ok"]
for item in ok_items:
    try:
        text = Path(item["text_path"]).read_text(encoding="utf-8")
    except Exception:
        continue
    h = sha1(text)
    if h in seen_hash:
        continue
    seen_hash.add(h)
    line = json.dumps({"text": text}, ensure_ascii=False)
    buf.append(line); size += len(line.encode("utf-8"))
    if size >= shard_bytes_limit:
        flush()
flush()

len(ok_items), len(seen_hash)


## 8) Relatório final

In [None]:

import json
REPORT = OUTDIR / "logs" / "report.json"
stats = {
    "total_urls": int(len(dedup)),
    "ok": int(sum(1 for r in results if r.get("status") == "ok")),
    "too_short": int(sum(1 for r in results if r.get("status") == "too_short")),
    "http_non200": int(sum(1 for r in results if str(r.get("status","")).startswith("http_"))),
    "errors": [r for r in results if "error" in r.get("status","")],
    "shards": int(len(list((OUTDIR / "data" / "shards").glob("corpus_*.jsonl")))),
}
REPORT.write_text(json.dumps(stats, indent=2, ensure_ascii=False), encoding="utf-8")
stats



## 9) Dicas
- Se um host estiver lento, ajuste: `MAX_WORKERS = 1`, `RATE = 0.5`, `TIMEOUT = 10`.
- Para debug detalhado: `LOG_LEVEL = "DEBUG"`.
- Se precisar **ignorar robots** para diagnóstico rápido, troque `robots.can_fetch(url)` para sempre `True` (apenas para teste).
- Para enriquecer metadados, crie um `index.csv` registrando `url`, `hash`, `path` e `chars` durante o loop.
