## Implementacje algorytmów

In [16]:
from queue import PriorityQueue
from bitarray import bitarray
from collections import defaultdict

### 1. Statyczny algorytm Huffmana:

Skompresowany tekst przechowywany jest w pliku w trzech częściach oddzielonych znakiem kowej linii:
- tekstowego nagłówka
- długości paddingu zakodowanego tekstu
- binarnego zakodowanego tekstu.

#### Kompresja:

In [17]:
def static_tree(text: str) -> tuple:
    """Tworzy z tekstu drzewo zapisane jako krotkę"""
    
    # Liczba wystąpień poszczególnych liter w tekście
    counts = defaultdict(lambda: 0)
    for letter in text:
        counts[letter] += 1

    # Kolejka priorytetowa według liczby wystąpień rosnąco
    # Bez id ma problem z porównywaniem przy tym samym count
    Q = PriorityQueue()
    for unique_id, (letter, count) in enumerate(counts.items()):
        Q.put((count, unique_id, letter))

    count_a, id_a, subtree_a = Q.get()

    while not Q.empty():
        count_b, _, subtree_b = Q.get()
        Q.put((count_a + count_b, id_a, (subtree_a, subtree_b)))
        # Tutaj kolejka nie będzie pusta
        count_a, id_a, subtree_a = Q.get()

    return subtree_a


def static_codes(tree: tuple) -> dict:
    """Wylicza kody na podstawie drzewa"""
    codes = {}

    def encode(node, code):
        if isinstance(node, str):
            codes[node] = bitarray(code)
            return

        a, b = node
        encode(a, code + "0")
        encode(b, code + "1")

    encode(tree, "")
    return codes


def static_encode(text: str, codes: dict) -> bitarray:
    """Zamienia otrzymany tekst w dane binarnie zakodowane"""

    compressed = bitarray()
    # To na tyle prosta operacja że pozwalam sobie użyć biblioteki
    compressed.encode(codes, text)
    return compressed


def static_compress(text: str, path: str = "file") -> None:
    """Kompresuje otrzymany tekst do pliku"""

    tree = static_tree(text)
    compressed = static_encode(text, codes=static_codes(tree))

    with open(path, "wb") as file:
        # drzewo Huffmana oraz długość paddingu
        file.write(bytes(repr(tree) + "\n" + str(compressed.fill()) + "\n", "utf"))
        file.write(compressed)


#### Dekompresja:

In [18]:
def static_decompress(path="file", verbose=False):
    with open(path, "rb") as file:
        tree = eval(file.readline())
        if verbose:
            print("Drzewo: ", tree)
        padding = int(file.readline())
        compressed = bitarray()
        compressed.fromfile(file)
        del compressed[-padding:]

    letters = []
    node = tree
    for i in range(len(compressed)):
        node = node[compressed[i]]

        if isinstance(node, str):
            letters.append(node)
            node = tree
    
    return "".join(letters)


#### Testy:

In [19]:
static_compress("dziala heheheheee")

In [20]:
static_decompress()

'dziala heheheheee'

In [21]:
with open("file", "rb") as file:
    print(file.read())

b"(((' ', ('d', 'z')), (('i', 'l'), 'a')), ('h', 'e'))\n3\n#F\xac]\xdd\xf8"


#### Pomiary:

In [22]:
from random import randbytes
with open("random", "wb") as file:
    file.write(randbytes(1_000_000))

In [23]:
import time

names = ["bfq-iosched.c", "pg34073.txt", "random"]
sizes = [1_000, 10_000, 100_000, 1_000_000]

for name in names:
    with open(name, "r") as file:
        for size in sizes:
            text = file.read(size)
            start = time.time()
            static_compress(text)
            compression_time = time.time()
            static_decompress()
            decompression_time = time.time()
            with open("file", "rb") as compressed:
                compressed = compressed.read()
            print("File: ", name)
            print("\tFile size: ", size, "B")
            print("\tRatio: ", len(compressed) / len(text))
            print("\tCompression time: ", compression_time - start, "s.")
            print("\tDecompression time: ", decompression_time - compression_time, "s.")
            print()
            

File:  bfq-iosched.c
	File size:  1000 B
	Ratio:  1.045
	Compression time:  0.0016186237335205078 s.
	Decompression time:  0.002304553985595703 s.

File:  bfq-iosched.c
	File size:  10000 B
	Ratio:  0.6975
	Compression time:  0.004084110260009766 s.
	Decompression time:  0.011548042297363281 s.

File:  bfq-iosched.c
	File size:  100000 B
	Ratio:  0.62351
	Compression time:  0.021536827087402344 s.
	Decompression time:  0.0936117172241211 s.

File:  bfq-iosched.c
	File size:  1000000 B
	Ratio:  0.627429961321222
	Compression time:  0.029646873474121094 s.
	Decompression time:  0.1361222267150879 s.

File:  pg34073.txt
	File size:  1000 B
	Ratio:  1.134
	Compression time:  0.0011341571807861328 s.
	Decompression time:  0.0014781951904296875 s.

File:  pg34073.txt
	File size:  10000 B
	Ratio:  0.6832
	Compression time:  0.004385709762573242 s.
	Decompression time:  0.009641647338867188 s.

File:  pg34073.txt
	File size:  100000 B
	Ratio:  0.60839
	Compression time:  0.01846766471862793 s.

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xb2 in position 1: invalid start byte

### 2. Dynamiczny algorytm Huffmana:

Spędziłem nad tym paręnaście godzin i niestety nie działa (choć coś tam się koduje w pliku).

Plik jest przechowywany jako ciąg zakodowanych binarnie danych. Gdy pojawia się kod którego dekoder nie zna następnym bajtem jest litera którą on oznacza. Dekoder na bieżąco modyfikuje drzewo w ten sam sposób co enkoder, zmieniając znaczenia kodów.

#### Kompresja:

In [None]:
class Node:
    def __init__(self, letter="", weight=1, parent=None, child=[None, None]):
        self.weight = weight
        self.letter = letter
        self.parent = parent
        self.child = child

    def get_code(self):
        code = ""
        while self.parent:
            if self.which_child() == 0:
                code = "0" + code
            else:
                code = "1" + code
            self = self.parent
        return code

    def update_weight(self, value=1):
        self.weight += value

        if self.parent:
            self.parent.update_weight(value)

        possible_swap = self.find_swap()
        if possible_swap:
            self.swap(possible_swap)

    def swap(self, other):
        s = self.which_child()
        o = other.which_child()

        # Podmiana wag
        self.parent.update_weight(other.weight - self.weight)
        other.parent.update_weight(self.weight - other.weight)

        # Podmiana wskażników rodziców na dzieci
        self.parent.children[s], other.parent.children[o] = (
            other.parent.children[o],
            self.parent.children[s],
        )
        # Podmiana wskaźników dzieci na rodziców
        self.parent, other.parent = other.parent, self.parent

    def which_child(self):
        if self.parent:
            return 0 if self is self.parent.child[0] else 1

    def find_swap(self):
        previous = self
        current = self.find_next()

        # W razie gdyby wiele węzłów miałą tę samą wagę
        while current and current.weight < self.weight:
            previous = current
            current = current.find_next()

        # Jeśli self nie jest największy na swoim poziomie
        if current and previous is not self:
            return previous

        current = self.find_next_above()
        while current and current.weight < self.weight:
            previous = current
            current = current.find_next()

    def find_next(self, level=0):
        """Zwraca następny węzeł na tym samym poziomie lub None jeśli self jest ostatnim"""

        current = self

        if not current.parent:
            return None
        
        # Wychodzimy po większych dzieciach dopóki nie
        # (jesteśmy mniejszym i istnieje większe)
        while current.which_child() == 1 or not current.parent.child[1]:
            current = current.parent
            level += 1
            
            # Self jest największy na swoim poziomie
            if not current.parent:
                return None

        # Zmieniamy na większe dziecko kiedy jesteśmy mniejszym
        current = current.parent.child[1]

        # Schodzimy na odpowiedni poziom po najmniejszych dzieciach
        while level > 0:
            if current.child[0]:
                current = current.child[0]
                level -= 1
            elif current.child[1]:
                current = current.child[1]
                level -= 1
            else:
                # Jeśli nie da się dojść na odpowiedni poziom to szukamy następnej ścieżki
                return current.find_next(level)

        return current

    def find_next_above(self):
        """Zwraca pierwszy węzeł na poziomie wyżej"""

        current = self
        level = 0

        # Wychodzimy do korzenia
        while current.parent:
            current = current.parent
            level += 1

        # Schodzimy po mniejszych dzieciach
        while level > 1:
            if current.child[0]:
                current = current.child[0]
                level -= 1
            elif current.child[1]:
                current = current.child[1]
                level -= 1
            else:
                current = current.find_next(level)

        return current
    
    def __repr__(self):
        child_0 = str(self.child[0]) if self.child[0] else ""
        child_1 = str(self.child[1]) if self.child[1] else ""
        letter = str(self.letter) + " " if self.letter else ""
        return f"[{letter}{str(self.weight)}]( {child_0} {child_1} )"


#### Dekompresja:

In [24]:
def dynamic_compress(text, path="dynamic"):
    nyt = Node("NYT", 0)
    nodes = {None: nyt}
    tree = nyt
    
    with open(path, "wb") as file:
        for letter in text:
            if letter in nodes:
                file.write(bytes(nodes[letter].get_code() + " ", "utf"))
                nodes[letter].update_weight(1)

            else:
                letter_node = Node(letter)
                nodes[letter] = letter_node
                # print(letter_node)

                new_node = Node(child=[nyt, letter_node])
                letter_node.parent = new_node
                
                if nyt.parent:
                    nyt.parent.child[0] = new_node
                else:
                    tree = new_node
                nyt.parent, new_node.parent = new_node, nyt.parent

                file.write(bytes(letter_node.get_code() + " ", "utf"))
                file.write(bytes(letter + " ", "utf"))

            print(tree)
        

Dekodowanie:

In [25]:
def dynamic_decompress(path="dynamic"):
    letters = []

    nyt = Node("NYT", 0)
    nodes = {None: nyt}
    tree = nyt
    node = tree
    
    with open(path, "rb") as file:
        compressed = bitarray()
        compressed.fromfile(file)

        for i in range(len(compressed)):
            print(node)
            if node.child[compressed[i]]:
                node = node.child[compressed[i]]
                if node.letter:
                    letters.append(node.letter)
                    node.update_weight(1)
                    nodes[letter].update_weight(1)
                    node = tree

            else:
                node.child[compressed[i]] = Node(str(compressed[i+1:i+9].tobytes()))
                i += 9
                node.child[0] = Node()
                node = tree




                # letter_node = Node(letter)
                # nodes[letter] = letter_node
                # # print(letter_node)

                # new_node = Node(child=[nyt, letter_node])
                # letter_node.parent = new_node
                
                # if nyt.parent:
                #     nyt.parent.child[0] = new_node
                # else:
                #     tree = new_node
                # nyt.parent, new_node.parent = new_node, nyt.parent

                # file.write(bytes(letter_node.get_code() + " ", "utf"))
                # file.write(bytes(letter + " ", "utf"))

            # print(tree)
    
    return "".join(letters)
        

#### Testy:

In [26]:
dynamic_compress("ccciekawe")

NameError: name 'Node' is not defined

In [27]:
dynamic_decompress()

NameError: name 'Node' is not defined

In [28]:
with open("dynamic", "rb") as file:
    print(file.read())

b'1 c 1 1 01 i 001 e 0001 k 00001 a 000001 w 001 '
