# U-net segmentering

## Setter opp initiele parametere

In [None]:
# importerer generelle moduler

import os
import torch
import numpy as np
import utils

In [None]:
# setter overordnede variabler

sensor = "s2"
classes = ['background', 'built-up', 'roads']
n_classes = len(classes)
batch_size = 16
learing_rate = 1e-3
chip_size = 512
trainingset = '512_rnd'
name = '3cls_rnd'

# sette navn på forsøk
experiment_name = f"{sensor}_{chip_size}_{name}_0"
load_experiment = None
epochs_start = 0
epochs_stop = 2

train_path = os.path.join(f"/data/user/imagery/{trainingset}/train", sensor)
test_path = os.path.join(f"/data/user/imagery/{trainingset}/test", sensor)

In [None]:
# regner ut statistikk over alle treningsbildene

n_channels, mean, std = utils.img_stats(f"/data/user/imagery/{trainingset}/*/{sensor}/img/")
mean = torch.tensor(mean)
std = torch.tensor(std)

In [None]:
# regner ut og setter vekter for klassene

import utils
class_weights = torch.FloatTensor(utils.weights(os.path.join(train_path, "mask")))
class_weights

In [None]:
class_weights = torch.FloatTensor(utils.weights(os.path.join(test_path, "mask")))
class_weights

In [None]:
class_weights = torch.FloatTensor([[0.1],      # background
                                   [0.9],      # built-up
                                   [0.9]])     # roads
class_weights

### Dataloader

In [None]:
import dataloader
train = dataloader.make_trainloaders(train_path, mean=mean, std=std)
test = dataloader.make_trainloaders(test_path, mean=mean, std=std)

In [None]:
# viser bilde og maske av en tile
import utils
import matplotlib.pyplot as plt

tile = train[200]
img_arr = tile["image"]
if sensor == "s1":
    r = utils.pct_clip(img_arr[[1],...])
    g = utils.pct_clip(img_arr[[0],...])
    b = utils.pct_clip(img_arr[[1],...]) / utils.pct_clip(img_arr[[0],...])
    rgb = np.moveaxis(np.array([r,g,b]).squeeze(), 0, -1)
elif sensor == "s2":
    r = img_arr[[3],...]
    g = img_arr[[2],...]
    b = img_arr[[1],...]
    rgb = np.moveaxis(np.array([r,g,b]).squeeze(), 0, -1)
elif sensor == "s1s2":
    r = img_arr[[3],...]
    g = img_arr[[2],...]
    b = utils.pct_clip(img_arr[[-1],...])
    rgb = np.moveaxis(np.array([r,g,b]).squeeze(), 0, -1)
print(rgb.shape)
plt.imshow(rgb)
plt.show()

mask = tile["mask"]
print(mask.shape)
plt.imshow(mask)
plt.show()

In [None]:
# lage dataloaders

trainloader = torch.utils.data.DataLoader(train, batch_size=batch_size, shuffle=True, num_workers=4)
testloader = torch.utils.data.DataLoader(test, batch_size=6, shuffle=True, num_workers=4)

## Bygger U-net og inspserer

Bygging skjer i unet.py

Der er også hjelpefunksjoner for å se på prediksjoner og teste ytelsen

## Treningsloop

### Setter variabler, taps og optimalisering

In [None]:
# kjør dette vinduet før trening for å nullstille nettet
import unet
from torchinfo import summary

net = unet.UNet(
    encChannels=[n_channels,64,128,256,512],
    decChannels=[512,256,128,64],
    nbClasses=n_classes,
    outSize=(chip_size,chip_size))

display(summary(net,(batch_size,n_channels,chip_size,chip_size)))

net.train()
if torch.cuda.is_available():
    net.to("cuda")

# for å laste inn trente vekter, kjør
if load_experiment != None:
    print(f"loading model: {load_experiment}")
    if torch.cuda.is_available():
        net = net.to("cuda")
        net.load_state_dict(torch.load(f"../models/{load_experiment}/models/unet_best.pt",map_location="cuda"))
    else:
        net.load_state_dict(torch.load(f"../models/{load_experiment}/models/unet_best.pt",map_location="cpu"))

# definer tapsfunksjon og optimeringsalgoritme
import torch.optim as optim
import torch.nn  as nn

lossfunc = nn.CrossEntropyLoss(weight=class_weights.cuda())
optimizer = optim.AdamW(net.parameters(), lr=learing_rate)

### TensorBoard

In [None]:
from torch.utils.tensorboard import SummaryWriter

# åpne tensorboard med kommandoen "tensorboard --logdir=models" fra anaconda prompt fra prosjektmappen

In [None]:
os.makedirs(f"../models/{experiment_name}/models",exist_ok=True)
os.makedirs(f"../models/{experiment_name}/inference",exist_ok=True)

writer = SummaryWriter(log_dir=f"../models/{experiment_name}")

accuracy_log = []
current_best = 0.1
#num_batches = 100
num_batches = len(trainloader)
test_interval = 200
num_test_batches = 50

for e in range(epochs_start,epochs_stop):
    # lage variabler for totalt tap
    
    running_loss = 0
    for i,batch in enumerate(trainloader):

        optimizer.zero_grad()
        images = batch["image"]
        labels = batch["mask"]

        if torch.cuda.is_available():
            images = images.to("cuda")
            labels = labels.to("cuda")

        # log net graph to writer
        if i == 0 and e == 0:
            writer.add_graph(net,images)

        out = net(images)
        loss = lossfunc(out,labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

        if i % test_interval == test_interval-1 or i == 0:
            if i == 0:
                avg_loss = running_loss
            else:
                avg_loss = running_loss/(test_interval)

            print(f"ep: {e+1}: batch: {(i+1)} Avg batch loss {avg_loss}")
            writer.add_scalar("Loss/loss",avg_loss,e*num_batches + i)
            running_loss = 0
        
            recall, precision, map = unet.test_accuracy(net,testloader,batch_lim=num_test_batches)
            if np.isnan(precision):
                    precision = 0
            if np.isnan(recall):
                recall = 0
            if np.isnan(map):
                ap = 0

            accuracy = (recall+precision)/2
            if precision+recall < 1e-8:
                f1 = 0
            else:
                f1 = 2*(precision*recall)/(precision+recall)

            print(f"Precision {precision}   Recall {recall}     mAP {map}    F1 {f1}")
            writer.add_scalar("Accuracy/Val/Recall",recall,e*num_batches + i)
            writer.add_scalar("Accuracy/Val/Precision",precision,e*num_batches+i)
            writer.add_scalar("Accuracy/Val/F1",f1,e*num_batches+i)
            writer.add_scalar("Accuracy/Val/mAP",map,e*num_batches+i)
            if map > current_best:
                torch.save(net.state_dict(),f"../models/{experiment_name}/models/unet_best.pt")
                current_best = map
                print("new model saved")
            torch.save(net.state_dict(),f"../models/{experiment_name}/models/unet_latest.pt")

            # lagre eksempel på inference etter hver epoke
            step = str(e*num_batches+i).zfill(6)
            it = iter(testloader)
            fig = unet.view_prediction(sensor, net, next(it), save_as=f"../models/{experiment_name}/inference/{step}_1.png", show=False)
            writer.add_figure("Sample Prediction/1", fig, e*num_batches + i, close=True)

writer.close()

## Inspisere resultater fra den trente modellen

In [None]:
# definer et nett

import unet

# experiment_name = "s2_test_unet_03"

net = unet.UNet(
    encChannels=[n_channels,64,128,256,512],
    decChannels=[512,256,128,64],
    nbClasses=n_classes,
    outSize=(chip_size,chip_size))

if torch.cuda.is_available():
    net.load_state_dict(torch.load(f"../models/{experiment_name}/models/unet_best.pt",map_location="cuda"))
    net = net.to("cuda")
else:
    net.load_state_dict(torch.load(f"../models/{experiment_name}/models/unet_best.pt",map_location="cpu"))

In [None]:
loop = iter(testloader)

In [None]:
batch = next(loop)
unet.view_prediction(sensor, net,batch)