In [1]:
""" This file has operations for image processing """


import numpy as np
import gzip
from enums import Metrics
from io import BytesIO
from PIL import Image
from skimage import img_as_ubyte, img_as_float
from skimage.measure import compare_psnr, compare_ssim
from skimage.util import view_as_blocks
from bitstring import Bits
from pathlib import PosixPath
from msssim import compare_msssim
import pandas as pd
import os
from pathlib import Path
import warnings

from enums import ImgData


class ImgProc:
    """ Static class that has methods to make basic operations on images """
    @staticmethod
    def pad_img(img, patch_size):
        """ Method that receives an image and a size for the patches. The method
            pad the image so that they can be cropped later
        """
        orig_shape = np.array(img.shape[:2])
        new_shape = patch_size * np.ceil(orig_shape / patch_size).astype(int)
        points_to_pad = new_shape - orig_shape
        pad_img = np.pad(img, [(0, points_to_pad[0]), (0, points_to_pad[1]),
                         (0, 0)], 'edge')
        return pad_img

    @staticmethod
    def is_pillow_valid_img(pathname, return_ref=True):
        """ Function that verifies if the file is a valid image considering
            the pillow library, that's used in this code. If desired, it
            returns the opened ref. The retuned reference must be closed later.
        """
        try:
            img = Image.open(pathname)
            is_valid = True
        except Exception:
            img = []
            is_valid = False

        ret = is_valid
        if return_ref:
            ret = list([ret])
            ret.append(img)
        else:
            img.close()

        return ret

    @staticmethod
    def get_img_real_ext(pathname):
        """ Function that identifies the format of the image and return it as
            a lower case string. If it's not a valid image, returns None.
        """
        valid, img = ImgProc.is_pillow_valid_img(pathname)
        if not valid:
            return None

        format = img.format.lower()
        img.close()
        return format

    @staticmethod
    def reconstruct_image(patches, width, height):
        """ Method that receives a 1D array of patches and the dimensions of
            the image and reconstruct the image based on the patches array
        """
        patch_size = patches[0].shape[0]
        colors = 1
        if len(patches[0].shape) > 2:
            colors = patches.shape[3]
        p_rows = np.ceil(height / patch_size).astype(int)
        p_columns = np.ceil(width / patch_size).astype(int)
        round_height = p_rows * patch_size
        round_width = p_columns * patch_size

        # First index changing fastest (reshape a grid of patches)
        img = patches.reshape(p_rows, p_columns, patch_size, patch_size,
                              colors)
        # Rows and columns near one another (for numpy to order them correctly)
        img = img.swapaxes(1, 2).reshape(round_height, round_width, colors)
        img = img[:height, :width]
        return img

    @staticmethod
    # TODO: use view_as_windows to implement extraction of overlapping patches
    def extract_img_patch(orig_img, patch_size):
        """ Method that receives an image and the patch size and extract
            the patches of the image.
        """
        if np.all(np.equal(orig_img.shape, patch_size)):
            return orig_img

        img = ImgProc.pad_img(orig_img, patch_size)
        color = 1
        if len(img.shape) > 2:
            color = img.shape[2]
        patches = view_as_blocks(img, (patch_size, patch_size, color))
        patches = patches.reshape(-1, patch_size, patch_size, color)
        return patches

    @staticmethod
    def conv_data_format(img, data_format):
        """ Method that receives a valid image array and a desired format to
            convert into.
        """
        if not isinstance(data_format, ImgData):
            raise ValueError("Format argument must be an " + ImgData.__name__)

        with warnings.catch_warnings():
            warnings.simplefilter('ignore')

            if data_format == ImgData.UBYTE:
                out_img = img_as_ubyte(img)
                return out_img

            if data_format == ImgData.FLOAT:
                out_img = img_as_float(img)
                out_img = out_img.astype(np.float32)
                return out_img

        raise ValueError('Range parameter is not recognized!')

    @staticmethod
    def save_img_from_ref(img_ref, new_path, kwargs={}):
        """ Utils function to rename a image. It's useful for conversion of
            images. It's also useful as a wrapper to avoid importing the PIL in
            other files
        """
        if isinstance(img_ref, (PosixPath, str)):
            img_ref = Image.open(img_ref)
        if isinstance(img_ref, np.ndarray):
            img_ref = Image.fromarray(img_ref)

        img_ref.save(new_path, **kwargs)

    @staticmethod
    def load_image(path, data_format=None, color_mode='RGB'):
        """ This method receives an image pathname and the target colorspace.
            If the path points to a non valid image, it returns empty data.
        """
        valid, img = ImgProc.is_pillow_valid_img(path)
        if not valid:
            return []

        if color_mode:
            img = img.convert(color_mode)
        img_data = np.array(img)
        img.close()

        if data_format:
            img_data = ImgProc.conv_data_format(img_data, data_format)

        return img_data

    @staticmethod
    def calc_n_patches(info, patch_size):
        """ Calculates the number of patches images with this height and
            widht considering the padding """
        if isinstance(info, (PosixPath, str)):
            img = Image.open(info)
            width, height = img.size
        elif isinstance(info, list):
            width, height = info
        else:
            width, height = info

        line_patches = np.ceil(height / patch_size).astype(int)
        column_patches = np.ceil(width / patch_size).astype(int)
        num_of_patches = line_patches * column_patches
        return num_of_patches

    @staticmethod
    def calc_bpp_using_gzip(list_arrays, pixels_num, bpp_proxy, pos):
        """ Function that calculates the bpp considering the gzip. It receives
            a data_array representing an image.
        """
        try:
            # If the number is not already an integer, it's not been quantized
            # So it's not fair do estimate bpp using a round version
            if np.all(np.equal(np.mod(list_arrays[0], 1), 0)):
                list_arrays = list(map(lambda e: np.array(e).astype(np.int),
                                       list_arrays))

            compressed = list(map(gzip.compress, list_arrays))
            # Bits has many representations. length get the len in bits
            bpp = list(map(lambda c: Bits(c).length / pixels_num, compressed))
            bpp_proxy[pos] = list(np.cumsum(bpp))
        except Exception as e:
            print(str(e))

    @staticmethod
    def calc_metric(true_ref, test_ref, metric):
        """ Wrapper from skimage psnr and ssim comparison. """
        if isinstance(true_ref, (Path, str)):
            true_ref = Image.open(true_ref)
        if isinstance(test_ref, (Path, str)):
            test_ref = Image.open(test_ref)

        true_ref = np.array(true_ref)
        test_ref = np.array(test_ref)

        if metric == Metrics.PSNR:
            result = compare_psnr(true_ref, test_ref)
        elif metric == Metrics.SSIM:
            result = compare_ssim(true_ref, test_ref, multichannel=True)
        elif metric == Metrics.MSSSIM:
            # TODO: find official implementation that really works (skvideo
            # did not work and apparently it's not being supported anymore)
            # TODO: verify this implementation is correct. Compare with the
            # other implementations
            true_ref = np.expand_dims(
                np.expand_dims(true_ref, 0), 0).swapaxes(0, -1)
            test_ref = np.expand_dims(
                np.expand_dims(test_ref, 0), 0).swapaxes(0, -1)
            aux_result = list(map(compare_msssim, true_ref, test_ref))
            result = np.mean(aux_result)

        # Put a huge number in this case
        result = np.iinfo(np.uint8).max if result == float('inf') else result

        return result

    @staticmethod
    def save_img(img_ref, path, mode='RGB'):
        """ Method that receives a numpy array and a path. It saves an image
            through PIL
        """
        try:
            if isinstance(img_ref, (str, PosixPath)):
                img_ref = Image.open(img_ref)
            elif isinstance(img_ref, np.ndarray):
                img_ref = Image.fromarray(img_ref)

            img_ref.save(path, mode=mode)
            img_ref.close()
        except Exception as e:
            print(str(e))

    @staticmethod
    def get_size(path):
        """ Wrapper that receives a path of an image and returns its size """
        with Image.open(path) as img_ref:
            width, height = img_ref.size
        return width, height

    @staticmethod
    def save_jpeg_analysis(img_path, out_pathname):
        """ This method analysis an image with jpeg wrt to bpp and visual
            metrics. It saves a csv file with this info.
        """
        # PIL doesn't handle numpy types
        quality = list(map(int, np.arange(69, 73)))
        params = list(map(lambda q: dict(quality=q), quality))

        valid, img_ref = ImgProc.is_pillow_valid_img(img_path)
        if not valid:
            return

        pixel_num = np.prod(img_ref.size)
        img_data = np.array(img_ref)
        bpp, psnr, ssim, msssim = [], [], [], []
        for param in params:
            with BytesIO() as buffer:
                img_ref.save(buffer, 'jpeg', **param)
                codec_data = np.array(Image.open(buffer))
                codec_buffer = buffer.getvalue()
            num_bits = Bits(codec_buffer).length
            bpp.append(num_bits / pixel_num)
            psnr.append(ImgProc.calc_metric(img_data, codec_data, Metrics.PSNR))
            ssim.append(ImgProc.calc_metric(img_data, codec_data, Metrics.SSIM))
            msssim.append(ImgProc.calc_metric(img_data, codec_data,
                                              Metrics.MSSSIM))
        data = np.stack((quality, psnr, ssim, msssim, bpp), axis=1)
        df = pd.DataFrame(data, columns=['quality', 'psnr', 'ssim',
                                         'msssim', 'bpp'])
        df.set_index('quality', inplace=True)
        df.to_csv(out_pathname)

    @staticmethod
    def save_jpeg2000_kakadu(orig_pathname, output_pathname, bpp):
        """ Save a image in kakadu jpeg2000 format """
        cmd = 'kdu_compress -i \'{}\' -o \'{}\' -rate {} >/dev/null 2>&1'\
            .format(orig_pathname, output_pathname, bpp)
        if os.system(cmd):
            raise RuntimeError('Error when executing kakadu binary!')



In [None]:
my_analysis=ImgProc()

original='/home/marcelo/Documentos/rnn-conv-autoencoder/imagens_teste/kodim'

for j in range (1,25):
    if j<10:  
        original_img= original+'0'+str(j)+'.bmp'
    else:
        original_img= original+str(j)+'.bmp'
    output='./jpeg100kakdu'+str(j)                   
    my_analysis.save_jpeg2000_kakadu(original_img, output, 2)

    

In [72]:
my_analysis=ImgProc()

#print(original)
reconstruida='/home/marcelo/Documentos/rnn-conv-autoencoder/Resultados/resultados2/kodim'
original='/home/marcelo/Documentos/rnn-conv-autoencoder/imagens_teste/kodim'
msssim=[]
ssim=[]
psnr=[]
for j in range (1,25):
    if j<10:  
        original_img= original+'0'+str(j)+'.bmp'
    else:
        original_img= original+str(j)+'.bmp'
                       
  
    for i in range(15,16):
               
        if i<10:  
            reconstruida_img= reconstruida+str(j)+'0'+str(i)+'.bmp'
        else:
            reconstruida_img= reconstruida+str(j)+str(i)+'.bmp'
    
     
        psnr.append(my_analysis.calc_metric(original_img,reconstruida_img,Metrics.PSNR))
        msssim.append(my_analysis.calc_metric(original_img,reconstruida_img,Metrics.MSSSIM))        
        ssim.append(my_analysis.calc_metric(original_img,reconstruida_img,Metrics.SSIM))
print('PSNR')   
for p in psnr:
    print(str(p).replace('.',','))
print(str(np.mean(psnr)).replace('.',','))
print(str(np.std(psnr)).replace('.',','))

print('SSIM')
for s in ssim:
    print(str(s).replace('.',','))
print(str(np.mean(ssim)).replace('.',','))
print(str(np.std(ssim)).replace('.',','))

print('MSSSIM')
for m in msssim: 
    print(str(m).replace('.',','))
print(str(np.mean(msssim)).replace('.',','))
print(str(np.std(msssim)).replace('.',','))
    

PSNR
31,859192542116507
35,38074298322865
37,46877862724646
35,85708796221175
31,692314114244393
32,886550910326726
37,01903088815439
29,924055904789416
37,15843018578245
36,95572071151756
33,82382064197072
37,53163809604375
28,317512400383137
32,6865292411929
35,11857255101528
36,11171828726212
36,238522752814916
31,5916513978147
34,50767055420816
35,88070558940806
33,353595854063784
34,2183694727478
38,06149464959112
31,049865135758523
34,36223214391222
2,6006973331927807
SSIM
0,9358506831524426
0,9257850949096431
0,9543991295723385
0,9409055466991226
0,9543567909162984
0,9416826844170006
0,9667761004379408
0,9383894601122401
0,9489006823785319
0,9543358943015751
0,9413576026553448
0,9477612039095584
0,9176941911380156
0,9426614137831627
0,9329013683672261
0,9502919569810823
0,9595871369868777
0,9352284317443086
0,944643031569497
0,9474764915903142
0,9443493163367281
0,9344725165001734
0,9576821431558254
0,9485974609067561
0,9444202638550835
0,010829547098602144
MSSSIM
0,989561724152

In [67]:
reconstruida='/home/marcelo/Documentos/rnn-conv-autoencoder/Resultados/resultados2/kodim'
original='/home/marcelo/Documentos/rnn-conv-autoencoder/imagens_teste/kodim'

msssim=np.zeros((24,16), dtype=np.float64)
ssim=np.zeros((24,16), dtype=np.float64)
psnr=np.zeros((24,16), dtype=np.float64)

for j in range (1,25):
    if j<10:  
        original_img= original+'0'+str(j)+'.bmp'
    else:
        original_img= original+str(j)+'.bmp'
                       
  
    for i in range(16):
               
        if i<10:  
            reconstruida_img= reconstruida+str(j)+'0'+str(i)+'.bmp'
        else:
            reconstruida_img= reconstruida+str(j)+str(i)+'.bmp'
    
        psnr[j-1][i]=my_analysis.calc_metric(original_img,reconstruida_img,Metrics.PSNR) 
        ssim[j-1][i]=my_analysis.calc_metric(original_img,reconstruida_img,Metrics.MSSSIM)
        msssim[j-1][i]=my_analysis.calc_metric(original_img,reconstruida_img,Metrics.SSIM)
        
print('PSNR:')
for y in range(0,16):
    ls=[]
    for x in range(0,24):
        ls.append(psnr[x][y])
    print(str(np.mean(ls)).replace('.',','))

        
print('SSIM:')
for y in range(16):
    ls=[]
    for x in range(0,24):
        ls.append(ssim[x][y])
    print(str(np.mean(ls)).replace('.',','))
    
print('MSSSIM:')
for y in range(16):
    ls=[]
    for x in range(0,24):
        ls.append(msssim[x][y])
    print(str(np.mean(ls)).replace('.',','))
        




PSNR:
22,718950662301882
25,205245475318474
26,66340365395708
27,725786239141843
28,768124246553366
29,582612656061702
30,30493317198945
30,97979652013983
31,52702330111049
32,0784827783857
32,61213969681853
33,0835569948517
33,468481085661104
33,77708884223001
34,116449216247695
34,36223214391222
SSIM:
0,8082727744126178
0,8968974749538517
0,9311679258399211
0,9489706827270016
0,9600724857588155
0,968173913408317
0,9729593559541104
0,9765084820321249
0,9798133586592286
0,9825017692770747
0,9839007371458681
0,9855111238672484
0,986817696991816
0,9882619266646696
0,9891880631453542
0,9900074058131118
MSSSIM:
0,6126334362453333
0,6970287114506295
0,7554866263038645
0,7986313502418652
0,8338850377753738
0,857785216947887
0,8765311199830172
0,8920905876233499
0,9031897156206806
0,9129983244729732
0,9215199795757724
0,9285015433891997
0,9335303721866942
0,9379298763444804
0,9414664136674403
0,9444202638550835
