# Laboratorium 3

Konrad Dębiec

### Static Huffman compression

In [2]:
from bitarray import bitarray


class StaticHuffmanAlgorithm:
    class Node:
        def __init__(self, letter='', freq=0, left=None, right=None):
            self.freq = freq
            self.letter = letter
            self.left = left
            self.right = right

        def __repr__(self):
            return "(" + str(self.freq) + " " + str(self.letter) + ")"

        def __lt__(self, other):
            return self.freq < other.freq

    def __init__(self, decoded_filename='', encoded_filename='', encode=True):
        self.encoded_filename = encoded_filename
        self.decoded_filename = decoded_filename
        self.encode = encode
        self.codes = {}
        self.tree = None
        self.letters_freq = {}
        self.codes_counter = 0
        self.encoded_data = bitarray()

        if encode:
            self.compress_to_file()
        else:
            self.decompress_from_file()

    def build_huffman_tree(self):
        leafs = sorted([self.Node(l, w) for (l, w) in self.letters_freq.items()],
                       key=lambda n: n.freq)
        internal_nodes = []

        while len(leafs) + len(internal_nodes) > 1:
            head = []

            if len(leafs) >= 2:
                head += leafs[:2]
            elif len(leafs) == 1:
                head += leafs[:1]

            if len(internal_nodes) >= 2:
                head += internal_nodes[:2]
            elif len(internal_nodes) == 1:
                head += internal_nodes[:1]

            element_1, element_2 = sorted(head)[:2]
            internal_nodes.append(self.Node(freq=element_1.freq + element_2.freq,
                                            left=element_1,
                                            right=element_2))
            if len(leafs) and element_1 == leafs[0]:
                leafs = leafs[1:]
            else:
                internal_nodes = internal_nodes[1:]

            if len(leafs) and element_2 == leafs[0]:
                leafs = leafs[1:]
            else:
                internal_nodes = internal_nodes[1:]

        self.tree = internal_nodes[0]

    def build_freq_dict(self):
        with open(self.decoded_filename, "r") as f:
            data = f.read()
            for a in data:
                if a not in self.letters_freq:
                    self.letters_freq[a] = 1
                else:
                    self.letters_freq[a] += 1

    def generate_codes(self, node, code):
        if node.letter == '':
            self.generate_codes(node.left, code + bitarray('1'))
            self.generate_codes(node.right, code + bitarray('0'))
        else:
            self.codes[node.letter] = code

    def _encode_tree(self, node, encoded_tree):
        if node.right.letter == '':
            encoded_tree.append(False)
            self._encode_tree(node.right, encoded_tree)
        else:
            encoded_tree.append(True)
            encoded_tree.frombytes(node.right.letter.encode())

        if node.left.letter == '':
            encoded_tree.append(False)
            self._encode_tree(node.left, encoded_tree)
        else:
            encoded_tree.append(True)
            encoded_tree.frombytes(node.left.letter.encode())

    def encode_tree(self):
        encoded_tree = bitarray()
        self._encode_tree(self.tree, encoded_tree)
        return encoded_tree

    def _decode_tree(self, node, number_of_codes):
        if number_of_codes == self.codes_counter:
            return

        if self.encoded_data[0]:
            self.encoded_data = self.encoded_data[1:]
            node.right = self.Node(letter=self.encoded_data[:8].tobytes().decode())
            self.encoded_data = self.encoded_data[8:]
            self.codes_counter += 1
        else:
            node.right = self.Node()
            self.encoded_data = self.encoded_data[1:]
            self._decode_tree(node.right, number_of_codes)

        if number_of_codes == self.codes_counter:
            return

        if self.encoded_data[0]:
            self.encoded_data = self.encoded_data[1:]
            node.left = self.Node(letter=self.encoded_data[:8].tobytes().decode())
            self.encoded_data = self.encoded_data[8:]
            self.codes_counter += 1
        else:
            node.left = self.Node()
            self.encoded_data = self.encoded_data[1:]
            self._decode_tree(node.left, number_of_codes)

    def decode_tree(self, number_of_codes):
        self.tree = self.Node()
        self._decode_tree(self.tree, number_of_codes)

    def compress_to_file(self):
        self.build_freq_dict()
        self.build_huffman_tree()
        self.generate_codes(self.tree, bitarray())
        encoded_tree = self.encode_tree()
        encoded_text = bitarray()

        with open(self.decoded_filename, "r") as f:
            data = f.read()
            encoded_text.encode(self.codes, data)

        with open(self.encoded_filename, "wb") as f:
            number_of_codes = len(self.codes)
            c = bitarray()
            c.frombytes(number_of_codes.to_bytes(1, 'big'))
            offset = 8 - (c.length() + encoded_tree.length() + encoded_text.length()) % 8
            offset_bit = bitarray()
            offset_bit.frombytes(offset.to_bytes(1, 'big'))
            f.write(offset_bit + bitarray([False] * offset) + c + encoded_tree + encoded_text)

    def decompress_from_file(self):
        with open(self.encoded_filename, 'rb') as f:
            self.encoded_data.fromfile(f)

        encoded_offset = self.encoded_data[:8]
        self.encoded_data = self.encoded_data[8:]
        offset = int.from_bytes(encoded_offset.tobytes(), 'big')
        self.encoded_data = self.encoded_data[offset:]

        encoded_number_of_codes = self.encoded_data[:8]
        self.encoded_data = self.encoded_data[8:]
        number_of_codes = int.from_bytes(encoded_number_of_codes.tobytes(), 'big')
        self.decode_tree(number_of_codes)
        code = bitarray()
        self.generate_codes(self.tree, code)

        with open(self.decoded_filename, "w") as f:
            f.write(''.join(self.encoded_data.decode(self.codes)))

### Format pliku przechowującego dane

Plik składa się z następujących danych:
 - offset - 8 bitów - przechowuje ile następnych cyfr uzupełnia cały plik do wielokrotności 8
 - następnie jest offset cyfr 0 uzupełniających cały plik do wielokrotności 8
 - number_of_codes - 8 bitów - przechowuje ile jest liter w zakodowanym tekście
 - encoded_tree - zakodowane drzewo
 - encoded_text - zakodowany tekst

### Sprawdzenie poprawności, mierzenie współczynnika kompresji oraz czasu kompresji i dekompresji

In [37]:
import time
import os
from pathlib import Path
import filecmp

def testCompression(filename):
    start = time.time()
    compressed = StaticHuffmanAlgorithm(filename+".txt", filename+"_compressed")
    end = time.time()
    print("Czas kompresji: " + str(end - start))
    
    print("Współczynnik kompresji: ")
    print(str(100*Path(filename+'_compressed').stat().st_size/Path(filename+'.txt').stat().st_size) + "%")
    
    start = time.time()
    decompressed = StaticHuffmanAlgorithm(filename+"_decompressed.txt", filename+"_compressed", False)
    end = time.time()
    print("Czas dekompresji: " + str(end - start))
    
    if filecmp.cmp(filename+'.txt', filename+'_decompressed.txt'):
        print("Plik wejściowy i wynikowy jest taki sam")
    else:
        print("Plik wejściowy i wynikowy różnią się")

### Test pliku 1kb

In [38]:
testCompression("tests/1kb")

Czas kompresji: 0.005689859390258789
Współczynnik kompresji: 
82.06627680311891%
Czas dekompresji: 0.014542102813720703
Plik wejściowy i wynikowy jest taki sam


### Test pliku 10kb

In [39]:
testCompression("tests/10kb")

Czas kompresji: 0.0064868927001953125
Współczynnik kompresji: 
75.48330404217926%
Czas dekompresji: 0.09153366088867188
Plik wejściowy i wynikowy jest taki sam


### Test pliku 100kb

In [40]:
testCompression("tests/100kb")

Czas kompresji: 0.036667823791503906
Współczynnik kompresji: 
74.6474609375%
Czas dekompresji: 0.846895694732666
Plik wejściowy i wynikowy jest taki sam


### Test pliku 1000kb

In [41]:
testCompression("tests/1000kb")

Czas kompresji: 0.285891056060791
Współczynnik kompresji: 
74.77822265625%
Czas dekompresji: 8.639173746109009
Plik wejściowy i wynikowy jest taki sam
