Notebook che definisce la rete neurale per lo Stochastic Quantum Walk.

Gli unici pacchetti "non standard" sono torchdiffeq e opt_einsum. Il primo dei due è indispensabile. Il secondo introduce una versione ottimizzata della funzione torch.einsum. Se quindi per qualche ragione non si vuol installare opt_einsum nell'environment conda, è sufficiente sostituire "contract" con "torch.einsum" nelle due occasioni in cui questa viene chiamata all'interno della classe LindbladFunc.

In [1]:
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt
import math
from torchdiffeq import odeint_adjoint as odeint
from opt_einsum import contract

In [None]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "cpu"
)
print(f"Using {device} device")

I dataset sono caricati nei dataloader. MNIST è già encodato a 64 feature e paddato con gli ultimi 10 pixel pari a 0.

In [None]:
class StochasticMNIST(Dataset):
    def __init__(self, data, targets):
        self.data = data
        self.targets = targets

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.targets[idx]

# Carico le immagini encodate
train_data, train_targets = torch.load("qsw_train_data_encoded64.pt", weights_only=False)
test_data, test_targets = torch.load("qsw_test_data_encoded64.pt", weights_only=False)

# Creo i dataset
qsw_train_data = StochasticMNIST(train_data, train_targets)
qsw_test_data = StochasticMNIST(test_data, test_targets)

batch_size = 128

qsw_train_dataloader = DataLoader(qsw_train_data, batch_size=batch_size, shuffle=True, num_workers=32)
qsw_test_dataloader = DataLoader(qsw_test_data, batch_size=batch_size, shuffle=True, num_workers=32)

for X, y in qsw_train_dataloader:
    print(f'Shape dataset: {X.shape}, {y.shape}')
    print(f'Data type: {X.dtype}, {y.dtype}')
    print(f'Padding: {X[0,-10:]}')
    break

Definisco la classe SQWalker del modello e la classe LindbladFunc. Quest'ultima è necessaria per implementare correttamente la funzione odeint_adjoint, la quale richiede che la rhs dell'ODE sia un nn.Module.

In [None]:
class LindbladFunc(nn.Module):
    def __init__(self, N, s, l, p, device):
        factory_kwargs = {"device": device, "dtype": torch.complex64}
        super().__init__()
        # Inizializzo i parametri del modello.
        self.n = math.isqrt(N)
        self.N = N
        self.s = s
        self.p = p

        # Costruisco la matrice di transizione e la popolo con i pesi da trainare.
        # Per semplicità scelgo i primi 10 nodi come quelli collegati ai sink.
        self.sinkrates = nn.parameter.Parameter(
            torch.empty((1, self.s), requires_grad=True, **factory_kwargs)
        )
        self.mask_sinks = torch.zeros((self.s, self.N), **factory_kwargs)
        self.mask_sinks.fill_diagonal_(1)

        # Costruisco dei tensori ausiliari per l'integrazione.
        self.basis = torch.eye(self.N + self.s, **factory_kwargs)
        self.B = torch.zeros(self.N + self.s, self.N + self.s, **factory_kwargs)
        self.B[:self.N, :self.N] = 0.5 * self.p * torch.eye(self.N, **factory_kwargs)

        # Definisco la matrice di adiacenza, di transizione, laplaciana, hamiltoniana e un'altra di transizione ma estesa.
        self.A = torch.zeros((self.N, self.N), **factory_kwargs)
        self.gamma = torch.zeros((self.s, self.N), **factory_kwargs)
        self.laplacian = torch.zeros(self.N + self.s, self.N + self.s, **factory_kwargs)
        self.H = torch.eye(self.N + self.s, **factory_kwargs)
        self.R = torch.zeros(self.N + self.s, self.N + self.s, **factory_kwargs)

        # Costruisco la matrice del lattice e la popolo con i pesi da trainare.
        self.mask = self._create_mask(l, device)
        self.expressivity = torch.count_nonzero(self.mask)
        self.weights = nn.parameter.Parameter(
            torch.empty((1, self.expressivity), requires_grad=True, **factory_kwargs)
        )
        self.reset_parameters()
        
    def _create_mask(self, l, device):
        '''Funzione che restituisce una maschera booleana per le entrate non nulle di una matrice di adiacenza di un lattice.
        Non saprei dire quanto sia efficiente, ma è il modo più carino che mi è venuto in mente.'''
        moves = torch.ones(2*l+1, 2*l+1, device=device)
        moves[l,l] = 0
        M = nn.functional.pad(moves, (self.n-l-1, self.n-l-1, self.n-l-1, self.n-l-1), 'constant', 0)

        m = torch.zeros((self.N, self.N), device=device)
        for i in range(self.n):
            for j in range(self.n):
                m[i*self.n+j] = M[self.n-1-i:2*self.n-1-i, self.n-1-j:2*self.n-1-j].flatten()
        return m == 1
    
    def reset_parameters(self):
        nn.init.kaiming_uniform_(self.weights, a=math.sqrt(5))
        nn.init.kaiming_uniform_(self.sinkrates, a=math.sqrt(5))

    def update_walker(self):
        '''Funzione che aggiorna i valori dei tensori della rhs nell'eq. di Lindblad'''
        # Aggiorno le matrici A e gamma. Elimino la parte reale di entrambe e simmetrizzo A.
        self.A[self.mask] = self.weights
        self.A = self.A - 1j * self.A.imag
        self.A = 0.5*(self.A + self.A.T)**2

        self.gamma[self.mask_sinks == 1] = self.sinkrates
        self.gamma = self.gamma - 1j * self.gamma.imag
        self.gamma = (self.gamma)**2

        # Calcolo le connettività della matrice A.
        degA = torch.sum(self.A, dim=0) 
            
        # Aggiorno la laplaciana.
        self.laplacian[:self.N, :self.N] = self.p * self.A / degA
        self.laplacian[self.N:, :self.N]= self.gamma

        # Calcolo la matrice a del foglio LaTeX.
        self.R[self.N:, :self.N] = self.gamma
        self.a = 0.5 * contract('ji,il,im->lm', self.R, self.basis, self.basis) + self.B

        # Aggiorno l'hamiltoniana.
        self.H[:self.N, :self.N] = (1 - self.p) * self.A
    
    def forward(self, t, rho):
        # Calcolo drho/dt.
        drho = -1j * (torch.matmul(self.H, rho) - torch.matmul(rho, self.H))
        drho = drho + contract('ij,il,im,bjj->blm', self.laplacian, self.basis, self.basis, rho)
        drho = drho - (torch.matmul(self.a, rho) + torch.matmul(rho, self.a))
        return drho
    
    def extra_repr(self) -> str:
        return "in_features: {a}, out_features: {b}\n Expressivity: {c:.2f}%".format(a = self.N, b = self.s, c = self.expressivity*100/self.N**2)

class SQWalker(nn.Module):
    def __init__(self, in_features, out_features, dt=1e-3, steps=1000, l = 1, noise=1., device = None):
        super().__init__()
        self.s = out_features
        self.lindblad = LindbladFunc(in_features, out_features, l, noise, device)
        self.t = torch.arange(0, dt*steps, dt, device=device)

    def forward(self, input, history = False):
        self.lindblad.update_walker()

        # Costruisco la matrice densità iniziale popolando la diagonale con l'immagine.
        rho0 = torch.diag_embed(input)

        # Integro con Runge-Kutta al quart'ordine. Se non specificassi il metodo odeint userebbe 
        # Dormand-Price, che è un metodo adattivo più lento.
        result = odeint(self.lindblad, y0=rho0, t=self.t, method = 'rk4')

        if history == True:
            return result
        output = result[-1].diagonal(dim1=-2, dim2=-1)[:, -self.s:]
        
        return output.real

In [None]:
model = SQWalker(64, 10, dt = 0.001, steps = 1000, l = 2, noise = 1., device = device).to(device)
print(model)

In [31]:
loss_fn = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [32]:
def train(dataloader, model, loss_fn, optimizer, voice = True):
    size = len(dataloader.dataset)
    model.train()
    current = 0
    i = 0
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        current += len(X)
        if current >= i*10000 and voice:
            i += 1
            loss = loss.item()
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [33]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return 100*correct, test_loss

In [None]:
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(qsw_train_dataloader, model, loss_fn, optimizer)
    test(qsw_test_dataloader, model, loss_fn)
print("Done!")

In [None]:
torch.save(model.state_dict(), "model.pth")
print("Saved PyTorch Model State to model.pth")

In [None]:
model = SQWalker(64, 10, dt = 0.001, steps = 1000, l = 2, noise = 1., device=device).to(device)
model.load_state_dict(torch.load("model.pth", weights_only=True))

In [None]:
classes = [
    "0",
    "1",
    "2",
    "3",
    "4",
    "5",
    "6",
    "7",
    "8",
    "9",
]

model.eval()
n_datas = len(test_data)

c = torch.randint(n_datas,(1,)).item()
x, y = qsw_test_data[c]
with torch.no_grad():
    x = x.to(device)
    pred = model(x.unsqueeze(0), history=True).squeeze(1)
    sinks = pred[-1].diagonal(dim1=-2, dim2=-1)[-10:].real
    predicted, actual = classes[sinks.argmax()], classes[y]

print(f'Convergenza: {(100*torch.sum(sinks)/torch.sum(x.real)):.2f}%')
print(f'Stato finale: {sinks}')