In [None]:
from pathlib import Path
from collections import defaultdict
import math
import random
import keras
from keras.models import Model
import numpy as np
from tqdm import tqdm
from PIL import Image

# import module
#import shutil

datasets_path = Path().cwd().parent / 'data'
#test_set = datasets_path / 'test_set'
#val_set = datasets_path / 'validation_set_original'
test_set = datasets_path / 'big_test_set'

In [None]:
val_set_size = 32
map_label_to_name = ['no_person', 'idle','sitting', 'laying']

def predict_images(model, dataset, folder: Path):
    dataset.batch_size = None
    x,y = dataset[0]
    preds = model.predict_on_batch(x)

    for stack, gt, pred, filename in zip(x,y,preds, dataset.filenames):
        sample_num = filename.split('_')[1].split('.')[0]
        sample_folder: Path = folder / sample_num
        sample_folder.mkdir(exist_ok=True, parents=True)

        to_pil(pred).save(sample_folder / f'prediction.png')
        to_pil(gt.squeeze()).save(sample_folder / f'gt.png')

        for i in range(stack.shape[-1]):
            plane = stack[:,:,i]
            to_pil(plane).save(sample_folder / f'plane{i}.png')

def to_pil(img):

    assert not np.isnan(img).any(), 'NAN'

    grayscale_img = ((img - img.min()) * (1/(img.max() - img.min()) * 255)).astype('uint8')
    image = Image.fromarray(grayscale_img.squeeze())
    return image

class DataGenerator(keras.utils.Sequence):
    def __init__(
        self,
        basedir: Path,
        batch_size: int = None,
        included_poses: list = None,
        included_trees: list = None,
        shuffle=False,
        only_use_n: int = None
    ):
        if not basedir.exists():
            ValueError('Datafolder does not exist. Add it to your drive and try again. Maybe restart the runtime.')

        self.basedir = basedir
        self.batch_size = batch_size
        self.included_poses = [map_label_to_name.index(pose) for pose in included_poses] if included_poses is not None else None
        self.included_trees  = included_trees
        self.filenames = self.__filter(shuffle, only_use_n)

    def __filter(self, shuffle, only_use_n):

        files = []
        self.pose_distribution = defaultdict(int)
        self.trees_distribution = defaultdict(int)
        self.pose_distribution_filtered = defaultdict(int)
        self.trees_distribution_filtered = defaultdict(int)

        unfiltered = list(self.basedir.iterdir())

        if shuffle:
            random.shuffle(unfiltered)

        total = len(unfiltered)
        if only_use_n is not None:
            total = only_use_n

        for path in tqdm(unfiltered, total=total):

            loaded = np.load(path)
            pose, trees = loaded['pose'], loaded['trees']

            self.pose_distribution[pose.item()] += 1
            self.trees_distribution[trees.item()] += 1

            fname = path.name
            if self.included_poses is not None and pose not in self.included_poses:
                continue

            if self.included_trees is not None and trees not in self.included_trees:
                continue

            files.append(fname)
            self.pose_distribution_filtered[pose.item()] += 1
            self.trees_distribution_filtered[trees.item()] += 1

            if only_use_n is not None and len(files) == only_use_n:
                break

        return files

    def load(self, path):
        loaded = np.load(path)
        x = loaded['x'] / 255
        y = loaded['y'] / 255
        return x, y

    def __len__(self):
        if self.batch_size is None:
            return len(self.filenames)

        return math.ceil(len(self.filenames) / self.batch_size)

    def __getitem__(self, idx):

        if self.batch_size is None:
            batch = self.filenames
        else:
            low = idx * self.batch_size
            high = min(low + self.batch_size, len(self.filenames))
            batch = self.filenames[low:high]

        X, Y = [],[]
        for fname in batch:
            x,y = self.load(self.basedir / fname)
            X.append(x)
            Y.append(y)

        return np.stack(X), np.stack(Y)

    def print_info(self, include_unfiltered=False):
        print()
        shape = self.load(self.basedir / self.filenames[0])[0].shape
        print(f'{len(self.filenames)} samples with shape : {shape}')
        if include_unfiltered:
            print(f'Pose distribution total')
            ("{:<15} {:<15}".format('pose', 'number of samples'))
            for key, value in self.pose_distribution.items():
                print("{:<15} {:<15}".format(map_label_to_name[key], value))
        print()
        print(f'Pose distribution filtered')
        ("{:<15} {:<15}".format('pose', 'number of samples'))
        for key, value in self.pose_distribution_filtered.items():
            print("{:<15} {:<15}".format(map_label_to_name[key], value))

        if include_unfiltered:
            print()
            print(f'Trees distribution total')
            print("{:<15} {:<15}".format('num trees per ha', 'number of samples'))

            for key, value in self.trees_distribution.items():
                print("{:<15} {:<15}".format(key, value))

        print()
        print(f'Trees distribution filtered')
        print("{:<15} {:<15}".format('num trees per ha', 'number of samples'))

        for key, value in self.trees_distribution_filtered.items():
            print("{:<15} {:<15}".format(key, value))


In [None]:
datasets = dict()
trees = [0,100,200]
poses = ['no_person', 'idle','sitting', 'laying']
for tree in trees:
    for pose in poses:
        dataset = DataGenerator(
            basedir=test_set, 
            included_trees=[tree], 
            included_poses=[pose],
            only_use_n=val_set_size
        )
        print(len(dataset.filenames))
        datasets[str(tree)+'_trees_'+pose] =  dataset
        #for filename in dataset.filenames:
            #shutil.copyfile(val_set / filename, test_set / filename)

In [None]:
MODEL = 'with_retrain.model.keras'     # trained on full sized val set
MODEL = 'MAE_ep108_loss0.0043.keras'   # trained with mean absolute error
MODEL = 'ep60_loss0.0051.keras'        # after 60 epochs 
MODEL = '20240114-195043.model.keras'  # original run
model: Model = keras.saving.load_model(Path.cwd().parent/ 'models' / MODEL)

predictions_folder = Path.cwd() / 'test_set_pred' / 'oldmodel'

for name, dataset in tqdm(datasets.items(), total=len(list(datasets.keys()))):
    predict_images(
        model, 
        dataset, 
        predictions_folder / name
    )

In [None]:
combo_path = Path.cwd() / 'test_set_pred' / 'mae_combo_images'
combo_path.mkdir(exist_ok=True)
for name, dataset in tqdm(datasets.items(), total=len(list(datasets.keys()))):
    path: Path = predictions_folder / name
    if not path.is_dir(): continue
    for sample_path in path.iterdir():
        if not sample_path.is_dir(): continue
        if '.DS_Store' in sample_path.as_posix(): continue
        blank_image = Image.new("L", (512*2 + 256, 512))
        gt = Image.open(sample_path / 'gt.png')
        pred = Image.open(sample_path / 'prediction.png')
        plane1 = Image.open(sample_path / 'plane1.png').resize((256,256))
        plane4 = Image.open(sample_path / 'plane4.png').resize((256,256))
        blank_image.paste(gt, (0,0))
        blank_image.paste(plane1, (512,0))
        blank_image.paste(plane4, (512,256))
        blank_image.paste(pred, (512+256,0))
        #blank_image.save(path / (sample_path.name + 'combo.png'))
        blank_image.save(combo_path / (name + '_' + sample_path.name + '.png'))

# COMPARE different models

In [None]:
predictions_folder_A =  Path.cwd() / 'test_set_pred' / 'extra_mae'
predictions_folder_B =  Path.cwd() / 'test_set_pred' / 'oldmodel'
compare_path = Path.cwd() / 'test_set_pred' / 'a_mae_b_oldmodel'
compare_path.mkdir(exist_ok=True, )

for name, dataset in tqdm(datasets.items(), total=len(list(datasets.keys()))):

    samples_dir_A = predictions_folder_A / name
    samples_dir_B = predictions_folder_B / name
    if not samples_dir_A.is_dir(): continue
    if not samples_dir_B.is_dir(): continue

    # loop over all samples
    for sample_path in samples_dir_A.iterdir():
        if not sample_path.is_dir(): continue
        if '.DS_Store' in sample_path.as_posix(): continue

        path_a = samples_dir_A / sample_path.name
        path_b = samples_dir_B / sample_path.name


        blank_image = Image.new("L", (512*3, 512))
        gt = Image.open(sample_path / 'gt.png')
        pred_a = Image.open(path_a / 'prediction.png')
        pred_b = Image.open(path_b / 'prediction.png')
        #plane1 = Image.open(sample_path / 'plane1.png').resize((256,256))
        #plane4 = Image.open(sample_path / 'plane4.png').resize((256,256))
        
        blank_image.paste(gt, (512,0))
        blank_image.paste(pred_a, (0,0))
        blank_image.paste(pred_b, (1024,0))
        #blank_image.paste(plane1, (512,0))
        #blank_image.paste(plane4, (512,256))
        #blank_image.save(path / (sample_path.name + 'combo.png'))
        blank_image.save(compare_path / (name + '_' + sample_path.name + '.png'))
