# Compression

In [53]:
# import
import numpy as np
import cv2 as cv
import os

In [54]:
# Read the image to be compressed
filename = "scenery.bmp"
filename_without_ext = os.path.splitext(filename)[0]
imgBGR = cv.imread(filename)
imgRGB = cv.cvtColor(imgBGR, cv.COLOR_BGR2RGB) # OpenCV stores images by default in BGR format, and we need to convert it to RGB
img_height, img_width = imgRGB.shape[:2]

In [55]:
# Transform RGB to YCbCr
def rgb2ycbcr(rgb):
    # Define the transformation matrix
    xform_matrix = np.array([[0.299, 0.587, 0.114],
                           [-0.168736, -0.331264, 0.5],
                           [0.5, -0.418688, -0.081312]])
    ycbcr = np.dot(rgb, xform_matrix.T)
    ycbcr[:, :, [1, 2]] += 128.0
    return ycbcr

imgYCbCr = rgb2ycbcr(imgRGB)

In [56]:
# Separate the three channels and perform 4:2:0 color subsampling
def downSample(ycbcr):
    # Separate the three channels
    y = ycbcr[:, :, 0]
    cb = ycbcr[:, :, 1]
    cr = ycbcr[:, :, 2]
    # Perform 4:2:0 color subsampling, take the upper-left corner of the 2x2 square
    cb = cb[::2, ::2]
    cr = cr[::2, ::2]
    return y, cb, cr

imgY, imgCb, imgCr = downSample(imgYCbCr)

In [57]:
# Pad the height and width of Y, Cb, Cr components to a multiple of 8
def pad(img): # (heignt, width)
    h, w = img.shape[:2]
    h_padding = 0
    w_padding = 0
    if h % 8 != 0:
        h_padding = 8 - (h % 8)
    if w % 8 != 0:
        w_padding = 8 - (w % 8)
    img_padded = cv.copyMakeBorder(img, 0, h_padding, 0, w_padding, cv.BORDER_REPLICATE)
    return img_padded # (height_padded, width_padded)
    
imgY_padded = pad(imgY)
imgCb_padded = pad(imgCb)
imgCr_padded = pad(imgCr)

In [58]:
# Divide the image into small blocks of 8x8
def split2blocks(img): # (height_padded, width_padded)
    h, w = img.shape[:2]
    blocks = np.zeros(((h//8) * (w//8), 8, 8), dtype=np.float32)
    for i in range(h//8):
        for j in range(w//8):
            yoff, xoff = i*8, j*8
            blocks[i*(w//8)+j, :, :] = img[yoff:yoff+8, xoff:xoff+8]
    return blocks # (h//8 * w//8, 8, 8)

blocksY = split2blocks(imgY_padded)
blocksCb = split2blocks(imgCb_padded)
blocksCr = split2blocks(imgCr_padded)

In [59]:
# Perform DCT on image blocks
def getDctMatrix():
    dct_matrix = np.zeros((8, 8))
    for i in range(8):
        for j in range(8):
            if i == 0:
                dct_matrix[i][j] = 1 / (2*np.sqrt(2))
            else:
                dct_matrix[i][j] = (1.0/2) * np.cos((2*j+1)*i*np.pi/16)
    return dct_matrix # T

def dct2D(blocks, dct_matrix): # (num_block, 8, 8)
    dct_blocks = np.zeros_like(blocks) # (num_block, 8, 8)
    for block_index in range(dct_blocks.shape[0]):
        for u in range(8):
            for v in range(8):
                # F = T · f · T.T
                dct_blocks[block_index] = dct_matrix @ blocks[block_index] @ dct_matrix.T
    return dct_blocks # (num_block, 8, 8)

dct_matrix = getDctMatrix()
dctY = dct2D(blocksY, dct_matrix)
dctCb = dct2D(blocksCb, dct_matrix)
dctCr = dct2D(blocksCr, dct_matrix)

In [60]:
# Apply Quantization
Luminance_Quantization_Table = np.array([[16, 11, 10, 16, 24, 40, 51, 61],
                                        [12, 12, 14, 19, 26, 58, 60, 55],
                                        [14, 13, 16, 24, 40, 57, 69, 56],
                                        [14, 17, 22, 29, 51, 87, 80, 62],
                                        [18, 22, 37, 56, 68, 109, 103, 77],
                                        [24, 35, 55, 64, 81, 104, 113, 92],
                                        [49, 64, 78, 87, 103, 121, 120, 101],
                                        [72, 92, 95, 98, 112, 100, 103, 99]])

Chrominance_Quantization_Table = np.array([[17, 18, 24, 47, 99, 99, 99, 99],
                                        [18, 21, 26, 66, 99, 99, 99, 99],
                                        [24, 26, 56, 99, 99, 99, 99, 99],
                                        [47, 66, 99, 99, 99, 99, 99, 99],
                                        [99, 99, 99, 99, 99, 99, 99, 99],
                                        [99, 99, 99, 99, 99, 99, 99, 99],
                                        [99, 99, 99, 99, 99, 99, 99, 99],
                                        [99, 99, 99, 99, 99, 99, 99, 99]])

def quantization(blocks, quantization_table): # (num_block, 8, 8)
    return np.round(blocks / quantization_table) # (num_block, 8, 8)

qY = quantization(dctY, Luminance_Quantization_Table)
qCb = quantization(dctCb, Chrominance_Quantization_Table)
qCr = quantization(dctCr, Chrominance_Quantization_Table)

In [61]:
# Zigzag Ordering
def zigzag(blocks): # (num_block, 8, 8)
    zigzag_table = np.array([[0, 1, 5, 6, 14, 15, 27, 28],
                            [2, 4, 7, 13, 16, 26, 29, 42],
                            [3, 8, 12, 17, 25, 30, 41, 43],
                            [9, 11, 18, 24, 31, 40, 44, 53],
                            [10, 19, 23, 32, 39, 45, 52, 54],
                            [20, 22, 33, 38, 46, 51, 55, 60],
                            [21, 34, 37, 47, 50, 56, 59, 61],
                            [35, 36, 48, 49, 57, 58, 62, 63]])
    
    linearized_block = np.zeros(64)
    blocks = blocks.reshape((-1, 64))
    for x in range(blocks.shape[0]):
        for i in range(8):
            for j in range(8):
                linearized_block[zigzag_table[i][j]] = blocks[x][i*8+j]
        blocks[x] = linearized_block
        
    return blocks # (num_block, 64)

zigY = zigzag(qY) # (num_block, 64)
zigCb = zigzag(qCb)
zigCr = zigzag(qCr)

In [62]:
# DPCM on DC coefficients, e(0) = DC(0), e(i) = DC(i) - DC(i-1)   
def dpcm(blocks): # (num_block, 64)
    dc = blocks[:, 0]
    dpcm_list = np.zeros(dc.shape[0])

    for i in range(dc.shape[0]): # num_block
        if (i == 0):
            dpcm_list[i] = dc[i]
        else:
            dpcm_list[i] = dc[i] - dc[i-1]
    
    return dpcm_list # (num_block,)

dpcmY = dpcm(zigY) # numpy NDArray[float64]
dpcmCb = dpcm(zigCb)
dpcmCr = dpcm(zigCr)

In [63]:
# RLC on AC coefficients
def rlc(blocks): # (num_block, 64)
    ac = blocks[:, 1:]
    rlc_list = []

    for i in range(ac.shape[0]):
        zero_cnt = 0
        for j in range(ac.shape[1]):
            if (ac[i][j] == 0):
                zero_cnt = zero_cnt + 1
            else:
                rlc_list.append((zero_cnt, ac[i][j])) # (skip, value)
                zero_cnt = 0
        rlc_list.append((0, 0)) # the end of a block

    return rlc_list # (num_block * ?), element: (skip, value)

rlcY = rlc(zigY) # list
rlcCb = rlc(zigCb)
rlcCr = rlc(zigCr)

In [64]:
# Perform Entropy Coding(VLC, Huffman Coding) 
import heapq
from collections import Counter

# Define the node class of the Huffman tree
class HuffmanNode():
    def __init__(self, char, freq, left=None, right=None):
        self.char = char
        self.freq = freq
        self.left = left
        self.right = right

    def __lt__(self, other):
        return self.freq < other.freq

def build_huffman_tree(data):
    counter = Counter(data)
    heap = [HuffmanNode(char, freq) for char, freq in counter.items()]
    heapq.heapify(heap)

    while len(heap) > 1:
        left = heapq.heappop(heap)
        right = heapq.heappop(heap)
        parent = HuffmanNode(None, left.freq + right.freq, left, right)
        heapq.heappush(heap, parent)

    return heap[0]

def create_encoding_table(root, encoding_table=None, code=''):
    if encoding_table is None:
        encoding_table = {}

    if root.char is not None:
        encoding_table[root.char] = code
        return encoding_table

    create_encoding_table(root.left, encoding_table, code + '0')
    create_encoding_table(root.right, encoding_table, code + '1')

    return encoding_table

def huffman_encode(data):
    root = build_huffman_tree(data)
    encoding_table = create_encoding_table(root)
    encoded_data = ''.join(encoding_table[char] for char in data)

    return encoding_table, encoded_data

dcY_encoding_table, dcY_encoded_data = huffman_encode(dpcmY)
dcCb_encoding_table, dcCb_encoded_data = huffman_encode(dpcmCb)
dcCr_encoding_table, dcCr_encoded_data = huffman_encode(dpcmCr)
acY_encoding_table, acY_encoded_data = huffman_encode(rlcY)
acCb_encoding_table, acCb_encoded_data = huffman_encode(rlcCb)
acCr_encoding_table, acCr_encoded_data = huffman_encode(rlcCr)

In [65]:
# Save the compressed data to a binary file
import struct
import pickle
import bitarray

def write_huffman_tables(huffman_tables, filename):
    with open(filename, 'wb') as f:
        # Write the number of Huffman coding tables
        num_tables = len(huffman_tables)
        f.write(struct.pack('B', num_tables))

        for i in range(num_tables):
            # Serialize coding table
            serialized_table = pickle.dumps(huffman_tables[i])
            
            # Write the length of the coding table
            f.write(struct.pack('I', len(serialized_table)))

            # Write the serialized coding table
            f.write(serialized_table)

def write_encoded_datas(encoded_datas, filename):
    with open(filename, 'wb') as f:
        # Write the number of Encoded Datas
        num_datas = len(encoded_datas)
        f.write(struct.pack('B', num_datas))

        for i in range(num_datas):
            # Convert the encoded data to a bitarray object
            data_bits = bitarray.bitarray(encoded_datas[i])

            # Write the length of the encoded data in bits
            f.write(struct.pack('I', len(data_bits)))

            # Write the encoded data
            data_bits.tofile(f)

huffman_tables = [dcY_encoding_table, dcCb_encoding_table, dcCr_encoding_table, acY_encoding_table, acCb_encoding_table, acCr_encoding_table]
encoded_datas = [dcY_encoded_data, dcCb_encoded_data, dcCr_encoded_data, acY_encoded_data, acCb_encoded_data, acCr_encoded_data]
write_huffman_tables(huffman_tables, filename_without_ext+".table")
write_encoded_datas(encoded_datas, filename_without_ext+".data")