In [186]:
import sys

sys.path.append('..')
sys.path.append('../py-irt')

import numpy as np
import torch
import jsonlines
import cli

In [188]:
class Model(torch.nn.Module):
    def __init__(self, input_size, hidden_dim, n_layers, n_problems):
        super(Model, self).__init__()

        # Defining some parameters
        # hidden_dim is the (arbitrary) dimensionality of the hidden layers.
        #
        # If all you wanted to do was to to encode the current answer into the hidden layer output of each step,
        # you could use a single hidden dimension, with 0 = wrong and 1 = right.  But that's not the only purpose of thie hidden layer.
        # You also need to retain information about not just this answer, but past right or wrong answers, to help predict the next one.
        #
        # I was using hidden layers for output (one of the layers is supposed to keep track of "will the student get the right answer?"
        # But I am not sure whether that will work.  The place I got this from uses a fully connected layer after the hidden layers,
        # which can take the hidden layer output and turn it into the actual answers.
        #
        self.hidden_dim = hidden_dim
        # n_layers is the (arbitrary) number of hidden layers
        self.n_layers = n_layers

        # Defining the layers
        # RNN Layer
        #
        # input_size = the number of features in the input.  This equals the number of problems times two, because
        # (1) the problems are one-hot encoded and so the number of features required to describe each problem equals
        # the number of problems, (2) you also need to know whether the student got the problem right.
        # The minimal encoding is actually log_2 N_features + 1; that is, if problem five is encoded [1,0,1], and the
        # student got it right (1), you would have [1, 0, 1, 1], which is (log_2 8) + 1.  But this apparently
        # will not result in a good outcome.
        #
        # hidden_dim = the dimensionality of each hidden layer, which is arbitrary (more dimensions = more latent variables)
        #
        # n_layers = the number of hidden layers, which is arbitrary (more layers = more processing of latent variables)
        #
        # batch_first = True: input and output tensors are provided as (batch_number, problem_index, feature)
        # Here, for input, feature corresponds to a one-hot encoding of the problems + answers, where (1, 0, 0, ...) means
        # the student answered the first problem (1) and got it wrong (0).  And (0, 0, 1, 1, 0, ...) means the student answered
        # the second problem (1) and got it right (1).
        # In other words, for each batch and each problem in each batch, we need to input 2N values to one-hot encode the problems.
        # Each batch is a student.
        #
        # For output, features corresponds to a hidden layer feature, I think.  What we need to do is to have a separate fully
        # connected layer for each problem (but it's the same for all batches) that outputs a 1 dimensional result, which says
        # whether the next problem is predicted to be gotten right or wrong.
        #
        # batch_number is the batch number (each batch is a student; we are running multiple students through the RNN at once)
        # and problem_index is the problem number.  (Called seq in the documentation).  Since the feature of an input problem _is_
        # the problem number, the input will look like:
        # [[1, ans1, 0, 0, 0, 0, ...],
        #  [0, 0, 1, ans2, 0, 0, ...],
        #  [0, 0, 0, 0, 1, ans3, ...]]
        # The output, in contrast, is not like this, because the output features use the hidden layer rather than one-hot.
        self.rnn = torch.nn.RNN(input_size, hidden_dim, n_layers, batch_first=True)
        # Fully connected layers.  We need one for each problem.
        self.fc_list = list()
        for n in range(n_problems):
            self.fc_list.append(torch.nn.Linear(hidden_dim, 1))
    
    def forward(self, input_data):
        # If batch_first = True, the input tensor is of the form (num_students, problem_index, one_hot_feature), see above
        # So input_data.size(0) must be the number of batches, i.e. students or batches.
        n_students = input_data.size(0)
        n_problems = input_data.size(1)

        # Initializing hidden state for first input using method defined below
        # We initialize one set of hidden layers for each batch (student),
        # so the dimensionality is (num_layers, batch_size, arbitrary_hidden_dim)
        # We don't need to pass n_layers or hidden_dim to init_hidden, because those are member variables of self from __init__.
        # Note: the hidden layer _always_ starts the forward pass in a particular (zeroed) state!  We do need to know the student's
        # answers, but these are not part of the initial state.  Rather, they are part of the input features.
        hidden_0 = self.init_hidden(n_students)

        # Passing in the input data and initial hidden state into the model and obtaining outputs
        # "hidden_0" is the initial hidden state in the form (num_layers, n_students, arbitrary_hidden_dim)
        # "hidden_out" is the output hidden state at the end, including all layers of hidden state.
        # "out" is of shape (n_students, n_problems, arbitrary_hidden_dim).  It shows the last hidden layer at each time step t.
        # We will use each fc layer to turn each problem's hidden layer into the guess as to correctness, which can be checked later.
        out, hidden_out = self.rnn(input_data, hidden_0)

        whole_list = list()
        # If I create whole_list piecemeal in this way, will the requires_grad allow propagation?
        # I don't see that I have any alternative -- it seems to me that I need separate fully-connected layers for each problem.
        # For example, what if different problems belong to different categories, and I must treat these categories differenlty?
        for ns in range(n_students):
            student_list = list() # list for this student
            student_list.append(torch.tensor(0.5, dtype=torch.float32, requires_grad=True)) # We cannot predict the first problem, because there is no prior information
            for np in range(1, n_problems): # We predict the 2nd through the last problem.
                prediction = self.fc_list[np](out[ns, np, :]) # we use the hidden layer to calculate the likelihood of correctness.
                # self.fc_list[0] is unused, because we cannot predict the first problem.
                # self.fc_list is a list, because each problem must use the hidden layer to calculate correctness in a different way.
                student_list.append(prediction[0]) # prediction is an array of length 1, so its 0th element is its only element.
            whole_list.append(torch.stack(student_list))
        
        return torch.stack(whole_list) # this array has shape (n_students, n_problems), just like make target produces
    
    def init_hidden(self, n_students):
        # This method generates the first hidden state of zeros which we'll use in the forward pass
        # We'll send the tensor holding the hidden state to the device we specified earlier as well
        hidden = torch.zeros(self.n_layers, n_students, self.hidden_dim)
        return hidden

In [189]:
def get_device():
    # torch.cuda.is_available() checks and returns a Boolean True if a GPU is available, else it'll return False
    is_cuda = torch.cuda.is_available()
    
    # If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
    if is_cuda:
        device = torch.device("cuda")
        print("GPU is available")
    else:
        device = torch.device("cpu")
        print("GPU not available, CPU used")
    return device

In [193]:
def rand_val(i: int, j: int, student_skill: np.array, problem_difficulty: np.array):
    """i: index of student
       j: index of problem
       student_skill: 1D array of student skills
       problem_difficulty: 1D array of problem difficulties
    """
    prob_guess = 0.2
    prob = prob_guess + (1 - prob_guess) / (1 + np.exp(student_skill[int(i)] - problem_difficulty[int(j)]))
    return np.random.choice(2, 1, p=[1 - prob, prob])[0]  

def irt_func(m1: np.array, m2: np.array, student_skill: np.array, problem_difficulty: np.array):
    """m1: 2D matrix where each entry is a student index
       m2: 2D matrix where each entry is a problem index
       student_skill: 1D array of student skills
       problem_difficulty: 1D array of problem difficulties
    """
    func = np.vectorize(lambda i, j: rand_val(i, j, student_skill, problem_difficulty))
    return func(m1, m2)


class RNN_IRT:

    def __init__(self):
        self.n_students_train = 10000
        self.n_students_val = 1000
        self.n_problems = 10
        self.n_hidden = 2
        self.problem_difficulty = np.random.uniform(low = 0, high = 1, size=(self.n_problems,))

    # make_target produces a rank 2 tensor that has dimension (n_students, n_problems)
    # it's the students' actual responses (or probabilities thereof) to be compared with the predictions
    def make_target(self, n_students: int, n_problems: int, student_skill: np.array):
        output_np = np.fromfunction(irt_func, (n_students, n_problems), dtype=float, student_skill=student_skill, problem_difficulty=self.problem_difficulty)
        output = torch.from_numpy(output_np)
        output = torch.tensor(output, dtype=torch.float32)
        return output

    # This produces a rank 3 tensor which has dimensions (n_students, n_problems, 2 * n_problems)
    # That is: (students, problems, features of problems)
    # It is passed the target, which was created by make_target and has dimensions (n_students, n_problems)
    # The target has the same data, only in a different shape - we're converting it to a kind of one-hot encoding.
    def make_input(self, n_students: int, n_problems: int, student_skill, target):
        input = list()
        for ns in range(n_students):
            input_part = torch.zeros(n_problems, 2 * n_problems)
            for np in range(n_problems):
                input_part[np, 2 * np] = 1
                input_part[np, 2 * np + 1] = target[ns, np]
            input.append(input_part)
        return torch.stack(input)
    
    def make_data(self):
        self.student_skill_train = np.random.uniform(low = 0, high = 1, size=(self.n_students_train,))
        self.student_skill_val = np.random.uniform(low = 0, high = 1, size=(self.n_students_val,))
        self.train_target = self.make_target(self.n_students_train, self.n_problems, self.student_skill_train)
        self.train_input = self.make_input(self.n_students_train, self.n_problems, self.student_skill_train, self.train_target)
        self.val_target = self.make_target(self.n_students_val, self.n_problems, self.student_skill_val)
        self.val_input = self.make_input(self.n_students_val, self.n_problems, self.student_skill_val, self.val_target)

    def make_target_jsonlines_for_irt(self):
        print("Now making target jsonlines file.")
        target_data = self.make_target(self.n_students_train, self.n_problems, self.student_skill_train)
        with jsonlines.open('rnn_irt.jsonlines', mode='w') as writer:
            for n, obj in enumerate(target_data):
                writer.write({"subject_id": str(n), "responses": {k: int(obj[k]) for k in range(len(obj))}})

    def train_evaluate_compute_mse_irt(self):
        print("Now training and evaluating using IRT.")
        cli.train_and_evaluate("1pl", "rnn_irt.jsonlines", "../py-irt/out")
        loss_sum = list()
        print("Now computing MSE from model predictions jsonlines file.")
        with jsonlines.open('../py-irt/out/model_predictions.jsonlines', mode='r') as reader:
            for obj in reader:
                loss_sum.append((obj['response'] - obj['prediction'])**2)
        mse_for_irt = sum(loss_sum) / len(loss_sum)
        print("MSE for IRT = ", mse_for_irt)
        return mse_for_irt


    def training_loop(self, n_epochs: int, device, model: Model, loss_fn, optimizer: torch.optim.Optimizer):
        self.make_data()
        self.train_input = self.train_input.to(device)
        self.train_target = self.train_target.to(device)
        self.val_input = self.val_input.to(device)
        self.val_target = self.val_target.to(device)
        
        # x = input()
        for epoch in range(1, n_epochs + 1):
            model.train()
            train_predictions = model(self.train_input)
            train_loss = loss_fn(train_predictions, self.train_target)

            #print("train_predictions:")
            #print(train_predictions)
            # x = input()
            optimizer.zero_grad() # Clear gradients
            train_loss.backward() # Back propagation
            #for name, param in model.named_parameters():
            #    print(f"Parameter: {name}, Gradient: {param.grad}")
            optimizer.step() # Update weights
            
            if epoch%10 == 0 or epoch == 1:
                with torch.no_grad():
                    model.eval()
                    val_predictions = model(self.val_input)
                    val_loss = loss_fn(val_predictions, self.val_target)
                    print(f'Epoch {epoch} out of {n_epochs}')
                    print(f"Val loss: {val_loss.item():.4f}")
                    print(f"Train loss: {train_loss.item():.4f}")

    def train_rnn(self):
        device = get_device()
        ### self.make_data()
        # Instantiate the model with hyperparameters
        # input_size is the number of features; here, it is twice n_problems because 
        # the input is one-hot encoded, with one feature per problem and.
        # hidden_dim is the dimension of the hidden layer, which is an arbitrary number.
        # We want the 0th dimension of the hidden layer to be the output; the other dimensions keep track of latent variables.
        model = Model(input_size=self.n_problems * 2, hidden_dim=self.n_hidden, n_layers=1, n_problems=self.n_problems)
        # We'll also set the model to the device that we defined earlier (default is CPU)
        model.to(device)
        
        n_epochs = 1000
        lr=0.1
        
        loss_fn = torch.nn.MSELoss()
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    
        self.training_loop(
            n_epochs = n_epochs,
            device = device,
            model = model,
            loss_fn = loss_fn,
            optimizer = optimizer)

In [None]:
# Some of the idea for the code is based on this website:
# https://web.archive.org/web/20231223192939/https://blog.floydhub.com/a-beginners-guide-on-recurrent-neural-networks-with-pytorch/

In [194]:
rnn_irt = RNN_IRT()

In [86]:
%%capture
rnn_irt.make_target_jsonlines_for_irt()
mse_for_irt = rnn_irt.train_evaluate_compute_mse_irt()

In [87]:
print(mse_for_irt)

0.2593336102663726


In [195]:
rnn_irt.train_rnn()

GPU not available, CPU used


  output = torch.tensor(output, dtype=torch.float32)


Epoch 1 out of 1000
Val loss: 0.7321
Train loss: 0.8866
Epoch 10 out of 1000
Val loss: 0.2602
Train loss: 0.2704
Epoch 20 out of 1000
Val loss: 0.2123
Train loss: 0.2150
Epoch 30 out of 1000
Val loss: 0.2017
Train loss: 0.2034
Epoch 40 out of 1000
Val loss: 0.1970
Train loss: 0.1988
Epoch 50 out of 1000
Val loss: 0.1939
Train loss: 0.1952
Epoch 60 out of 1000
Val loss: 0.1929
Train loss: 0.1942
Epoch 70 out of 1000
Val loss: 0.1915
Train loss: 0.1928
Epoch 80 out of 1000
Val loss: 0.1899
Train loss: 0.1912
Epoch 90 out of 1000
Val loss: 0.1883
Train loss: 0.1896
Epoch 100 out of 1000
Val loss: 0.1872
Train loss: 0.1884


KeyboardInterrupt: 