# Init

In [1]:
import lovely_tensors as lt
lt.monkey_patch()

In [2]:
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import os

from PIL import Image
from torchvision.transforms import Resize, Compose, ToTensor, Normalize
import numpy as np
import skimage
import matplotlib.pyplot as plt

import time

def get_mgrid(sidelen, dim=2):
    '''Generates a flattened grid of (x,y,...) coordinates in a range of -1 to 1.
    sidelen: int
    dim: int'''
    tensors = tuple(dim * [torch.linspace(-1, 1, steps=sidelen)])
    mgrid = torch.stack(torch.meshgrid(*tensors), dim=-1)
    mgrid = mgrid.reshape(-1, dim)
    return mgrid

In [3]:
class SineLayer(nn.Module):
    # See paper sec. 3.2, final paragraph, and supplement Sec. 1.5 for discussion of omega_0.
    
    # If is_first=True, omega_0 is a frequency factor which simply multiplies the activations before the 
    # nonlinearity. Different signals may require different omega_0 in the first layer - this is a 
    # hyperparameter.
    
    # If is_first=False, then the weights will be divided by omega_0 so as to keep the magnitude of 
    # activations constant, but boost gradients to the weight matrix (see supplement Sec. 1.5)
    
    def __init__(self, in_features, out_features, bias=True,
                 is_first=False, omega_0=30):
        super().__init__()
        self.omega_0 = omega_0
        self.is_first = is_first
        
        self.in_features = in_features
        self.linear = nn.Linear(in_features, out_features, bias=bias)
        
        self.init_weights()
    
    def init_weights(self):
        with torch.no_grad():
            if self.is_first:
                self.linear.weight.uniform_(-1 / self.in_features, 
                                             1 / self.in_features)      
            else:
                self.linear.weight.uniform_(-np.sqrt(6 / self.in_features) / self.omega_0, 
                                             np.sqrt(6 / self.in_features) / self.omega_0)
        
    def forward(self, input):
        return torch.sin(self.omega_0 * self.linear(input))
    
    def forward_with_intermediate(self, input): 
        # For visualization of activation distributions
        intermediate = self.omega_0 * self.linear(input)
        return torch.sin(intermediate), intermediate
    
    
class Siren(nn.Module):
    def __init__(self, in_features, hidden_features, hidden_layers, out_features, outermost_linear=False, 
                 first_omega_0=30, hidden_omega_0=30.):
        super().__init__()
        
        self.net = []
        self.net.append(SineLayer(in_features, hidden_features, 
                                  is_first=True, omega_0=first_omega_0))

        for i in range(hidden_layers):
            self.net.append(SineLayer(hidden_features, hidden_features, 
                                      is_first=False, omega_0=hidden_omega_0))

        if outermost_linear:
            final_linear = nn.Linear(hidden_features, out_features)
            
            with torch.no_grad():
                final_linear.weight.uniform_(-np.sqrt(6 / hidden_features) / hidden_omega_0, 
                                              np.sqrt(6 / hidden_features) / hidden_omega_0)
                
            self.net.append(final_linear)
        else:
            self.net.append(SineLayer(hidden_features, out_features, 
                                      is_first=False, omega_0=hidden_omega_0))
        
        self.net = nn.Sequential(*self.net)
    
    def forward(self, coords):
        coords = coords.clone().detach().requires_grad_(True) # allows to take derivative w.r.t. input
        output = self.net(coords)
        return output, coords        

    def forward_with_activations(self, coords, retain_grad=False):
        '''Returns not only model output, but also intermediate activations.
        Only used for visualizing activations later!'''
        activations = OrderedDict()

        activation_count = 0
        x = coords.clone().detach().requires_grad_(True)
        activations['input'] = x
        for i, layer in enumerate(self.net):
            if isinstance(layer, SineLayer):
                x, intermed = layer.forward_with_intermediate(x)
                
                if retain_grad:
                    x.retain_grad()
                    intermed.retain_grad()
                    
                activations['_'.join((str(layer.__class__), "%d" % activation_count))] = intermed
                activation_count += 1
            else: 
                x = layer(x)
                
                if retain_grad:
                    x.retain_grad()
                    
            activations['_'.join((str(layer.__class__), "%d" % activation_count))] = x
            activation_count += 1

        return activations

In [4]:
def laplace(y, x):
    grad = gradient(y, x)
    return divergence(grad, x)


def divergence(y, x):
    div = 0.
    for i in range(y.shape[-1]):
        div += torch.autograd.grad(y[..., i], x, torch.ones_like(y[..., i]), create_graph=True)[0][..., i:i+1]
    return div


def gradient(y, x, grad_outputs=None):
    if grad_outputs is None:
        grad_outputs = torch.ones_like(y)
    grad = torch.autograd.grad(y, [x], grad_outputs=grad_outputs, create_graph=True)[0]
    return grad

In [5]:
try:
    from prettytable import PrettyTable
except:
    ! pip install -q prettytable
    from prettytable import PrettyTable

def count_parameters(model):
    table = PrettyTable(["Modules", "Parameters"])
    total_params = 0
    for name, parameter in model.named_parameters():
        if not parameter.requires_grad: continue
        params = parameter.numel()
        table.add_row([name, params])
        total_params+=params
    print(table)
    print(f"Total Trainable Params: {total_params} type {type(model)}")
    return total_params

In [6]:
def get_cameraman_tensor(sidelength):
    pil_img = Image.fromarray(skimage.data.camera())        
    transform = Compose([
        Resize(sidelength),
        ToTensor(),
        Normalize(torch.Tensor([0.5]), torch.Tensor([0.5]))
    ])
    img = transform(pil_img)
    return img, pil_img

In [7]:
class ImageFitting(Dataset):
    def __init__(self, sidelength):
        super().__init__()
        img, pil_img = get_cameraman_tensor(sidelength)
        self.pixels = img.permute(1, 2, 0).view(-1, 1)
        self.coords = get_mgrid(sidelength, 2)
        self.pil_img = pil_img

    def __len__(self):
        return 1

    def __getitem__(self, idx):    
        if idx > 0: raise IndexError
            
        return self.coords, self.pixels

In [8]:
import os
import random
import numpy as np
import torch

def seed_all(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

In [9]:
def mse_and_psnr(im_a, im_b, data_range=1): 
    data_range = 1.
    mse = F.mse_loss(im_a, im_b)
    psnr = 10 * torch.log10((data_range ** 2) / mse)
    return mse, psnr

In [10]:
cameraman = ImageFitting(256)
dataloader = DataLoader(cameraman, batch_size=1, pin_memory=True, num_workers=0)

  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


In [11]:
def to01(t):
    t = t - t.min()
    t = t / t.max()
    return t

# Train pipeline

In [12]:
def tensor2pil(t):
    return Image.fromarray((t.detach().cpu().clip(0, 1).numpy()*255).astype(np.uint8))

def pil2tensor(pil): return torch.tensor(
    np.asarray(pil).astype(np.float32)/255).permute(2, 0, 1)


def imagify_tensor(t): 
    return to01(t).cpu().view(256,256).detach()

In [13]:
def train(model, cfg):
    device = cfg['device']
    model.to(device)
    
    losses = []

    total_steps = cfg['total_steps']
    steps_til_summary = cfg['steps_til_summary']

    if cfg.get('use_wandb'):
        import wandb
        wandb.init(project=cfg['project'], name=cfg['experiment_name'], config=cfg)
        
    total_params = count_parameters(model)

    if cfg.get('use_wandb'):
        wandb.log({'total_params': total_params})

    optim = torch.optim.Adam(lr=cfg['lr'], params=model.parameters())

    model_input, ground_truth = next(iter(dataloader))
    model_input, ground_truth = model_input.to(device), ground_truth.to(device)

    for step in range(total_steps):
        model_output, coords = model(model_input)
        mse, psnr = mse_and_psnr(model_output, ground_truth)
        loss = mse

        if cfg.get('use_wandb'):
            wandb.log({'step': step, 'mse': mse.item(), 'psnr': psnr.item()})

        if not step % steps_til_summary:
            print(f"Step {step}, Total loss {loss:0.6f}")
            img_grad_tensor = gradient(model_output, coords)
            img_laplacian_tensor = laplace(model_output, coords)

            img = imagify_tensor(model_output)
            img_grad = imagify_tensor(img_grad_tensor.norm(dim=-1))
            img_laplacian = imagify_tensor(img_laplacian_tensor)

            colage = tensor2pil(torch.cat([img, img_grad, img_laplacian], dim=1))
            plt.imshow(colage)
            plt.show()

            if cfg.get('use_wandb'):
                wandb.log({'step': step, 'image': wandb.Image(colage)})

        optim.zero_grad()
        loss.backward()
        optim.step()
        losses.append(loss.item())

    wandb.finish()

In [14]:
class UberLayer(nn.Module):
    def __init__(self, input_dim, h=32):
        super().__init__()
        self.linear = nn.Linear(input_dim, h*4)
        self.h = h
        
    def forward(self, x):
        h = self.h
        o = self.linear(x)
        o = o[..., :h].sigmoid() * o[..., h:2*h].tanh() * o[..., 2*h:3*h].sin() * o[..., 3*h:4*h].cos()
        return o


class UberNet(nn.Module):
    def __init__(self, in_features, hidden_features, hidden_layers, out_features, outermost_linear=False, 
                 first_omega_0=30, hidden_omega_0=30.):
        super().__init__()
        
        self.net = []
  
        self.net += [UberLayer(in_features, hidden_features)]
        self.net += [UberLayer(hidden_features, hidden_features) for _ in range(hidden_layers)]
        self.net += [UberLayer(hidden_features, out_features)]
        
        self.net = nn.Sequential(*self.net)
    
    def forward(self, coords):
        coords = coords.clone().detach().requires_grad_(True) # allows to take derivative w.r.t. input
        output = self.net(coords)
        return output, coords
    
    
# ! pip install random-fourier-features-pytorch
import rff
class UberNetWithEncoding(nn.Module):
    def __init__(self, in_features, hidden_features, hidden_layers, out_features, encoded_size, outermost_linear=False, 
                 first_omega_0=30, hidden_omega_0=30.):
        super().__init__()
        
        self.net = []
  
        self.net += [UberLayer(encoded_size*2, hidden_features)]
        self.net += [UberLayer(hidden_features, hidden_features) for _ in range(hidden_layers)]
        self.net += [UberLayer(hidden_features, out_features)]
        
        self.net = nn.Sequential(*self.net)
        
        self.encoding = rff.layers.GaussianEncoding(sigma=10.0, input_size=2, encoded_size=encoded_size)
    
    def forward(self, coords):
        coords = coords.clone().detach().requires_grad_(True) # allows to take derivative w.r.t. input
        i = self.encoding(coords)
        output = self.net(i)
        return output, coords    

In [15]:
break

SyntaxError: 'break' outside loop (668683560.py, line 4)

# SIREN

In [None]:
cfg = {}
cfg['device'] = 'cuda:3'
cfg['lr'] = 1e-4

cfg['total_steps'] = 500
cfg['steps_til_summary'] = 100
cfg['project'] = 'uber_inr'
cfg['experiment_name'] = 'siren_baseline'
cfg['use_wandb'] = True

cfg['model_kwargs'] = {
    'in_features': 2,
    'out_features': 1,
    'hidden_features': 256,
    'hidden_layers': 3,
    'outermost_linear': True,
}


seed_all(cfg.get('seed', 0))
model = Siren(**cfg['model_kwargs'])
train(model, cfg)

In [None]:
cfg = {}
cfg['device'] = 'cuda:3'
cfg['lr'] = 1e-4
cfg['seed'] = 0

cfg['total_steps'] = 10_000
cfg['steps_til_summary'] = 1000
cfg['project'] = 'uber_inr'
cfg['experiment_name'] = 'siren_baseline'
cfg['use_wandb'] = True

cfg['model_kwargs'] = {
    'in_features': 2,
    'out_features': 1,
    'hidden_features': 256,
    'hidden_layers': 3,
    'outermost_linear': True,
}

seed_all(cfg.get('seed', 0))
model = Siren(**cfg['model_kwargs'])
train(model, cfg)

In [None]:
class SineLayer(nn.Module):
    # See paper sec. 3.2, final paragraph, and supplement Sec. 1.5 for discussion of omega_0.
    
    # If is_first=True, omega_0 is a frequency factor which simply multiplies the activations before the 
    # nonlinearity. Different signals may require different omega_0 in the first layer - this is a 
    # hyperparameter.
    
    # If is_first=False, then the weights will be divided by omega_0 so as to keep the magnitude of 
    # activations constant, but boost gradients to the weight matrix (see supplement Sec. 1.5)
    
    def __init__(self, in_features, out_features, bias=True,
                 is_first=False, omega_0=30):
        super().__init__()
        self.omega_0 = omega_0
        self.is_first = is_first
        
        self.in_features = in_features
        self.linear = nn.Linear(in_features, out_features, bias=bias)
        
        # self.init_weights()
    
    def init_weights(self):
        with torch.no_grad():
            if self.is_first:
                self.linear.weight.uniform_(-1 / self.in_features, 
                                             1 / self.in_features)      
            else:
                self.linear.weight.uniform_(-np.sqrt(6 / self.in_features) / self.omega_0, 
                                             np.sqrt(6 / self.in_features) / self.omega_0)
        
    def forward(self, input):
        return torch.sin(self.omega_0 * self.linear(input))
    
    def forward_with_intermediate(self, input): 
        # For visualization of activation distributions
        intermediate = self.omega_0 * self.linear(input)
        return torch.sin(intermediate), intermediate
    
    
class Siren(nn.Module):
    def __init__(self, in_features, hidden_features, hidden_layers, out_features, outermost_linear=False, 
                 first_omega_0=30, hidden_omega_0=30.):
        super().__init__()
        
        self.net = []
        self.net.append(SineLayer(in_features, hidden_features, 
                                  is_first=True, omega_0=first_omega_0))

        for i in range(hidden_layers):
            self.net.append(SineLayer(hidden_features, hidden_features, 
                                      is_first=False, omega_0=hidden_omega_0))

        if outermost_linear:
            final_linear = nn.Linear(hidden_features, out_features)
            
            with torch.no_grad():
                final_linear.weight.uniform_(-np.sqrt(6 / hidden_features) / hidden_omega_0, 
                                              np.sqrt(6 / hidden_features) / hidden_omega_0)
                
            self.net.append(final_linear)
        else:
            self.net.append(SineLayer(hidden_features, out_features, 
                                      is_first=False, omega_0=hidden_omega_0))
        
        self.net = nn.Sequential(*self.net)
    
    def forward(self, coords):
        coords = coords.clone().detach().requires_grad_(True) # allows to take derivative w.r.t. input
        output = self.net(coords)
        return output, coords        

    def forward_with_activations(self, coords, retain_grad=False):
        '''Returns not only model output, but also intermediate activations.
        Only used for visualizing activations later!'''
        activations = OrderedDict()

        activation_count = 0
        x = coords.clone().detach().requires_grad_(True)
        activations['input'] = x
        for i, layer in enumerate(self.net):
            if isinstance(layer, SineLayer):
                x, intermed = layer.forward_with_intermediate(x)
                
                if retain_grad:
                    x.retain_grad()
                    intermed.retain_grad()
                    
                activations['_'.join((str(layer.__class__), "%d" % activation_count))] = intermed
                activation_count += 1
            else: 
                x = layer(x)
                
                if retain_grad:
                    x.retain_grad()
                    
            activations['_'.join((str(layer.__class__), "%d" % activation_count))] = x
            activation_count += 1

        return activations

In [None]:
cfg = {}
cfg['device'] = 'cuda:3'
cfg['lr'] = 1e-4
cfg['seed'] = 0

cfg['total_steps'] = 10_000
cfg['steps_til_summary'] = 1000
cfg['project'] = 'uber_inr'
cfg['experiment_name'] = 'siren_no_init'
cfg['use_wandb'] = True

cfg['model_kwargs'] = {
    'in_features': 2,
    'out_features': 1,
    'hidden_features': 256,
    'hidden_layers': 3,
    'outermost_linear': True,
}

seed_all(cfg.get('seed', 0))
model = Siren(**cfg['model_kwargs'])
train(model, cfg)

# uber inr

In [19]:
from splitnet import SplitNet, SplitNetManager, U

ImportError: cannot import name 'U' from 'splitnet' (/home/jupyter/src/my-neural-fields/notebooks/uber/splitnet.py)

In [20]:
def U(*shape):
    x = torch.rand(*shape)
    x = (x-0.5)*2.
    return x

In [22]:
cfg = {}
cfg['device'] = 'cuda:0'
cfg['lr'] = 1e-4
cfg['seed'] = 0

cfg['total_steps'] = 10_000
cfg['steps_til_summary'] = 1000
cfg['project'] = 'uber_inr'
cfg['experiment_name'] = 'splitnet_man'
cfg['use_wandb'] = True

cfg['model_kwargs'] = {
    'in_features': 2,
    'out_features': 1,
    'hidden_features': 32,
    'hidden_layers': 6,
    'outermost_linear': True,
}

seed_all(cfg.get('seed', 0))
model = SplitNet(**cfg['model_kwargs'])


In [24]:
man = SplitNetManager(model)
man.init_01(U(5_000, 2));

In [25]:
train(model, cfg)

Failed to detect the name of this notebook, you can set it manually with the WANDB_NOTEBOOK_NAME environment variable to enable code saving.
[34m[1mwandb[0m: Currently logged in as: [33mkilianovski[0m. Use [1m`wandb login --relogin`[0m to force relogin


+---------------------+------------+
|       Modules       | Parameters |
+---------------------+------------+
| net.0.linear.weight |    256     |
|  net.0.linear.bias  |    128     |
| net.1.linear.weight |    4096    |
|  net.1.linear.bias  |    128     |
| net.2.linear.weight |    4096    |
|  net.2.linear.bias  |    128     |
| net.3.linear.weight |    4096    |
|  net.3.linear.bias  |    128     |
| net.4.linear.weight |    4096    |
|  net.4.linear.bias  |    128     |
| net.5.linear.weight |    4096    |
|  net.5.linear.bias  |    128     |
| net.6.linear.weight |    4096    |
|  net.6.linear.bias  |    128     |
|     net.7.weight    |     32     |
|      net.7.bias     |     1      |
+---------------------+------------+
Total Trainable Params: 25761 type <class 'splitnet.SplitNet'>


ValueError: not enough values to unpack (expected 2, got 1)

wandb: Waiting for W&B process to finish... (success).
wandb: - 0.004 MB of 0.020 MB uploaded (0.000 MB deduped)
wandb: Run history:
wandb: total_params ▁
wandb: 
wandb: Run summary:
wandb: total_params 25761
wandb: 
wandb: 🚀 View run splitnet_man at: https://wandb.ai/kilianovski/uber_inr/runs/eytu5g4w
wandb: Synced 6 W&B file(s), 0 media file(s), 0 artifact file(s) and 0 other file(s)
wandb: Find logs at: ./wandb/run-20230424_171048-eytu5g4w/logs


In [None]:


# X = torch.randn((256, 256, 2))


In [None]:
cfg = {}
cfg['device'] = 'cuda:3'
cfg['lr'] = 1e-4
cfg['seed'] = 0

cfg['total_steps'] = 10_000
cfg['steps_til_summary'] = 1000
cfg['project'] = 'uber_inr'
cfg['experiment_name'] = 'ubernet_posenc'
cfg['use_wandb'] = True

cfg['model_kwargs'] = {
    'in_features': 2,
    'out_features': 1,
    'hidden_features': 32,
    'hidden_layers': 6,
    'encoded_size': 64,
    'outermost_linear': True,
}

seed_all(cfg.get('seed', 0))
model = UberNetWithEncoding(**cfg['model_kwargs'])
train(model, cfg)

In [None]:
cfg = {}
cfg['device'] = 'cuda:3'
cfg['lr'] = 1e-4
cfg['seed'] = 0

cfg['total_steps'] = 10_000
cfg['steps_til_summary'] = 1000
cfg['project'] = 'uber_inr'
cfg['experiment_name'] = 'ubernet_posenc'
cfg['use_wandb'] = True

cfg['model_kwargs'] = {
    'in_features': 2,
    'out_features': 1,
    'hidden_features': 64,
    'hidden_layers': 3,
    'encoded_size': 64,
    'outermost_linear': True,
}

seed_all(cfg.get('seed', 0))
model = UberNetWithEncoding(**cfg['model_kwargs'])
train(model, cfg)

In [None]:
cfg = {}
cfg['device'] = 'cuda:3'
cfg['lr'] = 1e-4
cfg['seed'] = 0

cfg['total_steps'] = 10_000
cfg['steps_til_summary'] = 1000
cfg['project'] = 'uber_inr'
cfg['experiment_name'] = 'ubernet_posenc_micro'
cfg['use_wandb'] = True

cfg['model_kwargs'] = {
    'in_features': 2,
    'out_features': 1,
    'hidden_features': 64,
    'hidden_layers': 1,
    'encoded_size': 64,
    'outermost_linear': True,
}

seed_all(cfg.get('seed', 0))
model = UberNetWithEncoding(**cfg['model_kwargs'])
train(model, cfg)

In [None]:
cfg = {}
cfg['device'] = 'cuda:3'
cfg['lr'] = 1e-4
cfg['seed'] = 0

cfg['total_steps'] = 10_000
cfg['steps_til_summary'] = 1000
cfg['project'] = 'uber_inr'
cfg['experiment_name'] = 'ubernet_posenc_micro_128'
cfg['use_wandb'] = True

cfg['model_kwargs'] = {
    'in_features': 2,
    'out_features': 1,
    'hidden_features': 128,
    'hidden_layers': 1,
    'encoded_size': 64,
    'outermost_linear': True,
}

seed_all(cfg.get('seed', 0))
model = UberNetWithEncoding(**cfg['model_kwargs'])
count_parameters(model)
breakk()
train(model, cfg)

In [None]:
cfg = {}
cfg['device'] = 'cuda:3'
cfg['lr'] = 1e-4
cfg['seed'] = 0

cfg['total_steps'] = 10_000
cfg['steps_til_summary'] = 1000
cfg['project'] = 'uber_inr'
cfg['experiment_name'] = 'ubernet_posenc_2_layers'
cfg['use_wandb'] = True

cfg['model_kwargs'] = {
    'in_features': 2,
    'out_features': 1,
    'hidden_features': 128,
    'hidden_layers': 0,
    'encoded_size': 64,
    'outermost_linear': True,
}

seed_all(cfg.get('seed', 0))
model = UberNetWithEncoding(**cfg['model_kwargs'])
count_parameters(model)
breakk()
train(model, cfg)

In [None]:
import torch
import torch.nn as nn

def print_model_shape(model, input_shape):
    with torch.no_grad():
        model.eval()
        x = torch.rand(input_shape)
        print(f"{'Layer Name':<30} {'Input Shape':<20} {'Output Shape':<20}")
        print("-" * 70)
        for name, layer in model.named_modules():
            if isinstance(layer, nn.Conv2d) or isinstance(layer, nn.Linear):
                x = layer(x)
                print(f"{name:<30} {str(x.shape):<20} {str(x.shape):<20}")

# Example usage
model = nn.Sequential(
    nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=2, stride=2),
    nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(kernel_size=2, stride=2),
    nn.Flatten(),
    nn.Linear(512 * 7 * 7, 1024),
    nn.ReLU(),
    nn.Linear(1024, 10)
)

print_model_shape(model, (1, 3, 32, 32))