# 08 - MLPs and Residual propagation

So far, we have been using Logistic Regression for all our classification needs. Logistic regression is very similar to linear regression, except for that $\sigma(z)$ in the end - it is essentially a linear projection and a choice between the "positive" and the "negative" sides of the projection surface.

Also, we have seen that we can choose to project our data $X$ into an intermediate representation $z$ so that $z$ has more than one dimension. We can use that for multi-class classification.

Now, we are going to view the effects of mapping the intermediate projection $z$ to another intermediate projection (let's call it $z_2$). As we will see, increasing the dimensionality of each representation $z_i$ and the number of intermediate projections has the effect of creating intermediate regions in which we can apply linear transformations separately. If you want a theoretical reference for such, refer to: [Thiago Serra, Christian Tjandraatmadja, Srikumar Ramalingam Proceedings of the 35th International Conference on Machine Learning, PMLR 80:4558-4566, 2018.](https://proceedings.mlr.press/v80/serra18b/serra18b.pdf).

In the examples shown here, we will start with the use of Linear Regression to show some limitations of this approach. These animated examples work like this:

1. We define a random dataset $X$
1. We define a target $y$ by applying some function over $X$
1. We initialize a prediction model with an identity function (that is, our weights are such that )
1. We train the prediction model to predict $\hat{y} = f(X)$ using gradient descent, and store $\hat{y}_t$ for each iteration $t$
1. We make an animation of all $\hat{y}_t$ so we can see what happens to our predictions.



In [None]:
import torch
import torch.nn as nn
from tqdm import tqdm
import pandas as pd

In [None]:
def train_model(model, X, y, lr=0.01, epochs=100):
    
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Training loop
    outputs = [X.numpy()]
    for epoch in tqdm(range(epochs)):
        # Forward pass
        predictions = model(X)
        outputs.append(predictions.detach().numpy())
        loss = criterion(predictions, y)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    return model, outputs

def animate_training(outputs, y, frame_duration=5):
    import pandas as pd
    import plotly.express as px

    # Convert outputs to a pandas DataFrame for easier plotting
    frames = []
    for i, output in enumerate(outputs):
        df = pd.DataFrame(output, columns=['Feature 1', 'Feature 2'])
        df['Frame'] = i  # Add a frame identifier
        frames.append(df)

    total_frames = len(frames)
    n = total_frames // 100
    frames = frames[::n]  # Get every nth frame
    
    # Concatenate all frames into a single DataFrame
    animated_df = pd.concat(frames, ignore_index=True)


    fig = px.scatter(

        animated_df,
        width=600,
        height=600,
        x='Feature 1',
        y='Feature 2',
        animation_frame='Frame',
        title='Outputs Animation',
        labels={'Feature 1': 'Feature 1', 'Feature 2': 'Feature 2'}
    )

    # Add a scatterplot of y
    scatter_y = pd.DataFrame(y.numpy(), columns=['Feature 1', 'Feature 2'])
    scatter_y['Frame'] = -1  # Use -1 to indicate the original data
    for _, row in scatter_y.iterrows():
        fig.add_trace(px.scatter(
            pd.DataFrame([row]),
            x='Feature 1',
            y='Feature 2',
            color='Frame',

        ).data[0])

    # Adjust animation speed
    fig.layout.updatemenus[0].buttons[0].args[1]["frame"]["duration"] = frame_duration  # Set duration in milliseconds
    fig.update_layout(coloraxis_showscale=False)
    return fig

## A simple dataset: a rotation + translation

A linear regression is capable to find the correct rotation and translation of a dataset. This is because rotations and translations can be immediately expressed by the linear prediction equation $y = xw^t + b$ - in this case, the weight matrix $w$ can be constructed from a rotation, and $b$ corresponnds to the translation:

In [None]:
# Create a mock dataset
X = torch.randn(100, 2) * 5  # 100 samples, 2 feature
theta = torch.tensor(30.0 * torch.pi / 180.0)  # Convert degrees to radians
rotation_matrix = torch.tensor([
    [torch.cos(theta), -torch.sin(theta)],
    [torch.sin(theta), torch.cos(theta)]
])
y = X @ rotation_matrix + torch.tensor([5,5]) + 0.01 * torch.randn(100, 2)  # y = 3x + 2 + noise

## 

In [None]:
# Initialize the model, loss function, and optimizer
input_size = 2
output_size = 2
linear_model = nn.Linear(
    in_features=2,
    out_features=2,
)
linear_model.weight.data = torch.eye(2)  # Initializing with identity
linear_model.bias.data = torch.zeros(2)  # Initializing bias to zero

model, outputs = train_model(
    model=linear_model,
    X=X,
    y=y,
    lr=0.01,
    epochs=1000
)

fig = animate_training(outputs, y)
fig.show()


## A more complicated dataset: linear by parts

Now, let's get a more complicated dataset. Now, $X$ (our input) will have three different clusters, and we will apply a different linear transform in each cluster.



In [None]:
# Create a mock dataset
X1 = torch.randn(100, 2) + torch.tensor([3,-3])   # 100 samples, 2 feature
theta = torch.tensor(30.0 * torch.pi / 180.0)  # Convert degrees to radians
y1 = 3*X1 + 0.01 * torch.randn(100, 2)  # y = 3x + 2 + noise

# Create a mock dataset
X2 = torch.randn(100, 2)  # 100 samples, 2 feature
theta = torch.tensor(150.0 * torch.pi / 180.0)  # Convert degrees to radians
rotation_matrix2 = torch.tensor([
    [torch.cos(theta), -torch.sin(theta)],
    [torch.sin(theta), torch.cos(theta)]
])
y2 = (X2 @ rotation_matrix2) + torch.tensor([-3,3]) + 0.01 * torch.randn(100, 2)  # y = 3x + 2 + noise

# Create a mock dataset
X3 = torch.randn(100, 2) + torch.tensor([3,3]) # 100 samples, 2 feature
theta = torch.tensor(150.0 * torch.pi / 180.0)  # Convert degrees to radians
y3 = -5*X3 - 0.01 * torch.randn(100, 2)  # y = 3x + 2 + noise


X = torch.cat((X1, X2, X3), dim=0)
y = torch.cat((y1, y2, y3), dim=0)

When we try to approximate this using a linear layer, we obviously can't. This is due to our data being more complicated than the model - or, in other words, our model is not *expressive* enough to model our data. In the animation, we clearly see that the linear layer can only apply the same transform to all points in the input vector space, hence they all "bend" in the same way.

Our model is unable, for example, to model the different cluster variances generated by the different multiplications applied when we generated each part of $y$.

In [None]:
# Initialize the model, loss function, and optimizer
input_size = 2
output_size = 2
linear_model = nn.Linear(
    in_features=2,
    out_features=2,
)
linear_model.weight.data = torch.eye(2)  # Initializing with identity
linear_model.bias.data = torch.zeros(2)  # Initializing bias to zero

model, outputs = train_model(
    model=linear_model,
    X=X,
    y=y,
    lr=0.01,
    epochs=500
)

fig = animate_training(outputs, y)
fig.show()


## MLP models

A possible upgrade to the linear model is the MLP model. The MLP model is:

$$
\hat{y} = f(xw_1^t+b_2)w_2^t+b_2,
$$
where $f$ is the Rectifying Linear Unit (ReLU) function given by $f(z)=0, z<0, f(z)=z, z>0$.

We can interpret this equation as two layers of linear projections, separated by a non-linear operation, as follows:

```mermaid

graph LR;
    A((X)) --> B["$$\times w_1^T$$"];
    subgraph Layer 1;
    B --> C[""$$+ b_1$$""];

    C --> D["""f"""];
        end;
    subgraph Layer 2;
    D --> E["$$\times w_2^T$$"];
    E --> F[""$$+ b_2$$""];
    end;
    F --> G((ŷ))


In the animation, we see that each part of the space is being folded and bended differenty. This is because of the non-linearity given by the ReLU function.

### A very small example

Let's suppose we have two 1-dimensional inputs: $x_1$ and $-x_2$, where $x_1$ and $x_2$ are real and positive. Our network has 1-d outputs as well.

For simplicity, let's assume $b_1=b_2=0$

$w_1$ will be equal to $\begin{bmatrix} 1 \\ -1 \end{bmatrix}$. Thus, $xw_1^T=\begin{bmatrix} x_1 & -x_2 \\ -x_1 & x_2 \end{bmatrix}$.

Now, after applying $f(.)$ to $xw_1^T$, we have:

$$
f(xw_1^T)=\begin{bmatrix} x_1 & 0 \\ 0 & x_2 \end{bmatrix}
$$

This is important: each one of our rows are independent of each input!

Now, note that $w_2$ must be 2x1 (let's say it is equal to $[c, d]$), and:

$$
y = \begin{bmatrix} x_1 & 0 \\ 0 & x_2 \end{bmatrix} \begin{bmatrix} c \\ d \end{bmatrix} = \begin{bmatrix} c x_1 \\ d x_2 \end{bmatrix} 
$$

Now, importantly: in our model, the first input received a scale of $c$, while the second input received a scale of $d$.

Thus, the model operates in two layers. In the first layer, it divides the inputs into groups; in the second layer, it applies a different linear transform for each group.

### Drawbacks

The values for the weight and bias matrices must be trained using gradient descent. However, $f(.)$ has zero gradient for all negative inputs. For this reason, it is common to see output values organizing in straight lines - located exactly where the point of inflection is.

In [None]:
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.nonlinearity = nn.ReLU()

        # Initialize weights to identity and biases to zero
        nn.init.eye_(self.fc1.weight)
        nn.init.zeros_(self.fc1.bias)
        nn.init.eye_(self.fc2.weight)
        nn.init.zeros_(self.fc2.bias)

    def forward(self, x):
        x = self.nonlinearity(self.fc1(x))
        x = self.fc2(x)
        return x

# Create an instance of the MLP
hidden_size = 2  # Example hidden layer size
mlp_model = MLP(input_size, hidden_size, output_size)
model, outputs = train_model(
    model=mlp_model,
    X=X,
    y=y,
    lr=0.01,
    epochs=5000
)

fig = animate_training(outputs, y, frame_duration=1)
fig.show()


### Residual blocks

The problem of zero-gradient has been tackled by many approaches. One of the most successfull was to create an alternate route for gradients to propagate. This route is called "residual", and involves adding the input to the output of the network, that is:

```mermaid

graph LR;
    A((X)) --> B["$$\times w_1^T$$"];
    subgraph Layer 1;
    B --> C[""$$+ b_1$$""];

    C --> D["""f"""];
        end;
    subgraph Layer 2;
    D --> E["$$\times w_2^T$$"];
    E --> F[""$$+ b_2$$""];
    end;
    F --> H("SUM");
    A -->|Residual Connection| H;
    H --> G((ŷ));
```
$$
\hat{y} = x + (f(xw_1^t+b_2)w_2^t+b_2),
$$

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.fc1 = nn.Linear(hidden_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.nonlinearity = nn.ReLU()

        # Initialize weights to identity and biases to zero
        nn.init.eye_(self.fc1.weight)
        nn.init.zeros_(self.fc1.bias)
        nn.init.eye_(self.fc2.weight)
        nn.init.zeros_(self.fc2.bias)

    def forward(self, x):
        residual = x
        x = self.nonlinearity(self.fc1(x))
        x = self.fc2(x)
        x += residual  # Add the residual connection
        return x
    
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.r1 = ResidualBlock(hidden_size)
        self.r2 = ResidualBlock(hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)

        # Initialize weights to identity and biases to zero
        nn.init.eye_(self.fc1.weight)
        nn.init.zeros_(self.fc1.bias)
        nn.init.eye_(self.fc2.weight)
        nn.init.zeros_(self.fc2.bias)

    def forward(self, x):
        x = self.fc1(x)
        x = self.r1(x)
        x = self.r2(x)
        y = self.fc2(x)
        return y

# Create an instance of the MLP
hidden_size = 2  # Example hidden layer size
mlp_model = MLP(input_size, hidden_size, output_size)
model, outputs = train_model(
    model=mlp_model,
    X=X,
    y=y,
    lr=0.01,
    epochs=5000
)

fig = animate_training(outputs, y, frame_duration=1)
fig.show()


### Normalization

The non-linearities allow applying different transforms to each region of the input space. The residual connections avoids the vanishing gradient problem. Now, we add an extra layer of stability by normalizing data in each layer. Normalization helps maintaining all representations within reasonable values, which helps numerical stability and has ultimately been linked to faster convergence in neural networks.

Using normalization after each layer, we do observe a faster convergence:

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.fc1 = nn.Linear(hidden_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.nonlinearity = nn.ReLU()

        # Initialize weights to identity and biases to zero
        nn.init.eye_(self.fc1.weight)
        nn.init.zeros_(self.fc1.bias)
        nn.init.eye_(self.fc2.weight)
        nn.init.zeros_(self.fc2.bias)

    def forward(self, x):
        residual = x
        x = self.nonlinearity(self.fc1(x))
        x = self.fc2(x)
        x += residual  # Add the residual connection
        return x
    
class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(MLP, self).__init__()
        self.bn1 = nn.BatchNorm1d(hidden_size)
        self.bn2 = nn.BatchNorm1d(hidden_size)
        self.bn3 = nn.BatchNorm1d(hidden_size)
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.r1 = ResidualBlock(hidden_size)
        self.r2 = ResidualBlock(hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)

        # Initialize weights to identity and biases to zero
        nn.init.eye_(self.fc1.weight)
        nn.init.zeros_(self.fc1.bias)
        nn.init.eye_(self.fc2.weight)
        nn.init.zeros_(self.fc2.bias)

    def forward(self, x):
        x = self.fc1(x)
        x = self.bn1(x)
        x = self.r1(x)
        x = self.bn2(x)
        x = self.r2(x)
        x = self.bn3(x)
        y = self.fc2(x)
        return y

# Create an instance of the MLP
hidden_size = 2  # Example hidden layer size
mlp_model = MLP(input_size, hidden_size, output_size)
model, outputs = train_model(
    model=mlp_model,
    X=X,
    y=y,
    lr=0.01,
    epochs=5000
)

fig = animate_training(outputs, y, frame_duration=1)
fig.show()


## Conclusion

Althoug [our reference](https://proceedings.mlr.press/v80/serra18b/serra18b.pdf) states that there is a theoretical upper bound for the number of regions created by subsequent ReLU-separated projections, it is still challenging to find what are the optimal regions and corresponding projections for a particular dataset.

Our toolset for such is:

1. We can use simple linear regressions or logistic regressions to find a baseline for our system.
1. We can use the MLP topology to create potential regions in our dataset. More layers, and more neurons per layer, increase the *expressivity* of the network, that is, the number of linear regions it can model.
1. Adding a residual connection helps propagating gradients to the earlier layers of the MLP, which favors using the whole potential of the network.
1. Normalization layers help leading to a more numerically stable fit.


## Exercise

Make a neural network that maps $X$ to $y$ using the data below. Plot an animation of the convergence process, like the ones shown above. Try the different model variations - what happens in each case?

In [None]:
x1 = torch.linspace(-1, 1, 500)
x2 = torch.linspace(1, -1, 500)
X = torch.stack([x1, x2], dim=1)
X += 0.1 * torch.randn(500, 2)  # Adding noise to X

y = 2*X + X**2 +0.5

import matplotlib.pyplot as plt

plt.figure(figsize=(8, 6))
plt.scatter(X[:, 0].numpy(), X[:, 1].numpy(), label='X', alpha=0.5)
plt.scatter(y[:, 0].numpy(), y[:, 1].numpy(), label='y', alpha=0.5)
plt.xlabel('Feature 1')
plt.ylabel('Feature 2')
plt.title('Scatterplot of X and y')
plt.legend()
plt.show()
