In [None]:

# =============================
# CÉLULA 1 — Configurações
# =============================
CSV_INPUT = 'resultado_tjma_v2.csv'  # ajuste para o seu caminho
CSV_ROTULADO = 'resultado_tjma_rotulado.csv'
CSV_MINIMAL = 'dataset_minimal_tjma.csv'
MODEL_NAME = 'neuralmind/bert-base-portuguese-cased'  # BERTimbau
MODEL_OUT_DIR = 'model_out_tjma'
USE_TRANSFORMERS = True  # se quiser desativar, coloque False (usa TF-IDF baseline)
SENSITIVE_COL = 'magistrado_genero'
SENSITIVE_BIN = ['Feminino','Masculino']  # filtra apenas estes valores para fairness
POSITIVE_CLASS = 'procedente'  # para métricas de paridade
ADV_LAMBDA = 0.3  # peso da loss adversária


In [None]:

# =============================
# CÉLULA 2 — Imports
# =============================
import re, os, json as pyjson
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, f1_score, precision_score, recall_score
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (7, 5)

# Tenta importar Fairlearn; se não houver, seguimos com cálculo manual
try:
    from fairlearn.metrics import MetricFrame, selection_rate, true_positive_rate, false_positive_rate
    FAIRLEARN_OK = True
except Exception as e:
    FAIRLEARN_OK = False
    print('Fairlearn não disponível, usarei métricas manuais. Erro:', e)


In [None]:

# =============================
# CÉLULA 3 — Heurísticas de rotulagem por regex
# =============================
df = pd.read_csv(CSV_INPUT)
len(df), df.head()

# Regras heurísticas (iguais às usadas no pipeline anterior)
PROCEDENTE = [
    r'(julgo\s+procedente)', r'(proced\w+)', r'(dou\s+provimento)', r'(acolho)', r'(condeno)',
]
IMPROCEDENTE = [
    r'(julgo\s+improcedente)', r'(improced\w+)', r'(nego\s+provimento)', r'(rejeito)', r'(desprovido|desprovimento)', r'(improcedentes\s+os\s+pedidos)',
]
NEUTRO = [
    r'(julgo\s+extinto|declaro\s+extinto|extin\w+\s+do\s+processo)',
    r'(sem\s+resolu\w+\s+do\s+m\w+rito|art\.?\s*485)',
    r'(homologo\s+acordo|homologa\w+|homologo\s+desist\w+)',
    r'(perda\s+do\s+objeto|car\w+ncia\s+de\s+a\w+\w+o)',
    r'(ilegitimidade|incompet\w+ncia|litispend\w+ncia)',
    r'(indeferimento\s+da\s+peti\w+\s+inicial|indefiro\s+a\s+inicial|rejeito\s+a\s+inicial)',
]
MERITO = [r'(resolu\w+\s+do\s+m\w+rito)', r'(art\.?\s*487)']
SEM_MERITO = [r'(sem\s+resolu\w+\s+do\s+m\w+rito)', r'(art\.?\s*485)']


def norm(s):
    if not isinstance(s, str): return ''
    s = s.strip()
    return re.sub(r'\s+', ' ', s)


def find_first_match(text, patterns):
    for rx in patterns:
        if re.search(rx, text, flags=re.IGNORECASE):
            return rx
    return None


def rotular_decisao(text):
    t = norm(text)
    ev = find_first_match(t, PROCEDENTE)
    if ev:
        tipo = 'merito' if find_first_match(t, MERITO) else ('sem_merito' if find_first_match(t, SEM_MERITO) else 'merito')
        return ('procedente', ev, tipo, 0.95)
    ev = find_first_match(t, IMPROCEDENTE)
    if ev:
        tipo = 'merito' if find_first_match(t, MERITO) else ('sem_merito' if find_first_match(t, SEM_MERITO) else 'merito')
        return ('improcedente', ev, tipo, 0.95)
    ev = find_first_match(t, NEUTRO)
    if ev:
        return ('neutro', ev, 'sem_merito', 0.90)
    if find_first_match(t, MERITO):
        return ('neutro', None, 'merito', 0.60)
    if find_first_match(t, SEM_MERITO):
        return ('neutro', None, 'sem_merito', 0.70)
    return ('neutro', None, None, 0.50)

out = df.copy()
res = out['decisao'].apply(rotular_decisao)
out['sentimento'] = res.apply(lambda x: x[0])
out['evidencia'] = res.apply(lambda x: x[1])
out['tipo_resultado'] = res.apply(lambda x: x[2])
out['confianca'] = res.apply(lambda x: x[3])
print('Distribuição de rótulos:')
print(out['sentimento'].value_counts())

out.to_csv(CSV_ROTULADO, index=False)
minimal = out[['decisao','sentimento',SENSITIVE_COL]].rename(columns={'decisao':'text','sentimento':'label'})
minimal['text'] = minimal['text'].fillna('').astype(str).str.strip()
# CORREÇÃO: operador '>' (antes estava &gt;)
minimal = minimal[minimal['text'].str.len() > 3]
minimal.to_csv(CSV_MINIMAL, index=False)
CSV_ROTULADO, CSV_MINIMAL, len(minimal)


In [None]:

# =============================
# CÉLULA 4 — Mapeamento e splits
# =============================
label2id = {'procedente':0, 'improcedente':1, 'neutro':2}
id2label = {v:k for k,v in label2id.items()}
minimal['label_id'] = minimal['label'].map(label2id)
minimal = minimal.dropna(subset=['label_id'])

# filtra apenas F/M para fairness
assert len(SENSITIVE_BIN) == 2, "SENSITIVE_BIN deve ter exatamente dois grupos."
minimal = minimal[minimal[SENSITIVE_COL].isin(SENSITIVE_BIN)]

# Split train/test estratificado
train_df, test_df = train_test_split(minimal, test_size=0.2, random_state=42, stratify=minimal['label_id'])
# Split validação a partir do treino (correção: não usar 'test' como validação no Trainer)
train_df, val_df = train_test_split(train_df, test_size=0.1, random_state=42, stratify=train_df['label_id'])

len(train_df), len(val_df), len(test_df)


In [None]:

# =============================
# CÉLULA 5 — Treino base (Transformers ou TF-IDF)
# =============================
pred_base = None
if USE_TRANSFORMERS:
    from transformers import (AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments)
    from transformers import DataCollatorWithPadding
    from datasets import Dataset, DatasetDict

    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

    train_ds = Dataset.from_pandas(train_df[['text','label_id']].rename(columns={'label_id':'labels'}))
    val_ds   = Dataset.from_pandas(val_df[['text','label_id']].rename(columns={'label_id':'labels'}))
    test_ds  = Dataset.from_pandas(test_df[['text','label_id']].rename(columns={'label_id':'labels'}))
    ds = DatasetDict({'train':train_ds, 'val':val_ds, 'test':test_ds})

    def tok(batch):
        return tokenizer(batch['text'], truncation=True, max_length=512)

    ds_tok = ds.map(tok, batched=True)
    model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=3, id2label=id2label, label2id=label2id)
    collator = DataCollatorWithPadding(tokenizer=tokenizer)

    import numpy as np
    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        preds = np.argmax(logits, axis=-1)
        return {
            'accuracy': accuracy_score(labels, preds),
            'precision_macro': precision_score(labels, preds, average='macro', zero_division=0),
            'recall_macro': recall_score(labels, preds, average='macro', zero_division=0),
            'f1_macro': f1_score(labels, preds, average='macro', zero_division=0),
        }

    args = TrainingArguments(
        output_dir=MODEL_OUT_DIR,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=32,
        num_train_epochs=2,
        learning_rate=2e-5,
        evaluation_strategy='epoch',
        save_strategy='epoch',
        report_to='none',
        load_best_model_at_end=True,
        metric_for_best_model='f1_macro',
        seed=42,
    )

    from transformers import EarlyStoppingCallback
    trainer = Trainer(
        model=model,
        args=args,
        train_dataset=ds_tok['train'],
        eval_dataset=ds_tok['val'],  # CORREÇÃO: usar validação, não 'test'
        tokenizer=tokenizer,
        data_collator=collator,
        compute_metrics=compute_metrics,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=1)]
    )

    trainer.train()
    eval_metrics = trainer.evaluate(ds_tok['val'])
    preds = trainer.predict(ds_tok['test'])
    y_true = np.array(preds.label_ids)
    y_pred = np.argmax(preds.predictions, axis=-1)
else:
    # Fallback rápido: TF-IDF + LogisticRegression
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.linear_model import LogisticRegression
    vec = TfidfVectorizer(max_features=30000, ngram_range=(1,2))
    Xtr = vec.fit_transform(train_df['text'])
    Xte = vec.transform(test_df['text'])
    clf = LogisticRegression(max_iter=200)
    clf.fit(Xtr, train_df['label_id'])
    y_pred = clf.predict(Xte)
    y_true = np.array(test_df['label_id'])
    eval_metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'precision_macro': precision_score(y_true, y_pred, average='macro', zero_division=0),
        'recall_macro': recall_score(y_true, y_pred, average='macro', zero_division=0),
        'f1_macro': f1_score(y_true, y_pred, average='macro', zero_division=0),
    }

# Verificação de alinhamento
assert len(test_df) == len(y_true) == len(y_pred), "Tamanhos de teste e vetores y_true/y_pred não batem."

eval_metrics


In [None]:

# =============================
# CÉLULA 6 — Fairness (modelo base)
# =============================
# Junta rótulos e grupos no teste
assert all(g in test_df[SENSITIVE_COL].unique() for g in SENSITIVE_BIN), "Nem todos os grupos de SENSITIVE_BIN estão presentes no teste."

test_eval = test_df.copy()
test_eval['y_true'] = y_true
test_eval['y_pred'] = y_pred

# Define 'positivo' como classe 0 (procedente)
POS_ID = label2id[POSITIVE_CLASS]

def rate_positive(y):
    return np.mean(np.array(y)==POS_ID)


def group_rates(df_in, col=SENSITIVE_COL):
    res = {}
    for g in SENSITIVE_BIN:
        dfg = df_in[df_in[col]==g]
        res[g] = {
            'selection_rate_true': rate_positive(dfg['y_true']),
            'selection_rate_pred': rate_positive(dfg['y_pred']),
        }
        # TPR/FPR por grupo no modelo (CORREÇÃO: operadores &)
        yt = (dfg['y_true'].values==POS_ID).astype(int)
        yp = (dfg['y_pred'].values==POS_ID).astype(int)
        TPR = (np.sum((yp==1) & (yt==1)) / max(np.sum(yt==1), 1))
        FPR = (np.sum((yp==1) & (yt==0)) / max(np.sum(yt==0), 1))
        res[g]['TPR'] = TPR
        res[g]['FPR'] = FPR
    return res

rates = group_rates(test_eval)

# Diferenças de disparidade (diferença absoluta entre grupos)
def disparity_diffs(rates):
    g0, g1 = SENSITIVE_BIN
    diffs = {
        'demographic_parity_true_diff': abs(rates[g0]['selection_rate_true'] - rates[g1]['selection_rate_true']),
        'demographic_parity_pred_diff': abs(rates[g0]['selection_rate_pred'] - rates[g1]['selection_rate_pred']),
        'equal_opportunity_diff': abs(rates[g0]['TPR'] - rates[g1]['TPR']),
        'equalized_odds_FPR_diff': abs(rates[g0]['FPR'] - rates[g1]['FPR']),
    }
    return diffs

diffs_base = disparity_diffs(rates)
diffs_base


In [None]:

# =============================
# CÉLULA 7 — Visualização (modelo base)
# =============================
g0, g1 = SENSITIVE_BIN
labels_plot = ['TPR','FPR']
vals0 = [rates[g0]['TPR'], rates[g0]['FPR']]
vals1 = [rates[g1]['TPR'], rates[g1]['FPR']]
x = np.arange(len(labels_plot))
w = 0.35
plt.bar(x-w/2, vals0, width=w, label=g0)
plt.bar(x+w/2, vals1, width=w, label=g1)
plt.xticks(x, labels_plot)
plt.ylim(0,1)
plt.title('TPR/FPR por gênero (modelo base)')
plt.legend()
plt.show()


In [None]:

# =============================
# CÉLULA 8 — GRL e AdvModel
# =============================
import torch
from torch import nn
from torch.utils.data import Dataset as TorchDataset, DataLoader

class GRL(torch.autograd.Function):
    @staticmethod
    def forward(ctx, x, lambda_):
        ctx.lambda_ = lambda_
        return x.view_as(x)
    @staticmethod
    def backward(ctx, grad_output):
        return -ctx.lambda_ * grad_output, None

class AdvModel(nn.Module):
    def __init__(self, model_name, num_labels=3):
        super().__init__()
        from transformers import AutoModel
        self.encoder = AutoModel.from_pretrained(model_name)
        hidden = self.encoder.config.hidden_size
        self.classifier = nn.Linear(hidden, num_labels)
        self.adv_head = nn.Linear(hidden, 2)  # gênero binário
    def forward(self, **inputs):
        out = self.encoder(**inputs)
        h = out.last_hidden_state[:,0,:]  # CLS
        logits_cls = self.classifier(h)
        return logits_cls, h
    def adv(self, h, lambda_):
        h_grl = GRL.apply(h, lambda_)
        logits_adv = self.adv_head(h_grl)
        return logits_adv


In [None]:

# =============================
# CÉLULA 9 — Definição de TextDataset (correção)
# =============================
class TextDataset(TorchDataset):
    def __init__(self, df: pd.DataFrame, tokenizer):
        self.texts = df['text'].astype(str).tolist()
        self.labels = df['label_id'].astype(int).tolist()
        # mapeia gênero para {0,1} conforme ordem em SENSITIVE_BIN
        mapping = {SENSITIVE_BIN[0]:0, SENSITIVE_BIN[1]:1}
        self.gender = df[SENSITIVE_COL].map(mapping).astype(int).tolist()
        self.tokenizer = tokenizer
    def __len__(self):
        return len(self.texts)
    def __getitem__(self, idx):
        enc = self.tokenizer(
            self.texts[idx],
            truncation=True,
            padding='max_length',  # CORREÇÃO: garantir padding uniforme para DataLoader
            max_length=512,
            return_tensors='pt'
        )
        item = {
            'input_ids': enc['input_ids'].squeeze(0),
            'attention_mask': enc['attention_mask'].squeeze(0),
            'labels': torch.tensor(self.labels[idx], dtype=torch.long),
            'gender': torch.tensor(self.gender[idx], dtype=torch.long),
        }
        return item


In [None]:

# =============================
# CÉLULA 10 — Treino adversarial (correções aplicadas)
# =============================
if USE_TRANSFORMERS:
    from transformers import AutoTokenizer
    tok2 = AutoTokenizer.from_pretrained(MODEL_NAME)

    tr_ds = TextDataset(train_df[['text','label_id',SENSITIVE_COL]], tok2)
    te_ds = TextDataset(test_df[['text','label_id',SENSITIVE_COL]], tok2)

    tr_dl = DataLoader(tr_ds, batch_size=16, shuffle=True)
    te_dl = DataLoader(te_ds, batch_size=32)

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    adv_model = AdvModel(MODEL_NAME).to(device)
    opt = torch.optim.AdamW(adv_model.parameters(), lr=2e-5)
    ce = nn.CrossEntropyLoss()

    # Treino curto (1 época) só para demonstrar
    adv_model.train()
    for epoch in range(1):
        for batch in tr_dl:
            inputs = {k:batch[k].to(device) for k in ['input_ids','attention_mask']}
            y = batch['labels'].to(device)
            g = batch['gender'].to(device)

            logits_cls, h = adv_model(**inputs)
            loss_cls = ce(logits_cls, y)

            # CORREÇÃO: usar GRL com lambda=1.0 e ponderar loss adversária por ADV_LAMBDA (sem sinal negativo)
            logits_adv = adv_model.adv(h, 1.0)
            loss_adv = ce(logits_adv, g)
            loss = loss_cls + ADV_LAMBDA * loss_adv

            opt.zero_grad(); loss.backward(); opt.step()

    # Avaliação
    adv_model.eval()
    y_true2, y_pred2, groups2 = [], [], []
    with torch.no_grad():
        for batch in te_dl:
            inputs = {k:batch[k].to(device) for k in ['input_ids','attention_mask']}
            logits_cls, _ = adv_model(**inputs)
            pred = torch.argmax(logits_cls, dim=-1).cpu().numpy()
            y_pred2.extend(list(pred))
            y_true2.extend(list(batch['labels'].numpy()))
            groups2.extend(list(batch['gender'].numpy()))

    y_true2 = np.array(y_true2); y_pred2 = np.array(y_pred2); groups2 = np.array(groups2)

    # Verificação de alinhamento
    assert len(test_df) == len(y_true2) == len(y_pred2), "Tamanhos do teste e vetores y_true2/y_pred2 não batem."

    # Monta DF para métricas
    test_eval_adv = test_df.copy()
    test_eval_adv['y_true'] = y_true2
    test_eval_adv['y_pred'] = y_pred2
    test_eval_adv['bin_gender'] = np.where(test_eval_adv[SENSITIVE_COL]=='Feminino','Feminino','Masculino')

    # Calcula disparidades (CORREÇÃO: operadores '&')
    def compute_rates_df(df_in):
        res = {}
        for g in SENSITIVE_BIN:
            dfg = df_in[df_in['bin_gender']==g]
            sel_pred = np.mean(dfg['y_pred'].values==POS_ID)
            yt = (dfg['y_true'].values==POS_ID).astype(int); yp = (dfg['y_pred'].values==POS_ID).astype(int)
            TPR = (np.sum((yp==1) & (yt==1)) / max(np.sum(yt==1), 1))
            FPR = (np.sum((yp==1) & (yt==0)) / max(np.sum(yt==0), 1))
            res[g] = {'selection_rate_pred': sel_pred, 'TPR': TPR, 'FPR': FPR}
        return res

    rates_adv = compute_rates_df(test_eval_adv)
    diffs_adv = {
        'demographic_parity_pred_diff': abs(rates_adv[SENSITIVE_BIN[0]]['selection_rate_pred'] - rates_adv[SENSITIVE_BIN[1]]['selection_rate_pred']),
        'equal_opportunity_diff': abs(rates_adv[SENSITIVE_BIN[0]]['TPR'] - rates_adv[SENSITIVE_BIN[1]]['TPR']),
        'equalized_odds_FPR_diff': abs(rates_adv[SENSITIVE_BIN[0]]['FPR'] - rates_adv[SENSITIVE_BIN[1]]['FPR']),
    }
    rates_adv, diffs_adv
else:
    print('Adversarial requer Transformers; ative USE_TRANSFORMERS=True e instale dependências.')


In [None]:

# =============================
# CÉLULA 11 — Comparação Base vs Adversarial
# =============================
if USE_TRANSFORMERS:
    print('Base:', diffs_base)
    print('Adv :', diffs_adv)
    # Barras de comparação (paridade predita)
    dp_base = diffs_base['demographic_parity_pred_diff']
    eo_base = diffs_base['equal_opportunity_diff']
    eod_base = diffs_base['equalized_odds_FPR_diff']
    dp_adv = diffs_adv['demographic_parity_pred_diff']
    eo_adv = diffs_adv['equal_opportunity_diff']
    eod_adv = diffs_adv['equalized_odds_FPR_diff']
    lbls = ['Parity diff','EO diff (TPR)','EOdds diff (FPR)']
    base_vals = [dp_base, eo_base, eod_base]
    adv_vals = [dp_adv, eo_adv, eod_adv]
    x = np.arange(len(lbls)); w=0.35
    plt.bar(x-w/2, base_vals, width=w, label='Base')
    plt.bar(x+w/2, adv_vals, width=w, label='Adversarial')
    plt.xticks(x, lbls); plt.title('Disparidades por gênero — Base vs. Adversarial')
    plt.legend(); plt.show()
else:
    print('Sem Transformers, comparação adversarial não foi executada.')


In [None]:

# =============================
# CÉLULA 12 — Relatório JSON
# =============================
report = {
  'base': diffs_base,
}
if USE_TRANSFORMERS:
    report['adversarial'] = diffs_adv
with open('fairness_report.json','w',encoding='utf-8') as f:
    pyjson.dump(report, f, ensure_ascii=False, indent=2)
'fairness_report.json'
