# Logan Kinajil-Moran CSC259 Final Project

# Video Compression

In [3]:
import os
import numpy as np

# Get initial size (using Mac Byte Division)
file_size = os.path.getsize("akiyo_cif.y4m") / (1000 * 1000) 

print(f"Original File Size: {file_size:.2f}MB")

Original File Size: 45.62MB


Getting Frames into an Array

In [4]:
import imageio.v3 as iio
import matplotlib.pyplot as plt
import numpy as np

# get frames into an array
def get_vid_info(yuv_filename):
    frame_marker = b'FRAME\n' 

    # Open and read the header
    with open(yuv_filename, "rb") as file:
        # Read the entire file to find where the frame data starts
        content = file.read()

    # Convert the content to a string to parse it
    content_str = content.decode("utf-8", errors="ignore")

    # Find the end of the main header
    header_end = content_str.find("\n") + 1  # Include the newline character

    # Calculate YUV frame size
    # Using YUV 4:4:4
    width = int(content_str.split()[1].strip("W"))
    height = int(content_str.split()[2].strip("H"))
    y_plane_size = width * height
    uv_plane_size = width * height
    frame_size = y_plane_size + 2 * uv_plane_size

    # Read YUV frames
    frames = []
    with open(yuv_filename, "rb") as yuv_file:
        # Skip the header
        header = yuv_file.read(int(header_end))
        while True:
            # Read the FRAME marker
            marker = yuv_file.read(len(frame_marker))

            # Read the raw YUV frame data
            frame_data = yuv_file.read(frame_size)
            if len(frame_data) < frame_size:
                # All frames have been read
                break

            frames.append(frame_data)

    return frames, header, width, height



In [5]:
frames, header, width, height = get_vid_info("akiyo_cif.y4m")

print(header)

def load_frame(frame, width=width, height=height):
    y_size = width * height
    uv_size = (width) * (height)
    
    y = np.frombuffer(frame[0:y_size], dtype=np.uint8).reshape((height, width))
    u = np.frombuffer(frame[y_size:y_size + uv_size], dtype=np.uint8).reshape((height, width))
    v = np.frombuffer(frame[y_size + uv_size:], dtype=np.uint8).reshape((height, width))
    
    return y, u, v

b'YUV4MPEG2 W352 H288 F30000:1001 Ip A128:117\n'


In [6]:
import os

def frames_into_bytes(frames):
    byte_frames = []
    for frame in frames:
        y, u, v = frame
        byte_frames.append(y.astype(np.uint8).tobytes() +
                                u.astype(np.uint8).tobytes() +
                                v.astype(np.uint8).tobytes())
    return byte_frames


def get_output_video(frames, output_name, width=width, height=height, framerate="30000:1001", chroma="C420"):
    frame_marker = b'FRAME\n'
    output_path = f"{output_name}.y4m"
    header = f"YUV4MPEG2 W{width} H{height} F{framerate} Ip A128:117\n".encode('utf-8')

    with open(f"{output_name}.y4m", "wb") as output_video:
        output_video.write(header)
        for frame in frames:
            output_video.write(frame_marker)
            output_video.write(frame)

    print(f"Size of {output_path}: {os.path.getsize(output_path) / (1000 * 1000):.2f}")


In [None]:
# chroma subsampling
print(height)
print(width)

def chroma_subsample_411(plane):
    return plane[:, ::4]  # Take every 4th pixel horizontally

# Chroma subsampling for 4:2:0 (downsample both horizontally and vertically by 2)
def chroma_subsample_420(plane):
    return plane[::2, ::2]  # Take every 2nd pixel horizontally and vertically

def chroma_upsample_420(plane):
    upsampled = np.zeros((height, width), dtype=plane.dtype)
    
    # Copy values to every 2x2 block
    # Safe upsampling
    for i in range(height // 2):
        for j in range(width // 2):
            y_idx = i * 2
            x_idx = j * 2
            upsampled[y_idx:y_idx+2, x_idx:x_idx+2] = plane[i,j]
    
    return upsampled

def apply_chroma_subsampling(frames, chroma_subsampling_type):
    subsampled_frames = []

    if chroma_subsampling_type == "C420":
        for frame in frames:
            y, u, v = load_frame(frame)
            u_sub = chroma_subsample_420(u)
            v_sub = chroma_subsample_420(v)
            subsampled_frames.append(y.astype(np.uint8).tobytes() +
                                     u_sub.astype(np.uint8).tobytes() +
                                     v_sub.astype(np.uint8).tobytes())
    
    if chroma_subsampling_type == "C411":
            for frame in frames:
                y, u, v = load_frame(frame)
                u_sub = chroma_subsample_411(u)
                v_sub = chroma_subsample_411(v)
                subsampled_frames.append(y.astype(np.uint8).tobytes() +
                                        u_sub.astype(np.uint8).tobytes() +
                                        v_sub.astype(np.uint8).tobytes())
    
    return subsampled_frames

subsampled_frames = apply_chroma_subsampling(frames, "C411") 
get_output_video(frames, "subsampled_test")

288
352
Size of subsampled_test.y4m: 45.62


In [7]:
# reduce the frame rate

def reduce_frame_rate(frames, rate):
    return frames[::rate]

In [8]:
import numpy as np

# Helper function to apply DFT to a block
def apply_dft_to_block(block):
    # Apply 2D DFT and shift the zero-frequency component to the center
    dft = np.fft.fft2(block)
    dft_shift = np.fft.fftshift(dft)

    # Apply a threshold to filter out small frequencies
    threshold = 0.01 * np.max(np.abs(dft_shift))
    compressed_dft = np.where(np.abs(dft_shift) > threshold, dft_shift, 0)

    return compressed_dft

# Helper function to reconstruct a block from compressed DFT
def reconstruct_block_from_dft(compressed_dft):
    # Inverse DFT to reconstruct the image from compressed DFT
    dft_ishift = np.fft.ifftshift(compressed_dft)
    reconstructed_block = np.abs(np.fft.ifft2(dft_ishift))

    # Normalize to the range [0, 255] and convert to uint8
    reconstructed_block = np.clip(reconstructed_block, 0, 255)
    return reconstructed_block

# Quantize the DFT coefficients for each block
def quantize_dft(dft_block, quantization_level=10):
    # Quantize the DFT coefficients
    return np.round(dft_block / quantization_level) * quantization_level

# Apply compression (DFT + quantization) to each block in the frame
def apply_compression_to_frame(frame, block_size=16, quantization_level=10):
    # Prepare an array to store the compressed blocks
    compressed_frame = np.zeros_like(frame)

    # Iterate over the frame in blocks
    for y in range(0, height, block_size):
        for x in range(0, width, block_size):
            # Extract the block from the frame
            block = frame[y:y+block_size, x:x+block_size]
            
            # Apply DFT and quantization
            compressed_dft = apply_dft_to_block(block)
            quantized_dft = quantize_dft(compressed_dft, quantization_level)
            
            # Reconstruct the block and store it in the compressed frame
            compressed_block = reconstruct_block_from_dft(quantized_dft)
            compressed_frame[y:y+block_size, x:x+block_size] = compressed_block

    return compressed_frame

# Apply the compression (DFT + quantization) to Y, U, and V channels of the video frames
def apply_compression_to_video(frames, width, height, block_size=16, quantization_level=20):
    compressed_frames = []
    for frame in frames:
        y, u, v = load_frame(frame, width, height)
        
        # Apply compression to Y, U, and V planes independently
        y_compressed = apply_compression_to_block(y, block_size, quantization_level)
        u_compressed = apply_compression_to_block(u, block_size, quantization_level)
        v_compressed = apply_compression_to_block(v, block_size, quantization_level)
        
        # Ensure all values are clamped and in uint8 range before storing
        y_compressed = np.clip(y_compressed, 0, 255).astype(np.uint8)
        u_compressed = np.clip(u_compressed, 0, 255).astype(np.uint8)
        v_compressed = np.clip(v_compressed, 0, 255).astype(np.uint8)
        
        # Append the compressed Y, U, V planes to the list of frames
        compressed_frames.append(
            y_compressed.tobytes() +
            u_compressed.tobytes() +
            v_compressed.tobytes()
        )
    
    return compressed_frames

# compressed_frames = apply_compression_to_video(frames, width, height, block_size=16, quantization_level=10)
# get_output_video(compressed_frames, "dft_block_compressed_video", width, height)

In [None]:
# writes encoded frames to an output file
def write_encoded_frames(encoded_blocks, u, v, width, height, block_size=16):
    for value in encoded_blocks:


In [43]:
import numpy as np
from scipy.fftpack import dct, idct
import matplotlib.pyplot as plt
from tqdm import tqdm

# Parameters
BLOCK_SIZE = 16
SEARCH_RANGE = 8
DCT_BLOCK_SIZE = 8

# Quantization Matrix (for DCT)
QUANTIZATION_MATRIX = np.array([
    [16, 11, 10, 16, 24, 40, 51, 61],
    [12, 12, 14, 19, 26, 58, 60, 55],
    [14, 13, 16, 24, 40, 57, 69, 56],
    [14, 17, 22, 29, 51, 87, 80, 62],
    [18, 22, 37, 56, 68, 109, 103, 77],
    [24, 35, 55, 64, 81, 104, 113, 92],
    [49, 64, 78, 87, 103, 121, 120, 101],
    [72, 92, 95, 98, 112, 100, 103, 99]
])

### MOTION ESTIMATION AND COMPENSATION ###
def motion_estimation(frame1, frame2, block_size=BLOCK_SIZE, search_range=SEARCH_RANGE):
    motion_vectors = np.zeros((height // block_size, width // block_size, 2), dtype=int)

    for i in range(0, height, block_size):
        for j in range(0, width, block_size):
            current_block = frame1[i:i+block_size, j:j+block_size]
            best_match = (0, 0)
            min_sad = float('inf')

            # Search in the given range
            for dx in range(-search_range, search_range + 1):
                for dy in range(-search_range, search_range + 1):
                    ref_x, ref_y = i + dx, j + dy
                    if ref_x < 0 or ref_y < 0 or ref_x + block_size > height or ref_y + block_size > width:
                        continue
                    ref_block = frame2[ref_x:ref_x+block_size, ref_y:ref_y+block_size]
                    sad = np.sum(np.abs(current_block - ref_block))
                    if sad < min_sad:
                        min_sad = sad
                        best_match = (dx, dy)

            motion_vectors[i // block_size, j // block_size] = best_match
    return motion_vectors

def motion_compensation(frame, motion_vectors, block_size=BLOCK_SIZE):
    predicted_frame = np.zeros_like(frame)

    for i in range(0, height, block_size):
        for j in range(0, width, block_size):
            dx, dy = motion_vectors[i // block_size, j // block_size]
            ref_x, ref_y = i + dx, j + dy
            predicted_frame[i:i+block_size, j:j+block_size] = frame[ref_x:ref_x+block_size, ref_y:ref_y+block_size]

    return predicted_frame

### RESIDUAL CALCULATIONS ###
def calculate_residual(actual_frame, predicted_frame):
    return actual_frame.astype(np.int16) - predicted_frame.astype(np.int16)

def reconstruct_frame(predicted_frame, residual):
    return predicted_frame + residual

### DCT AND QUANTIZATION ###
def apply_dct(block):
    return dct(dct(block.T, norm='ortho').T, norm='ortho')

def apply_idct(dct_block):
    return idct(idct(dct_block.T, norm='ortho').T, norm='ortho')

def quantize_dct(dct_block, quantization_matrix):
    return np.round(dct_block / quantization_matrix).astype(np.int16)

def dequantize_dct(quantized_block, quantization_matrix):
    return (quantized_block * quantization_matrix).astype(np.int16)

def run_length_encode(block):
    flat_block = np.array(block.flatten(), dtype=int)
    encoded = []
    prev_value = flat_block[0]
    count = 1

    for value in flat_block[1:]:
        if value == prev_value:
            count += 1
        else:
            encoded.append((int(prev_value), count))
            prev_value = value
            count = 1
    encoded.append((int(prev_value), count))
    return encoded

def run_length_decode(encoded_block):
    decoded_block = []
    for value, run_length in encoded_block:
        decoded_block.extend([value] * run_length)
    return np.array(decoded_block, dtype=int)

### FRAME PROCESSING ###
def encode_frame(frame):
    residual_dct_quantized = np.zeros_like(frame, dtype=float)
    encoded_blocks = []

    for i in range(0, height, DCT_BLOCK_SIZE):
        for j in range(0, width, DCT_BLOCK_SIZE):
            block = frame[i:i+DCT_BLOCK_SIZE, j:j+DCT_BLOCK_SIZE]
            dct_block = apply_dct(block)
            quantized_block = quantize_dct(dct_block, QUANTIZATION_MATRIX)
            residual_dct_quantized[i:i+DCT_BLOCK_SIZE, j:j+DCT_BLOCK_SIZE] = quantized_block.astype(int)
            encoded_blocks.append(run_length_encode(quantized_block))

    return encoded_blocks

def decode_frame(encoded_blocks, height, width):
    reconstructed_frame = np.zeros((height, width), dtype=float)
    block_idx = 0

    for i in range(0, height, DCT_BLOCK_SIZE):
        for j in range(0, width, DCT_BLOCK_SIZE):
            decoded_block_flat = run_length_decode(encoded_blocks[block_idx])
            decoded_block = decoded_block_flat.reshape(DCT_BLOCK_SIZE, DCT_BLOCK_SIZE)
            dequantize = dequantize_dct(decoded_block, QUANTIZATION_MATRIX)
            idct = apply_idct(dequantize)
            reconstructed_frame[i:i+DCT_BLOCK_SIZE, j:j+DCT_BLOCK_SIZE] = idct
            block_idx += 1

    return reconstructed_frame.astype(int)

def pipeline(frames, width, height):
    # Initialize the compressed frames with the key frame (Y, U, V)
    key_frame_y, key_frame_u, key_frame_v = load_frame(frames[0], width, height)
    stored_to_write = [(key_frame_y, key_frame_u, key_frame_v)]
    compressed_frames = [(key_frame_y, key_frame_u, key_frame_v)]  # Store Y, U, V as tuples
    num_to_process = 10


    for i in tqdm(range(1, num_to_process)):
        current_frame_y, current_frame_u, current_frame_v = load_frame(frames[i], width, height)
        
        prev_y, prev_u, prev_v = load_frame(frames[-1], width, height)
        motion_vectors = motion_estimation(prev_y, current_frame_y)

        predicted_frame_y = motion_compensation(prev_y, motion_vectors)

        residual_y = calculate_residual(current_frame_y, predicted_frame_y)

        encoded_blocks_y = encode_frame(residual_y)

        subsampled_u = chroma_subsample_420(current_frame_u)
        subsampled_v = chroma_subsample_420(current_frame_v)

        stored_to_write.append((motion_vectors, current_frame_u, current_frame_v, encoded_blocks_y))

        reconstructed_frame_u = chroma_upsample_420(subsampled_u)
        reconstructed_frame_v = chroma_upsample_420(subsampled_v)

        reconstructed_residual_y = decode_frame(encoded_blocks_y, height, width)
        reconstructed_residual_y = reconstructed_residual_y.astype(residual_y.dtype)
        reconstructed_frame_y = reconstruct_frame(predicted_frame_y, residual_y)

        reconstructed_frame_y = np.clip(reconstructed_frame_y, 0, 255)
        reconstructed_frame_u = np.clip(reconstructed_frame_u, 0, 255)
        reconstructed_frame_v = np.clip(reconstructed_frame_v, 0, 255)

        ## TODO ADD TRANSFORMATION TO U AND V COMPONENTS

        # Combine the reconstructed Y, U, and V channels to form the full frame
        reconstructed_frame = (
            reconstructed_frame_y,
            current_frame_u,
            current_frame_v
        )

        # Append the reconstructed Y, U, and V components to the compressed_frames list
        compressed_frames.append(reconstructed_frame)
    
    return compressed_frames, stored_to_write

frames_to_process, store = pipeline(frames, width, height)

100%|██████████| 9/9 [00:15<00:00,  1.73s/it]


In [44]:
compressed_to_bytes = frames_into_bytes(frames_to_process)

get_output_video(compressed_to_bytes, f'inter_test_1', width=width, height=height, framerate="30000:1001")


Size of inter_test_1.y4m: 3.04


In [None]:
import struct

def write_compressed_file(store, filename):
    with open(filename, "wb") as f:  # Use "wb" to write in binary mode
        # Keyframe (assuming `store[0]` is the keyframe)
        for keyframe in store[0]:
            for row in keyframe:
                for value in row:
                    f.write(struct.pack("B", value))  # Write as a single unsigned byte
        
        # Encoded frames
        for frame in range(1, len(store)):
            encoded_blocks = store[frame][3]
            motion_vectors = store[frame][0]
            u_frame = store[frame][1]
            v_frame = store[frame][2]

            # Write motion vectors
            for row in motion_vectors:
                for mv in row:
                    f.write(struct.pack("bb", mv[0], mv[1]))  # Write as two signed bytes

            # Write U and V frames
            for row in u_frame:
                for value in row:
                    f.write(struct.pack("B", value))  # Write as a single unsigned byte

            for row in v_frame:
                for value in row:
                    f.write(struct.pack("B", value))  # Write as a single unsigned byte

            # Write encoded blocks
            for block in encoded_blocks:
                for combo in block:
                    f.write(struct.pack("bb", combo[0], combo[1]))

def get_video_from_compressed_file(file, height, width):
    with open(file, "rb") as f:
        for byte in range(height * width):
            pass

write_compressed_file(store, "test.compressed")

In [None]:
# helper function

def show_frames(frame1, frame2, frame3, title1="Frame 1", title2="Frame 2", title3="Reconstructed Frame 2"):
    plt.figure(figsize=(15, 5))
    
    # Frame 1
    plt.subplot(1, 3, 1)
    plt.imshow(frame1, cmap='gray')
    plt.title(title1)
    plt.axis('off')
    
    # Frame 2
    plt.subplot(1, 3, 2)
    plt.imshow(frame2, cmap='gray')
    plt.title(title2)
    plt.axis('off')
    
    # Frame 3
    plt.subplot(1, 3, 3)
    plt.imshow(frame3, cmap='gray')
    plt.title(title3)
    plt.axis('off')
    
    plt.tight_layout()
    plt.show()