In [0]:
"""
Script with all imported "helper"-functions used in the Training Session.
"""

import os
import json
import skimage
import tensorflow as tf
import numpy as np
import matplotlib.image as mpimg
from skimage.io import imread
from pathlib import Path
from collections import Counter
from math import radians, cos, sin, asin, sqrt

In [0]:
# model output shape transformation

In [0]:
def convert_to_one_labelmatrix(X, labels):
    """
    converts labelmatrices to one matrix.
    prioritylist handles priority of labels (low to high; high overrides low)
    Args:
        X (nparray): labelmatrix of shape (img_dim,img_dim,number of labels)
        labels (list): list of labels ordered by expected index
    Returns:
        Y (nparray): matrix of shape (img_dim,img_dim) containing label_index of each pixel
    """
    labels_without_BG = labels[1:]
    Y = np.zeros([X.shape[0], X.shape[1]])
    for label in labels_without_BG:
        for x in range(X.shape[0]):
            for y in range(X.shape[1]):
                if X[x, y, labels_without_BG.index(label)] != 0:
                    Y[x, y] = labels_without_BG.index(label) +1
    
    return Y

In [0]:
# Loading functions for u-net

In [0]:
def load_images(paths_to_image, image_size):
    # initiate empty array of arrays with len of paths_to_image and shape of (image_size,image_size, 3)
    X = np.zeros((len(paths_to_image), image_size, image_size, 3), dtype=np.uint8)

    # fill empty array with actual image data
    for n, img in enumerate(paths_to_image):
        image = imread(img)[:, :, :3]
        X[n] = image

    return X

  
def rgb_to_labelmask(rgb_msk, colors):
  labelmask = np.zeros(rgb_msk.shape, dtype="uint8")
  for i, rgb in enumerate(colors.values()):
    labelmask[np.all(rgb_msk == rgb, axis=-1)] = i
  labelmask = labelmask[:,:,2]
  return labelmask


def load_masks(paths_to_masks, colors):
  x = np.array([np.array(rgb_to_labelmask(imread(mask)[:,:,0:3], colors)) for mask in paths_to_masks])
  y = np.array([np.array(tf.keras.utils.to_categorical(x_instance, num_classes=17, dtype='float32')) for x_instance in x]) 
  return y


In [0]:
# PV/Biodiversity/CO2 Score Calculation Functions for predited Image

In [0]:
def image_location(image_path):
    # transforms one image path to a 2 deminsonal of longitude and latitude
    return [float(x) for x in Path(image_path).stem.split('_')]

def distance_in_km(first_location, second_location):
    # returns the distance in km between two sets of 2d arrays with longitudes and latitudes
    first_location_lat, first_location_long, second_location_lat, second_location_long = map(radians, [first_location[0], first_location[1], second_location[0], second_location[1]])
    dlon = second_location_long - first_location_long 
    dlat = second_location_lat - first_location_lat 
    
    a = sin(dlat/2)**2 + cos(first_location_lat) * cos(second_location_lat) * sin(dlon/2)**2
    distance = 6371 * (2 * asin(sqrt(a)))
    return distance


def sun_hours_nearest_station(image_location,weather_data_list):
    # returns the average yearyl sun hours from the closest weather station to the current image from a weather_data list
    nearest_weather_station = (min(weather_data_list, key=lambda weather_station: distance_in_km(image_location, weather_station)))
    return nearest_weather_station[4]


def count_distinct_value_occurances(one_label_matrix):
    occurances = np.zeros(19)
    
    # each label is counted how often it occurs in the matrix
    for x in range(one_label_matrix.shape[0]):
      for y in range(one_label_matrix.shape[1]):
           if one_label_matrix[x,y] != 0:
                occurances[int(one_label_matrix[x,y])-1] = occurances[int(one_label_matrix[x,y])-1] + 1
    
    return occurances

  
def apply_scores(label_pixel_count,score_dict):
    # calculate the total score as a product of pixelcount * labelscore
    score = 0
    for counter, label in enumerate(score_dict):
      score = label_pixel_count[counter] * score_dict[label] + score
    
    return score

  
def score_calculation(one_image_result,weather_data_list,label_scores):
    '''
    updates a dict including (one_image_result) with the Pv, Biodiversity and Co2 Score
        one_image_result: dict including predicted mask of an image
        label_score_list: dict with labels as keys and scores as values
    Returns: input dict updated with 3 key-values pairs of scores needed for submission
    '''
    label_scores_indexed = label_scores.set_index('labels')
    label_score_dict = label_scores_indexed.to_dict()
    # find based on the location of the image the nearest weather station
    hours_of_sun = sun_hours_nearest_station(image_location(one_image_result['Img']),weather_data_list)
    
    # count how many pixels each label has
    occurance_array = count_distinct_value_occurances(one_image_result['predicted_mask'])

    # updating the image object
    one_image_result.update({
        'emmission_score': apply_scores(occurance_array,label_score_dict['CO2']),
        'solar_score': apply_scores(occurance_array,label_score_dict['usable_area'])/25 * hours_of_sun, # 1pixel ~ 0.2m*0.2m 
        'biodiversity_score': apply_scores(occurance_array,label_score_dict['biodiversity']) 
    })
    
    return one_image_result

In [0]:
def orthogonal_rot(image):
    return np.rot90(image, np.random.choice([-1, 0, 1]))

In [0]:
def augment_pictures(img_dir, mask_dir, image_size=512, aug_per_img=5, seed=0):
    #Augments images and masks with keras Image DataGenerator and saves the results back to the same directories.
        
    # img_dir : directory of the images
    # mask_dir: directory of the png masks (after labelme transformation)
    # image_size (int): dimension of pictures
    # nr_aug_per_img (int): how many augmentations per picture to generate

    # create image / mask lists
    images = [f for f in Path(img_dir).glob('*.jpg')]
    masks = [f for f in Path(mask_dir).glob('*.png')]

    # create list of images/ masks as matrices (an array)
    x = np.zeros((len(images), image_size, image_size, 3), dtype=np.uint8)
    y = np.zeros((len(masks), image_size, image_size, 3), dtype=np.uint8)

    for n, img in enumerate(images):
        image = imread(img)[:, :, :3]
        x[n] = image

    for n, msk in enumerate(masks):
        mask = imread(msk)[:, :, :3]
        y[n] = mask

    # defining the types of augmentations we want to perform
    args = dict(
        vertical_flip=True,  # flip inputs vertically
        horizontal_flip=True,  # flip inputs horizontally
        preprocessing_function=orthogonal_rot,
        #width_shift_range=[-0.1, 0.1],  # array of floats of total width
        #height_shift_range=[-0.1, 0.1],  # array of floats of total width
        #rotation_range=15,  # Int., Degree range for random rotations
        fill_mode='constant', # one of constant, nearest, reflect, wrap
        cval=0, # color to fill if fill_mode = constant
        #zoom_range=[0.7, 1.5],  # float range for random zoom
    )

    data_gen_img = ImageDataGenerator(**args, brightness_range=[0.8, 1.2])
    data_gen_mask = ImageDataGenerator(**args, brightness_range=[1, 1]) # keep brightness in masks
    
    data_gen_img_it = data_gen_mask.flow(x, batch_size=1, save_to_dir= img_dir, save_format='jpg', seed=seed)
    data_gen_msk_it = data_gen_mask.flow(y, batch_size=1, save_to_dir= mask_dir, save_format='png', seed=seed)
    
    for loop in range(len(masks) * aug_per_img):
        data_gen_img_it.next()
        data_gen_msk_it.next()