In [3]:
import cv2
from matplotlib import pyplot as plt
import numpy as np
import os
import pandas as pd
import random
from skimage import io
from shutil import copyfile
import sys
import time
import json
from tqdm import tqdm
from PIL import Image

import tensorflow as tf
import keras
from keras.preprocessing.image import ImageDataGenerator

from keras.models import Model, load_model
from keras.layers import Input
from keras.layers.core import Dropout, Lambda
from keras.layers.convolutional import Conv2D, Conv2DTranspose, UpSampling2D, Cropping2D
from keras.layers.pooling import MaxPooling2D
from keras.layers.merge import concatenate
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras import backend as K

In [5]:
tf.__version__

'2.0.0-alpha0'

In [None]:
input_path = 'data/'
output_path = 'outputs/'

In [None]:
def visualize_image(img=None, image_path=None):  # Define function to visualize an image
    if image_path is not None:
        img = io.imread(image_path)  # Read the image
        
    print(f"Image: {img.shape}")
    plt.figure(figsize=(15,10))
    plt.imshow(img)
    plt.axis("off")
    plt.show()

In [None]:
def dice_coeff(y_true, y_pred):
    smooth = 1.
    y_true_f = K.flatten(y_true)
    y_pred_f = K.flatten(y_pred)
    intersection = K.sum(y_true_f * y_pred_f)
    score = (2. * intersection + smooth) / (K.sum(y_true_f) + K.sum(y_pred_f) + smooth)
    return score

def dice_loss(y_true, y_pred):
    loss = 1 - dice_coeff(y_true, y_pred)
    return loss

In [None]:
# load and evaluate a saved model
from keras.models import load_model
import keras.losses
keras.losses.dice_loss = dice_loss

In [None]:
model = load_model('outputs/weights-008.h5')

# Test visually some results

In [None]:
test_images = os.listdir('data/test_images/test_hq')

In [None]:
img = io.imread('data/test_images/test_hq/' + test_images[178])  # Read the image
visualize_image(img=img)

In [None]:
a = np.array([tf.image.resize(img, [320, 480]).numpy()/255.])
b = model.predict(a)
visualize_image(img=b[0][:,:,0])

In [None]:
plt.figure(figsize=(15,10))
plt.imshow(img)
plt.imshow(b[0][:,:,0], cmap='jet', alpha=0.5) # interpolation='none'

In [None]:
test_datagen = ImageDataGenerator(rescale=1. / 255)
batch_size = 1

test_generator = test_datagen.flow_from_directory(
    directory='data/test_images/',
    target_size=(320, 480),
    batch_size=batch_size,
    class_mode=None,
    shuffle=False
)

In [None]:
def rle_encoding(x):
    '''
    x: numpy array of shape (height, width), 1 - mask, 0 - background
    Returns run length as list
    '''
    x = np.where(x < 0.5, 0, 1)
    dots = np.where(x.T.flatten()==1)[0] # .T sets Fortran order down-then-right
    run_lengths = []
    prev = -2
    for b in dots:
        if (b>prev+1): run_lengths.extend((b+1, 0))
        run_lengths[-1] += 1
        prev = b
    return run_lengths



In [None]:
def process_prediction(p):
    f = []
    for pred in p:
        f.append(rle_encoding(pred))
    
    return f

def write_rle(f, filepath):
    with open(filepath, 'a') as f_in:
        for l in f:
            l = list(map(str, l))
            l = json.dumps(l)
            f_in.write(l + '\n')

In [None]:
i = 0
maximumPredictions = 100064 // batch_size

for x in test_generator: #if the generator doesn't have y, use only "for x in..."
    predictions = model.predict(x)
    f = process_prediction(predictions)
    
    write_rle(f, 'data/model_predictions/prediction_batch_' + str(i) + '.csv')
    i+=1

    if i == maximumPredictions:
        break;