<a href="https://colab.research.google.com/github/pjbenard/MPEG/blob/main/JPEG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Format de compression MPEG

Notre projet consiste en l'implémentation du format compression MPEG. 
Le projet est divisé en deux parties : la compression JPEG et le "flot optique".

## I. Compression JPEG

La compression JPEG (Joint Photographic Experts Group) est un processus qui permet de réduire la taille d'une image 

Le processus de compression comporte six étapes principales :

1. Transformation de couleurs
2. Sous échantillonnage
3. Découpage en blocs de pixels
4. DCT
5. Quantification
6. Codage RLE et Huffman 


### Importation des librairies

In [None]:
import numpy as np
import scipy as sp
import scipy.fftpack as fft

#from bokeh.plotting import figure, show
#from bokeh.io import output_notebook
#import holoviews as hv
#hv.config.enable_colab_support = True
#hv.extension('bokeh')

import matplotlib.pyplot as plt

from PIL import Image
import requests
from io import BytesIO

Chargement de l'image. On a le choix entre l'image 'RGB_illumination.jpg' ou 'montagne.jpg'.

In [None]:

#img_url = "https://upload.wikimedia.org/wikipedia/commons/2/28/RGB_illumination.jpg"
#response = requests.get(img_url)
#img = np.array(Image.open(BytesIO(response.content))).astype(int)

img = np.array(Image.open('images_notebook/montagne.jpg'))

In [None]:
img.shape

Affichage de l'image et des channels

In [None]:
def plot_img_channels(img, cmaps=['Reds', 'Greens', 'Blues']):
    fig, axs = plt.subplots(1, 4, figsize=(16, 3), sharey=True, sharex=True)
    axs[0].imshow(img.astype(int))

    for col, cmap in enumerate(cmaps):
        axs[col + 1].imshow(img[...,col], cmap=cmap)

    plt.show()

plot_img_channels(img)

### 1. Transformation des couleurs

D'abort on va transformer les couleurs de l'image.
Les codages de couleur type luminance/chrominance donnent les meilleurs taux de compression car oeil humain assez sensible à la luminosité (luminance) mais peu à la teinte (chrominance) d'une image. (On fera donc un sous échantillonnage de couleurs sur ces couleurs là plutôt que sur RGB)

![Perception](images_notebook/perception.png)

La figure montre que la sensibilité de l'oeil humain est bien différente pour les couleurs rouge, vert et bleue constitutives de nos images. Ainsi le vert est-il le mieux perçu, puis vient le rouge, et enfin le bleu de maniere minoritaire. C'est donc "moins grave" de perdre l'information avec les couleurs type luminance/chrominacne. On passera donc d'une image codée en RGB à une image codée en fonction de sa luminance (Y), et de sa chrominance (Cb, Cr) (format YUV) 



Changer de couleurs RGB à YUV consiste à faire un changement de base orthogonale (Rappel : la base de RGB est orthogonale)

En principe, on a en quelque sorte :

  $$  Y ≃ R + G + B \\
    U ≃ B – Y \\
    V ≃ R – Y$$
    
La matrice de changement de base est plus particulièrement définie ainsi : (Wikipédia + autres sources)

![changement](changement_base.png)

Ici, on utilise simplement une fonction du module Image de la librairie PIL.

Implémentation de RGB_to_YCbCr

In [None]:
def RGB_to_YCbCr(img_rgb):
    # 1.3 s ± 10.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    conv = np.array([[ 65.481, 128.553,  24.966], 
                     [-37.797, -74.203, 112.   ], 
                     [112.   , -93.786, -18.214]])
    
    img_ycbcr = np.dot(img_rgb.astype(float)/255, conv.T)
    img_ycbcr[:,:,0] += 16
    img_ycbcr[:,:,[1,2]] += 128
    return img_ycbcr.astype(int)

def RGB_to_YCbCr_v2(img_rgb):
    # 123 ms ± 1.31 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
    return np.array(Image.fromarray(img_rgb).convert('YCbCr'))
    
def YCbCr_to_RGB(img_ycbcr):
    # 1.34 s ± 15.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    conv = np.array([[1,  0      , 1.402  ], 
                     [1, -0.34414, -.71414], 
                     [1,  1.772  , 0      ]])

    img_rgb = img_ycbcr.astype(float)
    img_rgb[:,:,[1,2]] -= 128
    img_rgb = np.dot(img_rgb, conv.T)
    
    return np.clip(img_rgb, 0, 255).astype(int)

def YCbCr_to_RGB_v2(img_ycbcr):
    # 93.3 ms ± 618 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
    pre_img = np.clip(img_ycbcr, 0, 255).astype(np.uint8)
    return np.array(Image.fromarray(pre_img, mode='YCbCr').convert('RGB'))

In [None]:
img_ycbcr = RGB_to_YCbCr(img)
print(img_ycbcr.shape)
plot_img_channels(img_ycbcr, ['gray'] * 3)

In [None]:
plot_img_channels(YCbCr_to_RGB(img_ycbcr))

Comparaison avec la fonction du module Image de PIL

In [None]:
img_yuv = RGB_to_YCbCr_v2(img)

plot_img_channels(np.array(img_yuv), ['gray'] * 3)

In [None]:
img_rgb = YCbCr_to_RGB_v2(img_ycbcr)

plot_img_channels(np.array(img_rgb))

### 2. Sous échantillonnage des couleurs

La deuxième étape de la compression est le sous échantillonnage des couleurs (Cb et Cr). On sépare les channels. On ne touche pas au channel Y mais on va rétrécir les image de U et V. Il y a plusieurs réglages possibles que l'on décrit avec la « notation J:a:b », définie ainsi, par bloc de 8x8 :
- J est le nombre de pixels de Y conservés pour 4 pixels affichés, sur chaque ligne ;
- a est le nombre de pixels de U conservés pour 4 pixels affichés, sur les lignes paires ;
- b est le nombre de pixels de V conservés pour 4 pixels affichés, sur les lignes impaires.


![subsampling](images_notebook/subsampling.png)

Ainsi le sous échantillonnage de couleur le plus utilisé est le 4:2:0 (c'est à dire qu'on découpe l'image en bloc de 8x8). (Mais ce n'est pas le plus important, on peut prendre du 4:4:4)

In [None]:
def subsampling_YCbCr(img_ycbcr):
    img_sub = img_ycbcr.copy()
    # Verticalement : tous les deuxièmes coefficients sont égaux au coefficient qu'il y a au dessus
    img_sub[1::2,:,1] = img_sub[::2, :, 1] #channel Cb
    img_sub[1::2,:,2] = img_sub[::2, :, 2] #channel Cr
    #Horizontalement : tous les deuxièmes coeff sont égaux au coeff à leur gauche
    img_sub[:, 1::2,1] = img_sub[:, ::2,1] 
    img_sub[:, 1::2,2] = img_sub[:, ::2,2] 
    return img_sub
    

In [None]:
img_sub = subsampling_YCbCr(img_ycbcr)
plot_img_channels(np.array(img_sub))
plot_img_channels(np.array(img_ycbcr))



In [None]:
np.unique(img_sub[:,:,1] - img_ycbcr[:,:,1],return_counts= True)

In [None]:
img_ycbcr = np.copy(img_sub)

### 3. Découpage de l'image en blocks


En JPEG, on ne travaille pas sur une image entière : on travaille sur des blocs de 8x8 pixels (séparément en ce qui concerne l’intensité, le bleu et le rouge, donc).
Si la taille d’une image n’est pas exactement un multiple de 8 dans un axe donné, et que
la compression est forte, de légers défauts de compression pourraient apparaître. C’est un des soucis de JPEG.

Chaque bloc de 8x8 est en suite envoyé pour être transformé par DCT.



Translation des coefficients de [0;255] à [-128;127]


In [None]:
def shift_array(arr, shift=-128):
    # 67.9 ms ± 834 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
    return arr + shift

In [None]:
img_shifted = shift_array(img_ycbcr)

Implémentation de transform_into_blocks

In [None]:
def transform_into_blocks(img, block_size=8):
    # 696 ms ± 14.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    """
    Return a array of size (img.shape[0] // block_size, img.shape[1] // block_size, 3, block_size, block_size) or
                         (3, img.shape[0] // block_size, img.shape[1] // block_size, block_size, block_size) (TBD)
    First shape reads block from top to bottom, from left to right.
    """
    nb_blocks_height = img.shape[0] // block_size
    nb_blocks_width  = img.shape[1] // block_size

    blocks = np.empty((nb_blocks_height, nb_blocks_width, 3, block_size, block_size), dtype=img.dtype)

    for y in range(nb_blocks_height):
        for x in range(nb_blocks_width):
            for color in range(3):
                blocks[y, x, color] = img[y * block_size:(y + 1) * block_size, 
                                          x * block_size:(x + 1) * block_size, 
                                          color]

    return blocks


def transform_into_blocks_v2(img, block_size=8):
    # 6.45 µs ± 113 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    shape = img.shape
    pre_blocks = np.reshape(img, (shape[0] // block_size, block_size, 
                                  shape[1] // block_size, block_size, 
                                  3))
    
    blocks = np.moveaxis(pre_blocks, [0, 1, 2, 3, 4], [0, 3, 1, 4, 2])
    
    return blocks

In [None]:
blocks = transform_into_blocks_v2(img_shifted)

In [None]:
blocks.shape

In [None]:
b1 = shift_array(blocks[0, 0, 1])
b1

Implémentation de transform_into_image (opération inverse)

In [None]:
def transform_into_image(blocks, block_size=8):
    # 768 ms ± 39.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    """
    Return a array of size (img.shape[0] // block_size, img.shape[1] // block_size, 3, block_size, block_size) or
                         (3, img.shape[0] // block_size, img.shape[1] // block_size, block_size, block_size) (TBD)
    First shape reads block from top to bottom, from left to right.
    """
    img_height = blocks.shape[0] * block_size
    img_width  = blocks.shape[1] * block_size

    img = np.empty((img_height, img_width, 3), dtype=blocks.dtype)

    for i in range(blocks.shape[0]):
        for j in range(blocks.shape[1]):
            for color in range(3):
                img[
                    i * block_size : (i + 1) * block_size, 
                    j * block_size : (j + 1) * block_size, 
                    color,
                ] = blocks[i, j, color]

    return img

def transform_into_image_v2(blocks, block_size=8):
    # 6.53 µs ± 122 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
    pre_img = np.moveaxis(blocks, [0, 3, 1, 4, 2], [0, 1, 2, 3, 4])
    
    shape = pre_img.shape
    img = np.reshape(pre_img, (shape[0] * shape[1], shape[2] * shape[3], 3))
    
    return img

In [None]:
img_deblocked = transform_into_image_v2(blocks)

In [None]:
img_deblocked.shape

In [None]:
plot_img_channels(shift_array(img_deblocked, 128), ['gray'] * 3)

### 4. Transformée en cosinus discret 

On fait une transformée DCT soit Discrete Cosine Transform. On applique cette transfo numérique à chaque bloc (variante de la transfo de fourier). Cette transfo décompose un bloc (considéré comme une fc num à deux variables) en une somme de fc cosinus oscillant à des freq différentes. Chaque bloc est ainsi décrit en une carte de freq et en amplitude plutôt qu'en pixels et coeff de couleur. (formule de la DCT dispo sur wiki) 

Le calcul d'une DCT est l'étape qui coûte le plus de temps et de ressources dans la compression JPEG. Mais elle peremt de séparer les basses et hautes freq de l'image. 

In [None]:
dct1 = fft.dctn(b1)
dct1.shape, dct1.astype(int)

In [None]:
dct1 = fft.dctn(b1, norm='ortho')
dct1.shape, (dct1).astype(int)

Implémentation DCT

In [None]:
def apply_dct(blocks):
    # 650 ms ± 18.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    return fft.dctn(blocks, axes=[-2, -1], norm ='ortho')

In [None]:
blocks_dct = apply_dct(blocks)

In [None]:
blocks_dct.shape

Implémentation DCT inverse

In [None]:
def apply_idct(blocks):
    # 631 ms ± 9.37 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    return np.round(fft.idctn(blocks, axes=[-2,-1], norm='ortho')).astype(np.float64)

In [None]:
blocks_idct = apply_idct(blocks_dct)

In [None]:
print(blocks_idct.shape)
print(blocks_idct[0, 0, 1])

In [None]:
#blocks = np.random.randn(16, 16)
np.allclose(blocks, apply_idct(apply_dct(blocks)))

### 5. Quantification

C'est à cette étape que l'on perd l'information

In [None]:
quantization_matrix = np.array([[16, 11, 10, 16,  24,  40,  51,  61],
                                [12, 12, 14, 19,  26,  58,  60,  55],
                                [14, 13, 16, 24,  40,  57,  69,  56],
                                [14, 17, 22, 29,  51,  87,  80,  62], 
                                [18, 22, 37, 56,  68, 109, 103,  77], 
                                [24, 35, 55, 64,  81, 104, 113,  92], 
                                [49, 64, 78, 87, 103, 121, 120, 101], 
                                [72, 92, 95, 98, 112, 100, 103,  99]], dtype=int)

In [None]:
def get_quant_matrix(Q=50):
    if Q == 50:
        return quantization_matrix
    
    elif Q > 50:
        return np.round(50 * quantization_matrix / Q)
    
    else:
        return np.round((100 - Q) * quantization_matrix / 50)

In [None]:
def quantize(arr, quant_mat=get_quant_matrix(Q=50)):
    # 311 ms ± 13.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    return np.round(np.divide(arr, quant_mat)).astype(int)

def dequantize(arr, quant_mat=get_quant_matrix(Q=50)):
    # 141 ms ± 5.59 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
    return np.multiply(arr, quant_mat)

In [None]:
blocks_quant = quantize(blocks_dct)

In [None]:
blocks_quant[0, 0, 1]

In [None]:
blocks_dequant = dequantize(blocks_quant)

In [None]:
img_deblocked = transform_into_image(blocks_idct)
img_deblocked.shape

In [None]:
plot_img_channels(YCbCr_to_RGB(shift_array(img_deblocked, 128)))

### 6. Codage RLE et Huffman 


On va calculer les entropies des DCT.
L'entropie permet de mesurer le nombre de bit nécessaire pour coder l'information de la DCT.
D'abord on doit aplatir les blocks 8x8 de l'image en vecteurs ligne, en prenant les coefficient en zigzag selon l'image suivante :

![zigzag](images_notebook/zigzag.png)

On va calculer l'entropie pour la DCT de chaque block aplatis selon le zigzag. 
- Il nous faut une fonction bloc_to_zigzag renvoyant une ligne de 64 ;
- Les lignes zigzag s'arrêtent avec un signal "end of block" à définir, qui détermine à partir de quel coefficient le vecteur n'est rempli plus que de zéros
- Une fonction qui prend toutes les lignes zigzag correspondant à tous les blocs de l'image, qui sélectionne le premier coeff de chaque ligne pour les mettre dans un vecteur "premiers coeffs" et qui sélectionne tous les autres vecteurs pour les mettre dans un vecteurs "autres coeffs".

L'entropie (de Shannon) est définie par :

$$E(\pi) = - \sum_i \pi(\alpha_i) log_2(\pi(\alpha_i)) $$

Où $\pi$ est la loi de probabilité d'apparition de chaque coefficient.


In [None]:
def RLE(block):
#     block_rot = np.rot90(block, axes=(-2, -1))
    block_rot = np.rot90(block)
    flat_array = []
    
    n = block.shape[0]
    for i in range(-n + 1, n):
        flat_array += list(np.diagonal(block_rot, offset=i))[::int((-1)**i)]
    
    arr = np.array(flat_array, dtype=int)    
    return arr

In [None]:
rle = RLE(blocks_quant[0, 0, 1])

In [None]:
def put_EOB(ligne): #coupe le vecteur là où les 0 commencent et ajoute le EOB
    size = ligne.size
    count = 0
    i = size-1
    while (ligne[i]==0) and i > 0:
            count +=1
            i -= 1
    new_ligne = ligne[:i+1].tolist()
    new_ligne += ['EOB']
    return new_ligne

In [None]:
#test
ligne_1 = np.array([1,2,3,0,0,4,1,2,0,0,0,0,0])
ligne_2 = np.array([2,0,0,1,4,0,0,0,0,0,0,0,0])
new_ligne_1 = put_EOB(ligne_1)
new_ligne_2 = put_EOB(ligne_2)
print(new_ligne_1)
print(new_ligne_2)
put_EOB(rle)

In [None]:
put_EOB(np.zeros(64, dtype=int))

In [None]:
all_ligne = [new_ligne_1,new_ligne_2]
all_ligne[1][1:]

In [None]:
def alphabet(all_ligne): #prend la liste des vecteurs lignes correspondant à tous les blocks de l'image
    premiers_coeffs = []
    autres_coeff = []
    for i in range(len(all_ligne)):
        premiers_coeffs += [all_ligne[i][0]]
        autres_coeff += all_ligne[i][1:]
    return premiers_coeffs, autres_coeff

In [None]:
#test
#print(all_ligne)
#print(np.shape(all_ligne))
premiers_coeffs, autres_coeffs = alphabet(all_ligne)
print(premiers_coeffs)
print(autres_coeffs)

In [None]:
#calcule la proba d'apparition des coeffs
#changer pour mettre des lignes à la place de img
def distribution_coeff(img):  
#     nb_pixel = img.size
    nb_pixel = len(img)
    coeffs,distribution = np.unique(img,return_counts= True)
    distribution = distribution/nb_pixel
    return coeffs, distribution, nb_pixel

#calcule l'entropie
def entropie(img):
    E = 0
    coeffs, distribution, nb_pixel = distribution_coeff(img)
    #print(distribution)
#     print(np.sum(distribution))
    E = -np.sum(distribution*np.log2(distribution))
    return E

In [None]:
def entropy_dct(blocks):
    shape = blocks.shape
    rles = np.empty(shape[:-2] + (np.prod(shape[-2:]),))
    after_EOB = []
    for i in range(shape[0]):
        for j in range(shape[1]):
            for c in range(shape[2]):
                after_EOB.append(put_EOB(RLE(blocks[i, j, c])))
    
    first_coeffs, other_coeffs = alphabet(after_EOB)
    entropy_1 = entropie(first_coeffs)
    entropy_2 = entropie(other_coeffs)
    
    return entropy_1, entropy_2, len(first_coeffs), len(other_coeffs)

In [None]:
entropy_dct(blocks_quant)

### 7. From image to jpeg

In [None]:
def to_JPEG(img, Q=50):
    # 5.91 s ± 21.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    """Compress and return a JPEG image"""
    T = get_quant_matrix(Q=Q)
    
    img = RGB_to_YCbCr(img)
    img = shift_array(img, shift=-128)
    
    blocks = transform_into_blocks(img)
    blocks = apply_dct(blocks)
    blocks = quantize(blocks, T)
    
    blocks = dequantize(blocks, T)
    blocks = apply_idct(blocks)
    
    img = transform_into_image(blocks)
    img = shift_array(img, shift=128)
    img = YCbCr_to_RGB(img)
    
    return img

def to_JPEG_v2(img, Q=50):
    # 2.21 s ± 11.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    """Compress and return a JPEG image"""
    T = get_quant_matrix(Q=Q)
    
    img = RGB_to_YCbCr_v2(img)
    img = shift_array(img, shift=-128)

    blocks = transform_into_blocks_v2(img)
    blocks = apply_dct(blocks)
    blocks = quantize(blocks, T)
    
    blocks = dequantize(blocks, T)
    blocks = apply_idct(blocks)
    
    img = transform_into_image_v2(blocks)
    img = shift_array(img, shift=128)
    img = YCbCr_to_RGB_v2(img)
    
    return img

In [None]:
def MSE(img_ref, img):
    size = img_ref.size
    return np.sum((img_ref - img)**2) / size

def PSNR(img_ref, img):
    return 20 * np.log10(255) - 10 * np.log10(MSE(img_ref, img))

In [None]:
img_RAW = np.copy(img)
plt.imshow(img_RAW)

In [None]:
img_JPEG = to_JPEG_v2(img_RAW)

In [None]:
plt.imshow(img_JPEG)

In [None]:
PSNR(img_RAW, img_JPEG)

In [None]:
for Q in range(0, 101, 10):
    img_JPEG = to_JPEG_v2(img_RAW, Q)
    psnr = PSNR(img_RAW, img_JPEG)
    print(f"Facteur de qualite {Q:>3d}, PSNR = {psnr:.2f}")