📝 **Author:** Amirhossein Heydari - 📧 **Email:** <amirhosseinheydari78@gmail.com> - 📍 **Origin:** [mr-pylin/media-processing-workshop](https://github.com/mr-pylin/media-processing-workshop)

---


**Table of contents**<a id='toc0_'></a>    
- [Dependencies](#toc1_)    
- [Load Images](#toc2_)    
- [Entropy](#toc3_)    
- [Redundancy](#toc4_)    
  - [Coding Redundancy](#toc4_1_)    
    - [Huffman Coding](#toc4_1_1_)    
    - [Arithmetic Coding](#toc4_1_2_)    
    - [Lempel-Ziv-Welch (LZW) Coding](#toc4_1_3_)    
  - [Interpixel (Spatial) Redundancy](#toc4_2_)    
    - [Run-Length Encoding (RLE)](#toc4_2_1_)    
    - [Differential Encoding](#toc4_2_2_)    
    - [Predictive Coding](#toc4_2_3_)    
    - [Transform Coding](#toc4_2_4_)    
    - [Wavelet Transform](#toc4_2_5_)    
    - [Block-based Compression](#toc4_2_6_)    
  - [Temporal Redundancy](#toc4_3_)    
    - [Inter-frame Compression (Predictive Compression)](#toc4_3_1_)    
    - [Motion Compensation](#toc4_3_2_)    
    - [Differential Encoding](#toc4_3_3_)    
    - [Keyframe Extraction](#toc4_3_4_)    
    - [Temporal Filtering](#toc4_3_5_)    
    - [Block-based Motion Estimation](#toc4_3_6_)    
    - [Long-Term Prediction](#toc4_3_7_)    
  - [Psychovisual Redundancy](#toc4_4_)    
    - [Chrominance Subsampling](#toc4_4_1_)    
    - [Quantization](#toc4_4_2_)    
    - [Perceptual Coding](#toc4_4_3_)    
    - [Rate-Distortion Optimization](#toc4_4_4_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->

# <a id='toc1_'></a>[Dependencies](#toc0_)

In [None]:
import heapq
from collections import Counter
from decimal import Decimal, getcontext

import cv2
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp

In [None]:
rng = np.random.default_rng(seed=42)

# <a id='toc2_'></a>[Load Images](#toc0_)


In [None]:
im_1 = cv2.imread("../../assets/images/dip_3rd/CH02_Fig0222(b)(cameraman).tif", cv2.IMREAD_GRAYSCALE)
im_2 = cv2.cvtColor(cv2.imread("../../assets/images/dip_3rd/CH06_Fig0638(a)(lenna_RGB).tif"), cv2.COLOR_BGR2RGB)

# plot
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(10, 4), layout="compressed")
axs[0].imshow(im_1, vmin=0, vmax=255, cmap="gray")
axs[0].set_title("CH02_Fig0222(b)(cameraman).tif")
axs[1].imshow(im_2, vmin=0, vmax=255)
axs[1].set_title("CH06_Fig0638(a)(lenna_RGB).tif")
plt.show()

In [None]:
im_3 = cv2.imread("../../assets/images/original/raster/entropy/entropy_4c.png", flags=cv2.IMREAD_GRAYSCALE)
im_4 = cv2.imread("../../assets/images/original/raster/entropy/entropy_32c_1.png", flags=cv2.IMREAD_GRAYSCALE)
im_5 = cv2.imread("../../assets/images/original/raster/entropy/entropy_32c_2.png", flags=cv2.IMREAD_GRAYSCALE)
im_6 = cv2.imread("../../assets/images/original/raster/entropy/entropy_32c_3.png", flags=cv2.IMREAD_GRAYSCALE)
im_7 = cv2.imread("../../assets/images/original/raster/entropy/entropy_256c.png", flags=cv2.IMREAD_GRAYSCALE)

# plot
images = [im_3, im_4, im_5, im_6, im_7]
titles = [
    f"{len(np.unique(im_3))} colors",
    f"{len(np.unique(im_4))} colors",
    f"{len(np.unique(im_5))} colors",
    f"{len(np.unique(im_6))} colors",
    f"{len(np.unique(im_7))} colors",
]
fig, axs = plt.subplots(nrows=2, ncols=len(images), figsize=(4 * len(images), 8), layout="compressed")
for i in range(len(images)):
    axs[0, i].imshow(images[i], cmap="gray", vmin=0, vmax=255)
    axs[0, i].set(title=titles[i], xticks=[], yticks=[])
    axs[1, i].hist(images[i].flatten(), bins=256, range=(0, 256))
plt.show()

# <a id='toc3_'></a>[Entropy](#toc0_)
- Entropy is a fundamental concept in information theory, **measuring the average amount of information** in a signal.
- It represents the **minimum number of bits** required to encode a signal **without loss**.
- The entropy $H$ of an image is given by Shannon's formula:
  $$H = - \sum_{i=1}^{N} P(x_i) \log_2 P(x_i)$$
- **Interpretation:**
  - **Low entropy (<1 bpp)** → The image has redundant information (e.g., large uniform areas).
  - **High entropy (~8 bpp for grayscale images)** → The image is highly detailed with little redundancy.

In [None]:
def calculate_entropy(image: np.ndarray) -> float:
    image_flat = image.flatten()
    _, counts = np.unique(image_flat, return_counts=True)
    probabilities = counts / counts.sum()
    entropy = -np.sum(probabilities * np.log2(probabilities))
    return entropy.item()

In [None]:
# average number of bits needed to encode each pixel in the image optimally
print(f"H(im_3): {calculate_entropy(im_3)}")
print(f"H(im_4): {calculate_entropy(im_4)}")
print(f"H(im_5): {calculate_entropy(im_5)}")
print(f"H(im_6): {calculate_entropy(im_6)}")
print(f"H(im_7): {calculate_entropy(im_7)}")

# <a id='toc4_'></a>[Redundancy](#toc0_)


## <a id='toc4_1_'></a>[Coding Redundancy](#toc0_)

- This occurs when an **image’s pixel values** are encoded using **more bits than necessary**.
- If a grayscale image uses **8 bits per pixel (bpp)** but the intensity values only range from **0 to 100**, we are wasting storage by not optimizing the encoding.
- By optimizing encoding, we can reduce image size without losing any information (**lossless compression**).


In [None]:
im_6_flatten = im_6.flatten().tolist()

### <a id='toc4_1_1_'></a>[Huffman Coding](#toc0_)

- It is an **entropy-based, variable-length** coding scheme that minimizes the average number of bits used per symbol by exploiting the **non-uniform** distribution of pixel values.
- The objective is to assign **shorter codewords to more frequent symbols** and **longer codewords to less frequent ones**.

📝 **Paper**:

- [**A Method for the Construction of Minimum-Redundancy Codes**](https://ieeexplore.ieee.org/abstract/document/4051119/) by [David A. Huffman](https://ieeexplore.ieee.org/author/37338941400) in *1952*.


In [None]:
class _HuffmanNode:
    def __init__(self, value=None, freq=0):
        self.value = value
        self.freq = freq
        self.left = None
        self.right = None

    def __lt__(self, other):
        return self.freq < other.freq

In [None]:
class HuffmanCoding:
    def __init__(self):
        self._codes = {}
        self._reverse_codes = {}
        self._tree_built = False

    def encode(self, data: list) -> str:
        root = self._build_tree(data)
        self._build_codes(root)
        self._tree_built = True
        return "".join(self._codes[item] for item in data)

    def decode(self, encoded_data: str) -> list:
        if not self._tree_built:
            raise ValueError("Huffman tree has not been built. Call `encode` first.")
        current_code = ""
        decoded_data = []

        for bit in encoded_data:
            current_code += bit
            if current_code in self._reverse_codes:
                decoded_data.append(self._reverse_codes[current_code])
                current_code = ""

        return decoded_data

    def get_codebook(self) -> dict:
        if not self._tree_built:
            raise ValueError("Codebook is not available. Call `encode` first.")
        return self._codes.copy()

    def _build_tree(self, data: list) -> _HuffmanNode:
        frequency = Counter(data)
        heap = [_HuffmanNode(value, freq) for value, freq in frequency.items()]
        heapq.heapify(heap)

        while len(heap) > 1:
            node1 = heapq.heappop(heap)
            node2 = heapq.heappop(heap)
            merged = _HuffmanNode(freq=node1.freq + node2.freq)
            merged.left = node1
            merged.right = node2
            heapq.heappush(heap, merged)

        return heap[0] if heap else None

    def _build_codes(self, node: _HuffmanNode, current_code: str = ""):
        if node is None:
            return

        if node.value is not None:
            self._codes[node.value] = current_code
            self._reverse_codes[current_code] = node.value
            return

        self._build_codes(node.left, current_code + "0")
        self._build_codes(node.right, current_code + "1")

In [None]:
def calculate_codebook_size(codebook: dict[int, str]):
    keys = len(codebook) * np.log2(np.iinfo(im_6.dtype).max + 1)
    values = sum(map(lambda x: len(x), codebook.values()))
    return int(keys + values)

In [None]:
huffman_coding = HuffmanCoding()
huffman_enc = huffman_coding.encode(im_6_flatten)
huffman_dec = huffman_coding.decode(huffman_enc)
codebook = huffman_coding.get_codebook()
im_6_dec = np.array(huffman_dec, dtype=np.uint8).reshape(im_6.shape)
is_equal = np.array_equal(im_6, im_6_dec)

# log
print(f"im_6        (bitstream length)      : {int(np.prod(im_6.shape) * np.log2(np.iinfo(im_6.dtype).max + 1))} bits")
print(f"huffman_enc (bitstream length)      : {len(huffman_enc)} + {calculate_codebook_size(codebook)} bits")
print(f"entropy     (avg. bitstream length) : {calculate_entropy(im_6) * np.prod(im_6.shape)} bits")
print(f"is_equal    (lossless)              : {is_equal}")

In [None]:
# huffman codebook
print("\nhuffman codebook:")
for symbol, code in huffman_coding.get_codebook().items():
    print(f"symbol: {symbol:3} -> code: {code}")

### <a id='toc4_1_2_'></a>[Arithmetic Coding](#toc0_)

- Arithmetic coding is a more **advanced** form of **entropy coding** rather than **Huffman coding**.
- It encodes the entire message as a **fraction (or interval)** within a range between **0 and 1**, based on the **cumulative probabilities** of the symbols in the message.

📝 **Paper**:

- [**A universal algorithm for sequential data compression**](https://ieeexplore.ieee.org/abstract/document/1055714) by [Jacob Ziv
](https://ieeexplore.ieee.org/author/37267355900) in *1977*.
- This paper introduced the core idea of arithmetic coding.


In [None]:
# base code: https://github.com/ahmedfgad/ArithmeticEncodingPython/blob/main/pyae.py

getcontext().prec = 288


class ArithmeticEncoding:
    """
    ArithmeticEncoding is a class for building the arithmetic encoding.
    """

    def __init__(self, save_stages=False):
        """
        save_stages: If True, then the intervals of each stage are saved in a list.
                      Note that setting save_stages=True may cause memory overflow if the message is large.
        """
        self.save_stages = save_stages
        if save_stages:
            print("WARNING: Setting save_stages=True may cause memory overflow if the message is large.")

    def get_probability_table(self, frequency_table):
        """
        Calculates the probability table from the frequency table.
        """
        total_frequency = sum(frequency_table.values())
        return {key: Decimal(value) / total_frequency for key, value in frequency_table.items()}

    def get_encoded_value(self, last_stage_probs):
        """
        After encoding the entire message, this method returns the single value that represents the entire message.
        """
        last_stage_values = [value for sublist in last_stage_probs.values() for value in sublist]
        encoded_value = (min(last_stage_values) + max(last_stage_values)) / 2
        return min(last_stage_values), max(last_stage_values), encoded_value

    def process_stage(self, probability_table, stage_min, stage_max):
        """
        Processing a stage in the encoding/decoding process.
        """
        stage_domain = stage_max - stage_min
        stage_probs = {}
        cumulative_min = stage_min
        for term, prob in probability_table.items():
            cumulative_max = cumulative_min + prob * stage_domain
            stage_probs[term] = [cumulative_min, cumulative_max]
            cumulative_min = cumulative_max
        return stage_probs

    def encode(self, msg):
        """
        Encodes a message using arithmetic encoding, calculating the frequency table internally.
        """
        # calculate frequency table from message
        frequency_table = dict(Counter(msg))

        # get the probability table
        probability_table = self.get_probability_table(frequency_table)

        encoder = []
        stage_min, stage_max = Decimal(0.0), Decimal(1.0)

        for msg_term in msg:
            stage_probs = self.process_stage(probability_table, stage_min, stage_max)
            stage_min, stage_max = stage_probs[msg_term]
            if self.save_stages:
                encoder.append(stage_probs)

        last_stage_probs = self.process_stage(probability_table, stage_min, stage_max)
        if self.save_stages:
            encoder.append(last_stage_probs)

        interval_min_value, interval_max_value, encoded_msg = self.get_encoded_value(last_stage_probs)
        return encoded_msg, encoder, interval_min_value, interval_max_value

    def decode(self, encoded_msg, msg_length, probability_table):
        """
        Decodes a message from a floating-point number, returning a list of characters.
        """
        decoded_msg = []
        stage_min, stage_max = Decimal(0.0), Decimal(1.0)

        for _ in range(msg_length):
            stage_probs = self.process_stage(probability_table, stage_min, stage_max)

            for msg_term, (min_val, max_val) in stage_probs.items():
                if min_val <= encoded_msg < max_val:
                    decoded_msg.append(msg_term)
                    stage_min, stage_max = min_val, max_val
                    break

            # optionally save the decoding stages (if needed, otherwise, you can remove this part)
            # if you want to save decoding stages like `encoder`, you can initialize `decoder`
            if self.save_stages:
                # if you want to save the stages, initialize decoder
                decoder = []  # initialize it here if you want to store the decoding stages
                decoder.append(stage_probs)

        return decoded_msg

### <a id='toc4_1_3_'></a>[Lempel-Ziv-Welch (LZW) Coding](#toc0_)

- LZW is widely used in formats like **GIF** and **TIFF** and works by **encoding input data into variable-length codes**, utilizing a **dynamic dictionary** to represent repeated sequences.

📝 **Paper**:

- [**A Technique for High-Performance Data Compression**](https://www.computer.org/csdl/magazine/co/1984/06/01659158/13rRUwIF63T) by [Terry A. Welch](https://www.computer.org/csdl/search/default?type=author&givenName=T.A.&surname=Welch) in *1984*.


In [None]:
class LZW:
    def __init__(self):
        self.reset_dictionary()

    def reset_dictionary(self):
        """Reset the dictionary to initial state"""
        # Initial dictionary contains all possible single-byte values
        self.dictionary = {}
        self.reverse_dictionary = {}
        self.next_code = 0

        # Initialize with single character patterns
        for i in range(256):
            self._add_to_dictionary(bytes([i]))

    def _add_to_dictionary(self, pattern):
        """Add a new pattern to the dictionary"""
        if pattern not in self.dictionary:
            self.dictionary[pattern] = self.next_code
            self.reverse_dictionary[self.next_code] = pattern
            self.next_code += 1

    def encode(self, data):
        """
        Compress a list of integers (0-255) using LZW algorithm
        """
        # Convert input data to bytes
        input_bytes = bytes(data)

        self.reset_dictionary()
        compressed = []
        w = bytes()

        for c in input_bytes:
            c_bytes = bytes([c])
            wc = w + c_bytes
            if wc in self.dictionary:
                w = wc
            else:
                compressed.append(self.dictionary[w])
                self._add_to_dictionary(wc)
                w = c_bytes

        if w:
            compressed.append(self.dictionary[w])

        return compressed

    def decode(self, compressed):
        """
        Decompress a list of code words using LZW algorithm
        """
        self.reset_dictionary()
        decompressed = []

        if not compressed:
            return decompressed

        # Convert first code
        old_code = compressed[0]
        decompressed.extend(self.reverse_dictionary[old_code])
        s = self.reverse_dictionary[old_code]

        for code in compressed[1:]:
            if code in self.reverse_dictionary:
                entry = self.reverse_dictionary[code]
            elif code == self.next_code:
                entry = s + bytes([s[0]])
            else:
                raise ValueError(f"Invalid compressed code: {code}")

            decompressed.extend(entry)

            # Add to dictionary
            new_pattern = s + bytes([entry[0]])
            self._add_to_dictionary(new_pattern)

            s = entry

        return list(decompressed)

    def print_dictionary(self, reverse=False):
        """Print the current dictionary contents"""
        print("\nCurrent Dictionary:")
        if reverse:
            for code, pattern in sorted(self.reverse_dictionary.items()):
                print(f"{code:4}: {list(pattern)}")
        else:
            for pattern, code in sorted(self.dictionary.items(), key=lambda x: x[1]):
                print(f"{list(pattern)}: {code:4}")

In [None]:
def lzw_total_bits(lzw_enc, initial_dict=256, max_bits=12):
    """Calculate total bits including header and payload."""
    # 1. header bits (assumes 4-byte header)
    header_bits = 4 * 8  # e.g., 32 bits for metadata

    # 2. payload bits (variable-width codes)
    bit_width = 9  # starts at 9 bits
    dict_size = initial_dict  # starts at 256
    payload_bits = 0

    for code in lzw_enc:
        payload_bits += bit_width
        if code >= dict_size and bit_width < max_bits:
            dict_size *= 2
            bit_width += 1

    return header_bits + payload_bits

In [None]:
lzw = LZW()
lzw_enc = lzw.encode(im_6_flatten)
lzw_dec = lzw.decode(lzw_enc)
im_6_dec = np.array(lzw_dec, dtype=np.uint8).reshape(im_6.shape)
is_equal = np.array_equal(im_6, im_6_dec)

# log
print(f"im_6     (bitstream length)      : {int(np.prod(im_6.shape) * np.log2(np.iinfo(im_6.dtype).max + 1))} bits")
print(f"lzw_enc  (bitstream length)      : {lzw_total_bits(lzw_enc)} bits")
print(f"entropy  (avg. bitstream length) : {calculate_entropy(im_6) * np.prod(im_6.shape)} bits")
print(f"is_equal (lossless)              : {is_equal}")

In [None]:
# show dictionary
lzw.print_dictionary(reverse=True)

## <a id='toc4_2_'></a>[Interpixel (Spatial) Redundancy](#toc0_)

- It refers to the redundancy found in the spatial domain of an image.
- It occurs when **neighboring pixels** in an image **share similar values**, meaning there is a **pattern** or **correlation** between them.


### <a id='toc4_2_1_'></a>[Run-Length Encoding (RLE)](#toc0_)

In [None]:
class RunLengthEncoder:
    def __init__(self):
        self.encoded = []

    def encode(self, flat_list: list):
        if not flat_list:
            self.encoded = []
            return self.encoded

        self.encoded = []
        prev = flat_list[0]
        count = 1

        for val in flat_list[1:]:
            if val == prev:
                count += 1
            else:
                self.encoded.append((prev, count))
                prev = val
                count = 1
        self.encoded.append((prev, count))
        return self.encoded

    def decode(self):
        return [val for val, count in self.encoded for _ in range(count)]

    def get_table(self):
        return self.encoded

In [None]:
def calculate_rle_size(encoded, value_dtype=np.uint8, count_dtype=np.uint16):
    val_bits = np.iinfo(value_dtype).bits
    count_bits = np.iinfo(count_dtype).bits
    return len(encoded) * (val_bits + count_bits)

In [None]:
rle = RunLengthEncoder()
rle_enc = rle.encode(im_6_flatten)
rle_dec = rle.decode()
im_6_dec = np.array(rle_dec, dtype=np.uint8).reshape(im_6.shape)
is_equal = np.array_equal(im_6, im_6_dec)

# log
print(f"im_6     (bitstream length)      : {int(np.prod(im_6.shape) * np.log2(np.iinfo(im_6.dtype).max + 1))} bits")
print(
    f"rle_enc  (bitstream length)      : {calculate_rle_size(rle_enc, value_dtype=im_6.dtype, count_dtype=np.uint16)} bits"
)
print(f"entropy  (avg. bitstream length) : {calculate_entropy(im_6) * np.prod(im_6.shape)} bits")
print(f"is_equal (lossless)              : {is_equal}")

In [None]:
# run-length codebook
print("\nrune-length codebook:")
for symbol, length in rle.get_table():
    print(f"symbol: {symbol:3} -> length: {length}")

### <a id='toc4_2_2_'></a>[Differential Encoding](#toc0_)

### <a id='toc4_2_3_'></a>[Predictive Coding](#toc0_)

### <a id='toc4_2_4_'></a>[Transform Coding](#toc0_)

### <a id='toc4_2_5_'></a>[Wavelet Transform](#toc0_)

### <a id='toc4_2_6_'></a>[Block-based Compression](#toc0_)

## <a id='toc4_3_'></a>[Temporal Redundancy](#toc0_)

- It refers to the redundancy that exists between **successive frames** in a **sequence** of images or video.
- This type of redundancy arises because **adjacent frames** (in a video) or successive images (in a time-series context) often contain very **similar information**, making them **predictable** or **compressible**.


### <a id='toc4_3_1_'></a>[Inter-frame Compression (Predictive Compression)](#toc0_)

### <a id='toc4_3_2_'></a>[Motion Compensation](#toc0_)

### <a id='toc4_3_3_'></a>[Differential Encoding](#toc0_)

### <a id='toc4_3_4_'></a>[Keyframe Extraction](#toc0_)

### <a id='toc4_3_5_'></a>[Temporal Filtering](#toc0_)

### <a id='toc4_3_6_'></a>[Block-based Motion Estimation](#toc0_)

### <a id='toc4_3_7_'></a>[Long-Term Prediction](#toc0_)

## <a id='toc4_4_'></a>[Psychovisual Redundancy](#toc0_)

- It refers to the redundancy in image or video data that arises due to the **limitations of the human visual system (HVS)**.
- In essence, certain **details** in images or video frames may be **less perceptible** to the **human eye**.
- They can be **removed or simplified without significantly affecting the perceived quality** of the image or video.

### <a id='toc4_4_1_'></a>[Chrominance Subsampling](#toc0_)

- It **reduces** the resolution of color (**Cb** and **Cr**) information in images.
- This technique is commonly used in **JPEG**, **MPEG**, and other **lossy compression** standards.

🧠 **Conceptual Basis**

Subsampling ratios are expressed as **J:a:b** (typically 4:a:b), where:

- `J` is the number of **luma (Y)** samples in a reference row (usually 4).
- `a` is the number of **chroma samples (Cb and Cr)** in the **first row** for every 4 Y samples.
- `b` is the number of **chroma samples** in the **second row**, indicating vertical subsampling.

📊 **Common Subsampling Ratios**

<figure style="text-align:center; margin:0;">
  <img src="../../assets/images/original/vector/compression/chroma-subsampling.svg" alt="chroma-subsampling.svg" style="max-width:80%; height:auto;">
  <figcaption>Common Chrominance Subsampling Ratios</figcaption>
</figure>


In [None]:
im_2_yuv = cv2.cvtColor(im_2, cv2.COLOR_RGB2YUV)
im_2_y = im_2_yuv[:, :, 0]
im_2_u = im_2_yuv[:, :, 1]
im_2_v = im_2_yuv[:, :, 2]

# log
print(f"im_2_yuv.shape: {im_2_yuv.shape}\n")
print(f"Y[:2, :4]:\n{im_2_y[:2, :4]}\n")
print(f"U[:2, :4]:\n{im_2_u[:2, :4]}\n")
print(f"V[:2, :4]:\n{im_2_v[:2, :4]}")

# plot
fig, axs = plt.subplots(nrows=1, ncols=4, figsize=(16, 4), layout="compressed")
axs[0].imshow(im_2)
axs[0].set_title("RGB image")
axs[1].imshow(im_2_y, cmap="gray")
axs[1].set_title("Y channel")
axs[2].imshow(im_2_u, cmap="seismic")
axs[2].set_title("U channel")
axs[3].imshow(im_2_v, cmap="seismic")
axs[3].set_title("V channel")
plt.show()

In [None]:
# 4:2:2
im_2_u_422 = im_2_u[:, ::2]
im_2_v_422 = im_2_v[:, ::2]

# reconstruct
U_422_rec = cv2.resize(im_2_u_422, im_2_yuv.shape[:-1][::-1], interpolation=cv2.INTER_LINEAR)
V_422_rec = cv2.resize(im_2_v_422, im_2_yuv.shape[:-1][::-1], interpolation=cv2.INTER_LINEAR)
im_2_yuv_422 = np.dstack([im_2_y, U_422_rec, V_422_rec])

# log
print(f"U_422.shape: {im_2_u_422.shape}")
print(f"V_422.shape: {im_2_v_422.shape}\n")
print(f"U_422[:2, :4]:\n{im_2_u_422[:2, :4]}\n")
print(f"V_422[:2, :4]:\n{im_2_v_422[:2, :4]}")
print(f"-" * 50)
print(f"U_422_rec.shape: {U_422_rec.shape}")
print(f"V_422_rec.shape: {V_422_rec.shape}\n")
print(f"U_422_rec[:2, :4]:\n{U_422_rec[:2, :4]}\n")
print(f"V_422_rec[:2, :4]:\n{V_422_rec[:2, :4]}")

In [None]:
# 4:2:0
im_2_u_420 = im_2_u[::2, ::2]
im_2_v_420 = im_2_v[::2, ::2]

# reconstruct
U_420_rec = cv2.resize(im_2_u_420, im_2_yuv.shape[:-1][::-1], interpolation=cv2.INTER_LINEAR)
V_420_rec = cv2.resize(im_2_v_420, im_2_yuv.shape[:-1][::-1], interpolation=cv2.INTER_LINEAR)
im_2_yuv_420 = np.dstack([im_2_y, U_420_rec, V_420_rec])

# log
print(f"U_420.shape: {im_2_u_420.shape}")
print(f"V_420.shape: {im_2_v_420.shape}\n")
print(f"U_420[:2, :4]:\n{im_2_u_420[:2, :4]}\n")
print(f"V_420[:2, :4]:\n{im_2_v_420[:2, :4]}")
print(f"-" * 50)
print(f"U_420_rec.shape: {U_420_rec.shape}")
print(f"V_420_rec.shape: {V_420_rec.shape}\n")
print(f"U_420_rec[:2, :4]:\n{U_420_rec[:2, :4]}\n")
print(f"V_420_rec[:2, :4]:\n{V_420_rec[:2, :4]}")

In [None]:
# 4:1:1
im_2_u_411 = im_2_u[:, ::4]
im_2_v_411 = im_2_v[:, ::4]

# reconstruct
U_411_rec = cv2.resize(im_2_u_411, im_2_yuv.shape[:-1][::-1], interpolation=cv2.INTER_LINEAR)
V_411_rec = cv2.resize(im_2_v_411, im_2_yuv.shape[:-1][::-1], interpolation=cv2.INTER_LINEAR)
im_2_yuv_411 = np.dstack([im_2_y, U_411_rec, V_411_rec])

# log
print(f"U_411.shape: {im_2_u_411.shape}")
print(f"V_411.shape: {im_2_v_411.shape}\n")
print(f"U_411[:2, :4]:\n{im_2_u_411[:2, :4]}\n")
print(f"V_411[:2, :4]:\n{im_2_v_411[:2, :4]}")
print(f"-" * 50)
print(f"U_411_rec.shape: {U_411_rec.shape}")
print(f"V_411_rec.shape: {V_411_rec.shape}\n")
print(f"U_411_rec[:2, :4]:\n{U_411_rec[:2, :4]}\n")
print(f"V_411_rec[:2, :4]:\n{V_411_rec[:2, :4]}")

In [None]:
im_2_rgb_422 = cv2.cvtColor(im_2_yuv_422, code=cv2.COLOR_YUV2RGB)
im_2_rgb_420 = cv2.cvtColor(im_2_yuv_420, code=cv2.COLOR_YUV2RGB)
im_2_rgb_411 = cv2.cvtColor(im_2_yuv_411, code=cv2.COLOR_YUV2RGB)

# plot
images = [im_2, im_2_rgb_422, im_2_rgb_420, im_2_rgb_411]
titles = ["Original RGB Image", "RGB 4:2:2", "RGB 4:2:0", "RGB 4:1:1"]
zoom_images = [
    im_2[220:300, 220:300],
    im_2_rgb_422[220:300, 220:300],
    im_2_rgb_420[220:300, 220:300],
    im_2_rgb_411[220:300, 220:300],
]
zoom_titles = ["Zoomed Original", "Zoomed RGB 4:2:2", "Zoomed RGB 4:2:0", "Zoomed RGB 4:1:1"]
fig, axs = plt.subplots(nrows=2, ncols=4, figsize=(16, 8), layout="compressed")
for i in range(4):
    axs[0, i].imshow(images[i])
    axs[0, i].set_title(titles[i])
    axs[0, i].axis("off")
for i in range(4):
    axs[1, i].imshow(zoom_images[i])
    axs[1, i].set_title(zoom_titles[i])
    axs[1, i].axis("off")
plt.show()

### <a id='toc4_4_2_'></a>[Quantization](#toc0_)

- After DCT, image data is represented as frequency coefficients.
- Since the human eye is less sensitive to high-frequency components, quantization reduces their precision (i.e., more aggressive rounding), often to zero.
- This enables better compression with minimal perceived quality loss

📝 **Docs**:

- [**Digital Compression and Coding of Continuous-Tone Still Images (JPEG Standard, ITU-T T.81)**](https://www.w3.org/Graphics/JPEG/itu-t81.pdf) by [Joint Photographic Experts Group (JPEG)](https://jpeg.org/about.html) in *1992*.


In [None]:
qm_luma = np.array(
    [
        [16, 11, 10, 16, 24, 40, 51, 61],
        [12, 12, 14, 19, 26, 58, 60, 55],
        [14, 13, 16, 24, 40, 57, 69, 56],
        [14, 17, 22, 29, 51, 87, 80, 62],
        [18, 22, 37, 56, 68, 109, 103, 77],
        [24, 35, 55, 64, 81, 104, 113, 92],
        [49, 64, 78, 87, 103, 121, 120, 101],
        [72, 92, 95, 98, 112, 100, 103, 99],
    ],
    dtype=np.float32,
)

In [None]:
qm_chroma = np.array(
    [
        [17, 18, 24, 47, 99, 99, 99, 99],
        [18, 21, 26, 66, 99, 99, 99, 99],
        [24, 26, 56, 99, 99, 99, 99, 99],
        [47, 66, 99, 99, 99, 99, 99, 99],
        [99, 99, 99, 99, 99, 99, 99, 99],
        [99, 99, 99, 99, 99, 99, 99, 99],
        [99, 99, 99, 99, 99, 99, 99, 99],
        [99, 99, 99, 99, 99, 99, 99, 99],
    ],
    dtype=np.float32,
)

In [None]:
# this formula comes from libjpeg, the de facto standard JPEG library, and is used by most open-source image software, such as GIMP, Pillow (PIL), ImageMagick, and OpenCV
def scale_quant_matrix(Q, quality):
    assert 1 <= quality <= 100, "Quality must be between 1 and 100"

    if quality < 50:
        scale = 5000 / quality
    else:
        scale = 200 - 2 * quality

    scaled_matrix = np.floor((Q * scale + 50) / 100)

    # Clip values to 1-255 range as per JPEG spec
    scaled_matrix = np.clip(scaled_matrix, 1, 255)

    return scaled_matrix


# scale quantization matrix
qm_luma_qf80 = scale_quant_matrix(qm_luma, quality=80)
qm_chroma_qf80 = scale_quant_matrix(qm_chroma, quality=80)

# log
print(f"qm_luma_qf80:\n{qm_luma_qf80}\n")
print(f"qm_chroma_qf80:\n{qm_chroma_qf80}")

In [None]:
# convert rgb to YCbCr
im_2_y = cv2.cvtColor(im_2, code=cv2.COLOR_RGB2YCrCb)[:, :, 0].astype(np.float32)

# scale Y to full range
im_2_y = (im_2_y - 16) * (255 / 219)

# extract first batch and ensure it is float32 to avoid underflow
im_2_patch00 = im_2_y[:8, :8].astype(np.float32)

# DCT
im_2_patch00_dct2 = sp.fftpack.dctn(im_2_patch00 - 128, norm="ortho")

# quantization (JPEG-style: floor(x + 0.5))
im_2_patch00_dct2_quantize = np.floor(im_2_patch00_dct2 / qm_luma_qf80 + 0.5)

# dequantization
im_2_patch00_dct2_dequantize = im_2_patch00_dct2_quantize * qm_luma_qf80

# IDCT
im_2_patch00_idct2 = sp.fftpack.idctn(im_2_patch00_dct2_dequantize, norm="ortho")

# shift back to [0, 255] and clip
im_2_patch00_idct2 += 128
im_2_patch00_idct2 = np.clip(im_2_patch00_idct2, 0, 255).astype(np.uint8)

# log
print(f"im_2_patch00:\n{im_2_patch00.astype(np.uint8)}\n")
print(f"im_2_patch00_idct2:\n{im_2_patch00_idct2}")

# plot
fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(8, 4), layout="compressed")
axs[0].imshow(im_2_patch00, cmap="gray", vmin=0, vmax=255)
axs[0].set_title("Original 8x8 patch")
axs[0].axis("off")
axs[1].imshow(im_2_patch00_idct2, cmap="gray", vmin=0, vmax=255)
axs[1].set_title("Reconstructed 8x8 patch")
axs[1].axis("off")
plt.show()

### <a id='toc4_4_3_'></a>[Perceptual Coding](#toc0_)

### <a id='toc4_4_4_'></a>[Rate-Distortion Optimization](#toc0_)