In [1]:
from bitarray import bitarray
from collections import defaultdict
from heapq import heappop, heappush
from os import listdir
from sys import getsizeof
from tqdm import tqdm

# Statyczne kodowanie Huffmana

Zakodowany tekst posiada na początku informacje dotyczące kodowania:
- 1 bajt przechowujący ilość nieważnych bitów dodanych do końca zakodowanego tekstu (dopełnienie ilości bitów do wielokrotności 8)
- 2 bajty przechowujące wielkość zapisanego dalej kodowania
- kodowanie każdej z liter, każde w postaci:  
  - 1 bajt przechowujący literę (kodowanie utf-8)
  - 1 bajt przechowujący wielkość kodowania
  - kodowanie litery
  
  
Następnie znajduje się zakodowany tekst.  
Całość zapisywana jest w pliku binarnym.

In [2]:
def count_letters(text):
    count = defaultdict(lambda: 0)
    for l in text:
        count[l] += 1
    return count

class Node():
    def __init__(self, weight=None, children=None, label=None, code=None, parent=None, **kwargs):
        self.weight = weight
        self.children = children
        self.label = label
        self.code = code
        self.parent = parent
        if parent is not None:
            self.parent = parent
            if parent.children:
                parent.children.append(self)
            else:
                parent.children = [self]
        if children is not None:
            for node in children:
                node.parent = self
        
    def is_leaf(self):
        return self.children is None
    
    def find_child(self, code):
        if self.children is None:
            return None
        if self.children[0].code == int(code):
            return self.children[0]
        elif len(self.children) == 2:
            return self.children[1]
        return None
    
    def __ge__(self, other):
        return self.weight >= other.weight
    
    def __eq__(self, other):
        return self.weight == other.weight
    
    def __gt__(self, other):
        return self.weight > other.weight
        
def code(tree):
    tree.children[0].code = 0
    tree.children[1].code = 1
    if not tree.children[0].is_leaf():
        code(tree.children[0])
    if not tree.children[1].is_leaf():
        code(tree.children[1])

def huffman(text):
    count = count_letters(text)
    trees = []
    leafs = []
    for l, weight in count.items():
        node = Node(weight, label=l)
        heappush(trees, node)
        leafs.append(node)
        
    while len(trees) > 1:
        t1 = heappop(trees)
        t2 = heappop(trees)

        parent = Node(children=[t1, t2],weight=t1.weight + t2.weight)
        heappush(trees, parent)
    code(trees[0])
    return trees[0], leafs

def get_code(leaf):
    code = str(leaf.code)
    parent = leaf.parent
    while parent is not None and parent.code is not None:
        code += str(parent.code)
        parent = parent.parent
    return code[::-1]
    
    
def print_huffman(node, prefix=""):
    assert not node.is_leaf()
    if node.children[0].is_leaf() and node.children[1].is_leaf():
        leaf = node.children[0]
        print(f"{prefix}{leaf.code} -> #{leaf.weight} {leaf.label} {get_code(leaf)}")
        leaf = node.children[1]
        print(f"{prefix}{leaf.code} -> #{leaf.weight} {leaf.label} {get_code(leaf)}")
    elif node.children[0].is_leaf() or node.children[1].is_leaf():
        if node.children[0].is_leaf():
            leaf = node.children[0]
            internal = node.children[1]
        else:
            leaf = node.children[1]
            internal = node.children[0]
        print(f"{prefix}{leaf.code} -> #{leaf.weight} {leaf.label} {get_code(leaf)}")
        print(f"{prefix}{internal.code} -> #{internal.weight}")
        print_huffman(internal, prefix=prefix+" ")
    else:
        print(f"{prefix}{node.children[0].code} -> #{node.children[0].weight}")
        print_huffman(node.children[0], prefix=prefix+" ")
        print(f"{prefix}{node.children[0].code} -> #{node.children[1].weight}")
        print_huffman(node.children[1], prefix=prefix+" ")
        

In [3]:
node, leafs = huffman("abracadabra")
print_huffman(node)

0 -> #5 a 0
1 -> #6
 0 -> #2
  0 -> #1 c 100
  1 -> #1 d 101
 0 -> #4
  0 -> #2 r 110
  1 -> #2 b 111


In [3]:
def encode(text, encoder, with_encoding=False):
    encoded_text = bitarray()
    if with_encoding:
        for k, v in encoder.items():
            encoded_text.frombytes(bytes(k, encoding="utf_8"))
            encoded_text.frombytes(len(v).to_bytes(1, 'big'))
            encoded_text += bitarray(v)

    encoded_text = bitarray((len(encoded_text)).to_bytes(3, "big")) + encoded_text
    for l in text:
        encoded_text += bitarray(encoder[l])
    bits_added = encoded_text.fill()
    encoded_text = bitarray(bits_added.to_bytes(2, "big")) + encoded_text
    return encoded_text.tobytes()

In [4]:
texts = []
text_dir = "compression texts"
for filename in listdir(text_dir):
    print(filename)
    with open(text_dir + "/" + filename, "r") as file:
        text = file.read()
    texts.append((filename, text))

10kb.txt
1MB.txt
100kb.txt
1kb.txt


In [5]:
def compress(text, with_encoding=True):
    node, leafs = huffman(text)
    encoder = {leaf.label: get_code(leaf) for leaf in leafs}
    return encode(text, encoder, with_encoding=with_encoding)

compressed_texts = []
for filename, text in texts:
    print(filename)
    encoded_text = compress(text, with_encoding=True)
    compressed_texts.append((filename, encoded_text))
    filename = filename.split(".")[0]
    with open(filename + ".compressed", "wb") as file:
        file.write(encoded_text)
    print(f"compression coefficient = {1 - getsizeof(encoded_text) / getsizeof(text)}")

10kb.txt
compression coefficient = 0.4512085944494181
1MB.txt
compression coefficient = 0.4648634103072645
100kb.txt
compression coefficient = 0.464029495813433
1kb.txt
compression coefficient = 0.3638863428047663


In [6]:
def decode_tree(text):
    root = Node()
    start = 0
    end = 8
    while end <= len(text):
        letter = text[start:end].tobytes().decode(encoding='utf_8')
        start += 8
        end += 8
        code_size = int.from_bytes(text[start:end].tobytes(), 'big')
        start += 8
        end = start + code_size
        code = text[start:end]
        start = end
        end += 8
        node = root
       # print(letter, code_size, code)
        for i, c in enumerate(code):
            c = int(c)
            child = node.find_child(c)
            if child is None:
                if code_size==i+1:
                    node = Node(code=c, parent=node, label=letter)
                else:
                    node = Node(code=c, parent=node)
            else:
                node = child
    return root

def get_letter(text, tree, start):
    n = start
    node = tree
    while not node.is_leaf():
        node = node.find_child(int(text[n]))
        n += 1
    return node.label, n
        
            
def decode(text):
    encoded_text = bitarray()
    encoded_text.frombytes(text)
    fill = int.from_bytes(encoded_text[:8].tobytes(), 'big')
    tree_size = int.from_bytes(encoded_text[8:24].tobytes(), 'big')
    tree = decode_tree(encoded_text[24:24+tree_size])
    encoded_text[:24+tree_size] = bitarray() # this part will not be needed again
    encoded_text[-fill:] = bitarray()
    decoded_text = []
    i = 0
    while len(encoded_text) > i:
        letter, i = get_letter(encoded_text, tree, i)
        decoded_text.append(letter)
    return "".join(decoded_text)

## Sprawdzenie poprawności odkodowanego tekstu

In [7]:
for filename, text in texts:
    print(filename)
    filename = filename.split(".")[0]
    with open(filename + ".compressed", "rb") as file:
        encoded_text = file.read()
    decoded_text = decode(encoded_text)
    assert decoded_text == text

10kb.txt
1MB.txt
100kb.txt
1kb.txt


# Porównanie czasu kodowania i dekodowania

In [9]:
%%timeit
for filename, text in texts:
    compress(text)

637 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
%%timeit
for filename, text in compressed_texts:
    decode(text)

5.21 s ± 211 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# Dynamiczne kodowanie Huffmana

Zakodowany tekst posiada na początku informacje dotyczące kodowania:
- 1 bajt przechowujący ilość nieważnych bitów dodanych do końca zakodowanego tekstu (dopełnienie ilości bitów do wielokrotności 8)
- 1 bajt przechowujący wielkość alfabetu
- alfabet (wszystkie litery występujące w tekście w kolejnośći ich pierwszego wystąpienia)
  
Następnie znajduje się zakodowany tekst.  
Całość zapisywana jest w pliku binarnym.

In [8]:
class AdaptiveNode(Node):
    def __init__(self, idx, **kwargs):
        super().__init__(**kwargs)
        self.idx = idx
        
        
class AdaptiveTree():
    def __init__(self, first_letter, first_code=0):
        self.root = AdaptiveNode(idx=0, weight=1)
        node = AdaptiveNode(idx=1, label=first_letter, code=first_code, weight=1, parent=self.root)
        zero_node = AdaptiveNode(idx=2, weight=0, code=(first_code+1)%2, parent=self.root)
        self.nodes = [self.root, node, zero_node]
        
    def find_highest_sibling(self, node):
        highest_idx = node.idx - 1
        while highest_idx >= 0 and self.nodes[highest_idx].weight == node.weight:
            highest_idx -= 1
        return self.nodes[highest_idx+1]
    
    def swap(self, idx1, idx2):
        node1 = self.nodes[idx1]
        node2 = self.nodes[idx2]
        parent1 = self.nodes[idx1].parent
        parent2 = self.nodes[idx2].parent

        if parent1:
            if parent1.children[0] is node1:
                parent1.children[0] = node2
            else:
                parent1.children[1] = node2
        if parent2:
            if parent2.children[0] is node2:
                parent2.children[0] = node1
            else:
                parent2.children[1] = node1
        node1.parent = parent2
        node2.parent = parent1
        node1.code, node2.code = node2.code, node1.code 
        self.nodes[idx1], self.nodes[idx2] = self.nodes[idx2], self.nodes[idx1]
        node1.idx, node2.idx = node2.idx, node1.idx 
        
        
    def increment(self, node):
        sibling = self.find_highest_sibling(node)
        if sibling is not self.root and sibling is not node and sibling is not node.parent:
            self.swap(sibling.idx, node.idx)
        node.weight += 1
        if node.parent is not None:
            self.increment(node.parent)
    
    @staticmethod
    def other_code(node):
        return (node.code + 1) % 2
    
    def add_node(self, label):
        zero_node = self.nodes[-1]
        parent = zero_node.parent
        assert parent is not None
        
        new_inner = AdaptiveNode(weight=0, code=zero_node.code, 
                                 idx=zero_node.idx)
        new_inner.parent = zero_node.parent
        leaf = AdaptiveNode(weight=1, label=label, code=self.other_code(zero_node), 
                    parent=new_inner, idx=zero_node.idx+1)
        zero_node.idx += 2
        zero_node.parent = new_inner
        new_inner.children.append(zero_node)

        if parent.children[0] == zero_node:
            parent.children[0] = new_inner
        else:
            parent.children[1] = new_inner
        self.nodes[new_inner.idx] = new_inner
        self.nodes.append(leaf)
        self.nodes.append(zero_node)
        self.increment(new_inner)
        return leaf


In [9]:
def find_leaf(leafs, label):
    for leaf in leafs:
        if leaf.label == label:
            return leaf
    return None

def check_tree(node):
    print(f"check {node.label} | w={node.weight}")
    if node.is_leaf():
        pass
    else:
        assert len(node.children) == 2
        check_tree(node.children[0])
        check_tree(node.children[1])

def adaptive_huffman_encode(text):
    alphabet = [text[0]]
    tree = AdaptiveTree(text[0])
    leafs = [tree.nodes[1]]
    encoded = bitarray(get_code(leafs[0]))
    for letter in text[1:]:
        leaf = find_leaf(leafs, letter)

        if leaf:
            encoded += bitarray(get_code(leaf))
         #   print("inc", get_code(leaf)+' '+leaf.label)
            tree.increment(leaf)
        else:
            alphabet.append(letter)
            zero_node = tree.nodes[-1]
            encoded += bitarray(get_code(zero_node))
            
            leaf = tree.add_node(letter)
            leafs.append(leaf)
            
    bit_alphabet = bitarray()
    bit_alphabet.frombytes(bytes("".join(alphabet), encoding="utf_8"))
    alphabet_size = len(alphabet)
    bit_size = bitarray((alphabet_size).to_bytes(2, "big"))
    fill = encoded.fill()
    bit_fill = bitarray((fill).to_bytes(2, "big"))
    return (bit_fill + bit_size + bit_alphabet + encoded).tobytes()
            
abracadabra = adaptive_huffman_encode("abracadabra")

In [10]:
compressed_texts = []
for filename, text in texts:
    print(filename)
    encoded_text = adaptive_huffman_encode(text)
    compressed_texts.append((filename, encoded_text))
    filename = filename.split(".")[0]
    with open(filename + ".compressed", "wb") as file:
        file.write(encoded_text)
    print(f"compression coefficient = {1 - getsizeof(encoded_text) / getsizeof(text)}")

10kb.txt
compression coefficient = 0.45797274445439173
1MB.txt
compression coefficient = 0.4649122457318835
100kb.txt
compression coefficient = 0.46464898782997943
1kb.txt
compression coefficient = 0.4207149404216315


In [11]:
def get_node(text, node, start):
    n = start
    while not node.is_leaf():
        node = node.find_child(int(text[n]))
        n += 1
    return node, n

def adaptive_huffman_decode(encoded_text):
    encoded = bitarray()
    encoded.frombytes(encoded_text)
    fill = int.from_bytes(encoded[:8].tobytes(), 'big')
    alphabet_size = int.from_bytes(encoded[8:16].tobytes(), 'big')
    alphabet = encoded[16:16+8*alphabet_size].tobytes().decode("utf_8")
    tree = AdaptiveTree(alphabet[0])
    leafs = [tree.nodes[1]]
    decoded = [alphabet[0]]
    unused = 1
    
    start = 16 + 8 * alphabet_size + 1
    while start < len(encoded) - fill:
        leaf, start = get_node(encoded, tree.root, start)
        if leaf.label:
            decoded.append(leaf.label)
            tree.increment(leaf)
        else:
            letter = alphabet[unused]
            decoded.append(letter)
            unused += 1
            
            zero_node = tree.nodes[-1]
            leaf = tree.add_node(letter)
            leafs.append(leaf)
            
    return "".join(decoded)

adaptive_huffman_decode(abracadabra)

'abracadabra'

## Sprawdzenie poprawności odkodowanego tekstu

In [12]:
for filename, text in texts:
    print(filename)
    filename = filename.split(".")[0]
    with open(filename + ".compressed", "rb") as file:
        encoded_text = file.read()
    decoded_text = adaptive_huffman_decode(encoded_text)
    assert decoded_text == text

10kb.txt
1MB.txt


KeyboardInterrupt: 

# Porównanie czasu kodowania i dekodowania

In [16]:
%%timeit
for filename, text in texts:
    adaptive_huffman_encode(text)

9.31 s ± 497 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [20]:
%%timeit
for filename, text in [texts[1]]:
    adaptive_huffman_encode(text)

7.53 s ± 558 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [19]:
%%timeit
for filename, text in [texts[2]]:
    adaptive_huffman_encode(text)

710 ms ± 223 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [17]:
%%timeit
for filename, text in compressed_texts:
    adaptive_huffman_decode(text)

11.3 s ± 156 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [21]:
%%timeit
for filename, text in [compressed_texts[2]]:
    adaptive_huffman_decode(text)

948 ms ± 283 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
%%timeit
for filename, text in [compressed_texts[1]]:
    adaptive_huffman_decode(text)

# Wnioski
Współczynnik kompresji osiąga bardzo zbliżone wartości niezależnie od sposobu kopresji - widoczna różnica pojawia się tylko przy kompresji małych plików. Róznica ta jest głównie spowodowana dodatkowymi informacjami przechowanymi razem z tekstem (alfabet zajmuje mniej miejsca niż zapis całego kodowania, więc kodowanie dynamiczne osiąga lepszy współczynnik), których wielkość traci znaczenie przy długich tekstach.  
Czas kompresji jest w algorytmie dynamicznym jest około 20x dłuższy niż przy algorytmie statycznym, więc biorąc pod uwagę niewielkie zmiany w wielkości plików wynikowych kodowanie algorytmem statycznym okazuje się ogólnie wydajniejsze.