In [10]:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
tf.compat.v1.enable_eager_execution()

import sys
import os

import warnings

from tensorflow import keras
import numpy as np
from tensorflow.data import Dataset
from image import *
import tensorflow.image

import numpy as np
import json
import cv2
import time


def load_data_list():
    '''
    objective: load lists of data files
    return: lists
    '''
    train_list = []
    with open('train_data_list.txt', 'r') as f:
        for line in f:
            inner_list = ['../'+ elt.strip() for elt in line.split(',')]
            train_list.append(inner_list)
    train_list = [val.replace('\'','') for val in train_list[0]]

    test_list = []
    with open('test_data_list.txt', 'r') as f:
        for line in f:
            inner_list = ['../'+ elt.strip() for elt in line.split(',')]
            test_list.append(inner_list)
    test_list = [val.replace('\'','') for val in test_list[0]]

    return train_list, test_list



def load_data(paths, train = True):
    '''
    objective: load image files
    param: paths to each image files
    return: image files of input image and ground-truth image
    '''
    for img_path in paths:
        gt_path = img_path.decode("utf-8").replace('.jpg','.h5').replace('images','ground-truth')
        img = tf.io.read_file(img_path)
        img = tf.image.decode_jpeg(img, channels = 3)
        img = tf.cast(img, tf.float32)
        img = (img/127.5) - 1 # normalizing the images to [-1, 1]

        gt_file = h5py.File(gt_path, 'r')
        target = np.asarray(gt_file['density'])

        target = cv2.resize(target,(int(target.shape[1]/8),int(target.shape[0]/8)),interpolation = cv2.INTER_CUBIC)*64

        yield (img, target)


def load_datasets():
    '''
    objective: load datasets from lists of paths and apply load function
    return: train_dataset, test_dataset - tf.data.Dataset
    '''
    train_list, test_list = load_data_list()

    # load dataset from generator defined as load_data
    train_dataset = tf.data.Dataset.from_generator(
        load_data, args = [train_list],output_types = (tf.float32, tf.float32), output_shapes = ((None,None,3), (None,None)))
    train_dataset = train_dataset.shuffle(100000)

    test_dataset = tf.data.Dataset.from_generator(
        load_data, args = [test_list], output_types = (tf.float32, tf.float32), output_shapes = ((None,None,3), (None,None)))
    return train_dataset, test_dataset

def loss_fn(model, input_image, gt_image):
    '''
    objective: calculate loss from input image and ground-truth
    return: loss 
    '''
    output = model(np.expand_dims(input_image,0), training=True)
    output = tf.squeeze(output, [0,3])
    # mean squared error
    loss = tf.reduce_mean(tf.square(output - gt_image))
    return loss

def grad(model, input_image, gt_image):
    '''
    objective: apply gradient descent method to update model's weights
    '''
    with tf.GradientTape() as tape:
        loss = loss_fn(model, input_image, gt_image)
    return tape.gradient(loss, model.trainable_weights)


In [11]:
train_ds, test_ds = load_datasets()

In [12]:
for i, v in train_ds.take(10):
    print(i.shape)
    print(v.shape)
    print("=======")

(768, 1024, 3)
(96, 128)
(688, 1024, 3)
(86, 128)
(768, 1024, 3)
(96, 128)
(768, 1024, 3)
(96, 128)
(681, 1024, 3)
(85, 128)
(800, 600, 3)
(100, 75)
(768, 1024, 3)
(96, 128)
(620, 620, 3)
(77, 77)
(768, 1024, 3)
(96, 128)
(768, 1024, 3)
(96, 128)


In [18]:
mod = keras.layers.Conv2D(filters=512, kernel_size=3, activation=tf.nn.relu, padding='SAME',dilation_rate=2)

In [19]:
for i, v in train_ds.take(1):
    print(i.shape)
    print(mod(np.expand_dims(i,0)).shape)
    print(v.shape)
    print("=======")

(768, 1024, 3)
(1, 764, 1020, 512)
(96, 128)
