# Rozwiązanie laboratorium 2

In [616]:
from queue import PriorityQueue
from time import perf_counter

Funkcje konwertujące

In [617]:
def string_to_int(string):
    val = 0
    for x in string:
        val = val << 1 | (0 if x == '0' else 1)
    return val


def int_to_string(x, no_of_bits):
    string = []
    mask = 1 << (no_of_bits - 1)
    for _ in range(no_of_bits):
        string.append("1" if x & mask else "0")
        mask >>= 1
    return "".join(string)

Wczytytywanie i odczytywanie z plików binarnych/tekstowych

In [618]:
def read_file_to_string(filename):
    with open(filename, "r", encoding="UTF-8") as f:
        data = f.read()
    return data


def read_binary_file_to_string(filename):
    with open(filename, 'rb') as f:
        bit_data = f.read()

    data = []
    for bit in bit_data[:-2]:
        data.append(int_to_string(bit, 8))
    last = ""
    mask = 1 << 7
    if bit_data[-1]:
        for j in range(bit_data[-1]):
            last += "1" if mask & bit_data[-2] else "0"
            mask >>= 1
    else:
        for j in range(8):
            last += "1" if mask & bit_data[-2] else "0"
            mask >>= 1
    data.append(last)
    return "".join(data)


def write_string_to_binary_file(filename, text):
    b = bytearray()
    for i in range(0, len(text), 8):
        b.append(string_to_int(text[i:i+8]))

    with open(filename, 'wb') as f:
        f.write(b)

In [619]:
def add_last_bits(text):
    padding_length = len(text) % 8
    text += "0" * ((8 - padding_length)%8)
    text += int_to_string(padding_length, 8)
    return text

### Statyczne drzewo Huffmana

In [620]:
class StaticNode:
    def __init__(self, character=None):
        self.character = character
        self.left = None
        self.right = None

class StaticHuffman:
    def __init__(self):
        self.tree_root = None
        self.frequency_dict = {}
        self.code_dict = {}

    def build_frequency_dict(self, data):
        for c in data:
            if c not in self.frequency_dict:
                self.frequency_dict[c] = 1
            else:
                self.frequency_dict[c] += 1

    def build_tree(self):
        pq = PriorityQueue()
        for c in self.frequency_dict:
            pq.put((self.frequency_dict[c], c, StaticNode(c)))

        while True:
            freq1, str1, node1 = pq.get()
            if pq.empty():
                self.tree_root = node1
                return
            freq2, str2, node2 = pq.get()
            new_node = StaticNode()
            new_node.left = node1
            new_node.right = node2
            pq.put((freq1 + freq2, str1 + str2, new_node))

    def code_characters(self):
        def traverse_tree(node, code=""):
            if node.character is not None:
                self.code_dict[node.character] = code
            else:
                traverse_tree(node.left, code + '0')
                traverse_tree(node.right, code + '1')

        traverse_tree(self.tree_root, code="")

    def encode_text(self, text):
        self.build_frequency_dict(text)
        self.build_tree()
        self.code_characters()
        encoded = []
        for c in text:
            encoded.append(self.code_dict[c])
        return add_last_bits("".join(encoded))

    def decode_text(self, text):
        ind = 0
        decoded = []
        while ind < len(text):
            ptr = self.tree_root
            while ptr.character is None:
                ptr = ptr.left if text[ind] == '0' else ptr.right
                ind += 1
            decoded.append(ptr.character)
        return "".join(decoded)

In [621]:
file = read_file_to_string("pan-tadeusz.txt")

In [622]:
# static_tree = StaticHuffman()
# encoded_text = static_tree.encode_text(file)
# write_string_to_binary_file("tak1", encoded_text)
# decoded_text = static_tree.decode_text(read_binary_file_to_string("tak1"))
# print(decoded_text == file)

### Dynamiczne drzewo Huffmana

In [623]:
class AdaptiveNode:
    def __init__(self, index, weight, character, external):
        self.index = index
        self.weight = weight
        self.character = character
        self.external = external
        self.left = None
        self.right = None
        self.parent = None


def interchange(node, change):
    if change != node:
        change.index, node.index = node.index, change.index
        parent_change, parent_node = change.parent, node.parent
        if parent_change.left == change:
            if parent_node.left == node:
                parent_change.left, parent_node.left = node, change
            else:
                parent_change.left, parent_node.right = node, change
        else:
            if parent_node.left == node:
                parent_change.right, parent_node.left = node, change
            else:
                parent_change.right, parent_node.right = node, change
        node.parent, change.parent = parent_change, parent_node


def update_weight(node, node_dict):
    node_dict[node.weight].remove(node)
    if node.weight + 1 not in node_dict:
        node_dict[node.weight + 1] = set()
    node_dict[node.weight + 1].add(node)


class AdaptiveHuffman:
    def __init__(self):
        self.root = AdaptiveNode(1000, 0, "NYT", True)
        self.NYT = self.root
        self.free_index = 999
        self.leaves = {}
        self.node_weights = {1:set()}
        self.leaf_weights = {1:set()}

    def get_leaf_code(self, node):
        code = []
        while node != self.root:
            code.append("0" if node == node.parent.left else "1")
            node = node.parent
        return "".join(code)[::-1]

    def add_new_node(self, char):
        right_child = AdaptiveNode(self.free_index, 1, char, True)
        self.free_index -= 1
        left_child = AdaptiveNode(self.free_index, 0, "NYT", True)
        self.free_index -= 1

        internal, self.NYT = self.NYT, left_child

        internal.weight = 1
        internal.character = ""
        internal.external = False
        internal.left = left_child
        internal.right = right_child

        right_child.parent = internal
        left_child.parent = internal

        self.leaves[char] = right_child
        self.node_weights[1].add(right_child)
        if internal != self.root:
            self.node_weights[1].add(internal)

        self.leaf_weights[1].add(right_child)
        self.update(internal)

    def update(self, node):
        while node != self.root:
            if node.parent.left == self.NYT:
                change = max(self.leaf_weights[node.weight], key=lambda item: item.index)
                interchange(node, change)
            else:
                change = max(self.node_weights[node.weight], key=lambda item: item.index)
                interchange(node, change)

            if node.external:
                update_weight(node, self.leaf_weights)
            update_weight(node, self.node_weights)
            node.weight += 1
            node = node.parent

        self.root.weight += 1

    def encode_text(self, text):
        encoded = []
        for c in text:
            if c not in self.leaves:
                encoded.append(self.get_leaf_code(self.NYT))
                encoded.append(int_to_string(ord(c), 16))
                self.add_new_node(c)
            else:
                encoded.append(self.get_leaf_code(self.leaves[c]))
                self.update(self.leaves[c])

        return add_last_bits("".join(encoded))


    def decode_text(self, text):
        decoded = []
        ind = 0
        while ind < len(text):
            ptr = self.root
            while not ptr.external:
                ptr = ptr.left if text[ind] == '0' else ptr.right
                ind += 1

            if ptr == self.NYT:
                new_char = chr(string_to_int(text[ind:ind+16]))
                decoded.append(new_char)
                self.add_new_node(new_char)
                ind += 16
            else:
                decoded.append(ptr.character)
                self.update(ptr)
        return "".join(decoded)

In [673]:
file_content = read_file_to_string("random_files/file_size_1000kb.txt")

In [None]:
# tree = AdaptiveHuffman()
# t = perf_counter()
# output = tree.encode_text(file_content)
# print(perf_counter() - t)
# write_string_to_binary_file("tak2", output)
# to_decode = read_binary_file_to_string("tak2")
# tree2 = AdaptiveHuffman()
# t = perf_counter()
# output2 = tree2.decode_text(to_decode)
# print(perf_counter() - t)
# print(output2==file_content)

### Algorytm o zmiennym bloku kompresji LZW (Lempel–Ziv–Welch)

In [None]:
def encoding():
    pass


def decoding():
    pass