In [193]:
import os
import json
import numpy as np
from sklearn.model_selection import KFold

In [2]:
def fetch_data():
    with open("./data/train.jsonl", 'r') as f:
        train_data = [json.loads(line) for line in f]

    tag_mapping = {1: "NN", 2: "DT", 3: "JJ", 4: "OT"}

    # Preprocess training data
    train_sentences = []
    train_labels = []
    for entry in train_data:
        tokens = entry['tokens']
        pos_tags = entry['pos_tags']
        chunk_tags = np.array(entry['chunk_tags'])
        
        train_sentences.append(pos_tags)
        train_labels.append(chunk_tags)

    with open("./data/test.jsonl", 'r') as f:
        test_data = [json.loads(line) for line in f]
    # Preprocess training data
    test_sentences = []
    test_labels = []
    for entry in test_data:
        tokens = entry['tokens']
        pos_tags = entry['pos_tags']
        chunk_tags = np.array(entry['chunk_tags'])
        
        test_sentences.append(pos_tags)
        test_labels.append(chunk_tags)
    
    return train_sentences, test_sentences,  train_labels, test_labels

X_train, X_test, y_train, y_test= fetch_data()


In [3]:
y_train[0]

array([1, 1, 1, 0, 1, 1, 1, 0, 1])

In [4]:
def one_hot_encode(input_list):
    encoded_list = []
    for item in input_list:
        one_hot_vector = np.zeros(4)
        one_hot_vector[item - 1] = 1  # Adjust index to start from 0
        encoded_list.append(one_hot_vector.tolist())
    return np.array(encoded_list)

In [5]:
one_hot_encode(X_train[0])

array([[1., 0., 0., 0.],
       [0., 0., 0., 1.],
       [0., 0., 1., 0.],
       [1., 0., 0., 0.],
       [0., 0., 0., 1.],
       [0., 0., 0., 1.],
       [0., 0., 1., 0.],
       [1., 0., 0., 0.],
       [0., 0., 0., 1.]])

In [6]:
np.concatenate([np.array([1,0,0]), np.array([1,0])])

array([1, 0, 0, 1, 0])

In [7]:
def into_ho(X_train):
    X_train_ho = []
    for i in range(len(X_train)):
        X = one_hot_encode(X_train[i])
        temp = []
        for j in range(len(X)):
            if j==0:
                temp.append(np.concatenate([np.array([1.0,0.0,0.0,0.0,0.0]), X[j]]))
            else:
                temp.append((np.concatenate([np.array([0]), X[j-1], X[j]] )))
        X_train_ho.append(np.array(temp))
    return X_train_ho

In [8]:
X_train_ho = into_ho(X_train)
X_test_ho = into_ho(X_test)


In [9]:
# print(X_train[0],"\n",X_train_ho[0])
# X_train_ho[0][0].shape

In [141]:
def sigmoid(x):
    return 1/(1.0+(np.exp(-x)))
def sigmoid_derivative(x):
    return sigmoid(x) * (1 - sigmoid(x))

In [100]:
def softmax(x):
    exp_vals = np.exp(x - np.max(x, axis=-1, keepdims=True))
    return exp_vals / np.sum(exp_vals, axis=-1, keepdims=True)

In [101]:
def relu(x):
    return np.maximum(0, x)

def relu_derivative(x):
    return np.where(x > 0, 1, 0)

In [11]:
np.concatenate([np.array([0]), X_train_ho[0][0]])

array([0., 1., 0., 0., 0., 0., 1., 0., 0., 0.])

In [12]:
8.9*np.ones(10)

array([8.9, 8.9, 8.9, 8.9, 8.9, 8.9, 8.9, 8.9, 8.9, 8.9])

In [227]:
def cross_entropy_loss(y_true, y_pred, epsilon=1e-15):
    y_pred = np.clip(y_pred, epsilon, 1 - epsilon)
    loss = -(y_true * np.log(y_pred) + (1 - y_true) * np.log(1 - y_pred))
    total_loss = np.sum(loss)
    return total_loss

In [300]:


def calculate_accuracy(y_true, y_pred):
    np.mean(y_train[18]==(predict[18]>0.5).astype(int))
    acc =  np.mean(y_true == (y_pred>0.5 ).astype(int))
    print(acc)
    return acc

In [325]:
class SingleRecurrentPerceptron:
    def __init__(self, vec_len, lr):
          
        # Initialize weights and bias
        self.weights = np.random.randn(vec_len)
        self.threshold = np.random.randn(1)
        self.lr = lr

   
    def forward(self, inputs):
        """inputs-- (B, Tx, 10)"""   
        prediction= []    #(B, Tx)
        X_i_b = []      #(B, Tx, 10)
        for j in range(len(inputs)):
            out=[]
            X_i_j = []
            Tx, _ = inputs[j].shape
            y_prev=0
            for i in range(Tx):
                x = np.concatenate([inputs[j][i], np.array([y_prev])])
                X_i_j.append(x)
                net = x.T @ self.weights - self.threshold[0]
                oi = sigmoid(net)
                y_prev = oi
                out.append(oi)
            prediction.append(np.array(out))
            X_i_b.append(np.array(X_i_j))
        return X_i_b, prediction
    
    # def forward_per_input(self, inputs):
    #     """inputs-- (Tx, 10) """   
    #     out=[]    #(Tx, 1)
    #     X_i_j = []  #(Tx, 10)
    #     Tx = len(inputs)
    #     y_prev=0
    #     for i in range(Tx):
    #         x = np.concatenate([inputs[i], np.array([y_prev])])
    #         X_i_j.append(x)
    #         x = x.T @ self.weights - self.threshold[0]
    #         x = sigmoid(x)
    #         out.append(x)

    #     return np.array(X_i_j), out

    # def backward(self, inputs, target):
    #     """inputs-- (B, Tx, 10)
    #        target-- (B, Tx)
    #         """   
    #     X, prediction = self.forward(inputs)

    #     for i in range(len(inputs)):      # iterate over each example
    #         delta_w = np.zeros(10)
    #         for j in range(len(inputs[i])):     # iterate over each time
    #             x = X[i][j]
    #             delta_w += -self.lr * (target[i][j]-prediction[i][j]) * (x)
    #         self.weights += delta_w
    

            
    def backward_gpt(self, inputs, targets):

        X_i_b, prediction = self.forward(inputs)
         
        B = len(inputs)  # Get batch size, sequence length, and feature dim

        # Initialize gradients for weights and bias
        self.weights_grad = np.zeros_like(self.weights)
        self.threshold_grad = np.zeros_like(self.threshold)
        sequence_lengths = [len(t) for t in targets]

        # Calculate gradients for output layer (using element-wise multiplication)
        for b in range(B):
            Tx = sequence_lengths[b]
            for t in range(Tx):
                delta_t = (prediction[b][t] - targets[b][t]) * sigmoid_derivative(prediction[b][t])
                self.weights_grad += X_i_b[b][t]*delta_t
                self.threshold_grad += delta_t

            # Backpropagate through time (using chain rule)
            delta_prev = 0
            for t in reversed(range(Tx)):
                if t + 1 < Tx:
                    delta_t = delta_prev * sigmoid_derivative(prediction[b][t]) + np.dot(delta_t, self.weights)
                else:
                    delta_t = delta_prev * sigmoid_derivative(prediction[b][t])
                self.weights_grad -= X_i_b[b][t] * delta_t*0.3  # Exclude previous output
                delta_prev = delta_t
              

            # Normalize gradients by batch size
        self.weights_grad /= B
        self.threshold_grad /= B

            # Update weights and bias using learning rate
        self.weights -= self.lr * self.weights_grad
        self.threshold -= self.lr * self.threshold_grad

    def calculate_loss(self, inputs, targets):
        
        """
        This function calculates the total loss for a minibatch of sequences.

        Args:
        inputs: Batch of input sequences (B, Tx_max, vec_len).
        targets: Batch of ground truth sequences (B, Tx_max).

        Returns:
        The average loss over the minibatch.
        """
        B = len(inputs)  # Get batch size, max sequence length, and feature dim

        # Initialize loss to zero
        loss = 0
        accuracy = 0
        _, predictions = self.forward(inputs)
        # Forward pass for each example in the minibatch
        for b in range(B):
            # Calculate loss per example using cross-entropy
          
            loss += cross_entropy_loss(predictions[b], targets[b])
            accuracy += np.mean(targets[b]==(predictions[b]>0.5).astype(int))
            
        # Average loss over the minibatch
        return loss / B, accuracy/B


        
    def train(self, inputs, targets, epochs):

        """inputs-- (B, Tx, 10)
           target-- (B, Tx)
            """           
        
        for iter in range(epochs):
            kf = KFold(n_splits=5, shuffle=True, random_state=42)
            train_loss = 0
            val_loss = 0
            train_accuracy = 0
            val_accuracy = 0
            for train_index, val_index in kf.split(inputs):
                train_inputs, val_inputs = [inputs[i] for i in train_index], [inputs[i] for i in val_index]
                train_targets, val_targets = [targets[i] for i in train_index], [targets[i] for i in val_index]
                self.backward_gpt(inputs, targets)
                delta_loss, delta_accuracy = self.calculate_loss(train_inputs,train_targets)
                train_loss += delta_loss
                train_accuracy += delta_accuracy
                delta_loss, delta_accuracy = self.calculate_loss(val_inputs,val_targets )

                val_loss += delta_loss
                val_accuracy += delta_accuracy
            print(f"epoch: {iter}, training loss : {train_loss/5}, training accuracy: {train_accuracy/5}, validation loss: {val_loss/5}, validation accuracy: {val_accuracy/5}")
    

In [326]:
model = SingleRecurrentPerceptron(10, 0.01)

In [327]:
model.train(X_train_ho, y_train, 32)

epoch: 0, training loss : 161.4176014161432, training accuracy: 0.6300183546225444, validation loss: 161.4158874638575, validation accuracy: 0.6300179516967915
epoch: 1, training loss : 161.05012304941351, training accuracy: 0.6300183546225444, validation loss: 161.04841692003635, validation accuracy: 0.6300179516967915
epoch: 2, training loss : 160.68471258191852, training accuracy: 0.6300183546225444, validation loss: 160.68301572611125, validation accuracy: 0.6300179516967915
epoch: 3, training loss : 160.32148132526467, training accuracy: 0.6370529516355663, validation loss: 160.319795153705, validation accuracy: 0.6370814353943863
epoch: 4, training loss : 159.9605366330468, training accuracy: 0.6491132099754512, validation loss: 159.95886251272782, validation accuracy: 0.648883633835918
epoch: 5, training loss : 159.60198176103896, training accuracy: 0.655106117831634, validation loss: 159.60032101191203, validation accuracy: 0.6553394625449076
epoch: 6, training loss : 159.24591

In [328]:
model.weights

array([ 0.02747697,  0.27829624,  0.80810796, -0.3781824 ,  1.6563038 ,
       -0.59600257,  0.37237197,  1.10109364,  1.15016423, -1.11687127])

In [None]:
tag_mapping = {1: "NN", 2: "DT", 3: "JJ", 4: "OT"}

# Calculate the number of unique POS tags
num_unique_tags = len(tag_mapping)

# Preprocess training data
# Preprocess training data
train_sentences = []
train_labels = []
for entry in train_data:
    tokens = entry['tokens']
    pos_tags = entry['pos_tags']
    chunk_tags = entry['chunk_tags']
    
    # Convert POS tags to one-hot encoded representation
    pos_tags_one_hot = np.zeros((len(pos_tags), num_unique_tags))
    for i, tag in enumerate(pos_tags):
        pos_tags_one_hot[i, tag - 1] = 1  # Subtract 1 to account for 0-based indexing
    
    # Flatten one-hot encoded representation
    flattened_tags = pos_tags_one_hot.flatten()
    
    train_sentences.append(flattened_tags)
    train_labels.append(chunk_tags)


# Initialize and train the single recurrent perceptron
# Initialize and train the single recurrent perceptron
input_size = len(train_sentences[0])  # Get input size from the first sample
output_size = 2  # Binary classification (1 for chunk, 0 for not chunk)
perceptron = SingleRecurrentPerceptron(input_size, output_size)
perceptron.train(train_sentences, train_labels, epochs=10)



# Load test data
with open('test.jsonl', 'r') as f:
    test_data = [json.loads(line) for line in f]

# Preprocess test data
test_sentences = []
test_labels = []
for entry in test_data:
    tokens = entry['tokens']
    pos_tags = entry['pos_tags']
    chunk_tags = entry['chunk_tags']
    
    # Convert POS tags to one-hot encoded representation
    pos_tags_one_hot = np.zeros((len(pos_tags), len(tag_mapping)))
    for i, tag in enumerate(pos_tags):
        pos_tags_one_hot[i, tag - 1] = 1  # Subtract 1 to account for 0-based indexing
    
    test_sentences.append(pos_tags_one_hot)
    test_labels.append(chunk_tags)

# Evaluate the trained perceptron
predictions = perceptron.predict(test_sentences)

# Assuming we have some evaluation function to compute accuracy
# Let's assume a simple accuracy calculation for demonstration
def accuracy(predictions, targets):
    correct = 0
    total = len(predictions)
    for pred, target in zip(predictions, targets):
        pred_labels = [1 if p > 0 else 0 for p in pred]
        if pred_labels == target:
            correct += 1
    return correct / total

acc = accuracy(predictions, test_labels)
print("Accuracy:", acc)