In [1]:
import os
import math
import numpy as np
from collections import Counter
import heapq
from queue import PriorityQueue
# Please put your input file in the same folder as this Jupyter notebook file.
# Please do not change any other code
# You only need to complete the functions with TODO
# You can simply run the Q1/Q2/Q3_test() to test your function

In [2]:
def Q1_input(file_path):
    input_list = []
    with open(file_path, 'r') as file:
        for line in file:
            values = line.strip().split(',')
            if len(values) == 3:
                try:
                    Y = int(values[0])
                    Co = int(values[1])
                    Cg = int(values[2])
                    input_list.append((Y, Co, Cg))
                except ValueError:
                    print(f"Invalid line: {line}")
    return input_list


def Q1_test():
    input_file = "q1_input.txt"
    yco_cg_list = Q1_input(input_file)

    for Y, Co, Cg in yco_cg_list:
        YUV = Q1(Y, Co, Cg)
        print(f"YCoCg: ({Y}, {Co}, {Cg}) -> YUV: (Y={YUV[0]}, U={YUV[1]}, V={YUV[2]})")


In [3]:
def Q1(Y, Co, Cg):
    # TODO:
    # Conversion formula from Y Co Cg to YUV
    
    # Convertin Cg Y Co to GBR 
    CgYCo = np.array([[Cg], [Y], [Co]])
    matrix1 = np.array([[1, 1, 0], 
                        [-1, 1, -1], 
                        [-1, 1, 1]])
    # print(matrix1) # Debugging
    GBR = np.matmul(matrix1, CgYCo)
    # print(GBR) # Debugging

    # Convert GBR to RGB
    RGB = np.array([[GBR[2][0]], [GBR[0][0]], [GBR[1][0]]])
    # print(RGB) # Debugging

    # Conversion matrix from RGB to YUV
    matrix2 = np.array([[0.299, 0.587, 0.114],
                        [-0.299, -0.587, 0.886],
                        [0.701, -0.587, -0.114]])
    # print(matrix2) # Debugging

    # Convert RGB to YUV
    YUV = np.matmul(matrix2, RGB)

    # Extracting Y, U, V from YUV matrix 
    Y = YUV[0][0]
    U = YUV[1][0]
    V = YUV[2][0]

    # print(Y, U, V) # Debugging
    # return YUV values
    return (Y, U, V)


In [4]:
Q1_test()

YCoCg: (100, 5, 5) -> YUV: (Y=101.795, U=-11.795000000000002, V=-1.7950000000000035)


In [5]:
dither_matrix = np.array([
    [0, 3],
    [2, 1]
])

def Q2_input(file_path):
    image = []
    with open(file_path, 'r') as file:
        for line in file:
            row = []
            values = line.strip().split(',')
            for item in values:
                row.append(int(item))
            image.append(row)
    return np.array(image)

def Q2_test():
    file_path = "q2_input.txt"
    input_image = Q2_input(file_path)
    print("input image:\n",input_image)
    dithering_res = dithering(input_image,dither_matrix)
    ordered_dithering_res = ordered_dithering(input_image,dither_matrix)
    print("dithering:\n",dithering_res)
    print("ordered_dithering:\n",ordered_dithering_res)

In [6]:
def dithering(input_image, dither_matrix):

    """
    IMPLEMENTATION OF DITHERING
    Added Support for any n*n dither matrix for extra 1 mark

    """

    # Step 1: Normalize the input image to range 0-4
    # Assuming input_image is 0-255, map to 0-4 (5 levels for a 2x2 matrix)
    intensity_level = dither_matrix.shape[0] * dither_matrix.shape[1] + 1  # 5
    normalized_image = np.floor((input_image / 256) * intensity_level).astype(int)
    # print("Normalized image:\n", normalized_image) # Debugging

    normalized_image[normalized_image > 4] = 4  # Cap the values to 4
    # print("Capped normalized image:\n", normalized_image) # Debugging
    
    # Step 2: Define output image size (2N x 2N)
    n_rows, n_cols = input_image.shape
    d_rows, d_cols = dither_matrix.shape  # 2x2
    output_rows = n_rows * d_rows
    output_cols = n_cols * d_cols
    output_image = np.zeros((output_rows, output_cols), dtype=int)
    # print(f"Output image size: {output_rows}x{output_cols}") # Debugging


    # Step 3: Expand each pixel into a 2x2 block
    for i in range(n_rows):
        for j in range(n_cols):
            # Get the normalized value of the current pixel
            k = normalized_image[i, j]
            # print(f"Processing pixel ({i}, {j}) with normalized value {k}") # Debugging

            # Compare k to each value in the dither_matrix to get a 2x2 binary block
            submatrix = (k > dither_matrix).astype(int)
            # print(f"Generated 2x2 block for pixel ({i}, {j}):\n", submatrix) # Debugging
            # Place the 2x2 block in the output image
            output_image[i*d_rows:(i+1)*d_rows, j*d_cols:(j+1)*d_cols] = submatrix
    
    # print("Final output image:\n", output_image) # Debugging
    return output_image

def ordered_dithering(input_image,dither_matrix):
    #TODO:
    # Function for ordered dithering
    output_image = np.zeros_like(input_image)

    # Get the size of the dither matrix (assuming it's square)
    n = dither_matrix.shape[0]
    # print(f"Dither matrix size: {n}x{n}")  # Debugging
    
    # Calculate the number of intensity levels based on dither matrix size
    # For a 2x2 matrix, values might be 0-3, so intensity_level = 5 (0 to 4)
    intensity_level = dither_matrix.size + 1  # n^2 + 1
    # print(f"Intensity levels: {intensity_level}")  # Debugging

    # Normalize the input image from 0-255 to 0-(intensity_level-1)
    # For example, for a 2x2 matrix, map 0-255 to 0-4
    normalized_image = np.floor((input_image / 256) * intensity_level).astype(int)
    normalized_image[normalized_image > 4] = 4  # Cap the values to 4
    # print("Capped normalized image:\n", normalized_image) # Debugging

    # Tile the dither matrix to match the input image size
    # Calculate the number of times to repeat the dither matrix in each dimension
    reps_x = input_image.shape[0] // n
    reps_y = input_image.shape[1] // n
    tiled_dither = np.tile(dither_matrix, (reps_x, reps_y))
    
    # Ensure the tiled dither matrix matches the input image size exactly
    # (This assumes the image dimensions are divisible by n, as per some problem constraints)
    tiled_dither = tiled_dither[:input_image.shape[0], :input_image.shape[1]]
    # print("Tiled dither matrix:\n", tiled_dither)  # Debugging

    # Apply ordered dithering: compare normalized image to tiled dither matrix
    output_image = (normalized_image > tiled_dither).astype(int)
    # print("Final output image:\n", output_image)  # Debugging

    return output_image

In [7]:
Q2_test()

input image:
 [[  5 266]
 [ 52   5]]
dithering:
 [[0 0 1 1]
 [0 0 1 1]
 [1 0 0 0]
 [0 0 0 0]]
ordered_dithering:
 [[0 1]
 [0 0]]


In [2]:
import heapq
def Q3_input(file_path):
    string = []
    with open(file_path, 'r') as file:
        for line in file:
            string.append(line)
    return string


def Q3_test():

    file_path = "q3_input.txt"

    input_list = Q3_input(file_path)
    for input_string in input_list:
        print("input string: ",input_string)
        print("first order entropy:",first_order_entropy(input_string))
        print("second order entropy:",second_order_entropy(input_string))
        print("average codeword length:",huffman_length(input_string))
        print("joint average length:",joint_huffman_length(input_string))



In [None]:
def first_order_entropy(string):
    # TODO:
    # Function to calculate first-order entropy
    freq = Counter(string)
    total = len(string)
    entropy = 0
    for count in freq.values():
        p = count / total
        if p > 0:
            entropy -= p * math.log2(p)
    return entropy

def second_order_entropy(string):
    # TODO:
    # Function to calculate second-order entropy
    if len(string) < 2:
        print("String too short for second-order entropy.")
        return 0  # Not enough data for second-order entropy.

    biGramFrequencyMap = {}
    totalBigrams = 0

    for i in range(0, len(string) - 1, 2):  # Increment by 2 for non-overlapping pairs
        biGram = string[i:i + 2]
        biGramFrequencyMap[biGram] = biGramFrequencyMap.get(biGram, 0) + 1
        totalBigrams += 1
        # print(f"Found bigram: {biGram}, Count: {biGramFrequencyMap[biGram]}") # Debugging
    
    # print("\nBigram Frequency Map:", biGramFrequencyMap) # Debugging
    # print("Total Bigrams Processed:", totalBigrams) # Debugging

    entropy = 0.0
    for frequency in biGramFrequencyMap.values():
        probability = frequency / totalBigrams
        entropy += probability * math.log2(probability)
        # print(f"Bigram: {bigram}, Frequency: {frequency}, Probability: {probability:.4f}, Contribution: {probability * math.log2(probability):.4f}") # Debugging

    return -entropy / 2  # Divide by 2 to get the average entropy per symbol pair

class SymbolNode:
    def __init__(self, symbol, freq):
        self.symbol = symbol
        self.freq = freq
        self.left = None
        self.right = None

    def __lt__(self, other):
        return self.freq < other.freq

    def is_leaf(self):
        return self.left is None and self.right is None

def create_frequency_dict(text, order):
    """Create a frequency dictionary for symbols or pairs based on the order."""
    freq_map = {}
    step = order  # Step size for non-overlapping pairs
    for i in range(0, len(text) - order + 1, step):
        symbol = text[i:i + order]
        freq_map[symbol] = freq_map.get(symbol, 0) + 1
        # print(f"Found symbol: {symbol}, Count: {freq_map[symbol]}")  # Debugging
    return freq_map

def build_priority_queue(freq_map):
   # Build a priority queue based on the frequency map
    pq = []
    for symbol, freq in freq_map.items():
        heapq.heappush(pq, SymbolNode(symbol, freq))
    return pq

def merge_nodes(pq):
    # Merge nodes in the priority queue to form the Huffman tree.# 
    while len(pq) > 1:
        left = heapq.heappop(pq)  # Get the node with the smallest frequency
        right = heapq.heappop(pq)  # Get the next smallest node
        merged = SymbolNode(None, left.freq + right.freq)  # Merge the two nodes into one
        merged.left = left
        merged.right = right
        heapq.heappush(pq, merged)  # Insert the merged node back into the queue
    return pq[0] if pq else None

def calculate_tree_depth_and_length(node, depth):
    # Calculate total length of the Huffman tree.# 
    if node is None:
        return 0
    if node.is_leaf():
        return node.freq * depth  # If it's a leaf, add the weighted depth
    return calculate_tree_depth_and_length(node.left, depth + 1) + calculate_tree_depth_and_length(node.right, depth + 1)

def build_huffman_tree(text, order):

    # Build the Huffman tree and calculate the average codeword length
    # for either single-symbol or 2-symbol joint Huffman coding.
    
    # Step 1: Create frequency map for pairs (or single symbols if order=1)
    freq_map = create_frequency_dict(text, order)

    # Step 2: Build priority queue based on frequencies
    pq = build_priority_queue(freq_map)

    # Step 3: Merge nodes to form the tree
    root = merge_nodes(pq)
    if root is None:
        print("Error: Empty tree, no Huffman code generated.")
        return 0

    # Step 4: Calculate total codeword length (weighted sum of depths)
    total_length = calculate_tree_depth_and_length(root, 0)

    if order == 1:
        # For single-symbol Huffman coding, we divide by the total number of symbols
        avg_codeword_length_per_symbol = total_length / len(text)
    elif order == 2:
        # For 2-symbol joint Huffman coding, we divide by the number of pairs
        num_pairs = len(text) // 2
        avg_codeword_length_per_pair = total_length / num_pairs
        avg_codeword_length_per_symbol = avg_codeword_length_per_pair / 2  # since each pair has 2 symbols

    return avg_codeword_length_per_symbol

def huffman_length(string):
    avg_length = build_huffman_tree(string, 1)  # Call with order=1 for single symbols
    return avg_length

def joint_huffman_length(string):
    joint_avg_length = build_huffman_tree(string, 2)  # Call with order=2 for 2-symbol pairs
    return joint_avg_length

In [4]:
Q3_test()

input string:  AABB
first order entropy: 1.0
second order entropy: 0.5
average codeword length: 1.0
joint average length: 0.5


In [None]:
# (3) Discuss any issues/interesting observations in your implementation.

"""
1. Handling Non-overlapping Pairs:
In the implementation for second-order (joint) coding, the string is split into non-overlapping 2‑symbol pairs. An interesting point was ensuring that the string length is even so that every pair is complete. In cases where the string length might be odd, a decision must be made (e.g., padding or ignoring the last symbol). In our assignment, the string is always an even number greater than 2, so this issue did not arise.

2. Entropy Calculations:

First-order entropy is computed by considering the probability of each individual letter.
Second-order entropy is calculated using non-overlapping pairs. As expected, if there are correlations between adjacent symbols, the second-order entropy tends to be lower than the first-order entropy. This indicates that joint coding is able to exploit redundancy better than coding each symbol independently.

3. Huffman Coding vs. Joint Huffman Coding:
The Huffman coding implementation for single symbols uses the full frequency distribution of individual letters, while the joint coding method treats each pair as a new symbol.

For single-symbol Huffman coding, the average codeword length per symbol is often slightly above the first-order entropy.
For 2-symbol joint Huffman coding, the average codeword length per symbol is derived from the average bits per pair (divided by 2). In many cases, this average is lower than that for single-symbol coding, demonstrating improved efficiency when the underlying data has patterns that the joint method can exploit.
"""

In [None]:
# (4) Experiment different randomly/non randomly generated inputs, and discuss your observations.
# You can implement your experiment here
# --- Experiment 1: Randomly Generated Inputs ---
import random
def experiment_random(n_experiments=5, length=32):
    alphabet = ['A', 'B', 'C', 'D', 'E']
    print("Random Input Experiments:")
    for i in range(n_experiments):
        s = ''.join(random.choice(alphabet) for _ in range(length))
        print(f"\nExperiment {i+1}:")
        print("Input string:", s)
        print("First-order entropy:", first_order_entropy(s))
        print("Second-order entropy:", second_order_entropy(s))
        print("Single-symbol Huffman average length:", huffman_length(s))
        print("2-symbol joint Huffman average length:", joint_huffman_length(s))

# --- Experiment 2: Non-Random (Patterned) Inputs ---
def experiment_patterned():
    patterned_strings = [
        "AABBCC" * 5,    # repeated pattern
        "ABABABABABABABAB",  # alternating pattern
        "AAAAAAAABBBBBBBB",  # two blocks of repeated symbols
    ]
    print("\nNon-Random Input Experiments:")
    for i, s in enumerate(patterned_strings):
        print(f"\nPattern Experiment {i+1}:")
        print("Input string:", s)
        print("First-order entropy:", first_order_entropy(s))
        print("Second-order entropy:", second_order_entropy(s))
        print("Single-symbol Huffman average length:", huffman_length(s))
        print("2-symbol joint Huffman average length:", joint_huffman_length(s))

# Run experiments
experiment_random()
experiment_patterned()

"""
Entropy Trends:
Across all experiments, the second-order entropy (computed on non-overlapping 2‑symbol pairs) is significantly lower than the first-order entropy. This reduction indicates that there is some predictability between pairs of symbols—i.e., the joint probability distribution captures redundancy not evident when symbols are considered independently.

Huffman Coding Performance:

The average codeword lengths for single-symbol Huffman coding are very close to the first-order entropy, which is expected since Huffman coding is optimal among prefix codes but still slightly exceeds the entropy bound.
The 2-symbol joint Huffman coding average lengths exactly match (or are extremely close to) the second-order entropy in some cases, which confirms that the joint Huffman coding efficiently exploits the redundancy between symbol pairs.
Special Case in Experiment 4:
In Experiment 4, the observed first-order entropy is 1.0 bit and the second-order entropy is 0.5 bit. These values (with corresponding Huffman lengths matching exactly) suggest that the string has a very skewed or perhaps even binary-like distribution. This case highlights that when the input is highly predictable, both entropy and average codeword lengths drop significantly.

Implications:
The experiments reinforce the idea that when symbols exhibit dependency (or when certain pairs occur more frequently), 2-symbol joint Huffman coding can achieve a much lower average codeword length per symbol compared to coding symbols individually. The benefit is particularly clear when comparing the 2‑symbol joint average length to the single-symbol Huffman average length.
In practical applications, if the data has underlying structure or correlations, joint coding methods can lead to more efficient compression.
"""


Random Input Experiments:

Experiment 1:
Input string: DCDBECEAEBCBAECBBBEEECDABCDCDBAE
First-order entropy: 2.2730898459737485
Second-order entropy: 1.6875
Single-symbol Huffman average length: 2.28125
2-symbol joint Huffman average length: 1.6875

Experiment 2:
Input string: EDDCDDACDCACBCDBDEBACDBAECBDACCD
First-order entropy: 2.19616329959714
Second-order entropy: 1.6639097655573916
Single-symbol Huffman average length: 2.25
2-symbol joint Huffman average length: 1.6875

Experiment 3:
Input string: ACDADAAEEAAABACCECDCCDACBCDCCBAC
First-order entropy: 2.1127361177297614
Second-order entropy: 1.7264097655573916
Single-symbol Huffman average length: 2.1875
2-symbol joint Huffman average length: 1.75

Experiment 4:
Input string: BBCEBAECDABDCEEEADCCCDDBEBAAEBAA
First-order entropy: 2.31019159868833
Second-order entropy: 1.8125
Single-symbol Huffman average length: 2.34375
2-symbol joint Huffman average length: 1.8125

Experiment 5:
Input string: DECAEEEAEEDCADDDBCCBEECEAEDADABA
First-