In [1]:
import torch
from torch import nn
import numpy as np
# 在BiLSTM+CRF模型中，BiLSTM部分可以使用Pytorch等深度学习框架，CRF部分必须手写完成。
# https://github.com/phipleg/keras/blob/crf/keras/layers/crf.py

class Model(nn.Module):
    def __init__(self, vocab_size, embed_size,embed_matrix, hidden_size, tagset_size, num_layers=1, dropout=0.1):
        super(Model, self).__init__()
        if embed_matrix is not None:
            self.embedding = nn.Embedding.from_pretrained(
                torch.tensor(embed_matrix, dtype=torch.float), freeze=True)
        else:
            self.embedding = nn.Embedding(vocab_size, embed_size)
        self.bilstm = nn.LSTM(embed_size, hidden_size//2,
                              num_layers, dropout=dropout, bidirectional=True)
        # Maps the output of the LSTM into tag space.
        self.hidden2tag = nn.Linear(hidden_size, tagset_size)
        self.crf_transition = nn.Parameter(
            torch.rand((tagset_size, tagset_size)))

    def forward(self, x):
        embed = self.embedding(x)
        lstmout, _ = self.bilstm(embed)
        emissions = self.hidden2tag(lstmout)
        #   seq_length,  batch_size , output_size
        seq_length,  batch_size, output_size = emissions.shape
        score = torch.zeros(emissions.shape)
        path = torch.zeros(emissions.shape)
        for i in batch_size:
            for j in seq_length:
                for k in range(output_size):
                    if j == 0:
                        score[j][i][k] = emissions[j][i][k]
                        path[j][i][k] = k
                    else:
                        max_score = float("-inf")
                        max_prev = 0
                        for l in range(output_size):
                            temp_score = score[j-1][i][l] + \
                                emissions[j][i][k]+self.crf_transition[l][k]
                            if temp_score > max_score:
                                max_score = temp_score
                                max_prev = l
                        score[j][i][k] = max_score
                        path[j][i][k] = max_prev
        return score, path


SyntaxError: expected ':' (3706709772.py, line 18)

In [23]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class ChainCRF(nn.Module):
    def __init__(self, num_classes):
        super(ChainCRF, self).__init__()
        self.num_classes = num_classes
        # 定义转移矩阵 U、起始边界 b_start、结束边界 b_end
        self.U = nn.Parameter(torch.rand(num_classes, num_classes))
        self.b_start = nn.Parameter(torch.zeros(num_classes))
        self.b_end = nn.Parameter(torch.zeros(num_classes))

    def forward(self, emissions, true_tags):
        """
        :param emissions: [batch_size, seq_len, num_classes] - emission scores
        :param true_tags: [batch_size, seq_len] - true tag indices
        :return: loss
        """
        # Calculate negative log-likelihood loss
        path_energy = self.path_energy(emissions, true_tags)
        free_energy = self.free_energy(emissions)
        loss = torch.mean(path_energy - free_energy)

        return loss

    def path_energy(self, emissions, true_tags):
        """
        : return [batch_size]
        """
        true_tags_mask = torch.one_hot(true_tags, self.num_classes)
        energy = true_tags_mask*emissions
        energy = torch.sum(energy, dim=2)
        prev = true_tags[:, :-1]
        next = true_tags[:, 1:]
        transitions = prev*self.num_classes+next
        U_flat = self.U.reshape((-1))
        U_y_t_em = U_flat[transitions]
        energy += torch.sum(U_y_t_em, dim=1)
        return energy

    def free_energy(self, emissions):
        """Compute the free energy using the forward algorithm.
        :param emissions: [batch_size, seq_len, num_classes] - emission scores
        :return: [batch_size] - free energy for each sequence in the batch
        """
        batch_size, seq_len, _ = emissions.size()

        # Initialize the alpha values with the emission scores at the first position
        alpha = emissions[:, 0, :]

        for t in range(1, seq_len):
            # Calculate the energy of each possible transition
            # alpha batch_size,num_classes,1
            # U     1         ,num_classes,num_classes
            transition_energy = alpha.unsqueeze(2) + self.U.unsqueeze(0)

            # Sum over the previous state (dim=1) to get the new alpha values
            alpha = torch.logsumexp(
                transition_energy, dim=1) + emissions[:, t, :]

        # Add the boundary energies
        alpha += self.b_end.unsqueeze(0)

        # Sum over the last state (dim=1) to get the free energy
        free_energy = -torch.logsumexp(alpha, dim=1)
        return free_energy

    def viterbi_decode(self, emission):
        """Decode the highest scoring sequence of tags using the Viterbi algorithm.
        :param emission: [batch_size, seq_len, num_classes] - emission scores
        :return: [batch_size, seq_len] - the tag indices of the highest scoring sequence
        """
        batch_size, seq_len, num_classes = emission.size()

        score = torch.zeros(emission.shape)
        path = torch.zeros(emission.shape, dtype=torch.long)

        for i in range(batch_size):
            for j in range(seq_len):
                for k in range(num_classes):
                    if j == 0:
                        score[i, j, k] = emission[i, j, k]
                        path[i, j, k] = k
                    else:
                        max_score = float("-inf")
                        max_prev = 0
                        for l in range(num_classes):
                            temp_score = score[i, j-1, l] + \
                                emission[i, j, k] + self.U[l, k]
                            if temp_score > max_score:
                                max_score = temp_score
                                max_prev = l
                        score[i, j, k] = max_score
                        path[i, j, k] = max_prev

        # Backtrack to find the best path
        best_path = torch.zeros((batch_size, seq_len), dtype=torch.long)
        _, best_last_tag = torch.max(score[:, -1, :], dim=1)
        best_path[:, -1] = best_last_tag

        for j in range(seq_len - 2, -1, -1):
            for i in range(batch_size):
                best_path[i, j] = path[i, j + 1, best_path[i, j + 1]]

        return best_path


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class BiLSTM_CRF(nn.Module):
    def __init__(self, num_classes, embedding_dim, hidden_dim, vocab_size):
        super(BiLSTM_CRF, self).__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.vocab_size = vocab_size
        self.num_classes = num_classes

        # 词嵌入层
        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)

        # BiLSTM层
        self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
                            num_layers=1, bidirectional=True, batch_first=True)

        # 线性映射到标签空间
        self.hidden2tag = nn.Linear(hidden_dim, num_classes)

        # 定义CRF层
        self.crf = ChainCRF(num_classes)

    def forward(self, sentence, targets=None):
        # 获取词嵌入
        embeds = self.word_embeddings(sentence)

        # BiLSTM层
        lstm_out, _ = self.lstm(embeds)

        # 线性映射到标签空间
        emissions = self.hidden2tag(lstm_out)

        if targets is not None:
            # 计算CRF损失
            # mask = (sentence != 0)  # 使用 0 填充的词的位置作为掩码
            crf_loss = self.crf(emissions, targets)
            return crf_loss
        else:
            # 测试时，使用维特比解码
            tags = self.crf.viterbi_decode(emissions)
            return tags


# 示例用法
num_classes = 5
embedding_dim = 50
hidden_dim = 50
vocab_size = 10000
model = BiLSTM_CRF(num_classes, embedding_dim, hidden_dim, vocab_size)

# 示例输入
sentence = torch.randint(1, vocab_size, (2, 10))  # 2个句子，每个句子有10个词

# 训练
targets = torch.tensor([[1, 2, 3, 4, 0, 0, 0, 0, 0, 0],
                       [3, 1, 4, 0, 0, 0, 0, 0, 0, 0]])
loss = model(sentence, targets)
print("训练损失:", loss.item())

# 测试
tags = model(sentence)
print("预测标签:", tags)


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ChainCRF(nn.Module):
    def __init__(self, num_classes):
        super(ChainCRF, self).__init__()
        self.U = nn.Parameter(torch.rand(num_classes, num_classes))
        self.b_start = nn.Parameter(torch.zeros(num_classes))
        self.b_end = nn.Parameter(torch.zeros(num_classes))

    def path_energy(self, y, x, mask=None):
        x = self.add_boundary_energy(x, mask)
        return self.path_energy0(y, x, mask)

    def path_energy0(self, y, x, mask=None):
        n_classes = x.size(2)
        y_one_hot = F.one_hot(y, n_classes).float()

        # Tag path energy
        energy = torch.sum(x * y_one_hot, dim=2)
        energy = torch.sum(energy, dim=1)

        # Transition energy
        y_t = y[:, :-1]
        y_tp1 = y[:, 1:]
        U_flat = self.U.view(-1)
        flat_indices = y_t * n_classes + y_tp1
        U_y_t_tp1 = U_flat[flat_indices]

        if mask is not None:
            mask = mask.float()
            y_t_mask = mask[:, :-1]
            y_tp1_mask = mask[:, 1:]
            U_y_t_tp1 *= y_t_mask * y_tp1_mask

        energy += torch.sum(U_y_t_tp1, dim=1)

        return energy

    def sparse_chain_crf_loss(self, y, x, mask=None):
        x = self.add_boundary_energy(x, mask)
        energy = self.path_energy0(y, x, mask)
        energy -= self.free_energy0(x, mask)
        return -energy.unsqueeze(-1)

    def add_boundary_energy(self, x, mask=None):
        if mask is None:
            x = torch.cat([x[:, :1, :] + self.b_start, x[:, 1:, :]], dim=1)
            x = torch.cat([x[:, :-1, :], x[:, -1:, :] + self.b_end], dim=1)
        else:
            mask = mask.float()
            x *= mask
            start_mask = torch.cat([torch.zeros_like(mask[:, :1]), mask[:, :-1]], dim=1)
            start_mask = (mask > start_mask).float()
            x = x + start_mask.unsqueeze(-1) * self.b_start
            end_mask = torch.cat([mask[:, 1:], torch.zeros_like(mask[:, -1:])], dim=1)
            end_mask = (mask > end_mask).float()
            x = x + end_mask.unsqueeze(-1) * self.b_end
        return x

    def viterbi_decode(self, x, mask=None):
        x = self.add_boundary_energy(x, mask)

        alpha_0 = x[:, 0, :]
        gamma_0 = torch.zeros_like(alpha_0)
        initial_states = [gamma_0, alpha_0]
        _, gamma = self._forward(x, initial_states, mask)
        y = self._backward(gamma, mask)
        return y

    def free_energy(self, x, mask=None):
        x = self.add_boundary_energy(x, mask)
        return self.free_energy0(x, mask)

    def free_energy0(self, x, mask=None):
        initial_states = [x[:, 0, :]]
        last_alpha, _ = self._forward(x, initial_states, mask)
        return last_alpha[:, 0]

    def _forward(self, x, states, mask=None):
        def _forward_step(energy_matrix_t, states):
            alpha_tm1 = states[-1]
            new_states = [torch.logsumexp(alpha_tm1.unsqueeze(2) + energy_matrix_t, dim=1)]
            return new_states[0], new_states

        U_shared = self.U.unsqueeze(0).unsqueeze(0)

        if mask is not None:
            mask = mask.float()
            mask_U = (mask[:, :-1] * mask[:, 1:]).unsqueeze(2).unsqueeze(3)
            U_shared = U_shared * mask_U

        inputs = (x[:, 1:, :] + U_shared).contiguous()
        inputs = torch.cat([inputs, torch.zeros_like(inputs[:, -1:, :, :])], dim=1)

        last, values = self._rnn(_forward_step, inputs, states)
        return last, values

    def _rnn(self, fn, inputs, initial_states):
        def step(input, states):
            return fn(input, states)

        return torch.scan(step, inputs, initial_states)

    def _backward(self, gamma, mask):
        gamma = gamma.int()

        def _backward_step(gamma_t, states):
            y_tm1 = states[0].squeeze(0)
            y_t = gamma_t.gather(1, y_tm1.unsqueeze(1))
            return y_t, [y_t.unsqueeze(0)]

        initial_states = [torch.zeros_like(gamma[:, 0, 0]).unsqueeze(0)]
        _, y_rev = self._rnn(_backward_step, gamma, initial_states)
        y = torch.flip(y_rev, [1])

        if mask is not None:
            mask = mask.int()
            y *= mask
            y += -(1 - mask)
        return y

    def forward(self, x):
        # During training, return x; during testing, return viterbi decoding
        return F.in_train_mode(x, self.viterbi_decode(x))

# Example Usage
num_classes = 5
crf = ChainCRF(num_classes)
x = torch.rand(2, 4, num_classes)  # Batch size of 2, sequence length of 4
y_true = torch.tensor([[1, 2, 3, 4], [0, 2, 1, 3]])  # Example true tag sequences

# Training
loss = crf.sparse_chain_crf_loss(y_true, x)
print("Training Loss:", loss.item())

# Testing
y_pred = crf(x)
print("Predicted Tags:", y_pred.numpy())
