In [216]:
import csv
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import spacy
import time
import pickle
nlp = spacy.load("en_core_web_lg")

def samplestring_to_num(sampleoutput):
    """Takes a list (sample, output) where sample is "Positive", "Negative", "Neutral",
    or "Irrelevant", and returns the same list, but the aforementioned terms are
    replaced with 1, -1, 0, or None
    """
    match sampleoutput[0]:
        case "Positive":
            return [0, sampleoutput[1]]
        case "Negative":
            return [1, sampleoutput[1]]
        case "Neutral":
            return [2, sampleoutput[1]]
        case "Irrelevant":
            return None

def format_data(input_data):
    """Takes unformated data from training/validation, and formats it into a list

    Input: input_data -- a string, represeting one line from twitter_training/validation
    """
    row = next(csv.reader([input_data]))[2:]
    if len(row) != 2:
        return None
    return samplestring_to_num(row)
    
def create_data(data_csv, max_samples=None):
    """Create data formatted for training and validation
    
    Inputs:
    data_csv -- a string, directory of a csv of training/validation data
    max_samples -- int, how many samples to include, default is all (useful for debugging)
    
    Output: (samples, outputs) samples -- a list of vectorized data, outputs -- the model output given
    the vectorized data
    """
    with open(data_csv, 'r') as infile:
        training = infile.read().split("\n")[:max_samples]
        formatted = [format_data(data) for data in training]
        formatted = [line for line in formatted if line is not None]
        (outputs, samples) = ([], [])
        for data in formatted:
            if data == None:
                continue
            tokens = [token.vector for token in nlp(data[1]) if not token.is_space]
            if len(tokens) == 0:
                continue    
            outputs.append(data[0])
            samples.append(tokens)

    return (samples, outputs)

def create_model(training_data, output_location, input_size = 300, hidden_size = 48, num_layers = 1, num_outputs = 3,\
                 epochs = 64, max_samples = None):
    """Creates a model and saves it to a file

    Inputs:
    training_data -- a string, directory of a csv of training data
    output_location -- a string, directory of where model is saved
    max_samples -- int, how many samples to include, default is all (useful for debugging)
    """
    if training_data.split(".")[-1] not in ["csv", "csv/"]:
        raise TypeError("training_data should be a csv file.")
   
    (samples, outputs) = create_data(training_data, max_samples)
    model = BiRNN(input_size, hidden_size, num_layers, num_outputs)
    model = train(model, samples, outputs, epochs, "cpu")
    
    print(format(f"Writing to {output_location}..."))
    with open(output_location, 'wb') as outfile:
        pickle.dump(model, outfile)
    print("Model created successfully!")

create_model("archive/twitter_training.csv", "models/twittermodel.model")

ETA: 7747.9317086792 seconds
Epoch 0: Loss = 0.9697057217228959
Epoch 5: Loss = 0.896549934248851
Epoch 10: Loss = 0.8635651615234579


KeyboardInterrupt: 

In [None]:
samples, outputs = create_data("archive/twitter_training.csv", max_samples = 400)

In [214]:
class BiRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers = 1, num_outputs= 3 ):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.num_outputs = num_outputs
        self.bidirectional = True
        self.rnn = nn.RNN(input_size, hidden_size, num_layers = 1, bidirectional = True)
        self.fc = nn.Linear(hidden_size * 2, num_outputs)

    def forward(self, x):
        lengths = torch.tensor([len(seq) for seq in x]) #Get lengths of inputs
        lengths, permx = lengths.sort(descending = True) #Sort inputs
        padded_sequence = pad_sequence(x, batch_first = True)
        padded_sequence = padded_sequence[permx]
        packed_sequence = pack_padded_sequence(padded_sequence, lengths, batch_first = True)
        output, hidden = self.rnn(packed_sequence)
        output, dummy = pad_packed_sequence(output, batch_first = True)
        batch_size = output.size(0)
        last_timesteps = lengths - 1
        batch_indicies = torch.arange(batch_size)
        
        return self.fc(output[batch_indicies, last_timesteps, :])
        
model = BiRNN(input_size=300, hidden_size=32, num_layers=1, num_outputs=3)



def train(model, samples, outputs, epochs, device):
    loss_func = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters())
    
    for epoch in range(epochs):
        sampleslen = len(samples)
        overallloss = 0 
        for sampleid, (sample, actual) in enumerate(zip(samples, outputs)):
            if sampleid == 1 and epoch == 0:
                timea = time.time()
            optimizer.zero_grad() #Clear previous gradients
            model_output = model(torch.tensor([sample], dtype=torch.float32))
            loss = loss_func(model_output, torch.tensor(actual).unsqueeze(0))
            overallloss += loss.item()
            loss.backward()
            optimizer.step()
            if sampleid == 100 and epoch == 0:
                overalltime = time.time() - timea
                print(format(f"ETA: {(overalltime * (sampleslen/100)) * epochs} seconds"))
        if epoch % 5 == 0:
            print(format(f"Epoch {epoch}: Loss = {overallloss/sampleslen}"))

    return model

#model = train(model, samples, outputs, 100, "cpu")



In [190]:
with open("models/twittermodel.model", 'rb') as infile:
    model = pickle.load(infile)


def preprocess_text(text):
    doc = nlp(text)
    vectors = [token.vector for token in doc if token.has_vector]
    return torch.tensor(vectors, dtype=torch.float32).unsqueeze(0)  # shape: (1, seq_len, embedding_dim)

input_text = """"""
input_tensor = preprocess_text(input_text)
model.eval()
with torch.no_grad():
    logits = model(input_tensor)
    probs = torch.softmax(logits, dim=1)
    predicted_class = torch.argmax(probs, dim=1).item()

print(f"Logits: {logits}")
print(f"Probabilities: {probs}")
print(f"Predicted class index: {predicted_class}")

Logits: tensor([[0.7277, 0.1725, 0.5145]])
Probabilities: tensor([[0.4198, 0.2410, 0.3392]])
Predicted class index: 0
