# Labolatorium 2 - Kodowanie Huffmana

In [1]:
import numpy as np
import heapq
from collections import deque
from bitarray import bitarray
from bitarray.util import ba2int
from time import process_time

# https://neptune.ai/blog/lossless-data-compression-using-arithmetic-encoding-in-python-and-its-applications-in-deep-learning

## Zadanie 1 - Implementacja kodera Huffmana

### Statyczny algorytm Huffmana

In [2]:
class StaticHuffmanEncoder():
    class Node():
        def __init__(self, left=None, right=None, value=None, weight=0):
            self.left = left
            self.right = right
            self.value = value
            self.weight = weight

    def __init__(self, text):
        self.text = text
        self.freq = self.get_freq()
        self.tree = self.build_tree()
        self.codes = self.get_code()

    def get_freq(self):
        freq = {}
        for c in self.text:
            if c in freq:
                freq[c] += 1
            else:
                freq[c] = 1
        return freq
    
    def build_tree(self):
        nodes = deque()
        for c in self.freq:
            nodes.append(self.Node(weight=self.freq[c], value=c))
        internal_nodes = deque()
        leafs = sorted(nodes, key=lambda x: x.weight)

        while len(leafs) + len(internal_nodes) > 1:
            # Pop two nodes with the lowest weight
            if len(leafs) == 0:
                left = internal_nodes.pop()
            elif len(internal_nodes) == 0:
                left = leafs.pop()
            else:
                if leafs[-1].weight < internal_nodes[-1].weight:
                    left = leafs.pop()
                else:
                    left = internal_nodes.pop()
            if len(leafs) == 0:
                right = internal_nodes.pop()
            elif len(internal_nodes) == 0:
                right = leafs.pop()
            else:
                if leafs[-1].weight < internal_nodes[-1].weight:
                    right = leafs.pop()
                else:
                    right = internal_nodes.pop()
            internal_nodes.append(self.Node(left=left, right=right, weight=left.weight+right.weight))

        return internal_nodes.pop()
    
    def get_code(self):
        codes = {}
        def traverse(node, code):
            if node.value:
                codes[node.value] = code
            else:
                traverse(node.left, code + '0')
                traverse(node.right, code + '1')
        traverse(self.tree, '')
        return codes
    
    def encode(self, encode_tree=False):
        encoded = bitarray()
        if encode_tree:
            encoded.extend(f'{len(self.freq):032b}')

            for c in self.codes:
                encoded.extend(f'{ord(c):08b}')
                encoded.extend(f'{self.freq[c]:032b}')

        for c in self.text:
            encoded.extend(self.codes[c])

        return encoded
    
    def encode_to_file(self, filename):
        encoded = self.encode()
        with open(filename, 'wb') as f:
            encoded.tofile(f)
    
    def decode(self,encoded: bitarray, encoded_tree = False):
        decoded = ''
        if not encoded_tree:
            node = self.tree
            start_index = 0
        else:
            no_of_codes = int(encoded[:32].to01(), 2)

            for i in range(no_of_codes):
                c = chr(int(encoded[32+40*i:32+40*i+8].to01(), 2))
                code_len = int(encoded[32+40*i+8:32+40*i+40].to01(), 2)
                self.freq[c] = encoded[32+40*i+40:32+40*i+40+code_len].to01()

            start_index = 32+40*no_of_codes
            node = self.build_tree()

        for c in encoded[start_index:]:
            if not c:
                node = node.left
            else:
                node = node.right
            if node.value:
                decoded += node.value
                node = self.tree
        return decoded    
    
    def decode_from_file(self, filename):
        with open(filename, 'rb') as f:
            encoded = bitarray()
            encoded.fromfile(f)
        return self.decode(encoded)

In [3]:
text = "Hello World!"
encoder = StaticHuffmanEncoder(text)
encoded = encoder.encode(encode_tree=False)
decoded = encoder.decode(encoded, encoded_tree=False)

print("Original text: ", text)
print("Encoded text: ", encoded)
print("Decoded text: ", decoded)

Original text:  Hello World!
Encoded text:  bitarray('00000001110101100010001111001010011010')
Decoded text:  Hello World!


### Dynamiczny algorytm Huffmana

In [4]:
class AdaptiveHuffmanEncoder():
    class Node():
        def __init__(self, right=None, parent=None, weight=0, index=0, value=None, left=None):
            self.left = left
            self.right = right
            self.parent = parent
            self.value = value
            self.weight = weight
            self.index = index

    def __init__(self):
        self.index = 999999 # Should be a large number
        tmp = self.Node(weight=0, index=self.index, value="NYT")
        self.root = tmp
        self.nodes = {"NYT": self.root}
        self.weights = {0: set([self.root]), 1: set()}

    def add_char_to_tree(self, char):
        node = self.nodes["NYT"]
        node.left = self.Node(weight=0, index=self.index - 2, parent=node, value="NYT")
        self.weights[0].add(node.left)
        self.nodes["NYT"] = node.left

        node.right = self.Node(weight=1, index=self.index-1, parent=node, value=char)
        self.weights[1].add(node.right)
        self.nodes[char] = node.right
        
        node.value = None
        self.index -= 2

        self.increment(node)

    def increment(self, node):
        while node != self.root:
            node = node.parent
            block_leader = max(self.weights[node.weight], key=lambda x: x.index)

            if node != block_leader:
                # Swap nodes
                node.index, block_leader.index = block_leader.index, node.index

                # If nodes have same parent, swap subtrees
                if node.parent == block_leader.parent:
                    if node == node.parent.left:
                        node.parent.right = node
                        node.parent.left = block_leader
                    else:
                        node.parent.right = block_leader
                        node.parent.left = node
                else:
                    # Swap nodes
                    if node == node.parent.left:
                        node.parent.left = block_leader
                    else:
                        node.parent.right = block_leader

                    if block_leader.parent.left == block_leader:
                        block_leader.parent.left = node
                    else:
                        block_leader.parent.right = node
                    
                    block_leader.parent, node.parent = node.parent, block_leader.parent

            # Update weights
            self.weights[node.weight].remove(node)
            node.weight += 1

            if node.weight not in self.weights:
                self.weights[node.weight] = set([node])
            else:
                self.weights[node.weight].add(node)

    def get_code(self, char):
        node = self.nodes[char]
        code = ""
        while node != self.root:
            if node == node.parent.left:
                code += "0"
            else:
                code += "1"
            node = node.parent

        return bitarray(code[::-1])

    def encode(self, text):
        coded_text = bitarray()
        for char in text:
            if char in self.nodes:
                coded_text += self.get_code(char)
                self.increment(self.nodes[char])
            else:
                # Encode char as 8 bits (we add # code because we need to know when to decode first occurence of char)
                coded_text += self.get_code("NYT") + bitarray(f"{ord(char):08b}")
                self.add_char_to_tree(char)
        
        # Make coded text divisible by 8
        end_bits = 8 - len(coded_text) % 8
        coded_text = bitarray(f"{end_bits:08b}") + coded_text + bitarray(end_bits)
        return coded_text



def decode_adaptive_huffman_tree(encoded_text):
    tree = AdaptiveHuffmanEncoder()
    bit = 0
    encoded = encoded_text[8:-int(encoded_text[:8].to01(),2)]
    decoded = ""
    node = tree.root
    
    while bit < len(encoded):
        # Get char from tree
        while not (node.left is None and node.right is None):
            if not encoded[bit]:
                node = node.left
            else:
                node = node.right
            bit += 1
        if node.value == "NYT":
            # Read 8 bits and add new node to tree
            decoded_char = chr(int(encoded[bit:bit + 8].to01(), 2))
            tree.add_char_to_tree(decoded_char)
            bit += 8
        else:
            decoded_char = node.value
            tree.increment(tree.nodes[decoded_char])
            
        node = tree.root
        decoded += decoded_char

    return decoded

In [5]:
text = "Hello World!"
encoder = AdaptiveHuffmanEncoder()
encoded = encoder.encode(text)
decoded = decode_adaptive_huffman_tree(encoded)

print("Original text: ", text)
print("Encoded text: ", encoded)
print("Decoded text: ", decoded)

Original text:  Hello World!
Encoded text:  bitarray('000001000100100000110010100011011000011000110111100000100000100001010111011110000111001001100000110010001000001000011000')
Decoded text:  Hello World!


## Zadanie 2 - Analiza algorytmów - pomiar czasu oraz wyznaczenie współczynnika kompresji

In [6]:
def test_huffman(filename):
    text = open(filename, 'r').read()
    
    start_time = process_time()
    encoder = StaticHuffmanEncoder(text)
    encoded = encoder.encode()
    decoded = encoder.decode(encoded)
    if decoded != text:
        print("Static Huffman failed")
        return

    print(f"Static Huffman: {process_time() - start_time}s")
    print(f"Compression ratio: {1 - len(text) / len(encoded)}")

    start_time = process_time()
    encoder = AdaptiveHuffmanEncoder()
    encoded = encoder.encode(text)
    decoded = decode_adaptive_huffman_tree(encoded)
    if decoded != text:
        print("Adaptive Huffman failed")
        return

    print(f"Adaptive Huffman: {process_time() - start_time}s")
    print(f"Compression ratio: {1 - len(text) / len(encoded)}")


In [10]:
test_huffman("./files/uniform_1MB.txt")

Static Huffman: 2.7253819999999997s
Compression ratio: 0.9843790736941991
Adaptive Huffman: 10.553025000000002s
Compression ratio: 0.8749263465003861
