In [20]:
!apt-get -qq update
!apt-get -qq install -y tesseract-ocr tesseract-ocr-por poppler-utils
!pip -q install pytesseract pdf2image pymupdf pillow python-dateutil pandas

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)


In [31]:
import io, re, json, math, hashlib
from datetime import datetime, date

import pandas as pd
from PIL import Image
import pytesseract
from pdf2image import convert_from_bytes
import fitz  # pymupdf
from dateutil import parser as dateparser
from google.colab import files

def sha256_bytes(b: bytes) -> str:
    return hashlib.sha256(b).hexdigest()

uploaded = files.upload()
file_name = next(iter(uploaded.keys()))
file_bytes = uploaded[file_name]
doc_hash = sha256_bytes(file_bytes)

print("Arquivo:", file_name)
print("SHA256:", doc_hash[:16], "...")

Saving documento_fraude_simulada.pdf to documento_fraude_simulada.pdf
Arquivo: documento_fraude_simulada.pdf
SHA256: a5968e5f7fa1e988 ...


In [32]:
def extract_text_offline(file_name: str, file_bytes: bytes, lang="por"):
    ext = file_name.lower().split(".")[-1]
    pages_text = []
    used = None

    if ext == "pdf":
        # 1) tenta extrair texto digital (sem OCR)
        try:
            doc = fitz.open(stream=file_bytes, filetype="pdf")
            digital_text = []
            for i in range(len(doc)):
                digital_text.append(doc[i].get_text("text"))
            digital_text = "\n".join(digital_text).strip()
            if len(digital_text) >= 300:  # heurística: tem texto mesmo
                used = "PDF_DIGITAL_TEXT"
                return {"text": digital_text, "method": used, "page_count": len(doc)}
        except Exception:
            pass

        # 2) fallback: OCR em páginas renderizadas
        images = convert_from_bytes(file_bytes, dpi=250)
        for img in images:
            t = pytesseract.image_to_string(img, lang=lang)
            pages_text.append(t)
        used = "PDF_OCR"
        return {"text": "\n".join(pages_text).strip(), "method": used, "page_count": len(images)}

    else:
        # imagem
        img = Image.open(io.BytesIO(file_bytes)).convert("RGB")
        text = pytesseract.image_to_string(img, lang=lang)
        used = "IMAGE_OCR"
        return {"text": text.strip(), "method": used, "page_count": 1}

doc_ocr = extract_text_offline(file_name, file_bytes, lang="por")
doc_ocr["doc_hash"] = doc_hash
doc_ocr["avg_word_confidence"] = None   # offline não tem confidence
doc_ocr["word_confidence_count"] = 0

print("Método:", doc_ocr["method"])
print("Páginas:", doc_ocr["page_count"])
print("\nAmostra do texto:\n", doc_ocr["text"][:800])

Método: PDF_DIGITAL_TEXT
Páginas: 1

Amostra do texto:
 CONTRATO DE PRESTAÇÃO DE SERVIÇOS
Cliente: João da Silva
CPF: 123.456.789-00
CNPJ Empresa: 12.345.678/0001-95
Data de Emissão: 30/12/2099
Data de Vencimento: 01/01/2100
Valor do Serviço: R$ 1.234,56
Valor Ajustado: R$1234.56
Desconto aplicado: 1234,56
Taxa adicional: R$ 1.235,00
Valor Final TOTAL: R$1234.56 R$ 1.234,56 R$ 1234,56
Observação: Este documento foi gerado automaticamente.
Itens:
•
Item 1 - Serviço A - R$ 199,90
•
Item 2 - Serviço B - R$199.90
•
Item 3 - Serviço C - 199,90
•
Item 4 - Serviço D - R$ 200,00
•
Item 5 - Serviço E - R$199,00
•
Item 6 - Serviço F - R$ 198,90


In [33]:
def only_digits(x):
    return re.sub(r"\D", "", x or "")

def safe_float(s):
    if s is None:
        return None
    s = str(s).strip().replace("R$", "").replace(" ", "").replace("\u00A0", "")
    if "," in s and "." in s:
        s = s.replace(".", "").replace(",", ".")
    else:
        s = s.replace(",", ".")
    try:
        return float(s)
    except:
        return None

def parse_date(s):
    if not s:
        return None
    try:
        return dateparser.parse(str(s), dayfirst=True).date()
    except:
        return None

def find_cpf_cnpj(text):
    cpf_pat = r"\b(\d{3}\.?\d{3}\.?\d{3}-?\d{2})\b"
    cnpj_pat = r"\b(\d{2}\.?\d{3}\.?\d{3}/?\d{4}-?\d{2})\b"
    cpfs = re.findall(cpf_pat, text)
    cnpjs = re.findall(cnpj_pat, text)
    cpfs = list({only_digits(x) for x in cpfs})
    cnpjs = list({only_digits(x) for x in cnpjs})
    return cpfs, cnpjs

def find_money_values(text):
    pat = r"(?:R\$\s*)?(\d{1,3}(?:\.\d{3})*,\d{2}|\d+(?:[.,]\d{2}))"
    vals = re.findall(pat, text)
    floats = [safe_float(v) for v in vals]
    return [v for v in floats if v is not None]

def find_money_raw_tokens(text):
    pat = r"(R\$\s*)?(\d{1,3}(?:\.\d{3})*,\d{2}|\d+(?:[.,]\d{2}))"
    return re.findall(pat, text)

def find_dates(text):
    pats = [
        r"\b\d{1,2}[\/\-]\d{1,2}[\/\-]\d{2,4}\b",
        r"\b\d{4}[\/\-]\d{1,2}[\/\-]\d{1,2}\b",
    ]
    found = set()
    for p in pats:
        for m in re.findall(p, text):
            found.add(m)
    parsed = [parse_date(x) for x in found]
    return sorted(set([d for d in parsed if d]))

def mask_cpf(cpf_digits):
    d = only_digits(cpf_digits)
    if len(d) == 11:
        return f"***.***.***-{d[-2:]}"
    return "*" * max(0, len(d)-2) + d[-2:]

def mask_cnpj(cnpj_digits):
    d = only_digits(cnpj_digits)
    if len(d) == 14:
        return f"**.***.***/****-{d[-2:]}"
    return "*" * max(0, len(d)-2) + d[-2:]

def mask_text_pii(text):
    cpf_pat = r"\b(\d{3}\.?\d{3}\.?\d{3}-?\d{2})\b"
    cnpj_pat = r"\b(\d{2}\.?\d{3}\.?\d{3}/?\d{4}-?\d{2})\b"
    text = re.sub(cpf_pat, lambda m: mask_cpf(m.group(1)), text)
    text = re.sub(cnpj_pat, lambda m: mask_cnpj(m.group(1)), text)
    return text

In [34]:
def validate_cpf(cpf_digits: str) -> bool:
    cpf = only_digits(cpf_digits)
    if len(cpf) != 11 or cpf == cpf[0]*11:
        return False
    def dv_calc(base):
        s = sum(int(d)*w for d, w in zip(base, range(len(base)+1, 1, -1)))
        r = (s*10) % 11
        return 0 if r == 10 else r
    d1 = dv_calc(cpf[:9])
    d2 = dv_calc(cpf[:9] + str(d1))
    return cpf.endswith(f"{d1}{d2}")

def validate_cnpj(cnpj_digits: str) -> bool:
    cnpj = only_digits(cnpj_digits)
    if len(cnpj) != 14 or cnpj == cnpj[0]*14:
        return False
    def calc(base, weights):
        s = sum(int(d)*w for d, w in zip(base, weights))
        r = s % 11
        return 0 if r < 2 else 11 - r
    w1 = [5,4,3,2,9,8,7,6,5,4,3,2]
    w2 = [6] + w1
    d1 = calc(cnpj[:12], w1)
    d2 = calc(cnpj[:12] + str(d1), w2)
    return cnpj.endswith(f"{d1}{d2}")

In [35]:
def build_extracted_from_text(text, doc_hash):
    cpfs, cnpjs = find_cpf_cnpj(text)
    money_vals = find_money_values(text)
    dates = find_dates(text)
    money_tokens = find_money_raw_tokens(text)

    return {
        "doc_hash": doc_hash,
        "cpfs": cpfs,
        "cnpjs": cnpjs,
        "money_values": money_vals[:40],
        "dates": [d.isoformat() for d in dates[:40]],
        "_money_tokens": money_tokens,  # interno
    }

def logistic(z):
    return 1.0 / (1.0 + math.exp(-z))

def detect_suspicious_value_editing(text, money_vals, money_tokens):
    evidence = []

    # 1) muitos valores
    if len(money_vals) >= 15:
        evidence.append(f"Muitos valores monetários detectados (count={len(money_vals)})")

    # 2) valores muito próximos (diferença <= 1 real)
    vals_sorted = sorted(money_vals)
    close_pairs = 0
    for i in range(1, len(vals_sorted)):
        if abs(vals_sorted[i] - vals_sorted[i-1]) <= 1.00:
            close_pairs += 1
    if close_pairs >= 5:
        evidence.append(f"Muitos valores muito próximos entre si (close_pairs={close_pairs})")

    # 3) mistura de formatos
    has_pt, has_en = False, False
    for pref, num in money_tokens:
        if "." in num and "," in num:
            has_pt = True
        elif "." in num and "," not in num:
            has_en = True
        elif "," in num and "." not in num:
            has_pt = True
    if has_pt and has_en:
        evidence.append("Mistura de formatos pt-BR e en-US no mesmo documento (pode indicar colagem/edição)")

    # 4) presença de TOTAL + muitos valores
    if re.search(r"\b(total|valor\s+total)\b", text, flags=re.IGNORECASE) and len(money_vals) >= 8:
        evidence.append("Palavra-chave de TOTAL presente com muitos valores — revisar consistência do total")

    # 5) ruído: muitos "R$" colados (ex: R$123,45 sem espaço) -> pode ser normal, mas sinaliza se exagerado
    glued_rs = len(re.findall(r"R\$\d", text))
    if glued_rs >= 10:
        evidence.append(f"Muitos padrões 'R$' colados ao número (count={glued_rs})")

    return evidence

def build_flag(rule, severity, points, evidence=None, recommendation=None):
    return {
        "rule": rule,
        "severity": severity,
        "points": points,
        "evidence": evidence or [],
        "recommendation": recommendation or ""
    }

def fraud_rules_probabilistic(doc_ocr, extracted, cpf_validity, cnpj_validity, seen_hashes=None):
    flags = []
    z = -2.0  # base baixo risco

    text = doc_ocr.get("text","") or ""
    text_len = len(text)

    # Texto muito curto
    if text_len < 200:
        flags.append(build_flag(
            "VERY_SHORT_TEXT","MEDIUM",15,
            evidence=[f"text_len={text_len}"],
            recommendation="Possível documento incompleto/ilegível. Solicitar reenvio em melhor qualidade."
        ))
        z += 0.8

    # OCR confidence (offline = None)
    avg_conf = doc_ocr.get("avg_word_confidence")
    if avg_conf is not None and avg_conf < 0.85:
        flags.append(build_flag(
            "LOW_OCR_CONFIDENCE","MEDIUM",25,
            evidence=[f"avg_word_confidence={avg_conf:.3f}"],
            recommendation="Revisar campos críticos; considerar reenvio do documento."
        ))
        z += 1.2

    # CPF/CNPJ inválidos
    invalid_cpfs = [c for c,v in cpf_validity.items() if not v]
    invalid_cnpjs = [c for c,v in cnpj_validity.items() if not v]

    if invalid_cpfs:
        flags.append(build_flag(
            "INVALID_CPF","HIGH",35,
            evidence=[f"cpfs_invalidos={ [mask_cpf(c) for c in invalid_cpfs] }"],
            recommendation="Bloquear automação e enviar para revisão humana."
        ))
        z += 1.8

    if invalid_cnpjs:
        flags.append(build_flag(
            "INVALID_CNPJ","HIGH",35,
            evidence=[f"cnpjs_invalidos={ [mask_cnpj(c) for c in invalid_cnpjs] }"],
            recommendation="Bloquear automação e validar cadastro/empresa."
        ))
        z += 1.8

    # Datas futuras
    parsed_dates = [parse_date(d) for d in extracted.get("dates", [])]
    parsed_dates = [d for d in parsed_dates if d]
    today = date.today()
    future_dates = [d for d in parsed_dates if d > today]
    if future_dates:
        flags.append(build_flag(
            "FUTURE_DATE_FOUND","HIGH",20,
            evidence=[f"datas_futuras={ [d.isoformat() for d in future_dates[:8]] }"],
            recommendation="Revisar datas; pode ser adulteração ou OCR incorreto."
        ))
        z += 1.1

    # Duplicidade por hash (se tiver histórico)
    if seen_hashes is not None and extracted.get("doc_hash") in seen_hashes:
        flags.append(build_flag(
            "DUPLICATE_DOCUMENT_HASH","HIGH",30,
            evidence=[f"hash={extracted.get('doc_hash','')[:16]}..."],
            recommendation="Possível reutilização de documento."
        ))
        z += 1.3

    # Suspeita de edição de valores
    money_vals = extracted.get("money_values", []) or []
    money_tokens = extracted.get("_money_tokens", []) or []
    susp_ev = detect_suspicious_value_editing(text, money_vals, money_tokens)
    if susp_ev:
        flags.append(build_flag(
            "SUSPECTED_VALUE_TAMPERING","MEDIUM",20,
            evidence=susp_ev[:6],
            recommendation="Verificar consistência de itens/totais e sinais de colagem."
        ))
        z += 0.9

    # Probabilidade e score
    prob = logistic(z)
    score = int(round(prob * 100))

    # Nível
    if score < 25:
        level = "LOW"
    elif score < 60:
        level = "MEDIUM"
    else:
        level = "HIGH"

    # Auditoria (resumo)
    sev_order = {"HIGH": 0, "MEDIUM": 1, "LOW": 2}
    flags_sorted = sorted(flags, key=lambda f: (sev_order.get(f["severity"], 9), -f["points"]))

    lines = [f"Risco estimado: {score}/100 (prob={prob:.2f}) | Nível: {level}"]
    if flags_sorted:
        lines.append("Principais achados:")
        for f in flags_sorted[:8]:
            ev = "; ".join(f["evidence"][:3]) if f["evidence"] else "sem evidência adicional"
            lines.append(f"- [{f['severity']}] {f['rule']} (+{f['points']}): {ev}")
    else:
        lines.append("Nenhuma regra crítica acionada.")
    audit_summary = "\n".join(lines)

    return score, prob, level, flags_sorted, audit_summary

def mask_final_output(final):
    # deep copy
    out = json.loads(json.dumps(final))

    # mascarar preview
    if "text_preview" in out:
        out["text_preview"] = mask_text_pii(out["text_preview"])

    # mascarar extracted
    ex = out.get("extracted", {})
    ex["cpfs_masked"] = [mask_cpf(c) for c in ex.get("cpfs", [])]
    ex["cnpjs_masked"] = [mask_cnpj(c) for c in ex.get("cnpjs", [])]
    ex.pop("cpfs", None)
    ex.pop("cnpjs", None)
    ex.pop("_money_tokens", None)

    # mascarar validations
    vals = out.get("validations", {})
    if "cpf" in vals:
        vals["cpf_masked"] = {mask_cpf(k): v for k, v in vals["cpf"].items()}
        vals.pop("cpf", None)
    if "cnpj" in vals:
        vals["cnpj_masked"] = {mask_cnpj(k): v for k, v in vals["cnpj"].items()}
        vals.pop("cnpj", None)

    return out

In [36]:
text = doc_ocr["text"]

extracted = build_extracted_from_text(text, doc_hash)

cpf_validity = {c: validate_cpf(c) for c in extracted["cpfs"]}
cnpj_validity = {c: validate_cnpj(c) for c in extracted["cnpjs"]}

score, prob, level, flags, audit_summary = fraud_rules_probabilistic(
    doc_ocr, extracted, cpf_validity, cnpj_validity, seen_hashes=set()
)

from datetime import timezone

final = {
    "file_name": file_name,
    "mode": "OFFLINE_DEMO",
    "doc_hash": doc_hash,
    "ocr": {"method": doc_ocr["method"], "page_count": doc_ocr["page_count"]},
    "extracted": extracted,
    "validations": {"cpf": cpf_validity, "cnpj": cnpj_validity},
    "risk": {
        "score_0_100": score,
        "probability_0_1": prob,
        "level": level,
        "flags": flags,
        "audit_summary": audit_summary
    },
    "text_preview": text[:1200],
    "generated_at": datetime.now(timezone.utc).isoformat()
}

final_masked = mask_final_output(final)

print(final_masked["risk"]["audit_summary"])
final_masked

Risco estimado: 86/100 (prob=0.86) | Nível: HIGH
Principais achados:
- [HIGH] INVALID_CPF (+35): cpfs_invalidos=['***.***.***-00']
- [HIGH] FUTURE_DATE_FOUND (+20): datas_futuras=['2099-12-30', '2100-01-01']
- [MEDIUM] SUSPECTED_VALUE_TAMPERING (+20): Muitos valores monetários detectados (count=17); Muitos valores muito próximos entre si (close_pairs=11); Mistura de formatos pt-BR e en-US no mesmo documento (pode indicar colagem/edição)


{'file_name': 'documento_fraude_simulada.pdf',
 'mode': 'OFFLINE_DEMO',
 'doc_hash': 'a5968e5f7fa1e98887cfcc7139e316b3d3d80aaa23702ac8afd6c50319f44672',
 'ocr': {'method': 'PDF_DIGITAL_TEXT', 'page_count': 1},
 'extracted': {'doc_hash': 'a5968e5f7fa1e98887cfcc7139e316b3d3d80aaa23702ac8afd6c50319f44672',
  'money_values': [123.45,
   6.78,
   12.34,
   5.67,
   1234.56,
   1234.56,
   1234.56,
   1235.0,
   1234.56,
   1234.56,
   1234.56,
   199.9,
   199.9,
   199.9,
   200.0,
   199.0,
   198.9],
  'dates': ['2099-12-30', '2100-01-01'],
  'cpfs_masked': ['***.***.***-00'],
  'cnpjs_masked': ['**.***.***/****-95']},
 'validations': {'cpf_masked': {'***.***.***-00': False},
  'cnpj_masked': {'**.***.***/****-95': True}},
 'risk': {'score_0_100': 86,
  'probability_0_1': 0.8581489350995123,
  'level': 'HIGH',
  'flags': [{'rule': 'INVALID_CPF',
    'severity': 'HIGH',
    'points': 35,
    'evidence': ["cpfs_invalidos=['***.***.***-00']"],
    'recommendation': 'Bloquear automação e env

In [37]:
from google.colab import files

with open("report_masked.json", "w", encoding="utf-8") as f:
    json.dump(final_masked, f, ensure_ascii=False, indent=2)

df_flags = pd.DataFrame(final_masked["risk"]["flags"])
df_flags.to_csv("flags.csv", index=False)

files.download("report_masked.json")
files.download("flags.csv")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>