In [1]:
!pip install adapters

Collecting adapters
  Downloading adapters-1.1.0-py3-none-any.whl.metadata (16 kB)
Collecting transformers~=4.47.1 (from adapters)
  Downloading transformers-4.47.1-py3-none-any.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.1/44.1 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
Downloading adapters-1.1.0-py3-none-any.whl (293 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m293.4/293.4 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading transformers-4.47.1-py3-none-any.whl (10.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m31.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: transformers, adapters
  Attempting uninstall: transformers
    Found existing installation: transformers 4.48.3
    Uninstalling transformers-4.48.3:
      Successfully uninstalled transformers-4.48.3
Successfully installed adapters-1.1.0 transformers-4.47.1


In [2]:
# should ideally be using spaCy

import re
def simple_sent_tokenize(text):
    # Split on `.`, `?`, `!` followed by space or end of string
    sentences = re.split(r'(?<=[.!?])\s+', text)
    return [s.strip() for s in sentences if s.strip()]

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel, AdamW
import random
import json

tokenizer = BertTokenizer.from_pretrained('dccuchile/bert-base-spanish-wwm-uncased')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Make sure there are at least 2 sentences per paragraph
class ContrastiveSentenceDataset(Dataset):
    def __init__(self, json_data, n_negatives):
        self.data = []
        for source, paragraphs in json_data.items():
            all_sentences = []
            for para in paragraphs:
                sentences = [s.strip() for s in simple_sent_tokenize(para) if s.strip()]
                all_sentences.extend(sentences)

            for para in paragraphs:
                sentences = [s.strip() for s in simple_sent_tokenize(para) if s.strip()]
                for i in range(len(sentences) - 1):
                    anchor = sentences[i]
                    positive = sentences[i + 1]
                    negatives = random.sample(
                        [s for s in all_sentences if s not in (positive, anchor)],
                        min(n_negatives, len(all_sentences) - 2)
                    )
                    self.data.append((anchor, positive, negatives))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        anchor, positive, negatives = self.data[idx]
        return anchor, positive, negatives

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/310 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/248k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/134 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/486k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/650 [00:00<?, ?B/s]

In [4]:
import torch
import torch.nn as nn
from transformers import BertModel, BertTokenizer, BertConfig
from peft import get_peft_model, LoraConfig, TaskType

class SentenceBERTContrastive(nn.Module):
    def __init__(self):
        super().__init__()
        base_model = BertModel.from_pretrained('dccuchile/bert-base-spanish-wwm-uncased')

        # Freeze all base model params
        for param in base_model.parameters():
            param.requires_grad = False

        lora_config = LoraConfig(
            task_type=TaskType.FEATURE_EXTRACTION,
            r=8,
            lora_alpha=16,
            lora_dropout=0.1,
            target_modules=["query", "value"]
        )
        self.bert = get_peft_model(base_model, lora_config)

        self.projection = nn.Linear(self.bert.config.hidden_size, 128)  # still project to embedding space

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        cls_embeddings = outputs.last_hidden_state[:, 0, :]
        return self.projection(cls_embeddings)

In [5]:
def contrastive_loss(anchor_emb, positive_emb, negative_embs, temperature=0.07):
    batch_size = anchor_emb.size(0)
    criterion = nn.CrossEntropyLoss()

    # Normalize embeddings
    anchor_emb = nn.functional.normalize(anchor_emb, dim=1)
    positive_emb = nn.functional.normalize(positive_emb, dim=1)
    negative_embs = nn.functional.normalize(negative_embs, dim=2)

    pos_sim = torch.bmm(anchor_emb.unsqueeze(1), positive_emb.unsqueeze(2)).squeeze(-1)  # [batch_size, 1]
    neg_sim = torch.bmm(negative_embs, anchor_emb.unsqueeze(2)).squeeze(-1)  # [batch_size, n_neg]

    logits = torch.cat([pos_sim, neg_sim], dim=1) / temperature  # [batch_size, 1 + n_neg]
    labels = torch.zeros(batch_size, dtype=torch.long, device=anchor_emb.device)  # correct idx is 0
    return criterion(logits, labels)

In [6]:
N_NEGATIVES = 3
batch_size=8

In [8]:
import tqdm

def train_contrastive_model(train_loader, val_loader, batch_size=8, epochs=3, lr=2e-5, val_ratio=0.1):
    tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
    model = SentenceBERTContrastive().to(device)
    optimizer = AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=lr)

    for epoch in range(epochs):
        model.train()
        total_loss = 0.0
        progress = tqdm.tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} [train]")

        for anchor_texts, positive_texts, negatives_list in progress:
            all_texts = list(anchor_texts) + list(positive_texts) + [sent for negs in negatives_list for sent in negs]
            encodings = tokenizer(all_texts, return_tensors='pt', padding=True, truncation=True)
            encodings = {k: v.to(device) for k, v in encodings.items()}

            embeddings = model(encodings['input_ids'], encodings['attention_mask'])
            batch_size_actual = len(anchor_texts)
            anchor_emb = embeddings[:batch_size_actual]
            positive_emb = embeddings[batch_size_actual:2*batch_size_actual]
            negative_embs = embeddings[2*batch_size_actual:].view(batch_size_actual, N_NEGATIVES, -1)

            loss = contrastive_loss(anchor_emb, positive_emb, negative_embs)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            progress.set_postfix(loss=loss.item())

        avg_loss = total_loss / len(train_loader)
        print(f"Epoch {epoch+1} finished. Train loss: {avg_loss:.4f}")

        # Validation loop
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for anchor_texts, positive_texts, negatives_list in tqdm.tqdm(val_loader, desc=f"Epoch {epoch+1} [val]"):
                all_texts = list(anchor_texts) + list(positive_texts) + [sent for negs in negatives_list for sent in negs]
                encodings = tokenizer(all_texts, return_tensors='pt', padding=True, truncation=True)
                encodings = {k: v.to(device) for k, v in encodings.items()}

                embeddings = model(encodings['input_ids'], encodings['attention_mask'])
                batch_size_actual = len(anchor_texts)
                anchor_emb = embeddings[:batch_size_actual]
                positive_emb = embeddings[batch_size_actual:2*batch_size_actual]
                negative_embs = embeddings[2*batch_size_actual:].view(batch_size_actual, N_NEGATIVES, -1)

                val_loss += contrastive_loss(anchor_emb, positive_emb, negative_embs).item()

        avg_val_loss = val_loss / len(val_loader)
        print(f"Epoch {epoch+1} finished. Validation loss: {avg_val_loss:.4f}")

        torch.save(model.state_dict(), f"contrastive_beto_epoch{epoch+1}.pt")

    return model

In [9]:
sample_threshold = 1000
random.seed(42)

with open('chilean_text.json', 'r') as f:
    chilean_text_data = json.load(f)

filtered_data = {}
for key in chilean_text_data:
    # Filter paragraphs with at least 3 sentences
    filtered_paragraphs = [
        para for para in chilean_text_data[key] if len(simple_sent_tokenize(para)) >= 3
    ]

    sample_max = 5000
    if len(filtered_paragraphs) > sample_threshold:
        sampled_paragraphs = random.sample(filtered_paragraphs, min(max(sample_threshold,len(filtered_paragraphs)), sample_max))
        filtered_data[key] = sampled_paragraphs

In [10]:
for key in filtered_data:
  print (key, len(filtered_data[key]))

https://www.fotech.cl/ 2127
https://www.df.cl/ 5000
https://www.wwf.cl/ 3974
https://www.elmostrador.cl/ 4947
https://www.biobiochile.cl/ 1922
https://fundacionsol.cl/ 4609
https://www.portalnet.cl/ 2483


In [11]:
# To prevent peaking
train_srcs = {'https://www.fotech.cl/', 'https://www.df.cl/', 'https://www.df.cl/', 'https://www.wwf.cl/', 'https://www.portalnet.cl/'}
eval_srcs = {'https://fundacionsol.cl/'}
test_srcs = {'https://www.biobiochile.cl/'}

In [None]:
# Subset samplers
train_data = {k: v for k, v in filtered_data.items() if k in train_srcs}
train_subset = ContrastiveSentenceDataset(train_data, n_negatives=N_NEGATIVES)

val_data = {k: v for k, v in filtered_data.items() if k in eval_srcs}
val_subset = ContrastiveSentenceDataset(val_data, n_negatives=N_NEGATIVES)

test_data = {k: v for k, v in filtered_data.items() if k in test_srcs}
test_subset = ContrastiveSentenceDataset(test_data, n_negatives=1)

# DataLoaders
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_subset, batch_size=batch_size, shuffle=False)

In [None]:
print(f"Train loader length (batches): {len(train_loader)}")
print(f"Validation loader length (batches): {len(val_loader)}")
print(f"Validation loader length (batches): {len(test_loader)}")

In [None]:
trained_model = train_contrastive_model(train_loader, val_loader, batch_size=8, epochs=3, lr=2e-5)

In [None]:
def test_contrastive_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    all_results = []

    with torch.no_grad():
        progress = tqdm(test_loader, desc="Testing Accuracy")
        for anchor_texts, positive_texts, negatives_list in progress:
            # Flatten all sentences
            all_texts = list(anchor_texts) + list(positive_texts) + [neg[0] for neg in negatives_list]
            encodings = tokenizer(all_texts, return_tensors='pt', padding=True, truncation=True)
            encodings = {k: v.to(device) for k, v in encodings.items()}

            embeddings = model(encodings['input_ids'], encodings['attention_mask'])  # [total_sentences, emb_dim]
            batch_size = len(anchor_texts)

            # Extract embeddings
            anchor_emb = embeddings[:batch_size]
            positive_emb = embeddings[batch_size:2 * batch_size]
            negative_emb = embeddings[2 * batch_size:]

            # Normalize for cosine similarity
            anchor_emb = nn.functional.normalize(anchor_emb, dim=1)
            positive_emb = nn.functional.normalize(positive_emb, dim=1)
            negative_emb = nn.functional.normalize(negative_emb, dim=1)

            # Positive pair scoring
            pos_sim = torch.sum(anchor_emb * positive_emb, dim=1)  # [batch_size]
            pos_preds = (pos_sim > 0).int()  # treat sim > 0 as positive label prediction

            # Negative pair scoring
            neg_sim = torch.sum(anchor_emb * negative_emb, dim=1)  # [batch_size]
            neg_preds = (neg_sim > 0).int()  # treat sim > 0 as positive (but we want it to be 0!)

            # Accuracy update
            correct_batch = pos_preds.sum().item() + (1 - neg_preds).sum().item()
            correct += correct_batch
            total += 2 * batch_size  # each anchor has 2 evals: one pos, one neg

            # Save pairwise results
            for i in range(batch_size):
                all_results.append({
                    "anchor": anchor_texts[i],
                    "candidate": positive_texts[i],
                    "label": 1,
                    "prediction": int(pos_preds[i].item())
                })
                all_results.append({
                    "anchor": anchor_texts[i],
                    "candidate": negatives_list[i][0],
                    "label": 0,
                    "prediction": int(neg_preds[i].item())
                })

            progress.set_postfix(batch_acc=correct_batch / (2 * batch_size))

    accuracy = correct / total
    print(f"Test Accuracy: {accuracy * 100:.2f}%")

    # Save results as JSON
    with open("contrastive_model_pairwise_predictions.json", "w") as f:
        json.dump(all_results, f, indent=2, ensure_ascii=False)
    print(f"Saved {len(all_results)} predictions to contrastive_model_pairwise_predictions.json")

In [None]:
test_contrastive_model(trained_model, test_loader)