# Manifold Sculpting

In [5]:
import matplotlib.pyplot as plt
import torch

import torchvision
import torchvision.transforms as transforms

### Compute distances

### Test cells

In [9]:
k = 2
data = torch.Tensor([[1,1],[7,7],[9,9],[12,12], [5,5], [2,2]])
p = data[2]
sigma = 0.1
n_dim = 1
scale_factor = 1 # at first

data_size = torch.tensor(data.size())
m = data_size[0] # number of data points
p_size = data_size[1:] # size of the single datapoint
p_size0 = p_size - torch.ones_like(p_size) # to be used for indexing
print(m)
print(p_size)

tensor(6)
tensor([2])


In [50]:
## Test cell
a = torch.Tensor([1,2])
b = torch.Tensor([2,3])
c = a@b
d = c.item()
a.tolist()
a[0]

tensor(1.)

In [65]:
## Test cell
from math import acos
a = torch.Tensor([0,1])
b = torch.Tensor([1,1])
a /= torch.norm(a)
b /= torch.norm(b)
print(b)
c = torch.acos(a@b)
c

tensor([0.7071, 0.7071])


tensor(0.7854)

In [75]:
## Test cell
a = torch.Tensor([[1,1,0],
                  [1,4,0],
                  [0,0,3]])
eigenval, eigenvec = torch.linalg.eigh(-a)
eigenval = -eigenval
print(eigenval)
print(eigenvec)

tensor([4.3028, 3.0000, 0.6972])
tensor([[ 0.2898,  0.0000, -0.9571],
        [ 0.9571,  0.0000,  0.2898],
        [ 0.0000,  1.0000,  0.0000]])


In [110]:
## Test cell
a = [1,2]
if [0]:
    print("che tavanata")
while a:
    a.pop(0)
    print("ush")

che tavanata
ush
ush


In [121]:
## Test cell
a = torch.Tensor([[1,2,3,4],[5,6,7,8]])
indx = torch.multinomial(a, num_samples=1) # replacement?
print(indx)
print(a[0,indx[0]])
print(a[1,indx[1]])

tensor([[2],
        [3]])
tensor([3.])
tensor([8.])


# Class

In [6]:
class ManifoldSculpting():
    ''' 
    Dependencies: import torch
    '''
    def __init__(self, k=5, n_dim=2, niter=100, sigma=0.99, patience=30): # rotate = True
        # Implementation of the Manifold Sculpting algorithm in PyTorch

        # Hyperparameters of the algorithm
        # Only torch.tensor()
        self.k = k                    # -- number of neighbors considered
        self.n_dim = n_dim            # -- dimension of the searched manifold
        self.niter = niter            # -- 1st stopping criterion for the iterative cicle 
        self.sigma = sigma            # -- scaling factor at each iteration (for extra dimensions)
        self.scale_factor = 1         # -- cumulative scale factor
        self.patience = patience      # -- 2nd stopping criterion

    def transform(self, data):
        ####### MAIN #######

        ## ---- Import
        # In case of images or weirdly shaped input points
        if len(data.size()) > 2:
            flatten   = torch.nn.Flatten()
            self.data = flatten(data) 
            # self.original_p_size = data.size[1:]
        else:
            self.data = data
            
        self.p_size = self.data.size()[1]   # single point flattenend dimension 
        self.n_datapoints = data.size()[0]  # int

        ## ---- Compute neighbors relations
        self.dist, self.neighb    = self.neighb_distance()
        self.colinear, self.theta = self.colinear_neighb()
        
        self.avg_dist = torch.mean(self.dist)

        self.nudge = self.avg_dist

        ## ---- PCA transform
        self.data = self.pca_transform(self.data)
        
        # Distinguish dimensions to be scaled/preserved
        self.preserv_dim = torch.tensor(list(range(self.n_dim)))
        self.scaled_dim  = torch.tensor(list(range(self.n_dim, self.p_size)))

        ## ---- Iterative transformation
        epoch = 1
        
        # Adjust a bunch of times without comparing errors
        while self.scale_factor > 0.01: # can be tuned
            mean_error = self.step()
            epoch += 1

        epochs_since_improvement = 0
        best_error = torch.Tensor(float('inf'))

        # Continue adjusting, start comparing errors
        while (epoch < self.niter) and (epochs_since_improvement < self.patience):
            mean_error = self.step()

            if mean_error < best_error:
                best_error = mean_error
                self.best_data  = torch.clone(self.data)
                sefl.best_error = best_error
                epochs_since_improvement = 0
            else:
                epochs_since_improvement += 1
                
            epochs += 1

        ## DEBUG and monitoring
        self.elapsed_epochs = epoch
        self.last_error = mean_error



    def neighb_distance(self):
        '''
        Returns:
        - tensor.size() = (n_datapoints, k)
          (i,j) hosts the DISTANCE of point i to its j-th neighbor 
          with distances decreasing in j
        - tensor.size() = (n_datapoints, k) 
          (i,j) hosts the INDEX (relative to self.data) of the j-th neighbor of point i
        '''
        x2 = self.data * self.data
        x2 = x2.sum(axis=1)
        data_t = torch.transpose(self.data, 0, 1) # pb in dim >i (es foto...), credo funzioni cmq
        xx = self.data@data_t
        
        all_distances = torch.sqrt( x2 - 2*xx + x2.unsqueeze(dim=1) ) # they have different dimensions, we leverage pytorch default for the operations
        
        _, indices = torch.sort(dist)

        kneighb = indices[:, 1:self.k+1]
        all_distances   = torch.zeros_like(kneighb)

        # Cherry-pick neighbors distances (sorted)
        d = []  # For some reason it doesn't allow directly with tensors
        for i in range(self.n_datapoints):
            d.append( all_distances[kneighb[i][:], i] )
        
        kdist = torch.reshape( torch.cat( d, 0 ), (-1, self.k) )
        
        return kdist, kneighb

    
    def avg_neighb_distance(self):
        dist = neighbor_distance(self.data)
        avg_dist = torch.mean(dist)
        
        return avg_dist

    def colinear_neighb(self):
        '''
        For clarity:
        Variables with _idx  have values in (0, m-1) (selecting points from self.data)
        Variables with _kidx have values in (0, k-1)

        Returns:
        - tensor.size() = (n_datapoints, k)
        (i,j) hosts the ANGLE theta (i-j-l) of the most colinear point l to the couple i-j
        - tensor.size() = (n_datapoints, k)
        (i,j) hosts the NEIGHBOR INDEX of l with respect to the neighbourhood of j
        '''
        theta    = torch.ones((self.n_datapoints,self.k))
        colinear = torch.ones((self.n_datapoints,self.k))
        
        # Loop over data points
        for i_idx in range(self.n_datapoints):
            # Loop over neighbors of i
            for j_kidx, j_idx in enumerate(self.neighb[i_idx]):
                
                p2j = self.data[i_idx] - self.data[j_idx]
                p2j /= torch.norm(p2j)
        
                colinear_kidx =  torch.ones(self.k)
                cos_i2l = torch.ones(self.k)
                # Loop over neighbor points of j
                for l_kidx, l_idx in enumerate(self.neighb[j_idx]):
                    
                    p2l = self.data[l_idx] - self.data[j_idx]
                    p2l /= torch.norm(p2l)
                    cos_i2l[l_kidx] = p2l@c2j
                    
                # Extract the colinear angle and neighbor
                cos_max, colinear[i_idx, j_kidx] = torch.max(cos_i2l, dim=0)
                theta[i_idx, j_kidx] = torch.acos(cos_max)

        return colinear, theta
    
            
    def pca_transform():
        '''
        Returns
        - tensor.size() = (n_datapoints, p_size)
          the data linearly transformed to the principal components basis
        '''
        cov = torch.cov(self.data)
        eigenval, eigenvec = torch.linalg.eigh(-cov) # -cov because it outputs sorted descending eigenval
        eigenval = -eigenval
        pca_data = eigenvec@self.data
        
        return pca_data
        
    
    def compute_error(self, p, visited):
        '''
        Parameters:
        - p : (int)         index of the point
        - visited: (list)   list of already adjusted points
        
        Returs:
        - (float) error relative to the neighbourhood of p
        '''
        
        w = torch.ones(self.k)
        for j in range(self.k):
            w[j] = 10 if self.neighb[p,j] in visited else 1
    
        total_err = 0
        for i in range(self.k):
            # Extract indices
            n = self.neighb[p,i].item()
            c = self.colinear[p,i].item()

            # Compute theta_p2c, the angle in p-n-c
            p2n = self.data[p] - self.data[n]
            c2n = self.data[c] - self.data[n]
            p2n /= torch.norm(p2n)
            c2n /= torch.norm(c2n)
            theta_p2c = torch.acos(p2n@c2n)

            # Compute error
            err_dist = .5*(p2n - self.dist[p,i]) / self.avg_dist
            err_theta = (theta_p2c - self.theta[p,i])/3.1415926535
            
            total_err += w[i] * (err_dist*err_dist + err_theta*err_theta)
            
        return total_err
    
    
    def adjust_point(self, p, visited):
        '''
        Parameters:
        - p: (int) index of the point to be adjusted
        - visited: (list) list of already adjusted points in current epoch

        Returns:
        - number of hill descent steps
        - error for the adjusted point
        '''
        # Slightly randomize the entity of the update
        nudge = self.nudge * ( .6 + .4*torch.rand(1).item() ) # float
        
        s = -1 
        improved = True
        err = self.compute_error(p, visited)
        
        while (s<30) and improved:
            s += 1
            improved = False

            ## --- Downhill update
            # Loop over dimensions (try the same nudge for all of them)
            for d in self.preserv_dim:
                
                self.data[p,d] += nudge  # Try one direction
                new_err = self.compute_error(p, visited)
                
                if new_err >= err:
                    self.data[p,d] -= 2*nudge  # Try in the opposite
                    new_err = self.compute_error(p, visited)
                    
                if new_err >= err:
                    self.data[p,d] += nudge # Stay put
                    
                else:
                    err = newerr
                    improved = True
                    
        return s, err


    def step(self):
        '''
        Returns:
        - (float) mean error after adjustment
        '''
        # ---- a)
        # (a,b refer to pseudo-code (Fig 2.2) in original paper)
        
        self.scale_factor *= self.sigma
        
        # Downscale component along scaled dimensions
        self.data[:, self.scaled_dim] *= self.sigma
    
        # Upscale the component along preserved dimensions
        while (self.avg_neighb_distance(self.data) < self.avg_dist): # mi sfugge li senso di qsto criterio
            self.data[:, self.preserv_dim] /= self.sigma
            
        # ---- b)
        pr_idx = torch.multinomial(torch.ones(self.n_datapoints), num_samples=1)
        # pr = data[rand_indx,:]
        
        queue_idx = []
        queue_idx.append(pr_idx)
        visited = []
    
        step = 0
        mean_error = 0
        counter = 0
        
        # while the queue is not empty
        while queue_idx:
            p = queue_idx.pop(0)
            if p in visited:
                continue
    
            # Add p's neighbors in the queue
            for n in self.neighb[p]:
                queue_indx.append(n)
                
            s, err = self.adjust_point(p,visited) 
    
            step += s
            mean_error += err
            counter += 1
            visited.append(p)
    
        mean_error /= counter
    
        # numbers from author's implementation (weight decay-like)
        if step < self.n_datapoints:
            self.nudge *= 0.87
        else:
            self.nudge /= 0.91
            
        return mean_error

### Other test cells

In [60]:
1+2*torch.rand(1).item()

1.3728734254837036

In [46]:
a = torch.Tensor([[7,4],
                  [4,1]])
from math import sqrt
eigenval, eigenvec = torch.linalg.eigh(a)
# me li restituisce già in base ON
# verdetto finale: NN ci vuole ness1 trasposizione di sorta
print(eigenval)
print(eigenvec)
print(eigenvec*sqrt(5))
pca_basis = eigenvec@a
print(pca_basis)

#       cov = torch.cov(self.data)
#       eigenval, eigenvec = torch.linalg.eigh(-cov) # Li ordina già in vista dla pca!
#       eigenval = -eigenval  # Pké li ordina decrescenti
#       pca_data = eigenvec@self.data # È qla giusta o è qla trasposta?

tensor([-1.,  9.])
tensor([[ 0.4472, -0.8944],
        [-0.8944, -0.4472]])
tensor([[ 1., -2.],
        [-2., -1.]])
tensor([[-0.4472,  0.8944],
        [-8.0498, -4.0249]])


In [38]:
a = [0,4,2,7,1,6]
list(enumerate(a))

[(0, 0), (1, 4), (2, 2), (3, 7), (4, 1), (5, 6)]

In [22]:
a = torch.Tensor([[1,2],[1,2]])
len(a.size())
b = torch.clone(a)
a[1,0] = 17
print(a)
print(b)

tensor([[ 1.,  2.],
        [17.,  2.]])
tensor([[1., 2.],
        [1., 2.]])


In [32]:
a = torch.ones(2)
b = torch.tensor([0,1])
c = torch.tensor([0,1])
print(a)
a[0] = torch.acos(b@c)
print(a)
print(torch.acos(b@c))

tensor([1., 1.])
tensor([0., 1.])
tensor(0.)


In [37]:
a = torch.Tensor([1,5,20,3])
b, index = torch.max(a, dim=0)
print(b)
print(index)
b /= b
torch.acos(b)

tensor(20.)
tensor(2)


tensor(0.)

# Programma dla mattina:
- [x] Parti leggendo il pezzo che manca dl paper
- [x] CONTROLLA che qlo che hai già scritto funzioni 
    (cfr con il bro per il senso generale, che le funzioni facciano qlo che ti aspetti con dle test cells)
- [x] bisogna finire fit
- [x] scrivere adjust point
- [x] organizzare tutto in 1 classe

## Poi incrociare forte le dita pké funzioni su MNIST

- [ ] Genera la manifold con il vae (ridotto a 1 o 2 cifre) e già puoi vedere se le organizzano in modi molto #i
- [ ] Prova con 2 cifre (ma qua stiamo già azzardando)


# Attenti al lupo

In [7]:
## Import Dataset (MNIST)
transform = transforms.Compose([
    transforms.ToTensor(),                             
])

# train_test_data = torchvision.datasets.MNIST(root='./Data', train=True, download=True, transform=transform)
train_data = torchvision.datasets.MNIST(root='./Data', train=True, download=True, transform=transform)
val_data  = torchvision.datasets.MNIST(root='./Data', train=False, download=True, transform=transform)

#frac = 0.98
#train_data, test_data = torch.utils.data.random_split(train_test_data, [frac, 1-frac], generator=torch.Generator().manual_seed(42))
print(len(train_data))
print(len(  val_data))
#print(len( test_data))

60000
10000


In [8]:
# Filter out images with labels 1 and 7 from the training dataset
indices = (train_data.targets == 1) + (train_data.targets == 2) # this has the same length as train_data.targets, filled with 1 and 0s
indices[8000:] = False  # Limit the number of images with labels 1 and 7
train_data.data, train_data.targets = train_data.data[indices], train_data.targets[indices]

# Same thing for the Validation dataset
indices = (val_data.targets == 1) + (val_data.targets == 2)
indices[4000:] = False
val_data.data, val_data.targets = val_data.data[indices], val_data.targets[indices]

# Same thing for the Test dataset
#indices = (test_data.targets == 1) + (test_data.targets == 7)
#indices[4000:] = False
#test_data.data, test_data.targets = test_data.data[indices], test_data.targets[indices]

print("Train set: ", len(train_data.targets), "samples \nVal   set: ", len(val_data.targets), "samples")
#print("Train set: ", len(train_data.targets), "samples \nVal set: ", len(val_data.targets), "samples \nTest set: ", len(test_data.targets), "samples")

Train set:  1694 samples 
Val   set:  868 samples


In [1]:
# NN mi interessano i loaders
# Validation qua ha poco senso

In [11]:
sculptor = ManifoldSculpting(k=10) # defaults: k=5, n_dim=2, niter=100, sigma=0.99, patience=30

In [None]:
sculptor.transform(train_data)
print(f'Epochs: {sculptor.elapsed_epochs}')
print(f'Final mean error: {scupltor.last_error}')
print(f'Best mean error: {sculptor.best_error}')