In [1]:
from typing import List, Dict, Union
import pandas as pd
import torch

In [2]:

# Read the CSV file
df = pd.read_csv("/home/chudeo/coding-evidence-extraction-main/pos_sentence.csv")

# Extract unique labels from the word_labels column
unique_labels = set()
for labels in df['word_labels']:
    unique_labels.update(labels.split(','))

# Generate label2id and id2label dictionaries
label2id = {label: id for id, label in enumerate(unique_labels)}
id2label = {id: label for label, id in label2id.items()}

# Example usage:
print("label2id:", label2id)
print("id2label:", id2label)

label2id: {'I': 0, 'B': 1, 'O': 2}
id2label: {0: 'I', 1: 'B', 2: 'O'}


In [3]:
def get_tags_with_positions(labels: List[str], id2label: Dict[int, str]) -> List[Dict[str, Union[str, List[int]]]]:
    tags_pos = []
    ind = 0
    while ind < len(labels):
        if id2label[labels[ind]].startswith("B"):
            tag = id2label[labels[ind]].split("-")[1]
            start_pos = ind
            ind += 1
            while ind < len(labels) and id2label[labels[ind]].startswith("I"):
                ind += 1
            end_pos = ind
            tags_pos.append({"tag": tag, "pos": [start_pos, end_pos]})
        else:
            ind += 1
    return tags_pos

In [4]:
def get_mean_vector_from_segment(embeddings: torch.Tensor, start_pos: int, end_pos: int) -> torch.Tensor:
    return embeddings[start_pos:end_pos].mean(dim=0)

In [5]:
def process_csv(file_path: str):
    df = pd.read_csv(file_path)
    for _, row in df.iterrows():
        sentence = row['sentence']
        word_labels = row['word_labels'].split(',')
        pos_tags = row['pos_tags'].split(',')
        # Assuming you have id2label dictionary defined somewhere
        tags_with_positions = get_tags_with_positions(word_labels, id2label)
        # Perform further processing as needed

In [6]:
csv_file_path = "/home/chudeo/coding-evidence-extraction-main/pos_sentence.csv"
process_csv(csv_file_path)

KeyError: 'O'

In [None]:
import os
from typing import List, Tuple

from torch import nn
from transformers import AutoModel

In [None]:
LOG_INF = 10e5


class BertCrf(nn.Module):
    def __init__(
        self,
        num_labels: int,
        bert_name: str,
        dropout: float = 0.2,
        use_crf: bool = True,
    ):
        super().__init__()
        self.num_labels = num_labels
        self.use_crf = use_crf
        self.cross_entropy = nn.CrossEntropyLoss()

        self.bert = AutoModel.from_pretrained(bert_name)

        self.dropout = nn.Dropout(dropout)
        self.hidden2label = nn.Linear(self.bert.config.hidden_size, num_labels)

        self.start_transitions = nn.Parameter(torch.empty(num_labels))
        self.end_transitions = nn.Parameter(torch.empty(num_labels))
        self.transitions = nn.Parameter(torch.empty(num_labels, num_labels))

        nn.init.uniform_(self.start_transitions, -0.1, 0.1)
        nn.init.uniform_(self.end_transitions, -0.1, 0.1)
        nn.init.uniform_(self.transitions, -0.1, 0.1)

    def _compute_log_denominator(self, features: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
        seq_len = features.shape[0]

        log_score_over_all_seq = self.start_transitions + features[0]

        for i in range(1, seq_len):
            next_log_score_over_all_seq = torch.logsumexp(
                log_score_over_all_seq.unsqueeze(2) + self.transitions + features[i].unsqueeze(1),
                dim=1,
            )
            log_score_over_all_seq = torch.where(
                mask[i].unsqueeze(1),
                next_log_score_over_all_seq,
                log_score_over_all_seq,
            )
        log_score_over_all_seq += self.end_transitions
        return torch.logsumexp(log_score_over_all_seq, dim=1)

    def _compute_log_numerator(self, features: torch.Tensor, labels: torch.Tensor, mask: torch.Tensor) -> torch.Tensor:
        seq_len, bs, _ = features.shape

        score_over_seq = self.start_transitions[labels[0]] + features[0, torch.arange(bs), labels[0]]

        for i in range(1, seq_len):
            score_over_seq += (
                self.transitions[labels[i - 1], labels[i]] + features[i, torch.arange(bs), labels[i]]
            ) * mask[i]
        seq_lens = mask.sum(dim=0) - 1
        last_tags = labels[seq_lens.long(), torch.arange(bs)]
        score_over_seq += self.end_transitions[last_tags]
        return score_over_seq

    def get_bert_features(
        self, input_ids: torch.Tensor, attention_mask: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        hidden = self.bert(input_ids, attention_mask=attention_mask)["last_hidden_state"]
        hidden = self.dropout(hidden)
        return self.hidden2label(hidden), hidden

    def forward(
        self,
        input_ids: torch.Tensor,
        attention_mask: torch.Tensor,
        labels: torch.Tensor,
    ) -> torch.Tensor:
        features, _ = self.get_bert_features(input_ids=input_ids, attention_mask=attention_mask)
        attention_mask = attention_mask.bool()

        if self.use_crf:
            features = torch.swapaxes(features, 0, 1)
            attention_mask = torch.swapaxes(attention_mask, 0, 1)
            labels = torch.swapaxes(labels, 0, 1)

            log_numerator = self._compute_log_numerator(features=features, labels=labels, mask=attention_mask)
            log_denominator = self._compute_log_denominator(features=features, mask=attention_mask)

            return torch.mean(log_denominator - log_numerator)
        else:
            return self.cross_entropy(
                features.flatten(end_dim=1),
                torch.where(attention_mask.bool(), labels, -100).flatten(end_dim=1),
            )

    def _viterbi_decode(self, features: torch.Tensor, mask: torch.Tensor) -> List[List[int]]:
        seq_len, bs, _ = features.shape

        log_score_over_all_seq = self.start_transitions + features[0]

        backpointers = torch.empty_like(features)

        for i in range(1, seq_len):
            next_log_score_over_all_seq = (
                log_score_over_all_seq.unsqueeze(2) + self.transitions + features[i].unsqueeze(1)
            )

            next_log_score_over_all_seq, indices = next_log_score_over_all_seq.max(dim=1)

            log_score_over_all_seq = torch.where(
                mask[i].unsqueeze(1),
                next_log_score_over_all_seq,
                log_score_over_all_seq,
            )
            backpointers[i] = indices

        backpointers = backpointers[1:].int()

        log_score_over_all_seq += self.end_transitions
        seq_lens = mask.sum(dim=0) - 1

        best_paths = []
        for seq_ind in range(bs):
            best_label_id = torch.argmax(log_score_over_all_seq[seq_ind]).item()
            best_path = [best_label_id]

            for backpointer in reversed(backpointers[: seq_lens[seq_ind]]):
                best_path.append(backpointer[seq_ind][best_path[-1]].item())

            best_path.reverse()
            best_paths.append(best_path)

        return best_paths

    def decode(self, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> List[List[int]]:
        features, _ = self.get_bert_features(input_ids=input_ids, attention_mask=attention_mask)
        attention_mask = attention_mask.bool()

        if self.use_crf:
            features = torch.swapaxes(features, 0, 1)
            mask = torch.swapaxes(attention_mask, 0, 1)
            return self._viterbi_decode(features=features, mask=mask)
        else:
            labels = torch.argmax(features, dim=2)
            predictions = []
            for i in range(len(labels)):
                predictions.append(labels[i][attention_mask[i]].tolist())
            return predictions

    def save_to(self, path: str):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        torch.save(self.state_dict(), path)

    def load_from(self, path: str):
        self.load_state_dict(torch.load(path))
