In [4]:
import numpy as np
from sklearn.base import BaseEstimator

class LinearClassifier(BaseEstimator):
    """
    General class for binary linear classifiers. Implements the predict
    function, which is the same for all binary linear classifiers. There are
    also two utility functions.
    """

    def decision_function(self, X):
        """
        Computes the decision function for the inputs X. The inputs are assumed to be
        stored in a matrix, where each row contains the features for one
        instance.
        """
        return X.dot(self.w)

    def predict(self, X):
        """
        Predicts the outputs for the inputs X. The inputs are assumed to be
        stored in a matrix, where each row contains the features for one
        instance.
        """

        # First compute the output scores
        scores = self.decision_function(X)

        # Select the positive or negative class label, depending on whether
        # the score was positive or negative.
        out = np.select([scores >= 0.0, scores < 0.0],
                        [self.positive_class,
                         self.negative_class])
        return out

    def find_classes(self, Y):
        """
        Finds the set of output classes in the output part Y of the training set.
        If there are exactly two classes, one of them is associated to positive
        classifier scores, the other one to negative scores. If the number of
        classes is not 2, an error is raised.
        """
        classes = sorted(set(Y))
        if len(classes) != 2:
            raise Exception("this does not seem to be a 2-class problem")
        self.positive_class = classes[1]
        self.negative_class = classes[0]

    def encode_outputs(self, Y):
        """
        A helper function that converts all outputs to +1 or -1.
        """
        return np.array([1 if y == self.positive_class else -1 for y in Y])


class Perceptron(LinearClassifier):
    """
    A straightforward implementation of the perceptron learning algorithm.
    """

    def __init__(self, n_iter=20):
        """
        The constructor can optionally take a parameter n_iter specifying how
        many times we want to iterate through the training set.
        """
        self.n_iter = n_iter

    def fit(self, X, Y):
        """
        Train a linear classifier using the perceptron learning algorithm.
        """

        # First determine which output class will be associated with positive
        # and negative scores, respectively.
        self.find_classes(Y)

        # Convert all outputs to +1 (for the positive class) or -1 (negative).
        Ye = self.encode_outputs(Y)

        # If necessary, convert the sparse matrix returned by a vectorizer
        # into a normal NumPy matrix.
        if not isinstance(X, np.ndarray):
            X = X.toarray()

        # Initialize the weight vector to all zeros.
        n_features = X.shape[1]
        self.w = np.zeros(n_features)

        # Perceptron algorithm:
        for i in range(self.n_iter):
            for x, y in zip(X, Ye):

                # Compute the output score for this instance.
                score = x.dot(self.w)

                # If there was an error, update the weights.
                if y*score <= 0:
                    self.w += y*x


##### The following part is for the optional task.

### Sparse and dense vectors don't collaborate very well in NumPy/SciPy.
### Here are two utility functions that help us carry out some vector
### operations that we'll need.

def add_sparse_to_dense(x, w, factor):
    """
    Adds a sparse vector x, scaled by some factor, to a dense vector.
    This can be seen as the equivalent of w += factor * x when x is a dense
    vector.
    """
    w[x.indices] += factor * x.data

def sparse_dense_dot(x, w):
    """
    Computes the dot product between a sparse vector x and a dense vector w.
    """
    return np.dot(w[x.indices], x.data)


class SparsePerceptron(LinearClassifier):
    """
    A straightforward implementation of the perceptron learning algorithm,
    assuming that the input feature matrix X is sparse.
    """

    def __init__(self, n_iter=20):
        """
        The constructor can optionally take a parameter n_iter specifying how
        many times we want to iterate through the training set.
        """
        self.n_iter = n_iter

    def fit(self, X, Y):
        """
        Train a linear classifier using the perceptron learning algorithm.

        Note that this will only work if X is a sparse matrix, such as the
        output of a scikit-learn vectorizer.
        """
        self.find_classes(Y)

        # First determine which output class will be associated with positive
        # and negative scores, respectively.
        Ye = self.encode_outputs(Y)

        # Initialize the weight vector to all zeros.
        self.w = np.zeros(X.shape[1])

        # Iteration through sparse matrices can be a bit slow, so we first
        # prepare this list to speed up iteration.
        XY = list(zip(X, Ye))

        for i in range(self.n_iter):
            for x, y in XY:

                # Compute the output score for this instance.
                # (This corresponds to score = x.dot(self.w) above.)
                score = sparse_dense_dot(x, self.w)

                # If there was an error, update the weights.
                if y*score <= 0:
                    # (This corresponds to self.w += y*x above.)
                    add_sparse_to_dense(x, self.w, y)


In [2]:
import time

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import Normalizer
from sklearn.pipeline import make_pipeline
from sklearn.feature_selection import SelectKBest
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

# from aml_perceptron import Perceptron, SparsePerceptron

# This function reads the corpus, returns a list of documents, and a list
# of their corresponding polarity labels. 
def read_data(corpus_file):
    X = []
    Y = []
    with open(corpus_file, encoding='utf-8') as f:
        for line in f:
            _, y, _, x = line.split(maxsplit=3)
            X.append(x.strip())
            Y.append(y)
    return X, Y


if __name__ == '__main__':
    
    # Read all the documents.
    X, Y = read_data('data/all_sentiment_shuffled.txt')
    
    # Split into training and test parts.
    Xtrain, Xtest, Ytrain, Ytest = train_test_split(X, Y, test_size=0.2,
                                                    random_state=0)

    # Set up the preprocessing steps and the classifier.
    pipeline = make_pipeline(
        TfidfVectorizer(),
        SelectKBest(k=1000),
        Normalizer(),

        # NB that this is our Perceptron, not sklearn.linear_model.Perceptron
        Perceptron()  
    )

    # Train the classifier.
    t0 = time.time()
    pipeline.fit(Xtrain, Ytrain)
    t1 = time.time()
    print('Training time: {:.2f} sec.'.format(t1-t0))

    # Evaluate on the test set.
    Yguess = pipeline.predict(Xtest)
    print('Accuracy: {:.4f}.'.format(accuracy_score(Ytest, Yguess)))

FileNotFoundError: [Errno 2] No such file or directory: 'data/all_sentiment_shuffled.txt'

In [None]:
from scipy.sparse import issparse

class Pegasos(BaseEstimator):
    """
    Implementation of the Pegasos algorithm for linear classification.
    """

    def __init__(self, n_iter=20, lambda_=0.1):
        """
        Initializes the Pegasos classifier.

        Parameters:
        - n_iter: Number of iterations.
        - lambda_: Regularization parameter.
        """
        self.n_iter = n_iter
        self.lambda_ = lambda_

    def fit(self, X, Y):
        """
        Train the Pegasos classifier.

        Parameters:
        - X: Feature matrix.
        - Y: Labels.

        Note: X should be a sparse matrix.
        """
        self.find_classes(Y)

        # Convert all outputs to +1 (for the positive class) or -1 (negative).
        Ye = self.encode_outputs(Y)

        # Print out the shape of X and Ye
        print("Shape of X:", X.shape)
        print("Shape of Ye:", Ye.shape)

        # Initialize the weight vector to zeros.
        n_samples, n_features = X.shape
        self.w = np.zeros(n_features)

        for t in range(1, self.n_iter + 1):
            for i in range(n_samples):
                idx = np.random.randint(n_samples)  # Generate random index within range(n_samples)

                # Print out the index
                print("Index:", idx)

                # Fetch the randomly chosen instance and label.
                x_t, y_t = X[idx], Ye[idx]



                # Compute the learning rate.
                eta_t = 1 / (self.lambda_ * t)

                # Compute the inner product.
                inner_product = np.dot(self.w, x_t)

                # Update the weight vector.
                if y_t * inner_product < 1:
                    self.w = (1 - eta_t * self.lambda_) * self.w + eta_t * y_t * x_t
                else:
                    self.w = (1 - eta_t * self.lambda_) * self.w

        # Optionally normalize the weight vector.
        self.w /= np.linalg.norm(self.w)

    def predict(self, X):
        """
        Predicts the outputs for the inputs X.

        Parameters:
        - X: Feature matrix.

        Returns:
        - Predicted labels.
        """
        scores = X.dot(self.w)
        out = np.where(scores >= 0.0, self.positive_class, self.negative_class)
        return out


    def find_classes(self, Y):
        """
        Finds the set of output classes in the output part Y of the training set.
        If there are exactly two classes, one of them is associated to positive
        classifier scores, the other one to negative scores. If the number of
        classes is not 2, an error is raised.
        """
        classes = sorted(set(Y))
        if len(classes) != 2:
            raise ValueError("This does not seem to be a 2-class problem")
        self.positive_class = classes[1]
        self.negative_class = classes[0]

    def encode_outputs(self, Y):
        """
        A helper function that converts all outputs to +1 or -1.
        """
        encoded_Y = np.where(Y == self.positive_class, 1, -1)
        return np.squeeze(encoded_Y)  # Ensure the array is squeezed to remove extra dimensions





In [None]:
if __name__ == '__main__':
    
    # Read all the documents.
    X, Y = read_data('data/all_sentiment_shuffled.txt')
    
    # Split into training and test parts.
    Xtrain, Xtest, Ytrain, Ytest = train_test_split(X, Y, test_size=0.2,
                                                    random_state=0)

    # Set up the preprocessing steps and the classifier.
    pipeline = make_pipeline(
        TfidfVectorizer(),
        SelectKBest(k=1000),
        Normalizer(),

        # Using Pegasos algorithm
        Pegasos()  
    )

    # Train the classifier.
    t0 = time.time()
    pipeline.fit(Xtrain, Ytrain)
    t1 = time.time()
    print('Training time: {:.2f} sec.'.format(t1-t0))

    # Evaluate on the test set.
    Yguess = pipeline.predict(Xtest)
    print('Accuracy: {:.4f}.'.format(accuracy_score(Ytest, Yguess)))

Shape of X: (9531, 1000)
Shape of Ye: ()
Index: 8269


IndexError: too many indices for array: array is 0-dimensional, but 1 were indexed

In [None]:
if __name__ == '__main__':
    
    # Read all the documents.
    X, Y = read_data('data/all_sentiment_shuffled.txt')
    print("Shape of X:", len(X))
    print("Shape of Y:", len(Y))


Shape of X: 11914
Shape of Y: 11914


In [None]:
import numpy as np


class Pegasos1:
  """
  This class implements the Pegasos algorithm for binary classification.
  """

    def __init__(self, eta0=0.1, lambda_=0.001, max_iter=1000):
        """
        Initializes the Pegasos algorithm with hyperparameters.

        Args:
          eta0: Initial learning rate (float).
          lambda_: Regularization parameter (float).
          max_iter: Maximum number of iterations (int).
        """
        self.eta0 = eta0
        self.lambda_ = lambda_
        self.max_iter = max_iter
        self.w = None  # Initialize weight vector to None
    
    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.w = np.zeros(n_features)

        for _ in range(self.max_iter):
          # Permutation with integer indices
            perm = np.random.permutation(n_samples).astype(int)
          # Use advanced indexing with perm
            X_shuffled = X[perm, :]  # Select all features for each shuffled index
            y_shuffled = y[perm]      # Select labels corresponding to shuffled indices

            for i in range(n_samples):
                x_i, y_i = X_shuffled[i], y_shuffled[i]
                eta_t = self.eta0 / (self.lambda_ * (i + 1))

                # Update rule based on hinge loss
                if y_i * np.dot(self.w, x_i) <= 1:
                    self.w = (1 - eta_t * self.lambda_) * self.w + eta_t * y_i * x_i

    def predict(self, X):
    """
    Predicts labels for new data points.

    Args:
      X: New data points as a numpy array (shape: n_samples, n_features).

    Returns:
      Predicted labels as a numpy array (shape: n_samples).
    """
        scores = np.dot(X, self.w)
        return np.sign(scores)




IndentationError: unexpected indent (3625784856.py, line 9)

In [None]:
if __name__ == '__main__':
    
    # Read all the documents.
    X, Y = read_data('data/all_sentiment_shuffled.txt')
    
    # Split into training and test parts.
    Xtrain, Xtest, Ytrain, Ytest = train_test_split(X, Y, test_size=0.2,
                                                    random_state=0)

    # Set up the preprocessing steps and the classifier.
    pipeline = make_pipeline(
        TfidfVectorizer(),
        SelectKBest(k=1000),
        Normalizer(),

        # Using Pegasos algorithm
        Pegasos1()  
    )

    # Train the classifier.
    t0 = time.time()
    pipeline.fit(Xtrain, Ytrain)
    t1 = time.time()
    print('Training time: {:.2f} sec.'.format(t1-t0))

    # Evaluate on the test set.
    Yguess = pipeline.predict(Xtest)
    print('Accuracy: {:.4f}.'.format(accuracy_score(Ytest, Yguess)))

TypeError: only integer scalar arrays can be converted to a scalar index