In [79]:
import torch

from torch import nn
from torch import optim
from torch.nn.utils.rnn import pad_sequence

## Thoughts and assumptions
In this task I was to create a network which would compute the $L1$ norm of the a variable-length input sequence of real-valued numbers without using the $L1$ norm explicitly. I could use Dense Layers, Relu activations, the negation operation, and the sum and multiplication operations. No manually initialize a specific weight or set of weights was allowed.

I needed to split this problem into multiple 'sub-problems'
1. Handle the variable-lenth input sequence.
2. Handle values of different signs.
3. Build a trainable model which would predict/compute the $L1$ norm of the input.  

=====================================================================================================================================

1. The first point was relatively easy and hinted in the task description - use an RNN architecture.
2. Point number two needed a bit more thinking and not because it was hard to implement it, but in order to better understand why such operations is required.  
output of the RNN cell can described using the following equation:  
<center>$h' = tanh(W_{ih}*x + b_{ih} + W_{hh}*h + b_{hh})$</center>  
If the input $x$ changes its sign, then it will adjust the value that will before having it processed through the activation function. I could use the negation so I will change the signs to positive wherever $x < 0$. The other thing which requires attention is the activation function itself. $tanh$ value range is $(-1, 1)$, which scales our output and this behaviour is not expected. We will change it then to relu which returns a linear output for a non-negative input $ReLU(x) = (x)^{+}$. Our equation then changes into
<center>$h' = ReLU(W_{ih}*x + b_{ih} + W_{hh}*h + b_{hh})$</center> 
3. Here I needed to establish what I want to achieve. I have an RNN cell which outputs the sum of the scaled previous output added to the current number which is also scaled. I also add bias. That lead me to the conclusion that the feasible solution to that would get an RNN cel which weights would be identity matrices (or just 1 like in our case; $W_{ih} = 1$ and $W_{hh} = 1$) and bias is either 0 (both $b_{ih} = 0$ and $b_{hh} = 0$) or it cancel itself ($b_{ih} = -b_{hh}$). I cannot set it manually but at least I know what I'm trying to achieve.

In [261]:
class CustomRNNCell(nn.Module):
    """Custom RNN cell 
    Custom RNN cell which for a given input returns it's positive value summed
    to the information carried along.
    """

    def __init__(self, input_size: int = 1, hidden_size: int = 1):
        """Initialization method

        :param input_size: input size; number od values in a single input
        :type input_size: int
        :param hidden_size: number of features in the hidden layer of our RNN cell
        :type hidden_size: int
        """
        super(CustomRNNCell, self).__init__()

        self._rnn_cell = nn.RNNCell(
            input_size, 
            hidden_size,
            bias=False, # we do not need bias since it's information is irrelevant.
            nonlinearity='relu'
        )

    def forward(self, x, hidden):
        """
        """

        # transformation to input x
        x = torch.where(x < 0, torch.neg(x), x)
 
        # pass through the cell
        hidden = self._rnn_cell(x, hidden)

        return hidden

In [262]:
# RNN network that sums the outputs of the CustomRNNCell
class CustomRNN(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(CustomRNN, self).__init__()
        self.hidden_size = hidden_size
        self.rnn_cell = CustomRNNCell(input_size, hidden_size)
        
    def forward(self, x, print_results=False):
        batch_size = x.size(0)
        hidden = torch.zeros(batch_size, self.hidden_size)

        # Iterate over time steps
        for t in range(x.size(1)):
            current_input = x[:, t, :]
            hidden = self.rnn_cell(current_input, hidden)
        
        return hidden

In [263]:
# Function to generate random sequences and their L1 norms
def generate_data(batch_size, max_length):
    sequences = []
    targets = []
    for _ in range(batch_size):
        length = torch.randint(1, max_length + 1, (1,)).item() # get random length
        seq = torch.randn(length, 1)  # Random sequence of 'length'
        l1_norm = torch.sum(torch.abs(seq))  # Compute the L1 norm
        sequences.append(seq)
        targets.append(torch.tensor([l1_norm], dtype=torch.float32))
    return sequences, targets

In [264]:
# Parameters
input_size = 1  # Each element is a scalar
hidden_size = 1  # Output is a scalar
batch_size = 8
max_length = 10  # Maximum length of any sequence
epochs = 10000

# Create the RNN model
model = CustomRNN(input_size=input_size, hidden_size=hidden_size)

for p in model.rnn_cell.parameters():
    print(p)

Parameter containing:
tensor([[0.4361]], requires_grad=True)
Parameter containing:
tensor([[-0.5121]], requires_grad=True)


In [265]:
# train till w's are 1

In [268]:
optimizer = optim.Adam(model.parameters(), lr=.1)
criterion = nn.MSELoss()  # Mean Squared Error loss

# Generate a batch of random sequences and their L1 norms
sequences, targets = generate_data(batch_size=batch_size, max_length=max_length)

# Pad sequences to have a consistent batch size
padded_sequences = pad_sequence(sequences, batch_first=True)
lengths = torch.tensor([len(seq) for seq in sequences])
targets = torch.cat(targets)

#for epoch in range(epochs):
epoch = 1
do_train = True
while do_train:
    # Pad sequences to have a consistent batch size
    padded_sequences = pad_sequence(sequences, batch_first=True)
    
    optimizer.zero_grad()
    
    # Forward pass
    outputs = model(padded_sequences).squeeze(1)
    
    # Compute loss
    loss = criterion(outputs, targets)
    loss.backward()
    
    # Update weights
    optimizer.step()
    
    if epoch % 100 == 0:
        print(f'Epoch {epoch}, Loss: {loss.item()}')

    if model.rnn_cell._rnn_cell.weight_hh == 1 and model.rnn_cell._rnn_cell.weight_ih == 1:
        print(f"Finished training after {epoch} epochs")
        do_train = False
    epoch += 1

Epoch 100, Loss: 0.0001708079653326422
Epoch 200, Loss: 4.289804067525438e-09
Finished training after 297 epochs


In [269]:
for p in model.rnn_cell.parameters():
    print(p)

Parameter containing:
tensor([[1.]], requires_grad=True)
Parameter containing:
tensor([[1.]], requires_grad=True)


In [270]:
model.rnn_cell._rnn_cell.weight_hh, model.rnn_cell._rnn_cell.weight_ih, model.rnn_cell._rnn_cell.bias_hh, model.rnn_cell._rnn_cell.bias_ih

(Parameter containing:
 tensor([[1.]], requires_grad=True),
 Parameter containing:
 tensor([[1.]], requires_grad=True),
 None,
 None)

In [253]:
# Training setup
def train_model(model, epochs, batch_size, max_length, learning_rate=0.001):
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()  # Mean Squared Error loss
    
    for epoch in range(epochs):
        # Generate a batch of random sequences and their L1 norms
        sequences, targets = generate_data(batch_size=batch_size, max_length=max_length)
        
        # Pad sequences to have a consistent batch size
        padded_sequences = pad_sequence(sequences, batch_first=True)
        targets = torch.cat(targets)
        
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(padded_sequences).squeeze(1)
        
        # Compute loss
        loss = criterion(outputs, targets)
        loss.backward()
        
        # Update weights
        optimizer.step()
        
        if epoch % 100 == 0:
            print(f'Epoch {epoch}, Loss: {loss.item()}')

# Parameters
input_size = 1  # Each element is a scalar
hidden_size = 1  # Output is a scalar
batch_size = 8
max_length = 10  # Maximum length of any sequence
epochs = 10000

# Create the RNN model
model = CustomRNN(input_size=input_size, hidden_size=hidden_size)

# Train the model
train_model(model, epochs, batch_size, max_length)

# Test the model on a new sequence
test_sequence = torch.tensor([3, -5, 7, -2], dtype=torch.float32).unsqueeze(1).unsqueeze(0)
test_length = torch.tensor([test_sequence.size(1)])

# Get the model's prediction
model.eval()
with torch.no_grad():
    predicted_sum = model(test_sequence).item()
    actual_sum = sum(torch.abs(test_sequence).squeeze()).item()

print(f"Predicted sum: {predicted_sum}, Actual sum: {actual_sum}")

Epoch 0, Loss: 18.528465270996094
Epoch 100, Loss: 23.143762588500977
Epoch 200, Loss: 10.109903335571289
Epoch 300, Loss: 0.43800294399261475
Epoch 400, Loss: 0.08178436011075974
Epoch 500, Loss: 0.05002187192440033
Epoch 600, Loss: 0.03228967636823654
Epoch 700, Loss: 0.019860919564962387
Epoch 800, Loss: 0.06131543964147568
Epoch 900, Loss: 0.019320055842399597
Epoch 1000, Loss: 0.03905189037322998
Epoch 1100, Loss: 0.04713689908385277
Epoch 1200, Loss: 0.013239098712801933
Epoch 1300, Loss: 0.02433709427714348
Epoch 1400, Loss: 0.01014829333871603
Epoch 1500, Loss: 0.012641885317862034
Epoch 1600, Loss: 0.03029327280819416
Epoch 1700, Loss: 0.031277067959308624
Epoch 1800, Loss: 0.011759214103221893
Epoch 1900, Loss: 0.01458704099059105
Epoch 2000, Loss: 0.009540936909615993
Epoch 2100, Loss: 0.0055660465732216835
Epoch 2200, Loss: 0.003678351640701294
Epoch 2300, Loss: 0.004250539466738701
Epoch 2400, Loss: 0.007224759552627802
Epoch 2500, Loss: 0.0014588885242119431
Epoch 2600, L

In [254]:
model.rnn_cell._rnn_cell.bias_hh, model.rnn_cell._rnn_cell.bias_ih

(None, None)

In [255]:
model.rnn_cell._rnn_cell.weight_hh, model.rnn_cell._rnn_cell.weight_ih

(Parameter containing:
 tensor([[1.]], requires_grad=True),
 Parameter containing:
 tensor([[1.]], requires_grad=True))

In [271]:
seq = torch.randn(1, 10000, 1)  # Random sequence of 'length'
l1_norm = torch.sum(torch.abs(seq))  # Compute the L1 norm
seq, l1_norm

(tensor([[[-0.0524],
          [-0.2335],
          [ 0.3654],
          ...,
          [ 0.1490],
          [ 1.3713],
          [ 0.5624]]]),
 tensor(7953.1104))

In [272]:
seq_len = torch.tensor([seq.size(1)])
seq_len

tensor([10000])

In [273]:
# Get the model's prediction
with torch.no_grad():
    predicted_sum = model(seq, seq_len).item()
    actual_sum = sum(torch.abs(seq).squeeze()).item()

print(f"Predicted sum: {predicted_sum}, Actual sum: {actual_sum}")

Predicted sum: 7953.1142578125, Actual sum: 7953.1142578125
