In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.nn.utils.parametrizations import weight_norm
import scipy as sp
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


In [2]:
# 1. Load and preprocess Boston Housing data
boston = fetch_openml(name='boston', version=1, as_frame=True)
X = boston.data.values
y = boston.target.values

# Normalize features and target
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X = scaler_X.fit_transform(X)
y = scaler_y.fit_transform(y.reshape(-1, 1)).flatten()

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)


In [5]:
class MaskedLinear(nn.Linear):
    def __init__(self, in_features, out_features, bias=True, threshold=2, use_mask=True):
        super().__init__(in_features, out_features, bias)
        self.threshold = -torch.log10(torch.tensor(threshold))
        self.use_mask = use_mask
        self.entropy = None
        
    def wval(self, M):
        L = M.shape[1] if M.ndim == 2 else M.shape[1] * M.shape[2] * M.shape[3]
        alpha = torch.as_tensor(1/2, dtype=M.dtype, device=M.device)
        beta = torch.as_tensor((L-1)/2, dtype=M.dtype, device=M.device)
    
        # Normalize each weight vector going from a node in A to layer B
        M_normed = F.normalize(M, p=2, dim=1)
        
        M_clamped = torch.clamp(M_normed**2, min=1e-8, max=1-1e-8)
    
        # Compute the incomplete beta function manually using Beta distribution
        # This is done using a formula for the Beta CDF (this is a simple approximation)
        B = torch.exp(
            torch.lgamma(alpha + beta) - torch.lgamma(alpha) - torch.lgamma(beta)
        )  # Beta function normalization constant
        cdf = (M_clamped**(alpha - 1)) * ((1 - M_clamped)**(beta - 1)) / B
        beta_surv = 1 - cdf  # Survival function
        
        w_val = -torch.log10(beta_surv)
        assert w_val.shape == M.shape

        return w_val

    def forward(self, input):
        if self.use_mask:
            # Compute significance
            significance = self.wval(self.weight)
    
            # Create binary mask
            mask = (significance >= self.threshold).float()
    
            # Apply the mask (zero out insignificant weights)
            masked_weight = self.weight * mask
        else:
            masked_weight = self.weight

        flat_M = torch.flatten(masked_weight)
        flat_M_2 = flat_M.pow(2)
        self.entropy = sp.stats.entropy(flat_M_2.detach().numpy())
            
        return F.linear(input, masked_weight, self.bias)



class FlexibleMLP(nn.Module):
    def __init__(self, input_size=13, layer1=64, layer2=32, activation_fn=nn.ReLU, use_mask=True, threshold=2):
        super().__init__()
        # input_size=13
        # layer1=64
        # layer2=32
        
        # Instantiate activation function
        self.activation = activation_fn()

        # Build model
        self.model = nn.Sequential(
            MaskedLinear(input_size, layer1, threshold=threshold, use_mask=use_mask),
            self.activation,
            MaskedLinear(layer1, layer2, threshold=threshold, use_mask=use_mask),
            self.activation,
            MaskedLinear(layer2, 1, threshold=threshold, use_mask=use_mask)
        )

    def forward(self, x):
        return self.model(x)



In [7]:
t = 2
activations = [nn.Tanh, nn.Sigmoid, nn.ReLU]
test_losses = []
train_losses = []
models = []
the_lambda = 0

for act in activations:
    test_loss = []
    train_loss = []
    model = FlexibleMLP(threshold=t, activation_fn=act, use_mask=False) # MyActivation
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)

    # 4. Train
    for epoch in range(100):
        model.train()
        optimizer.zero_grad()
        output = model(X_train)
        loss = criterion(output, y_train)

        if the_lambda > 0:
            norm = sum(p.pow(2.0).sum() for p in model.parameters())
            loss += the_lambda*norm
        
        loss.backward()
        optimizer.step()
        
        if epoch % 10 == 0:
            model.eval()
            val_loss = criterion(model(X_test), y_test).item()
            print(f"Epoch {epoch}, Train Loss: {loss.item():.4f}, Test Loss: {val_loss:.4f}")
            test_loss.append([val_loss])
            train_loss.append(loss.item())
            
    test_losses.append(test_loss)
    train_losses.append(train_loss)
    models.append(model)



Epoch 0, Train Loss: 1.2901, Test Loss: 0.4396
Epoch 10, Train Loss: 0.5453, Test Loss: 0.2876
Epoch 20, Train Loss: 0.4358, Test Loss: 0.2613
Epoch 30, Train Loss: 0.3516, Test Loss: 0.2419
Epoch 40, Train Loss: 0.3040, Test Loss: 0.2247
Epoch 50, Train Loss: 0.2716, Test Loss: 0.2136
Epoch 60, Train Loss: 0.2482, Test Loss: 0.1942
Epoch 70, Train Loss: 0.2324, Test Loss: 0.1729
Epoch 80, Train Loss: 0.2224, Test Loss: 0.1585
Epoch 90, Train Loss: 0.2160, Test Loss: 0.1507
Epoch 0, Train Loss: 1.4419, Test Loss: 0.9240
Epoch 10, Train Loss: 0.9959, Test Loss: 0.7086
Epoch 20, Train Loss: 0.7073, Test Loss: 0.4369
Epoch 30, Train Loss: 0.5504, Test Loss: 0.3199
Epoch 40, Train Loss: 0.4839, Test Loss: 0.2755
Epoch 50, Train Loss: 0.4457, Test Loss: 0.2702
Epoch 60, Train Loss: 0.4280, Test Loss: 0.2668
Epoch 70, Train Loss: 0.4174, Test Loss: 0.2660
Epoch 80, Train Loss: 0.4111, Test Loss: 0.2625
Epoch 90, Train Loss: 0.4072, Test Loss: 0.2622
Epoch 0, Train Loss: 1.4699, Test Loss: 0.

In [5]:
models

[FlexibleMLP(
   (activation): Tanh()
   (model): Sequential(
     (0): MaskedLinear(in_features=13, out_features=64, bias=True)
     (1): Tanh()
     (2): MaskedLinear(in_features=64, out_features=32, bias=True)
     (3): Tanh()
     (4): MaskedLinear(in_features=32, out_features=1, bias=True)
   )
 ),
 FlexibleMLP(
   (activation): Sigmoid()
   (model): Sequential(
     (0): MaskedLinear(in_features=13, out_features=64, bias=True)
     (1): Sigmoid()
     (2): MaskedLinear(in_features=64, out_features=32, bias=True)
     (3): Sigmoid()
     (4): MaskedLinear(in_features=32, out_features=1, bias=True)
   )
 ),
 FlexibleMLP(
   (activation): ReLU()
   (model): Sequential(
     (0): MaskedLinear(in_features=13, out_features=64, bias=True)
     (1): ReLU()
     (2): MaskedLinear(in_features=64, out_features=32, bias=True)
     (3): ReLU()
     (4): MaskedLinear(in_features=32, out_features=1, bias=True)
   )
 )]

In [6]:
entropies = []
layer_count = 0

for name, layer in models[2].named_modules():
    if isinstance(layer, (nn.Conv2d, MaskedLinear, nn.Linear)):
        layer_count += 1
        print(layer, layer.entropy)
        entropies.append(layer.entropy)

#print(f"Epoch {epoch + 1}, Weight Entropies: {entropies}")
print(entropies)

MaskedLinear(in_features=13, out_features=64, bias=True) 2.5678496
MaskedLinear(in_features=64, out_features=32, bias=True) 2.7266998
MaskedLinear(in_features=32, out_features=1, bias=True) 1.3361826
[np.float32(2.5678496), np.float32(2.7266998), np.float32(1.3361826)]


In [9]:
class FlexibleMLPC(nn.Module):
    def __init__(self, input_size=28*28, layer1=256, layer2=128, num_classes=10 ,activation_fn=nn.ReLU, use_mask=True, threshold=2):
        super().__init__()
        # input_size=13
        # layer1=64
        # layer2=32
        
        # Instantiate activation function
        self.activation = activation_fn()

        # Build model
        self.model = nn.Sequential(
            MaskedLinear(input_size, layer1, threshold=threshold, use_mask=use_mask),
            self.activation,
            MaskedLinear(layer1, layer2, threshold=threshold, use_mask=use_mask),
            self.activation,
            MaskedLinear(layer2, num_classes, threshold=threshold, use_mask=use_mask)
        )

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten from [B, 1, 28, 28] to [B, 784]
        return self.model(x)

In [10]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
# 2. Load MNIST
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
train_dataset = datasets.MNIST(root="./data", train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root="./data", train=False, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000)

images, labels = next(iter(test_loader))
input_dim1 = images.shape[1]*images.shape[2]*images.shape[3]


In [29]:
# 3. Initialize model, loss, optimizer
t = 6
activations = [nn.ReLU]
test_losses = []
train_losses = []
models = []
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
the_lambda = 1e-6

for act in activations:
    model = FlexibleMLPC(threshold=t, input_size=input_dim1, activation_fn=act, use_mask=False).to(device)  # Swap MyActivation with nn.ReLU, nn.Tanh, etc.
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.01)
    
    # 4. Training loop
    for epoch in range(5):
        model.train()
        total_loss = 0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
    
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            
            if the_lambda > 0:
                norm = sum(p.abs().sum() for p in model.parameters())
                loss += the_lambda*norm
                
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
    
        print(f"Epoch {epoch+1}, Loss: {total_loss / len(train_loader):.4f}")
    models.append(model);
    # 5. Evaluation
    model.eval()
    correct = total = 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            preds = outputs.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
    
    print(f"Test Accuracy: {correct / total:.4f}")

Epoch 1, Loss: 0.3113
Epoch 2, Loss: 0.2279
Epoch 3, Loss: 0.2221
Epoch 4, Loss: 0.1991
Epoch 5, Loss: 0.2053
Test Accuracy: 0.9584


In [30]:
entropies = []
layer_count = 0

for name, layer in models[0].named_modules():
    if isinstance(layer, (nn.Conv2d, MaskedLinear, nn.Linear)):
        layer_count += 1
        print(layer, layer.entropy)
        entropies.append(layer.entropy)

#print(f"Epoch {epoch + 1}, Weight Entropies: {entropies}")
print(entropies)

MaskedLinear(in_features=784, out_features=256, bias=True) 10.796702
MaskedLinear(in_features=256, out_features=128, bias=True) 8.649537
MaskedLinear(in_features=128, out_features=10, bias=True) 5.943685
[np.float32(10.796702), np.float32(8.649537), np.float32(5.943685)]


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.utils.weight_norm as weight_norm
import torch.nn.functional as F
import numpy as np
import scipy as sp
from scipy.stats import norm
from scipy.stats import shapiro
import matplotlib.pyplot as plt

In [2]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# # Data preprocessing
# transform = transforms.Compose([
#     transforms.ToTensor(),
#     transforms.Normalize((0.1307,), (0.3081,))  # Mean and std of MNIST
# ])

# # Load datasets
# train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
# test_dataset = datasets.MNIST(root='./data', train=False, download=True, transform=transform)

# train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
# test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)
# Define transforms (including normalization)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # Normalize to [-1, 1]
])

# Load training set
train_dataset = torchvision.datasets.CIFAR10(
    root='./data', train=True, download=True, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)

# Load test set
test_dataset = torchvision.datasets.CIFAR10(
    root='./data', train=False, download=True, transform=transform)

test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=64, shuffle=False)

# Class labels
classes = ('airplane', 'automobile', 'bird', 'cat', 'deer',
           'dog', 'frog', 'horse', 'ship', 'truck')

In [6]:
# Define CNN model
class CNN(nn.Module):
    def __init__(self, activation_fn=nn.ReLU, use_mask=True):
        super(CNN, self).__init__()
        self.activation = activation_fn()
        self.use_mask = use_mask
        
        # MNIST
        # self.model = nn.Sequential(
        #     nn.Conv2d(1, 32, kernel_size=3, padding=1),
        #     self.activation,
        #     nn.MaxPool2d(2),                            # 28x28 → 14x14

        #     nn.Conv2d(32, 64, kernel_size=3, padding=1),  # 14x14 → 14x14
        #     self.activation,
        #     nn.MaxPool2d(2),                              # 14x14 → 7x7

        #     nn.Flatten(),
        #     nn.Linear(64 * 7 * 7, 128),
        #     self.activation,
        #     nn.Linear(128, 10)
        # )

        #CIFAR-10
        self.model = nn.Sequential(
            MaskedConv2d(3, 64, 3, padding=1, use_mask=self.use_mask), 
            # nn.Conv2d(3, 64, 3, padding=1),     # doubled from 32 to 64
            self.activation,
            # nn.Conv2d(64, 64, 3, padding=1),    # extra conv layer
            # EntropyConv2d(64, 64, 3, padding=1), 
            # self.activation,
            nn.MaxPool2d(2, 2),

            MaskedConv2d(64, 128, 3, padding=1, use_mask=self.use_mask),
            # nn.Conv2d(64, 128, 3, padding=1),   # doubled from 64 to 128
            self.activation,
            # nn.Conv2d(128, 128, 3, padding=1),  # extra conv layer
            # EntropyConv2d(128, 128, 3, padding=1),
            # self.activation,
            nn.MaxPool2d(2, 2),

            nn.Flatten(),

            MaskedLinear(128 * 8 * 8 , 512, use_mask=self.use_mask),
            # nn.Linear(128 * 8 * 8, 512),        # doubled from 256 to 512
            self.activation,
            # nn.Linear(512, 512),                # extra dense layer
            # EntropyLinear(512, 512),
            # self.activation,
            MaskedLinear(512, 10, use_mask=self.use_mask)
            # nn.Linear(512, 10)
        )
        # self.features = nn.Sequential(
        #     MaskedConv2d(3, 64, 3, padding=1, use_mask=True),
        #     self.activation,
        #     nn.MaxPool2d(2, 2),
        #     MaskedConv2d(64, 128, 3, padding=1, use_mask=True),
        #     self.activation,
        #     nn.MaxPool2d(2, 2),
        # )

        # # # Calculate flattened size
        # # with torch.no_grad():
        # #     dummy_input = torch.zeros(1, 3, 32, 32)
        # #     dummy_output = self.features(dummy_input)
        # #     flattened_size = dummy_output.view(1, -1).shape[1]
        # #     print(flattened_size)
        # self.classifier = nn.Sequential(
        #     nn.Flatten(),
        #     MaskedLinear(flattened_size, 512, use_mask=True),
        #     self.activation,
        #     MaskedLinear(512, 10, use_mask=True),
        # )

        # self.model = nn.Sequential(self.features, self.classifier)
    
    def forward(self, x):
        # print(x.shape)
        return self.model(x)


class MaskedConv2d(nn.Conv2d):
    def __init__(self, in_channels, out_channels, kernel_size, bias=True, stride=1, padding=0, threshold=2, use_mask=True):
        super().__init__(in_channels, out_channels, kernel_size)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size)
        self.stride = stride
        self.padding = padding
        self.threshold = -torch.log10(torch.tensor(threshold))
        self.use_mask = use_mask
        self.entropy = None
        
    def wval(self, M):
        L = M.shape[1] if M.ndim == 2 else M.shape[1] * M.shape[2] * M.shape[3]
        alpha = torch.as_tensor(1/2, dtype=M.dtype, device=M.device)
        beta = torch.as_tensor((L-1)/2, dtype=M.dtype, device=M.device)
    
        # Normalize each weight vector going from a node in A to layer B
        M_normed = F.normalize(M, p=2, dim=1)
        
        M_clamped = torch.clamp(M_normed**2, min=1e-8, max=1-1e-8)
    
        # Compute the incomplete beta function manually using Beta distribution
        # This is done using a formula for the Beta CDF (this is a simple approximation)
        B = torch.exp(
            torch.lgamma(alpha + beta) - torch.lgamma(alpha) - torch.lgamma(beta)
        )  # Beta function normalization constant
        cdf = (M_clamped**(alpha - 1)) * ((1 - M_clamped)**(beta - 1)) / B
        beta_surv = 1 - cdf  # Survival function
        
        w_val = -torch.log10(beta_surv)
        assert w_val.shape == M.shape

        return w_val

    def forward(self, input):
        # print(input.shape)
        if self.use_mask:
            # Compute significance
            significance = self.wval(self.weight)
    
            # Create binary mask
            mask = (significance >= self.threshold).float()
    
            # Apply the mask (zero out insignificant weights)
            masked_weight = self.weight * mask
        else:
            masked_weight = self.weight

        flat_M = torch.flatten(masked_weight)
        flat_M_2 = flat_M.pow(2)
        self.entropy = sp.stats.entropy(flat_M_2.detach().numpy())
            
        return F.conv2d(input, masked_weight, self.bias, stride=self.stride, padding=self.padding)



In [18]:
# Initialize model, loss, optimizer
#model_A = CNN(MyActivation(2)).to(device)
model = CNN(nn.ReLU, use_mask=False).to(device)
the_lambda = 1e-6
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
epochs = 3
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for i, (images, labels) in enumerate(train_loader, 0):
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        output = model(images)
        loss = criterion(output, labels)
        if the_lambda > 0:
            norm = sum(p.abs().sum() for p in model.parameters())
            loss += the_lambda*norm
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        if i % 100 == 99:  # print every 100 mini-batches
            print(f'Epoch {epoch + 1}, Batch {i + 1}, Loss: {running_loss / 100:.3f}')
            running_loss = 0.0

    #print(f"Epoch {epoch+1}, Loss: {total_loss:.4f}")

# Evaluation
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        predicted = outputs.argmax(dim=1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")


Epoch 1, Batch 100, Loss: 1.798
Epoch 1, Batch 200, Loss: 1.441
Epoch 1, Batch 300, Loss: 1.302
Epoch 1, Batch 400, Loss: 1.213
Epoch 1, Batch 500, Loss: 1.152
Epoch 1, Batch 600, Loss: 1.114
Epoch 1, Batch 700, Loss: 1.055
Epoch 2, Batch 100, Loss: 0.914
Epoch 2, Batch 200, Loss: 0.898
Epoch 2, Batch 300, Loss: 0.885
Epoch 2, Batch 400, Loss: 0.888
Epoch 2, Batch 500, Loss: 0.878
Epoch 2, Batch 600, Loss: 0.872
Epoch 2, Batch 700, Loss: 0.825
Epoch 3, Batch 100, Loss: 0.706
Epoch 3, Batch 200, Loss: 0.677
Epoch 3, Batch 300, Loss: 0.677
Epoch 3, Batch 400, Loss: 0.706
Epoch 3, Batch 500, Loss: 0.720
Epoch 3, Batch 600, Loss: 0.687
Epoch 3, Batch 700, Loss: 0.681
Test Accuracy: 73.28%


In [19]:
entropies = []
layer_count = 0

for name, layer in model.named_modules():
    if isinstance(layer, (nn.Conv2d, MaskedLinear, nn.Linear, MaskedConv2d)):
        layer_count += 1
        #print(layer, layer.entropy)
        entropies.append(layer.entropy)

#print(f"Epoch {epoch + 1}, Weight Entropies: {entropies}")
print(entropies)

[np.float32(6.9278994), np.float32(9.817288), np.float32(13.1984215), np.float32(7.493392)]


In [None]:
type(model)

In [None]:
import matplotlib.pyplot as plt

def visualize_conv_weights(layer, title="Conv Filters", max_filters=64):
    weights = layer.weight.data.cpu().numpy()  # Shape: (out_channels, in_channels, H, W)
    
    # Normalize for visualization
    weights = (weights - weights.min()) / (weights.max() - weights.min())

    n_filters = min(weights.shape[0], max_filters)
    ncols = 8
    nrows = (n_filters + ncols - 1) // ncols

    fig, axes = plt.subplots(nrows, ncols, figsize=(ncols * 2, nrows * 2))
    fig.suptitle(title)

    for i in range(n_filters):
        ax = axes[i // ncols, i % ncols]
        # Collapse to RGB by transposing
        img = weights[i].transpose(1, 2, 0)  # (H, W, C)
        ax.imshow(img)
        ax.axis('off')
        
    for i in range(n_filters, nrows * ncols):
        axes[i // ncols, i % ncols].axis('off')

    plt.tight_layout()
    plt.show()


In [None]:
visualize_conv_weights(model.model[0])

In [None]:
def plot_weight_distribution(layer, title="Weight Magnitude Histogram"):
    weights = layer.weight.data.cpu().numpy().flatten()
    plt.hist(weights, bins=100)
    plt.title(title)
    plt.xlabel("Weight value")
    plt.ylabel("Frequency")
    plt.grid(True)
    plt.show()

plot_weight_distribution(model.model[0], "First Conv Layer")
plot_weight_distribution(model.model[-1], "Final Linear Layer")
