# Text compression using Huffman Coding

In [56]:
from collections import Counter
from queue import PriorityQueue
from typing import Union, Dict, List, Tuple
import math
import string
import random
import time
import os

### Standard (static) Huffman Coding

In [184]:
class Node:
    def __init__(self, char: str, count: int):
        self.char: str = char # will be None for internal nodes
        self.count: int = count
        self.left = None
        self.right = None
    
    # we need operator overload for priority queue
    def __lt__(self, other):
        return self.count < other.count
    
    def __eq__(self, other):
        if other == None:
            return False
        return self.count == other.count
    
    # for debugging purposes
    def __str__(self):
        return str(self.char) + ' ' + str(self.count)
        
    
def create_static_huffman_tree(text: str) -> Node:
    # counting letters in text
    freq: Dict[str, int] = Counter(text)
    leaves: Dict[str, Node] = {key: Node(key, val) for key, val in freq.items()} 
    # creating priority queue
    queue = PriorityQueue()
    for letter in leaves.keys():
        queue.put(leaves[letter])
        
    while queue.qsize() > 1:
        node1: Node = queue.get()
        node2: Node = queue.get()
        
        parent_node: Node = Node(None, node1.count + node2.count)
        parent_node.left = node1
        parent_node.right = node2
        
        queue.put(parent_node)
        
    return queue.get()

def traverse_tree(node: Node, encoding: Dict[str, str], code: str):
    if node.left != None:
        traverse_tree(node.left, encoding, code + '0')

    if node.right != None:
        traverse_tree(node.right, encoding, code + '1')
            
    if node.left == None and node.right == None:
        encoding[node.char] = code

def get_huffman_coding(root: Node) -> Dict[str, str]:
    encoding: Dict[str, str] = dict()
        
    traverse_tree(root, encoding, '')
    
    return encoding

def huffman_encode(text: str, encoding: Dict[str, str], filename: str):
    bin_string = ''
    for letter in text:
        bin_string += encoding[letter]

    with open(filename, 'wb') as file:
        file.write(b'\x42')
        for k, v in encoding.items():

            file.write((len(v)).to_bytes(1, byteorder='big'))
            file.write(int(v, 2).to_bytes(2, byteorder='big'))
            file.write(ord(k).to_bytes(1, byteorder='big'))
        
        file.write(b'\x00')
        file.write((len(bin_string) % 8 + 8).to_bytes(1, byteorder='big'))
        ## add some
        bin_string = '1' * ((len(bin_string) % 8 + 8) + bin_string
                            
        int_data = int(bin_string, 2)
        bin_data = int_data.to_bytes((int_data.bit_length() + 7) // 8, byteorder='big')
        file.write(bin_data) 
    
    print(bin_string[:100], encoding, end = '\n\n\n')
    

def huffman_decode(filename: str) -> str:
    bin_string: str
    reversed_encoding: Dict[str, str] = {}
    
    with open(filename, 'rb') as file:
        file_magic_number = file.read(1)
        if file_magic_number == b'\x42':
            len_of_code = int.from_bytes(file.read(1), "big")
            while len_of_code > 0:
                
                code = bin(int.from_bytes(file.read(2), "big"))[2:]
                letter = chr(int.from_bytes(file.read(1), "big"))
                
                while len(code) < len_of_code:
                    code = '0' + code

                reversed_encoding[code] = letter

                # this one will be used for next code
                len_of_code = int.from_bytes(file.read(1), "big")
                
            bin_string = bin(int.from_bytes(file.read(), "big"))[2:]
            
        else:
            raise Exception('Wrong file format!')
    
    print(bin_string[:100])
    text: str = ''
    
    prefix: str = ''
    for bit in bin_string:
        prefix += bit
        
        if prefix in reversed_encoding:
            print(prefix, reversed_encoding[prefix])
            text += reversed_encoding[prefix]
            prefix = ''
    
    return text

### File format
**All numbers in hex, two numbers makes a byte**<br>
42 (magic starting number)<br>
... <br>
(repeat those for all codes)<br>
XX (length of code one byte)<br>
AB CD (two bytes for code)<br>
EF (one byte for letter)<br>
...<br>
00 00 (mark that we ended giving codes)<br>
YYYY... (text encoded in binary)<br>


In [181]:
text = 'aabbccddeeffgghhiijjkk'
root = create_static_huffman_tree(text)
encoding = get_huffman_coding(root)

huffman_encode(text, encoding, 'test.huff')

101010101100110010111011100100110111010010011110111001101101001000000011111111 {'j': '000', 'f': '001', 'i': '010', 'h': '011', 'd': '100', 'a': '1010', 'c': '1011', 'b': '1100', 'e': '1101', 'g': '1110', 'k': '1111'}




In [182]:
decoded_text = huffman_decode('test.huff')
text, decoded_text, text == decoded_text

00101010101100110010111011100100110111010010011110111001101101001000000011111111
001 f
010 i
1010 a
1100 b
1100 b
1011 c
1011 c
100 d
100 d
1101 e
1101 e
001 f
001 f
1110 g
1110 g
011 h
011 h
010 i
010 i
000 j
000 j
1111 k
1111 k


('aabbccddeeffgghhiijjkk', 'fiabbccddeeffgghhiijjkk', False)

### Tests

In [168]:
# generate text files
set_of_chars = string.ascii_letters + ' '

datasets: List[Tuple[str, int, int]] = [('test_1kB', 32, 32), 
                                         ('test_10kB', 160, 64), 
                                         ('test_100kbB', 160, 640), 
                                         ('test_1MB', 1600, 640)]
for ds in datasets:
    with open(ds[0], "w") as f:
        result  = ""
        lines = ds[1]
        chars = ds[2]

        for i in range(lines):
            for j in range(chars):

                if j == 0:
                    result += 'X'
                else:
                    result += random.choice(set_of_chars)
            result += "\n"
        f.write(result)


#### Static huffman

In [183]:
static_compress_times = []
static_decompress_times = []
static_compression_rate = []
datasets: List[Tuple[str, int, int]] = [('test_1kB', 32, 32)]
for ds in datasets:
    text = ""
    with open(ds[0], "r") as f:
        text = f.read()
    
    start = time.time()
    
    root = create_static_huffman_tree(text)
    encoding = get_huffman_coding(root)
    huffman_encode(text, encoding, ds[0] + '.huff')
    
    static_compress_times.append(time.time() - start)
    
    start = time.time()
    
    decoded_text = huffman_decode(ds[0] + '.huff')
    print(text[:20], decoded_text[:20])
    assert text == decoded_text
    
    static_decompress_times.append(time.time() - start)
    
    static_compression_rate.append((1 - os.path.getsize(ds[0] + '.huff') / os.path.getsize(ds[0])))

0010100100011000000000110001111111101111111110011001010000001101100111101110100001101101100010011001 {'G': '000000', 'a': '000001', 'S': '00001', 's': '00010', 'n': '00011', 'X': '0010', 'c': '00110', 'b': '001110', 'o': '001111', 'T': '010000', 'v': '010001', 'e': '010010', 't': '010011', 'q': '01010', 'J': '010110', 'p': '010111', 'y': '011000', 'g': '011001', '\n': '01101', 'Y': '011100', 'f': '011101', 'Z': '011110', 'K': '011111', 'D': '100000', 'h': '100001', 'F': '100010', 'A': '100011', 'W': '100100', 'j': '100101', 'x': '100110', 'E': '100111', 'I': '101000', ' ': '101001', 'H': '101010', 'm': '101011', 'C': '10110', 'Q': '101110', 'i': '101111', 'N': '110000', 'B': '110001', 'M': '110010', 'u': '110011', 'O': '110100', 'R': '110101', 'd': '110110', 'P': '110111', 'r': '111000', 'l': '111001', 'V': '111010', 'L': '111011', 'U': '111100', 'k': '111101', 'z': '111110', 'w': '111111'}


0101001000110000000001100011111111011111111100110010100000011011001111011101000011011011000100

AssertionError: 

In [170]:
print(encoding)

{'G': '000000', 'a': '000001', 'S': '00001', 's': '00010', 'n': '00011', 'X': '0010', 'c': '00110', 'b': '001110', 'o': '001111', 'T': '010000', 'v': '010001', 'e': '010010', 't': '010011', 'q': '01010', 'J': '010110', 'p': '010111', 'y': '011000', 'g': '011001', '\n': '01101', 'Y': '011100', 'f': '011101', 'Z': '011110', 'K': '011111', 'D': '100000', 'h': '100001', 'F': '100010', 'A': '100011', 'W': '100100', 'j': '100101', 'x': '100110', 'E': '100111', 'I': '101000', ' ': '101001', 'H': '101010', 'm': '101011', 'C': '10110', 'Q': '101110', 'i': '101111', 'N': '110000', 'B': '110001', 'M': '110010', 'u': '110011', 'O': '110100', 'R': '110101', 'd': '110110', 'P': '110111', 'r': '111000', 'l': '111001', 'V': '111010', 'L': '111011', 'U': '111100', 'k': '111101', 'z': '111110', 'w': '111111'}
