In [None]:
from fcn import FCN
from utils import get_session
from config import *

import tensorflow as tf
import cv2
import numpy as np

import json
import os
import os.path as osp
from glob import glob
import urllib

In [None]:
# Load images into memory.
imgps = glob("./original_images/*.jpg")
print(len(imgps))

images = []
filenames = []
for imgp in imgps[:]:
    print(imgp)
    img = cv2.imread(imgp)
    # reverse gamma correction for sRGB
    img = (img / 255.0) ** 2.2 * 65536
    images.append(img)
    
    filenames.append(osp.basename(imgp))

print(len(images))

In [None]:
# Test inference on multiple images.
tf.reset_default_graph()  # allows multiple executions of this block

ckpt = "pretrained/colorchecker_fold1and2.ckpt"
with get_session() as sess:
    fcn = FCN(sess=sess, name=ckpt)
    fcn.load_absolute(ckpt)
    fcn.test_external(images=images, fns=filenames, show=False, write_compare=True)

## Load images from URLs

In [3]:
manifest = json.load(open("match_dataset_combined.json"))
len(manifest)

2311

In [None]:
manifest[0]

In [None]:
target = "match_images_internal"
list_name = "url.list"

!mkdir {target}

In [None]:
urls = [url for d in manifest for url in d[target]]
print(len(urls))
urls = list(set(urls))
print(len(urls))

with open(osp.join(target, list_name), "w") as f:
    for url in urls:
        f.write(url+"\n")

urls[0]

In [86]:
# Will take 10 minutes.
cmd = 'cat ./{}/{} | parallel --gnu -j 20 "wget {} --directory-prefix ./{}/ -T 5 -t 1"'.format(target, list_name, "{}", target)
print(cmd)

cat ./match_images_internal/url.list | parallel --gnu "wget {} --directory-prefix ./match_images_internal/"


In [None]:
# Load images from urls.
for url in urls[100:102]:
    print(url)
    url_response = urllib.urlopen(url)
    img = cv2.imdecode(np.array(bytearray(url_response.read()), dtype=np.uint8), -1)
    # reverse gamma correction for sRGB
    img = (img / 255.0) ** 2.2 * 65536
    images.append(img)
    
    filenames.append(osp.basename(url))

## Inner loop

In [None]:
def test_external(self, imgpaths, scale=1.0, show=True, save_prefix="cc_outputs", write=False, write_compare=False):
    illums = []
    confidence_maps = []
    errors = []
    for i, imgp in enumerate(imgpaths):
        filename = osp.basename(imgp)+".jpg"  # force save as jpg
        print ""
        print i, filename
        
        try:
            img = cv2.imread(imgp)
        except Exception as e:
            print("ERROR can't read image {}\n{}".format(filename, e))
            errors.append(imgp)
            continue
        
        if img is None:
            print("ERROR read None for {}".format(filename))
            errors.append(imgp)
            continue
        
        # reverse gamma correction for sRGB
        img_linear = (img / 255.0) ** 2.2 * 65536
        
        if scale != 1.0:
            img_linear = cv2.resize(img_linear, (0, 0), fx=scale, fy=scale)
        
        shape = img_linear.shape[:2]    
        if shape not in self.test_nets:
            #print("shape {}".format(shape))
            
            aspect_ratio = 1.0 * shape[1] / shape[0]
            if aspect_ratio < 1:
                target_shape = (MERGED_IMAGE_SIZE, MERGED_IMAGE_SIZE * aspect_ratio)
            else:
                target_shape = (MERGED_IMAGE_SIZE / aspect_ratio, MERGED_IMAGE_SIZE)
            
            target_shape = tuple(map(int, target_shape))

            test_net = {}
            test_net['illums'] = tf.placeholder(
                tf.float32, shape=(None, 3), name='test_illums')
            test_net['images'] = tf.placeholder(
                tf.float32, shape=(None, shape[0], shape[1], 3), name='test_images')
            with tf.variable_scope("FCN", reuse=True):
                try:
                    test_net['pixels'] = FCN.build_branches(test_net['images'], 1.0)
                except ValueError as e:
                    print("ERROR {}".format(e))
                    errors.append(imgp)
                    continue

                test_net['est'] = tf.reduce_sum(test_net['pixels'], axis=(1, 2))
            test_net['merged'] = get_visualization(
                test_net['images'], test_net['pixels'], test_net['est'],
                test_net['illums'], target_shape)
            self.test_nets[shape] = test_net
            
        test_net = self.test_nets[shape]

        pixels, est, merged = self.sess.run(
            [test_net['pixels'], test_net['est'], test_net['merged']],
            feed_dict={
                test_net['images']: img_linear[None, :, :, :],
                test_net['illums']: [[1, 1, 1]]
            }
        )
        est = est[0]
        est /= np.linalg.norm(est)

        pixels = pixels[0]
        confidences = np.linalg.norm(pixels, axis=2)
        confidence_maps.append(confidences)
        ind = int(confidences.flatten().shape[0] * 0.95)
        print("Confidence: {:0.1f} mean, {:0.1f} max, {:0.1f} 95%".format(
            confidences.mean(),
            confidences.max(),
            sorted(confidences.flatten())[ind]))
        merged = merged[0]
        illums.append(est)

        if show:
            cv2.imshow('Ret', merged[:, :, ::-1])
            k = cv2.waitKey(0) % (2**20)

        try:
            os.makedirs(save_prefix)
        except:
            pass

        corrected = np.power(img_linear[:,:,::-1] / 65535 / est[None, None, :] * np.mean(est), 1/2.2)[:,:,::-1]
        if show:
            cv2.imshow("corrected", corrected)
            cv2.waitKey(0)

        corrected = corrected * 255.0
        output_img = corrected
        if write_compare:
            orig_img = img #np.power(img / 65535, 1/2.2) * 255
            output_img = np.concatenate((orig_img, corrected), axis=1)

        cv2.imwrite(osp.join(save_prefix, 'corrected_%s' % filename), output_img, )

    return illums, confidence_maps

In [16]:
start_ind = 0
print(start_ind)
imgpaths = glob("./match_images/*")[start_ind:]
len(imgpaths)

0


6414

In [None]:
# Test inference on multiple images.
from summary_utils import get_visualization
tf.reset_default_graph()  # allows multiple executions of this block

ckpt = "pretrained/colorchecker_fold1and2.ckpt"
with get_session() as sess:
    fcn = FCN(sess=sess, name=ckpt)
    fcn.load_absolute(ckpt)
    illums, confidence_maps = test_external(fcn, imgpaths=imgpaths, save_prefix="match_colorcorrected", show=False, write_compare=False)
    

In [15]:
# Example of image which will not load.
img = cv2.imread(imgpaths[1])
img is None

True

In [14]:
# Example of an image which does not error on loading, but also does not load.
img = cv2.imread(imgpaths[33])
print type(img), img.shape

<type 'numpy.ndarray'> (1, 1, 3)
