# Dual Tomographic Compression (DTC): Generate Data

## Gibbs randomness-compression proposition: An eﬃcient deep learning

    Mehmet Suezen  
    (c) 2025   
    License  Apache 2.0

## Versions 

```
# 
# Packages used Python 2.10.13
#
torch==2.6.0
torchvision==0.21.0
matplotlib==3.10.1
numpy==1.26.2
cvxpy==1.6.4
scipy==1.15.2
dill==0.4.0
statsmodels==0.14.4
```

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import numpy as np

# Import packages.
import cvxpy as cp
import numpy as np
import matplotlib.pyplot as plt
import scipy
import matplotlib
import numpy
import cvxpy
import itertools

In [None]:
# ('2.6.0', '0.21.0', '3.10.1', '1.26.2', '1.6.4', '1.15.2')
torch.__version__, torchvision.__version__, matplotlib.__version__, numpy.__version__, cvxpy.__version__, scipy.__version__

In [None]:
import sys

sys.version_info

In [None]:
batch_size = 512
learning_rate = 0.001

transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]  #  mu/sigma
)

In [None]:
train_dataset = datasets.MNIST(root=".", train=True, download=True, transform=transform)
test_dataset = datasets.MNIST(root=".", train=False, download=True, transform=transform)

g_cpu = torch.Generator()
g_cpu.manual_seed(42424242)
        
train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=False, generator=g_cpu
)
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=batch_size, shuffle=False, generator=g_cpu
)

In [None]:
i = 0
for x, y in train_loader:
    i = i + 1
i
print(y)

In [None]:
class BaseM(nn.Module): # Fixed seed
    def __init__(self, nsize, initial_type="xavier", seed=42424242): # CPU
        super(BaseM, self).__init__()
        self.input = nn.Flatten()
        self.bnorm = nn.BatchNorm1d(28 * 28)
        self.linear1 = nn.Linear(28 * 28, nsize, bias=False)
        self.linear2 = nn.Linear(nsize, 10, bias=False)
        self.softmax_out = nn.LogSoftmax(dim=0)

        g_cpu = torch.Generator()
        g_cpu.manual_seed(seed)
        if initial_type == "xavier":
            _ = torch.nn.init.xavier_normal_(self.linear1.weight, generator=g_cpu)
            _ = torch.nn.init.xavier_normal_(self.linear2.weight, generator=g_cpu)

        if initial_type == "he":
            _ = torch.nn.init.kaiming_normal_(
                self.linear1.weight, mode="fan_in", nonlinearity="relu", generator=g_cpu
            )
            _ = torch.nn.init.kaiming_normal_(
                self.linear1.weight, mode="fan_in", nonlinearity="relu", generator=g_cpu
            )

    def forward(self, image_mnist):
        input_flat = self.input(image_mnist)
        linear1 = self.linear1(input_flat)
        linear1_relu = F.relu(linear1)
        linear2 = self.linear2(linear1_relu)
        output = self.softmax_out(linear2)
        return output

In [None]:
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, 1)
    return torch.sum(preds == labels).item() / len(labels)

In [None]:
def test(model, device, test_loader, criterion):
    model.eval()
    test_loss = 0.0
    test_acc = 0.0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            test_acc += accuracy(outputs, labels)
    # print(f'Test Loss: {test_loss / len(test_loader):.4f}, Test Accuracy: {test_acc / len(test_loader):.4f}')
    return test_acc / len(test_loader)

In [None]:
def test_batch(model, device, test_loader, criterion, nbatch=100):
    model.eval()
    test_loss = 0.0
    test_acc = 0.0
    nb = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            test_acc += accuracy(outputs, labels)
            nb = nb + 1
            if nb > nbatch:
                break
    # print(f'Test Loss: {test_loss / len(test_loader):.4f}, Test Accuracy: {test_acc / len(test_loader):.4f}')
    return test_acc / len(test_loader)

In [None]:
def model_train(model_dyn, number_of_batches=100):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model_dyn.parameters(), lr=learning_rate)
    test_acc = test(model_dyn, device, test_loader, criterion)
    i = 0
    for _, (inputs, labels) in itertools.cycle(enumerate(train_loader)):
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model_dyn(inputs)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        i = i + 1
        if i == number_of_batches:
            test_acc = test(model_dyn, device, test_loader, criterion)
            return model_dyn, test_acc
    return model_dyn, test_acc

In [None]:
# Now inverse
def relative_error(x, y):
    """
    Calculate the relative error between two vectors.

    Parameters:
    x (numpy.ndarray): The first vector.
    y (numpy.ndarray): The second vector.

    Returns/:
    float: The relative error between the two vectors.
    """
    return np.linalg.norm(x - y) / np.linalg.norm(y)


def mape_vectorized_v2(a, b):
    mask = a != 0
    return (np.fabs(a - b) / a)[mask].mean()


def generate_dct_matrix(n, dct_type=1):
    return scipy.fftpack.dct(np.eye(n), norm="ortho", axis=0, type=dct_type)


def cs_inverse(x_org, sparsity_level=0.05, Lambda=1.5, seed=42424242):
    np.random.seed(seed=seed)
    m = len(x_org)
    # Up to Measurements  M > K LOG (N/K) , K sparse components
    n = int(m * (1.0 - sparsity_level))  #
    #
    Phi = np.random.normal(size=[n, m])  # Gaussian random (measurement matrix) ;
    # Theta = Phi  * Psi  (Psi   sparsification matrix)
    # Recall https://users.soe.ucsc.edu/~afletcher/EE293/Week1Readings/Papers_Week1_and_Week2/Baraniuk_SPMag2007.pdf
    Psi = generate_dct_matrix(m)
    Theta = np.matmul(Phi, Psi)
    # Define and solve the CVXPY problem.
    weights_sparse = cp.Variable(m)
    y_measurement = np.matmul(Phi, x_org)  # make a measurement
    cost = cp.sum_squares(
        Phi @ Psi @ weights_sparse - y_measurement
    ) + Lambda * cp.norm1(weights_sparse)
    prob = cp.Problem(cp.Minimize(cost))
    _ = prob.solve(solver="SCS") #  SCS,  
    weights_sparse_value = weights_sparse.value
    x_reconstructed = np.matmul(Psi, weights_sparse_value)
    mape = mape_vectorized_v2(x_reconstructed, x_org)
    return weights_sparse_value, x_reconstructed, mape, Theta, y_measurement

In [None]:
def prune_model(
    model_org, sparsity_level=0.05, cs=False, random_prune=False, Lambda=1.5
):  #  defaults to magnitute prune
    l1_weights = model_org.state_dict()["linear1.weight"]
    l2_weights = model_org.state_dict()["linear2.weight"]
    A1 = np.array(np.array(l1_weights).sum(axis=1))
    A2 = np.array(np.array(l2_weights).sum(axis=0))
    w12 = A1 + A2
    mape1 = 0.0
    mape2 = 0.0
    Theta1 = -1  # if Not CS
    Theta2 = -1
    y1 = -1 
    y2 = -1
    if cs:
        weights_sparse_value1, _, mape1, Theta1, y1 = cs_inverse(
            A1, sparsity_level=sparsity_level, Lambda=Lambda
        )
        weights_sparse_value2, _, mape2, Theta2, y2 = cs_inverse(
            A2, sparsity_level=sparsity_level, Lambda=Lambda
        )
        w12 = weights_sparse_value1 + weights_sparse_value2
    # find units to  magnitute clip (on CS weights or learned weights)
    keep_indices = np.where(np.abs(w12) < np.quantile(np.abs(w12), sparsity_level))[0]
    keep_id_length = len(keep_indices) 
    if random_prune:
       np.random.seed(42424242)
       keep_indices = np.random.choice(len(w12), keep_id_length, replace=False)
    sparse_model = BaseM(nsize=len(keep_indices))
    model_weights = sparse_model.state_dict()
    model_weights["linear1.weight"] = l1_weights[keep_indices, :]
    model_weights["linear2.weight"] = l2_weights[:, keep_indices]
    sparse_model.load_state_dict(model_weights)
    return sparse_model, mape1, mape2, w12, Theta1, Theta2, y1, y2

In [None]:
#
# CS Inverse: 90% sparsity 
# Iterative Sparsification and training 
#
def iterative_train_prune(
    model,
    is_cs=False,
    random_prune=False,
    number_of_batches=100,
    stop_size=10,
    sparsity_level=0.99,
    Lambda=1.5,
):
    nsize_reduced = 512
    nsize = 512
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    criterion = nn.CrossEntropyLoss()
    test_errors = []
    sparsity_levels = []
    total_weights = []
    theta1 = []
    theta2 = []
    ymeasure1 = []
    ymeasure2 = []
    i = 0
    while nsize_reduced > stop_size:
        model, acc = model_train(model, number_of_batches=number_of_batches)
        model, mape1, mape2, w12, t1, t2, y1, y2 = prune_model(
            model,
            sparsity_level=sparsity_level,
            cs=is_cs,
            random_prune=random_prune,
            Lambda=Lambda,
        )
        nsize_reduced = model.state_dict()["linear1.weight"].shape[0]
        actual_sparsity_level = 1.0 - nsize_reduced / nsize
        # print(f"iter-batch-group {i} nsize_reduced {nsize_reduced}")
        # print(
        #     f"      mape1={mape1} mape2={mape2} acc={acc} actual_sparsity_level={actual_sparsity_level}"
        # )

        # print("  ")
        # print("  Test on sparse")
        # print("  ")
        test_error = test(model, device, test_loader, criterion)
        test_errors.append(test_error)
        sparsity_levels.append(actual_sparsity_level)
        total_weights.append(w12)
        theta1.append(t1)
        theta2.append(t2)
        ymeasure1.append(y1)
        ymeasure2.append(y2)
        print(i, test_error, model.state_dict()["linear1.weight"].shape[0], actual_sparsity_level)
        i = i + 1
    return sparsity_levels, test_errors, total_weights, theta1, theta2, ymeasure1, ymeasure2

In [None]:
# Basic model
nsize = 512
batch_size = batch_size
learning_rate = 0.001
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.CrossEntropyLoss()
model0 = BaseM(nsize=nsize).to("cpu")
model, acc = model_train(model0, number_of_batches=1)  #  
test_error_full = test(model, device, test_loader, criterion)
print("full model test error=", test_error_full)

In [None]:
# Seeds are set for initialisation and batches
# So results are deterministic on CPU

In [None]:
# Magnitute prune (default) : 
nrepeat = 1 
magnitute_prune_data = []
model = BaseM(nsize=nsize).to("cpu") # fixed seed
for _ in range(nrepeat):
    sparsity_levels_m, test_errors_m, _, _, _, _, _ = iterative_train_prune(
        model, sparsity_level=0.9, number_of_batches=200
    )
    magnitute_prune_data.append(
        (
            sparsity_levels_m,
            test_errors_m,
        )
    )

In [None]:
# random prune
nrepeat = 1
random_prune_data = []
model = BaseM(nsize=nsize).to("cpu") # fixed seed
for _ in range(nrepeat):
    sparsity_levels_m, test_errors_m, _, _, _, _, _ = iterative_train_prune(
        model, random_prune=True, sparsity_level=0.9, number_of_batches=200
    )
    random_prune_data.append(
        (
            sparsity_levels_m,
            test_errors_m,
        )
    )

In [None]:
# CS prune
nrepeat = 1 
cs_prune_data = []
model = BaseM(nsize=nsize).to("cpu") # fixed seed
for _ in range(nrepeat):
    sparsity_levels_m, test_errors_m, total_weights_cs, theta1, theta2, y1, y2 = iterative_train_prune(
            model, 
            is_cs=True,
            Lambda=0.3,
            sparsity_level=0.9,
            number_of_batches=200,
        )
    cs_prune_data.append(
        (sparsity_levels_m, test_errors_m, total_weights_cs, theta1, theta2, y1, y2)
    )

In [None]:
d2c_data = {
    "test_error_full": test_error_full,
    "magnitute_prune_data": magnitute_prune_data,
    "random_prune_data": random_prune_data,
    "cs_prune_data":cs_prune_data
}

import dill
dill.dump(d2c_data, open("d2c_data.dill", "wb"))

Q.E.D.