<h1 style="text-align: center;">Описание подходов для решения задачи сегментации фасадов жилых домов</h1>

Для решения задачи будем использовать архитектуру Unet, хорошо зарекомендовавшуюся себя в задачах семантической сегментации изображений.

1. Характеристики входного изображения (С x W x H) - 1 x 128 x 128 (grayscale)
2. Размеченные данные - (С x W x H) - 2 x 128 x 128 (два класса [0,1], 0 - background, 1 - windows)
3. При достаточной хорошей обучаемости Unet, для определения контуров окон можно использовать The marching squares algorithm. (The marching squares algorithm is a special case of the marching cubes algorithm (Lorensen, William and Harvey E. Cline. Marching Cubes: A High Resolution 3D Surface Construction Algorithm. Computer Graphics SIGGRAPH 87 Proceedings) 21(4) July 1987, p. 163-170).)

The marching squares algorithm возвращает требуемое количество контуров, соответствующих окнам, а также индексы пикселей, соответствующие центрам контуров, чтобы можно было посчитать сетку колонн/рядов окон.

Работа алгоритма:

<image src="https://scikit-image.org/docs/stable/_images/sphx_glr_plot_contours_001.png" alt="Описание картинки">





In [1]:
import torch 
import torch.nn as nn
import os
import numpy as np
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from torchvision.io import read_image, ImageReadMode
from torchvision import datasets
#from torchsummary import summary
import torchmetrics
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime
import cv2 
import albumentations as A
from skimage import measure # measure.find_contours() - The marching squares algorithm
from PIL import Image, ImageOps
from sklearn.utils import compute_class_weight
%matplotlib inline



# Dataset: CMP_facade_DB_base
https://cmp.felk.cvut.cz/~tylecr1/facade/



In [2]:
TARGET_IMAGE_WIDTH = 128
TARGET_IMAGE_HEIGHT = 128
NUM_CHANNELS = 2
BATCH_SIZE = 16
EXECUTION_IMAGE_DATA = True
EXECUTION_MODEL_TRAIN = True

mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]

mean_gray=[0.485]
std_gray=[0.229]


## Data Augmentation

In [3]:
def data_augmentation(img_folder, mask_folder, num_of_augmentaions):

    """Additional image transformations"""
    
    transform = A.Compose([A.VerticalFlip(),
                            A.Rotate(),
                            A.HorizontalFlip(),
                            A.GridDistortion(p=0.2),
                            ])

    tree = os.walk(img_folder)
    cwd = os.getcwd()
    for root, dirs, files in tree:
        for file in files:
            image_base_path = os.path.join(root, file)
            mask_base_path = os.path.join(mask_folder, file) 
            mask_base_path = mask_base_path.replace('.jpg', '.png')
            image_base = np.array(Image.open(image_base_path).convert('RGB'))
            mask_base = np.array(Image.open(mask_base_path).convert('RGB'))
            for i in range(num_of_augmentaions):
                transformed = transform(image=image_base, mask=mask_base)
                transform_image_save_path = image_base_path.replace('.jpg',f'__{i}.jpg')
                transform_mask_save_path = mask_base_path.replace('.png',f'__{i}.png')

                transformed_image = Image.fromarray(transformed['image'])
                transformed_image.save(transform_image_save_path)
                transformed_mask = Image.fromarray(transformed['mask'])
                transformed_mask.save(transform_mask_save_path)


num_of_augs = 4            

#data_augmentation(r'D:\Coding\test_case_cv\Unet\dataset\trainA', r'D:\Coding\test_case_cv\Unet\dataset\trainB', num_of_augs)
#data_augmentation(r'D:\Coding\test_case_cv\Unet\dataset\testA', r'D:\Coding\test_case_cv\Unet\dataset\testB', num_of_augs)

# Encode segmentation masks to classes 

In [4]:
"""resizing pictures without antialiasing"""

transform_mask = transforms.Resize((TARGET_IMAGE_WIDTH, TARGET_IMAGE_HEIGHT), interpolation=transforms.InterpolationMode.NEAREST_EXACT, antialias=False)

"""Diferent colormaps for testing (RGB, Grayscale)"""
colormap12 = {(0,0,170) : 0, #background
            (0,0,255) : 1, #facade
            (0,85,255): 2, #window
            (0,170,255): 3, #door
            (255,85,0): 4, #cornice
            (85,255,170): 5, #sill
            (170,255,85): 6, #balcony
            (255,255,0): 7, #blind
            (255,170,0): 8, #deco
            (0,255,255): 9, #molding
            (255,0,0): 10, #pillar
            (170,0,0): 11 #shop 
                            }


colormap12_gray = {19 : 0, #background #PIL
            29 : 1, #facade
            79: 2, #window
            129: 3, #door
            126: 4, #cornice
            194: 5, #sill
            210: 6, #balcony
            226: 7, #blind
            176: 8, #deco
            179: 9, #molding
            76: 10, #pillar
            51: 11 #shop 
                            }

colormap2 = {   (0,0,170) : 0, #background
                (0,85,255): 1 #window    
                           }  

colormap2_gray = {   19 : 0, #background
                    79: 1 #window    
                                }
                                
  
cwd = os.getcwd()
img_trainA_path = os.path.join(cwd, r'dataset\trainA')
img_testA_path = os.path.join(cwd, r'dataset\testA')
img_trainB_path = os.path.join(cwd, r'dataset\trainB')
img_testB_path = os.path.join(cwd, r'dataset\testB')
npy_path = os.path.join(cwd, r'dataset\np.array_targets')
img_test_trans = cwd


def to_categorical(y: np.array, num_classes: int) -> np.array:
    """one-hot encoding"""
    return np.eye(num_classes, dtype='uint8')[y.astype('uint8')]



def encode_to_classes2(img_array: np.array, colormap):
    """2 class segmentation encoding"""
    class_array = np.zeros((img_array.shape[0], img_array.shape[1]))
    image_trans_array = np.zeros((img_array.shape[0], img_array.shape[1]))

    for W in range(img_array.shape[0]):
        for H in range(img_array.shape[1]):
            #pixel_color = tuple(img_array[W,H])
            class_array[W,H] = colormap.get(img_array[W,H], 0)
            image_trans_array[W,H] = img_array[W,H]
    class_array = to_categorical(class_array,2)
    return class_array.astype(np.uint8), image_trans_array.astype(np.uint8)


def encode_to_classes12(img_array: np.array, colormap: dict):
    """2 class segmentation encoding (for testing purposes)"""

    class_array = np.zeros((img_array.shape[0], img_array.shape[1]))
    image_trans_array = np.zeros((img_array.shape[0], img_array.shape[1], 3))
    for W in range(img_array.shape[0]):
        for H in range(img_array.shape[1]):
            class_array[W,H] = colormap[tuple(img_array[W,H])]
            image_trans_array[W,H] = img_array[W,H]
    class_array = to_categorical(class_array, 12)
    return class_array.astype(np.uint8), image_trans_array.astype(np.uint8)
    

def create_image_data(execute) -> np.array:
    """save encoded class arrays as .npy on disk for faster work of pytorch dataloaer"""
    if execute:
        tree = os.walk(os.path.join(cwd, r'dataset\trainB'))
        for root, dirs, files in tree:
            for filename in files:
                mask_filename = os.path.join(root, filename)

                mask = Image.open(mask_filename)
                mask = ImageOps.grayscale(mask)
                mask = transform_mask(mask)
                mask = np.array(mask)
                # mask = Image.open(mask_filename).convert('RGB')
                # mask = transform_mask(mask)
                # mask = np.array(mask)

                if NUM_CHANNELS == 2:
                    class_array, img_trans_array = encode_to_classes2(mask, colormap2_gray)
                else:
                    class_array, img_trans_array = encode_to_classes12(mask, colormap12)
                    img_trans_array = Image.fromarray(img_trans_array).save(os.path.join(r'D:\Coding\test_case_cv\Unet\dataset\check_masks', filename))
                np.save(os.path.join(npy_path, r'npy_trainB', filename.replace('.png', '.npy')), class_array)

        tree = os.walk(os.path.join(cwd, r'dataset\testB'))
        for root, dirs, files in tree:
            for filename in files:
                mask_filename = os.path.join(root, filename)
                mask = Image.open(mask_filename)
                mask = ImageOps.grayscale(mask)
                mask = transform_mask(mask)
                mask = np.array(mask)

                # mask = Image.open(mask_filename).convert('RGB')
                # mask = transform_mask(mask)
                # mask = np.array(mask)
                if NUM_CHANNELS == 2:
                    class_array, img_trans_array = encode_to_classes2(mask, colormap2_gray)
                else:
                    class_array,_ = encode_to_classes12(mask, colormap12)
                np.save(os.path.join(npy_path, r'npy_testB', filename.replace('.png', '.npy')), class_array)
                
#create_image_data(EXECUTION_IMAGE_DATA)

In [5]:
"""Balancing weights for loss function"""

tree = os.walk(os.path.join(cwd, r'dataset\np.array_targets\npy_trainB'))
mask_list = []
for root, dirs, files in tree:
            for filename in files:
                mask_filename = os.path.join(root, filename)
                mask_list.append(np.load(mask_filename))

tree = os.walk(os.path.join(cwd, r'dataset\np.array_targets\npy_testB'))
mask_list = []
for root, dirs, files in tree:
            for filename in files:
                mask_filename = os.path.join(root, filename)
                mask_list.append(np.load(mask_filename))

masks_encoded = np.argmax(np.array(mask_list), axis=3)
masks_reshaped_encoded = masks_encoded.reshape(-1,1).flatten()

class_weights = compute_class_weight(class_weight='balanced', 
                                    classes = np.unique(masks_reshaped_encoded), 
                                    y=masks_reshaped_encoded)
# class_weights = [0.56486709,  0.17596369,  0.66243338,  7.8157498,   1.16416553,  5.06901417,
#   1.56538023,  1.88223472, 27.06131078,  4.66247519, 11.34772731,  2.91352604]

In [6]:
class_weights = torch.Tensor(class_weights).to('cuda')

# Custom Dataset class and instances #


In [20]:

transform_image = transforms.Compose(
    [   transforms.Resize((TARGET_IMAGE_WIDTH, TARGET_IMAGE_HEIGHT)),
        transforms.ToTensor(),
    transforms.Normalize(mean=mean_gray, std=std_gray)
    ]
)

transform_label = transforms.ToTensor()


image_train_ids = [] # pass to Pytorch Dataset class
image_train_path_list = [] # pass to Pytorch Dataset class
mask_train_path_list = [] # pass to Pytorch Dataset class

image_test_ids = [] # pass to Pytorch Dataset class
image_test_path_list = [] # pass to Pytorch Dataset class
mask_test_path_list = [] # pass to Pytorch Dataset class

tree_train_img = os.walk(os.path.join(cwd, r'dataset\trainA')) 

for root, dirs, files in tree_train_img:
            for filename in files:
                img_id = filename.replace('.jpg','')
                image_path = os.path.join(root, filename)
                image_train_ids.append(img_id)
                image_train_path_list.append(os.path.join(root, filename))


tree_test_img = os.walk(os.path.join(cwd, r'dataset\testA')) 
for root, dirs, files in tree_test_img:
            for filename in files:
                img_id = filename.replace('.jpg','')
                image_path = os.path.join(root, filename)
                image_test_ids.append(img_id)
                image_test_path_list.append(os.path.join(root, filename))


tree_train_npy = os.walk(os.path.join(npy_path, r'npy_trainB'))
for root, dirs, files in tree_train_npy:
            for filename in files:
                mask_train_path_list.append(os.path.join(root, filename))

tree_test_npy = os.walk(os.path.join(npy_path, r'npy_testB'))
for root, dirs, files in tree_test_npy:
            for filename in files:
                mask_test_path_list.append(os.path.join(root, filename))

class CustomImageDataset(Dataset):
    """Custom dataset class for pytorch dataloader"""
    def __init__(self, ids_list, image_path_list, mask_path_list, transform_image, transform_label):
        self.image_ids = ids_list
        self.image_path_list = image_path_list
        self.mask_path_list = mask_path_list
        self.transform_image = transform_image
        self.transform_label = transform_label

    def __len__(self):
        return len(self.image_path_list)

    def __getitem__(self, idx):
        img_path = self.image_path_list[idx]
        img_label_path = self.mask_path_list[idx]
        image = Image.open(self.image_path_list[idx])
        image= ImageOps.grayscale(image)
        image_label = np.load(self.mask_path_list[idx])
        text_label = self.image_ids[idx]
        if self.transform_image:
            image = self.transform_image(image)
        if self.transform_label:
            image_label = self.transform_label(image_label)
        return image, image_label, text_label


training_dataset = CustomImageDataset(ids_list=image_train_ids,
                                        image_path_list=image_train_path_list,
                                        mask_path_list=mask_train_path_list,
                                   transform_image=transform_image,
                                   transform_label=transform_label,
                                   )

validation_dataset = CustomImageDataset(ids_list=image_test_ids,
                                        image_path_list=image_test_path_list,
                                        mask_path_list=mask_test_path_list,
                                   transform_image=transform_image,
                                   transform_label=transform_label,
                                   )

# DataLoader instances #

In [8]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(training_dataset, batch_size=BATCH_SIZE, shuffle=False)
validation_dataloader = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=False)


# Unet Model implementation #

In [9]:
class conv_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.conv1 = nn.Conv2d(in_c, out_c, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(out_c)         
        self.conv2 = nn.Conv2d(out_c, out_c, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(out_c)         
        self.relu = nn.ReLU()  

        
    def forward(self, inputs):
        x = self.conv1(inputs)
        x = self.bn1(x)
        x = self.relu(x)        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)    
        return x

class encoder_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.conv = conv_block(in_c, out_c)
        self.pool = nn.MaxPool2d((2, 2))    

    def forward(self, inputs):
        x = self.conv(inputs)
        p = self.pool(x)
        return x, p

class decoder_block(nn.Module):
    def __init__(self, in_c, out_c):
        super().__init__()
        self.up = nn.ConvTranspose2d(in_c, out_c, kernel_size=2, stride=2, padding=0)
        self.conv = conv_block(out_c+out_c, out_c)    

    def forward(self, inputs, skip):
        x = self.up(inputs)
        x = torch.cat([x, skip], axis=1)
        x = self.conv(x)
        return x

class Unet(nn.Module):
    def __init__(self, num_channels):
        super().__init__() 
        """ Encoder """
        self.e1 = encoder_block(1, 64)
        self.e2 = encoder_block(64, 128)
        self.e3 = encoder_block(128, 256)
        self.e4 = encoder_block(256, 512)        
        """ Bottleneck """
        self.b = conv_block(512, 1024)         
        """ Decoder """
        self.d1 = decoder_block(1024, 512)
        self.d2 = decoder_block(512, 256)
        self.d3 = decoder_block(256, 128)
        self.d4 = decoder_block(128, 64)        
        """ Classifier """
        self.outputs = nn.Conv2d(64, num_channels, kernel_size=1, padding=0)     
        
    def forward(self, inputs):
        """ Encoder """
        s1, p1 = self.e1(inputs)
        s2, p2 = self.e2(p1)
        s3, p3 = self.e3(p2)
        s4, p4 = self.e4(p3)         
        """ Bottleneck """
        b = self.b(p4)         
        """ Decoder """
        d1 = self.d1(b, s4)
        d2 = self.d2(d1, s3)
        d3 = self.d3(d2, s2)
        d4 = self.d4(d3, s1)         
        """ Classifier """
        outputs = self.outputs(d4) 
        return outputs

# Prepare and run model #

In [10]:
device = 'cuda'


LEARNING_RATE = 0.01
EPOCHS =120
writer_step = 2


def main(execute):
    
    if execute:
        model = Unet(num_channels=NUM_CHANNELS).to(device)
        # metric = torchmetrics.Accuracy(task="multiclass", num_classes=NUM_CHANNELS)
        # metric.to(device)
        #summary(model, (3,256,256))
        loss_fn = torch.nn.CrossEntropyLoss()
        #loss_fn = torch.nn.Sigmoid()
        optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
        softmax = nn.Softmax2d()

        def train_one_epoch(epoch_index, tb_writer):

            running_loss = 0
            last_loss = 0

            for i, batch in enumerate(train_dataloader):
                inputs, labels, _ = batch
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()
                preds = model(inputs)
                loss = loss_fn(preds, labels)
                # acc = metric(preds, labels)
                # print(f'batch {i} accuracy = {acc}')
                loss.backward()
                optimizer.step()
                running_loss += loss.item()

                last_loss = running_loss / writer_step # loss per batch
                print('  batch {} loss: {}'.format(i + 1, last_loss))
                tb_x = epoch_index * len(train_dataloader) + i + 1
                tb_writer.add_scalar('Loss/train', last_loss, tb_x)
                running_loss = 0.
            return last_loss



        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        writer = SummaryWriter('runs/fashion_trainer_{}'.format(timestamp))
        epoch_number = 0


        for epoch in range(EPOCHS):
            print('EPOCH {}:'.format(epoch_number + 1))

            model.train(True)
            avg_loss = train_one_epoch(epoch_number, writer)

            model.train(False)
            with torch.no_grad():
                running_vloss = 0.0
                for i, vdata in enumerate(validation_dataloader):
                    vinputs, vlabels, _ = vdata
                    vinputs, vlabels = vinputs.to(device), vlabels.to(device)
                    voutputs = model(vinputs)
                    vloss = loss_fn(voutputs, vlabels)
                    running_vloss += vloss
                    avg_vloss = running_vloss / (i + 1)
                    print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))

                    # Log the running loss averaged per batch
                    # for both training and validation
                    writer.add_scalars('Training vs. Validation Loss',
                                    { 'Training' : avg_loss, 'Validation' : avg_vloss },
                                    epoch_number + 1)
                    writer.flush()
            epoch_number += 1


        writer.close()

        tree = os.walk(cwd, topdown=True)

        for root, dirs, files in tree:
            for name in files:
                if '.pth' in os.path.join(cwd, name):
                    os.remove(os.path.join(cwd, name))
                    break
            break

        model_path = 'model_{}_{}'.format(timestamp, epoch_number)
        torch.save(model.state_dict(), model_path + '.pth')

main(EXECUTION_MODEL_TRAIN)

EPOCH 1:
  batch 1 loss: 0.0014214315451681614
  batch 2 loss: 0.0012292088940739632
  batch 3 loss: 0.0009771047625690699
  batch 4 loss: 0.0007376284338533878
  batch 5 loss: 0.0005988504271954298
  batch 6 loss: 0.0004370987298898399
  batch 7 loss: 0.00041860202327370644
  batch 8 loss: 0.00047175196232274175
  batch 9 loss: 0.0008948147296905518
  batch 10 loss: 0.000676861556712538
  batch 11 loss: 0.001178947277367115
  batch 12 loss: 0.0010429138783365488
  batch 13 loss: 0.0008196195121854544
  batch 14 loss: 0.0013547197449952364
  batch 15 loss: 0.001025184988975525
  batch 16 loss: 0.0007728490163572133
  batch 17 loss: 0.0009595765150152147
  batch 18 loss: 0.0007523289532400668
  batch 19 loss: 0.0013438686728477478
  batch 20 loss: 0.0012332148617133498
  batch 21 loss: 0.0011160783469676971
  batch 22 loss: 0.0010784427868202329
  batch 23 loss: 0.0011122656287625432
  batch 24 loss: 0.0010697003453969955
  batch 25 loss: 0.0011476895306259394
  batch 26 loss: 0.0008980

# Make Predictions on single Image #

In [16]:
cwd = os.getcwd()


tree = os.walk(cwd, topdown=True)
for root, dirs, files in tree:
    for name in files:
        if '.pth' in os.path.join(cwd, name):
            model_path = os.path.join(cwd, name)
            break
    break

model = Unet(num_channels=NUM_CHANNELS).to('cuda')
model.load_state_dict(torch.load(model_path))


def decode_to_RGB2(model, image_path, colormap): #gray
    """segmentation predicton for 2 class segmentation"""
    image_test_path = os.path.join(cwd, image_path) 
    image_test = Image.open(image_test_path)
    image_test = ImageOps.grayscale(image_test)
    image_test = transform_image(image_test)
    image_test = image_test.clone().detach().to(device)
    image_test .to(device)

    output = model(image_test .unsqueeze(0))

    sm = nn.Softmax2d()
    output = sm(output).squeeze(0)
    output = torch.permute(output, (1,2,0)) #swap axes of tensor
    output = torch.argmax(output, dim=2)
    image_classes = output.cpu().detach().numpy()
    image_output = np.zeros(shape=(TARGET_IMAGE_WIDTH,TARGET_IMAGE_HEIGHT)).astype(np.uint8)
    for W in range(image_classes.shape[0]):
        for H in range(image_classes.shape[1]):
            image_output[W,H] = [k for k, v in colormap.items() if v == image_classes[W,H]][0]
    image_output = Image.fromarray(image_output.astype(np.uint8))
    image_output.save('test_mask_predicted.png')



def decode_to_RGB12(model, image_path, colormap):
    """segmentation predicton for 2 class segmentation (for testing purposes)"""
    image_test_path = os.path.join(cwd, image_path) 

    image_test = Image.open(image_test_path)
    image_test = transform_image(image_test)
    image_test = image_test.clone().detach().to(device)

    output = model(image_test .unsqueeze(0)).squeeze(0)

    sm = nn.Softmax2d()
    output = sm(output).squeeze(0)
    output = torch.permute(output, dims=(1,2,0))
    output = torch.argmax(output, axis=2)
    image_classes = output.cpu().detach().numpy().astype(np.uint8)

    image_output = np.zeros((TARGET_IMAGE_WIDTH, TARGET_IMAGE_HEIGHT,3))
    for W in range(image_classes.shape[0]):
        for H in range(image_classes.shape[1]):
            image_output[W,H] = [k for k, v in colormap.items() if v == image_classes[W,H]][0]
    image_output = Image.fromarray(image_output.astype(np.uint8)).save('test_mask_predicted.png')

if NUM_CHANNELS == 2:
    decode_to_RGB2(model, 'test_image.jpg', colormap2_gray)
else:
    decode_to_RGB12(model, 'test_image.jpg', colormap12)


# Count number of windows #

In [21]:
"""The marching squares algorithm for counting windows"""

test_image_path = os.path.join(cwd, 'test_mask_predicted.png')

def count_windows(test_img_path):
    if os.path.exists(test_image_path):
        imgray = cv2.imread(test_image_path, cv2.IMREAD_GRAYSCALE)
        imgrgb = cv2.imread(test_image_path)

        # get contours
        contours = measure.find_contours(imgray)

        # get contours length
        contour_length_list = []
        for contour in contours:
            contour_length = 0
            for i in range(len(contour)-1):
                contour_length += cv2.norm(contour[i], contour[i+1], cv2.NORM_L2)
            contour_length_list.append(contour_length)

        max_countour_length = max(contour_length_list)
        # remove countours with length less than 20% of mean length of all countours
        windows_count = len([length for length in contour_length_list if length > max_countour_length/3])
        return windows_count

windows = count_windows(test_image_path)
print(f"Количество окон - {windows}")


Количество окон - 44
