In [1]:
import numpy as np
import torch
import torch.nn as nn
from collections import defaultdict, Counter
from itertools import chain
from math import log
from scipy.optimize import fmin_l_bfgs_b

# Constants for the beginning of a sentence token and its index
BOS_TOKEN, BOS_IDX = '<bos>', 0

# Global variables for logging and gradients
ITER_NUM = SUB_ITER_NUM = 0
GRAD = None

def read_corpus(filename):
    """
    Reads a corpus from a file and returns the data as a list of (X, Y) pairs.
    Each X is a list of tokens, and each Y is a list of labels.
    """
    data, X, Y = [], [], []

    # Read each line in the file
    for words in [line.strip().split() for line in open(filename)]:
        if words:
            X.append(words[:-1])  # Tokens except the last one
            Y.append(words[-1])   # The last token is the label
        elif X:
            data.append((X, Y))
            X, Y = [], []
    if X:
        data.append((X, Y))

    return data

def extract_text_features(text_tokens, index):
    """
    Extracts features from the text tokens at a given index.
    Returns a list of feature strings.
    """
    features = []

    # Extract features from positions relative to the current index (-2 to +2)
    for i in range(-2, 3):
        idx = index + i
        if 0 <= idx < len(text_tokens):
            token = text_tokens[idx][0]
            features.append(f'unigram[{i}]:' + token)
            if token.isupper():
                features.append(f'is_upper[{i}]')
            if token.isdigit():
                features.append(f'is_digit[{i}]')

    return features

class FeatureSet():
    """
    Class to manage features and their mappings to feature IDs.
    """
    def __init__(self):
        self.feature_dict = dict()
        self.empirical_counts = Counter()
        self.num_features = 0
        self.label2index = {BOS_TOKEN: BOS_IDX}
        self.label_array = [BOS_TOKEN]

    def process_corpus(self, data):
        """
        Processes the corpus to build the feature set and label mappings.
        """
        for X, Y in data:
            prev_y = BOS_IDX  # Start with the beginning of sentence index
            for i in range(len(X)):
                y = self.label2index.get(Y[i], len(self.label2index))
                if Y[i] not in self.label2index:
                    self.label2index[Y[i]] = y
                    self.label_array.append(Y[i])
                self._add(prev_y, y, X, i)
                prev_y = y

    def __len__(self):
        return self.num_features

    def get_labels(self):
        """
        Returns the label mappings.
        """
        return self.label2index, self.label_array

    def _add(self, prev_y, y, X, i):
        """
        Adds features to the feature set for a given position in the sentence.
        """
        for feature_string in extract_text_features(X, i):
            if feature_string not in self.feature_dict:
                self.feature_dict[feature_string] = {}

            for pair in [(prev_y, y), (-1, y)]:
                if pair not in self.feature_dict[feature_string]:
                    self.feature_dict[feature_string][pair] = self.num_features
                    self.num_features += 1

                feature_id = self.feature_dict[feature_string][pair]
                self.empirical_counts[feature_id] += 1

    def get_empirical_counts_numpy(self):
        """
        Returns the empirical feature counts as a NumPy array.
        """
        return np.array([self.empirical_counts.get(feature_id, 0) for feature_id in range(self.num_features)])

    def get_empirical_counts_torch(self):
        """
        Returns the empirical feature counts as a PyTorch tensor.
        """
        counts = torch.zeros(self.num_features)
        for feature_id, count in self.empirical_counts.items():
            counts[feature_id] = count
        return counts

    def get_feature_list(self, X, i):
        """
        Returns the list of features and their IDs for a given position.
        """
        feature_list_dict = defaultdict(set)
        for feature_string in extract_text_features(X, i):
            for (prev_y, y), feature_id in self.feature_dict[feature_string].items():
                feature_list_dict[(prev_y, y)].add(feature_id)
        return list(feature_list_dict.items())

    def calc_inner_products_numpy(self, params, X, i):
        """
        Calculates inner products for features at position i using NumPy arrays.
        """
        inner_products = defaultdict(float)
        features = chain.from_iterable(self.feature_dict.get(feature_string, {}).items() for feature_string in extract_text_features(X, i))

        for (prev_y, y), feature_id in features:
            inner_products[(prev_y, y)] += params[feature_id]

        return [((prev_y, y), score) for (prev_y, y), score in inner_products.items()]

    def calc_inner_products_torch(self, params, X, i):
        """
        Calculates inner products for features at position i using PyTorch tensors.
        """
        inner_products = defaultdict(float)
        features = chain.from_iterable(
            self.feature_dict.get(feature_string, {}).items()
            for feature_string in extract_text_features(X, i)
        )

        for (prev_y, y), feature_id in features:
            inner_products[(prev_y, y)] += params[feature_id]
        return [((prev_y, y), score) for (prev_y, y), score in inner_products.items()]

In [2]:
class LinearChainCRFNumpy():
    """
    Linear Chain CRF implemented using NumPy.
    """
    def __init__(self):
        self.training_data = None
        self.feature_set = None
        self.label2index = None
        self.label_array = None
        self.num_classes = None
        self.params = None
        self.squared_sigma = 10.0  # Regularization parameter

    def get_training_feature_data(self):
        """
        Prepares the training data by extracting features.
        """
        return [[self.feature_set.get_feature_list(X, i) for i in range(len(X))]
                for X, _ in self.training_data]

    def estimate_parameters(self):
        """
        Estimates the model parameters using L-BFGS optimization.
        """
        def _callback(params):
            global ITER_NUM, SUB_ITER_NUM
            ITER_NUM += 1
            SUB_ITER_NUM = 0

        self.params, _, _ = fmin_l_bfgs_b(
            func=self.negative_log_likelihood,
            fprime=self.gradient,
            x0=np.zeros(len(self.feature_set)),
            args=(
                self.feature_set,
                self.get_training_feature_data(),
                self.feature_set.get_empirical_counts_numpy(),
                self.label2index,
                self.squared_sigma
            ),
            maxiter=20,
            callback=_callback
        )

    def calc_transition_matrices(self, params, num_classes, feature_set, X, inference=True):
        """
        Calculates the transition matrices M for each position in the sequence.
        """
        M = [np.zeros((num_classes, num_classes)) for _ in range(len(X))]
        
        for i, xi in enumerate(X):
            # Get the feature scores for the current position
            feature_pairs = feature_set.calc_inner_products_numpy(params, X, i) if inference else xi
            for (prev_y, y), feature_scores in feature_pairs:
                # Sum the feature scores for each (prev_y, y) pair
                score = sum(params[fid] for fid in feature_scores) if not inference else feature_scores
                indices = prev_y if prev_y != -1 else slice(None)
                M[i][indices, y] += score

            # Exponentiate the scores to get probabilities
            M[i] = np.exp(M[i])
            # Apply transition constraints
            if i == 0:
                M[i][BOS_IDX + 1:, :] = 0
            else:
                M[i][:, BOS_IDX] = 0
                M[i][BOS_IDX, :] = 0
        
        return M

    def forward_backward(self, num_classes, seq_length, M):
        """
        Performs the forward-backward algorithm to compute alpha and beta.
        """
        alpha = np.zeros((seq_length, num_classes))
        beta = np.zeros((seq_length, num_classes))
        
        alpha[0, :] = M[0][BOS_IDX, :]  # Initial alpha values
        for i in range(1, seq_length):
            alpha[i] = np.dot(alpha[i - 1], M[i])

        beta[-1, :] = 1.0  # Initial beta values
        for i in range(seq_length - 2, -1, -1):
            beta[i] = np.dot(M[i + 1], beta[i + 1])

        return alpha, beta

    def negative_log_likelihood(self, params, feature_set, sequences, empirical_counts, label2index, squared_sigma):
        """
        Computes the negative log-likelihood for optimization.
        """
        expected_counts = np.zeros(len(feature_set))
        log_Z = 0
        
        for X in sequences:
            M = self.calc_transition_matrices(params, len(label2index), feature_set, X, inference=False)
            alpha, beta = self.forward_backward(len(label2index), len(X), M)
            Z = alpha[-1].sum()
            log_Z += log(Z)

            for i, xi in enumerate(X):
                for (prev_y, y), feature_ids in xi:
                    # Compute marginal probabilities
                    if prev_y == -1:
                        prob = alpha[i, y] * beta[i, y]
                    elif i == 0 and prev_y == BOS_IDX:
                        prob = M[i][BOS_IDX, y] * beta[i, y]
                    else:
                        prob = alpha[i - 1, prev_y] * M[i][prev_y, y] * beta[i, y]
                    for fid in feature_ids:
                        expected_counts[fid] += prob / Z

        # Compute the regularized negative log-likelihood
        log_likelihood = np.dot(empirical_counts, params) - log_Z - np.sum(params ** 2) / (2 * squared_sigma)

        # Store the gradient for use in optimization
        global GRAD, ITER_NUM, SUB_ITER_NUM
        GRAD = empirical_counts - expected_counts - params / squared_sigma
        print(f"{ITER_NUM:03d} {(f'({SUB_ITER_NUM:02d})' if SUB_ITER_NUM else '')}: {-log_likelihood:.4f}")
        SUB_ITER_NUM += 1

        return -log_likelihood

    def gradient(self, params, *args):
        """
        Returns the gradient for optimization.
        """
        return -GRAD

    def train(self, train_corpus_filename):
        """
        Trains the model using the training corpus.
        """
        global ITER_NUM, SUB_ITER_NUM
        ITER_NUM = SUB_ITER_NUM = 0
        self.training_data = read_corpus(train_corpus_filename)
        self.feature_set = FeatureSet()
        self.feature_set.process_corpus(self.training_data)
        self.label2index, self.label_array = self.feature_set.get_labels()
        self.index2label = {idx: label for label, idx in self.label2index.items()}
        self.num_classes = len(self.label_array)
        self.estimate_parameters()

    def test(self, test_corpus_filename):
        """
        Tests the model using the test corpus and reports accuracy.
        """
        test_data = read_corpus(test_corpus_filename)
        total_count, correct_count = 0, 0
        for X, Y in test_data:
            total_count += len(Y)
            correct_count += sum(y == yp for y, yp in zip(Y, self.inference(X)))
        print(f'LinearChainCRFNumpy Accuracy: {100 * correct_count / total_count:.2f}%')

    def inference(self, X):
        """
        Performs inference (Viterbi decoding) on a given sequence X.
        """
        return self.viterbi(X, self.calc_transition_matrices(self.params, self.num_classes, self.feature_set, X, inference=True))

    def viterbi(self, X, M):
        """
        Viterbi decoding algorithm to find the most probable sequence of labels.
        """
        seq_len = len(X)
        V = np.zeros((seq_len, self.num_classes))
        backptr = np.zeros((seq_len, self.num_classes), dtype='int64')
        V[0, :] = M[0][BOS_IDX, :]  # Initialization

        for i in range(1, seq_len):
            for y in range(self.num_classes):
                probabilities = V[i - 1] * M[i][:, y]
                backptr[i, y] = best_prev_y = np.argmax(probabilities)
                V[i, y] = probabilities[best_prev_y]

        last_y = np.argmax(V[-1])
        Y_pred = [last_y]
        
        for i in range(seq_len - 1, 0, -1):
            last_y = backptr[i, last_y]
            Y_pred.append(last_y)

        return [self.index2label[y] for y in reversed(Y_pred)]

In [19]:
class LinearChainCRFPytorch(nn.Module):
    """
    Linear Chain CRF implemented using PyTorch.
    """
    def __init__(self):
        super(LinearChainCRFPytorch, self).__init__()
        self.squared_sigma = 10.0  # Regularization parameter

    def get_training_feature_data(self):
        """
        Prepares the training data by extracting features.
        """
        return [
            [self.feature_set.get_feature_list(X, i) for i in range(len(X))]
            for X, _ in self.training_data
        ]

    def estimate_parameters(self):
        """
        Estimates the model parameters using PyTorch's LBFGS optimizer.
        """
        empirical_counts = self.feature_set.get_empirical_counts_torch()
        sequences = self.get_training_feature_data()

        self.params = nn.Parameter(torch.zeros(len(self.feature_set)))

        optimizer = torch.optim.LBFGS([self.params], max_iter=20)

        def closure():
            optimizer.zero_grad()
            loss = self.negative_log_likelihood(self.params, sequences, empirical_counts)
            loss.backward()
            return loss

        optimizer.step(closure)

    def calc_transition_matrices(self, params, num_classes, feature_set, X, inference=True):
        """
        Calculates the transition matrices M for each position in the sequence.
        """
        M = []
        
        ### BEGIN SOLUTION        
        for i in range(len(X)):
            # Initialize the transition matrix with -inf (log-space equivalent of 0 probability)
            M_i = torch.full((num_classes, num_classes), float('-inf'))
            
            # Compute feature pairs for the current position
            feature_pairs = feature_set.calc_inner_products_torch(params, X, i) if inference else X[i]
            
            for (prev_y, y), scores in feature_pairs:
                # Ensure scores are scalar (aggregate if needed)
                if isinstance(scores, set):
                    scores = sum(scores)  # Aggregate set into a single scalar (can also use mean or max)
                elif not isinstance(scores, (float, torch.Tensor)):
                    raise ValueError(f"Unexpected score type: {type(scores)} in feature_pairs: {feature_pairs}")
                
                if prev_y == -1:  # Unary features
                    M_i[:, y] = torch.where(M_i[:, y] == float('-inf'), scores, M_i[:, y] + scores)
                else:  # Pairwise features
                    M_i[prev_y, y] = torch.where(M_i[prev_y, y] == float('-inf'), scores, M_i[prev_y, y] + scores)
            
            # Handle special BOS constraints
            if i == 0:
                M_i[BOS_IDX + 1:, :] = float('-inf')  # Disallow transitions from non-BOS states at the start
            else:
                M_i[:, BOS_IDX] = float('-inf')  # Disallow transitions to BOS
                M_i[BOS_IDX, :] = float('-inf')  # Disallow transitions from BOS

            M.append(M_i)
        ### END SOLUTION
        
        return M

    def forward_backward(self, num_classes, seq_length, M):
        """
        Performs the forward-backward algorithm in log-space using PyTorch tensors.
        """
        alpha = []
        beta = []
        
        ### BEGIN SOLUTION
        alpha = torch.full((seq_length, num_classes), float('-inf'))
        beta = torch.full((seq_length, num_classes), float('-inf'))

        # Forward pass
        alpha[0, :] = M[0][BOS_IDX, :]
        for i in range(1, seq_length):
            alpha[i, :] = torch.logsumexp(alpha[i - 1].unsqueeze(1) + M[i], dim=0)

        # Backward pass
        beta[-1, :] = 0  # Log(1) = 0
        for i in range(seq_length - 2, -1, -1):
            beta[i, :] = torch.logsumexp(M[i + 1] + beta[i + 1].unsqueeze(0), dim=1)
        ### END SOLUTION
        
        return alpha, beta

    def negative_log_likelihood(self, params, sequences, empirical_counts):
        """
        Computes the negative log-likelihood for optimization.
        """
        expected_counts = torch.zeros_like(params)
        total_log_Z = 0.0
        
        ### BEGIN SOLUTION
        for X in sequences:
            M = self.calc_transition_matrices(params, self.num_classes, self.feature_set, X, inference=False)
            alpha, beta = self.forward_backward(self.num_classes, len(X), M)
            Z = torch.logsumexp(alpha[-1], dim=0)
            total_log_Z += Z

            # Compute expected counts using vectorized operations
            for i, feature_pairs in enumerate(X):
                alpha_broadcast = alpha[i - 1].unsqueeze(1) if i > 0 else torch.zeros(self.num_classes).unsqueeze(1)
                beta_broadcast = beta[i].unsqueeze(0)

                for (prev_y, y), feature_ids in feature_pairs:
                    if prev_y == -1:  # Unary feature
                        prob = alpha[i, y] + beta[i, y]
                    else:  # Pairwise feature
                        prob = alpha_broadcast[prev_y, 0] + M[i][prev_y, y] + beta_broadcast[0, y]

                    # Update expected counts
                    prob_normalized = torch.exp(prob - Z)
                    for fid in feature_ids:
                        expected_counts[fid] += prob_normalized
        ### END SOLUTION
        
        # Compute the regularized negative log-likelihood
        global ITER_NUM
        regularization = torch.sum(params ** 2) / (2 * self.squared_sigma)
        log_likelihood = torch.dot(empirical_counts, params) - total_log_Z - regularization
        loss = -log_likelihood
        print(f'{ITER_NUM:03d} : {loss.item()}')
        ITER_NUM += 1
        return loss

    def train(self, train_corpus_filename):
        """
        Trains the model using the training corpus.
        """
        global ITER_NUM
        ITER_NUM = 0
        self.training_data = read_corpus(train_corpus_filename)
        self.feature_set = FeatureSet()
        self.feature_set.process_corpus(self.training_data)
        self.label2index, self.label_array = self.feature_set.get_labels()
        self.index2label = {idx: label for label, idx in self.label2index.items()}
        self.num_classes = len(self.label_array)
        self.estimate_parameters()

    def test(self, test_corpus_filename):
        """
        Tests the model using the test corpus and reports accuracy.
        """
        test_data = read_corpus(test_corpus_filename)
        total_count, correct_count = 0, 0
        for X, Y in test_data:
            total_count += len(Y)
            predicted_Y = self.inference(X)
            correct_count += sum(y == yp for y, yp in zip(Y, predicted_Y))
        print(f'LinearChainCRFPytorch Accuracy: {100 * correct_count / total_count:.2f}%')

    def inference(self, X):
        """
        Performs inference (Viterbi decoding) on a given sequence X.
        """
        
        ### BEGIN SOLUTION
        M = self.calc_transition_matrices(self.params, self.num_classes, self.feature_set, X, inference=True)
        return self.viterbi(X, M)
        ### END SOLUTION

    def viterbi(self, M):
        """
        Viterbi decoding algorithm to find the most probable sequence of labels.
        """
        
        ### BEGIN SOLUTION
        seq_len = len(M)
        V = torch.full((seq_len, self.num_classes), float('-inf'))
        backptr = torch.zeros((seq_len, self.num_classes), dtype=torch.long)

        V[0, :] = M[0][BOS_IDX, :]  # Initialization
        for i in range(1, seq_len):
            probabilities = V[i - 1].unsqueeze(1) + M[i]
            backptr[i] = torch.argmax(probabilities, dim=0)
            V[i] = torch.max(probabilities, dim=0).values

        last_y = torch.argmax(V[-1]).item()
        Y_pred = [last_y]

        for i in range(seq_len - 1, 0, -1):
            last_y = backptr[i, last_y].item()
            Y_pred.append(last_y)

        ### END SOLUTION
        
        return [self.index2label[y] for y in reversed(Y_pred)]

In [4]:
# Train and test the NumPy model
numpy_model = LinearChainCRFNumpy()
numpy_model.train('data/train.txt')
numpy_model.test('data/test.txt')

000 : 5003.6527
000 (01): 4668.3695
000 (02): 3418.6408
001 : 1281.3727
002 : 603.3559
003 : 304.4369
004 : 199.3623
005 : 174.8886
006 : 150.4813
007 : 142.9658
008 : 135.2746
009 : 129.5327
010 : 126.7701
011 : 123.8661
012 : 121.4586
013 : 117.1418
014 : 115.5125
015 : 114.3371
016 : 113.9227
017 : 113.5398
018 : 113.2865
019 : 113.1469
LinearChainCRFNumpy Accuracy: 77.77%


In [20]:
# Train and test the PyTorch model
torch_model = LinearChainCRFPytorch()
torch_model.train('data/train.txt')
torch_model.test('data/test.txt')

000 : 80149800.0
001 : 80149792.0
002 : 79480368.0
003 : 79480360.0
004 : 79480360.0


TypeError: LinearChainCRFPytorch.viterbi() takes 2 positional arguments but 3 were given