In [None]:
!sudo apt-get install libmagickwand-dev
!pip install --no-cache-dir \
    opencv-python-headless==4.6.*\
    rawpy==0.17.* \
    pandas \
    Pillow==7.1.2 \
    scikit-image==0.16.2 \
    scipy \
    tqdm \
    Wand

In [None]:
!git clone https://github.com/andreacos/BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation

In [3]:
import sys

sys.path.insert(1, "BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation")

In [4]:
import io
import os
from glob import glob

import cv2
import numpy as np
import pandas as pd
import tensorflow as tf
from networks import (custom_mse_wrapper, custom_softmax_activation,
                      custom_two_terms_loss_wrapper)
from PIL import Image
from skimage.util import view_as_windows
from utils import label2coefficient, max_min_coefficient, string2Q

In [5]:
model_file = 'BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation/models/model_QF1_60-98_QF2_90-2-term-loss.h5'
image_file =  'BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation/resources/00000000_redaf7d93t.TIF_85_90.png'
_map = np.load('BoostingCNN-Jpeg-Primary-Quantization-Matrix-Estimation/resources/qf1_qf2_map_90.npy', allow_pickle=True)

In [6]:
def preprocess(image: np.array, dimension: tuple) -> np.array:
    """Function for preprocessing the input image."""
    if isinstance(image, str):
        image = cv2.imread(image)
        
    image = cv2.cvtColor(image, cv2.COLOR_BGR2YCrCb)[:,:,0]

    if image.shape != dimension:
        image = cv2.resize(image, dimension)

    return image.astype(np.float32) / 255.


def mse(x: np.array, y: np.array) -> float:
    """Function to calculate the mse."""
    return np.mean((x - y) ** 2, axis=1)


def make_predict(image: np.array, arr: np.array, maximum_coefficients: np.array) -> np.array:
    """Function to make predictions."""
    predicted_label = label2coefficient(model.predict(np.expand_dims(image, [0, -1]), verbose=0).flatten(), maximum_coefficients)
    return labels[np.argmin(mse(predicted_label, arr))]


def image_compression(image: np.array, *qfs: tuple) -> np.array:
    """Function to make image compress."""
    for qf in qfs:
        image = np.array(Image.open(io.BytesIO(cv2.imencode('.jpg', image, [int(cv2.IMWRITE_JPEG_QUALITY), qf])[1])))
    return image

In [7]:
maximum_coefficients, _ = max_min_coefficient(
    quality_range=(50, 100),
    n_coeffs=15
    )

In [8]:
model = tf.keras.models.load_model(
    model_file,
    custom_objects=({
        'custom_softmax': custom_softmax_activation(maximum_coefficients),
        'custom_two_terms_loss_wrapper': custom_two_terms_loss_wrapper(maximum_coefficients, 0.8),
        'custom_mse': custom_mse_wrapper(maximum_coefficients)
        })
    )



In [9]:
### TESTING
label2coefficient(
    model.predict(np.expand_dims(preprocess(image_file, (64, 64)), [0, -1])).flatten(), 
    max_coefficients=maximum_coefficients
    )



array([ 7,  5,  4,  6,  4,  3,  7,  6,  4,  6,  8,  7,  7,  8, 11])

In [10]:
labels = _map[:, :2]
arr = np.array([string2Q(i[2], size=(8, 8), flatten=True)[:15] for i in _map])

In [11]:
examples = [
    (75, ),
    (85, 90),
    (70, 90),
    (100, 90),
    (88, 90)
]

### A0: Программно сгенерировать 5 принципиально различных ситуаций: однократное сжатие, 𝑄𝐹1 < 𝑄𝐹2, 𝑄𝐹1 ≪ 𝑄𝐹2, 𝑄𝐹1 > 𝑄𝐹2, 𝑄𝐹1 ≈ 𝑄𝐹2. 

In [12]:
def a0_task(examples: list) -> pd.DataFrame:
    """Function for A0 task."""
    a0_data = []

    for example in examples:
        for path in glob('images/*.TIF'):
            image = image_compression(cv2.imread(path), *example)[:64, :64]
            prediction = make_predict(preprocess(image, (64, 64)), arr, maximum_coefficients)
            a0_data.append([prediction[0], example[0], np.abs(prediction[0] - example[0])])

    task_a0 = pd.DataFrame(a0_data, columns=['Prediction','QF1', 'Error'])
    return task_a0.groupby('QF1').mean().reset_index()

In [13]:
a0_task(examples)

Unnamed: 0,QF1,Prediction,Error
0,70,69.454545,2.727273
1,75,72.545455,2.818182
2,85,83.545455,1.454545
3,88,89.454545,4.909091
4,100,96.636364,3.363636


### A1: Сравнить ошибку при оценивании 𝑄𝐹1 по одному патчу и при оценивании по 𝑝 = 10 патчам, выбранным из одного изображения без пересечения.

In [14]:
def a1_task(examples: list) -> pd.DataFrame:
    """Function for A1 task."""
    a1_data = []

    for example in examples:
        for path in glob('images/*.TIF'):
            chunks = view_as_windows(image_compression(cv2.imread(path), *example), (64, 64, 3), 64).reshape(-1, 64, 64, 3)
            temp_data = []
            
            for chunk in chunks[:10]:
                predictions = make_predict(preprocess(chunk, (64, 64)), arr, maximum_coefficients)
                temp_data.append([predictions[0], np.abs(predictions[0] - example[0])])
        
            mean_result = np.mean(temp_data, axis=0)
            a1_data.append([mean_result[0], example[0], mean_result[1]])

    task_a1 = pd.DataFrame(a1_data, columns=['Prediction','QF1', 'Error'])
    return task_a1.groupby('QF1').mean().reset_index()

In [15]:
a1_task(examples)

Unnamed: 0,QF1,Prediction,Error
0,70,69.727273,1.763636
1,75,73.354545,1.754545
2,85,83.7,1.5
3,88,90.236364,4.2
4,100,96.127273,3.872727
