## Lab 2 - algorytmy kompresji tekstu

Biblioteki:

In [2]:
from heapq import heappop, heappush, heapify

from bitarray import bitarray
from bitarray.util import ba2int
import pandas as pd
import numpy as np

Algorytm statyczny

In [3]:
class Node:
    def __init__(self, value, char=None, left=None, right=None):
        self.value = value
        self.char = char
        self.left = left
        self.right = right

    def __gt__(self, other):
        return self.value > other.value


class StaticHuff:
    def __init__(self, text):
        self.root = self.make_tree(text)
        self.code_chars = dict()
        self.make_codes(self.root, self.code_chars, bitarray())

    def make_tree(self, text):
        chars = dict()

        for c in text:
            chars[c] = chars.get(c, 0) + 1

        leaves = [Node(v, c) for c, v in chars.items()]

        while len(leaves) > 1:
            f, s = heappop(leaves), heappop(leaves)
            heappush(leaves, Node(f.value + s.value, left=f, right=s))
        return leaves[0]

    def make_codes(self, node, code_chars, code_bit):
        if node.char is not None:
            code_chars[node.char] = code_bit

        code_copy = code_bit.copy()

        if node.right is not None:
            code_bit.append(1)
            self.make_codes(node.right, code_chars, code_bit)

        if node.left is not None:
            code_bit = code_copy
            code_bit.append(0)
            self.make_codes(node.left, code_chars, code_bit)

    def encode(self, text):
        bit_encoded = bitarray()
        for c in text:
            bit_encoded.extend(self.code_chars[c])
        return bit_encoded

    def decode(self, encoded_bit):
        node = self.root
        decoded_text = ""
        for bit in encoded_bit:
            if bit:
                node = node.right
            else:
                node = node.left
            if not node.left and not node.right:
                decoded_text += node.char
                node = self.root
        return decoded_text

In [4]:
text = "Text example"
static = StaticHuff(text)
result = static.encode(text)
decoded = static.decode(result)
pd.DataFrame(data=[text, decoded], columns=["text"], index=["Before", "After"])
# decoded

Unnamed: 0,text
Before,Text example
After,Text example


Algorytm dynamiczny


In [5]:
class Node2:
    def __init__(self, value=0, index=0, char=None, left=None, right=None, parent=None):
        self.value = value
        self.char = char
        self.left = left
        self.right = right
        self.index = index
        self.parent = parent


class DynamicHuff:
    def __init__(self):
        self.index = 999999
        NYT = Node2(value=0, index=self.index + 1, char='NYT')
        self.NYT = NYT
        self.root = NYT
        self.leaves = {"NYT": self.root}
        self.values = {0: {self.root}, 1: set()}

    def add_node(self, char):
        node = self.NYT
        node.char = None
        node_right = Node2(value=1, index=self.index, parent=node, char=char)
        node.right = node_right
        node_left = Node2(value=0, index=self.index - 1, parent=node, char="NYT")
        node.left = node_left
        self.NYT = node_left
        self.index -= 2
        self.values[0].add(node_left)
        self.values[1].add(node_right)
        self.leaves[char] = node_right
        self.leaves["NYT"] = node_left
        self.slide_increment(node)

    def slide_increment(self, node):
        while node != self.root:
            node = node.parent
            max_i_n = max(self.values[node.value], key=lambda x: x.index)
            if node != max_i_n:
                node.index, max_i_n.index = max_i_n.index, node.index
                if node.parent != max_i_n.parent:
                    if node.parent.left != node:
                        node.parent.right = max_i_n

                    else:
                        node.parent.left = max_i_n
                    if max_i_n.parent.left != max_i_n:
                        max_i_n.parent.right = node

                    else:
                        max_i_n.parent.left = node
                    if max_i_n.parent != node.parent:
                        max_i_n.parent, node.parent = node.parent, max_i_n.parent
                else:
                    if node.parent.left != node:
                        node.parent.right = max_i_n
                        node.parent.left = node
                    else:
                        node.parent.right = node
                        node.parent.left = max_i_n

            self.values[node.value].remove(node)
            node.value += 1
            if node.value not in self.values:
                self.values[node.value] = set()
            self.values[node.value].add(node)

    def get(self, char):
        node = self.leaves[char]
        code_bit = bitarray()

        while node != self.root:
            if node.parent.left != node:
                code_bit.append(1)

            else:
                code_bit.append(0)

            node = node.parent

        code_bit.reverse()

        return code_bit

    def encode_dynamic(self, text):
        coded_bit = bitarray()
        for char in text:
            if char not in self.leaves:
                coded_char = self.get('NYT')
                # coded_char.frombytes(char.encode('iso-8859-1'))
                coded_bit += coded_char + bitarray(f"{ord(char):08b}")
                self.add_node(char)
            else:
                coded_bit += self.get(char)
                self.slide_increment(self.leaves[char])
        end_of_bits = 8 - len(coded_bit) % 8
        coded_bit = bitarray(f"{end_of_bits:08b}") + coded_bit + bitarray(end_of_bits)
        return coded_bit


def decode_dynamic(encoded_bit):
    dynamic = DynamicHuff()
    node = dynamic.root
    index = 0
    encoded_bit = encoded_bit[8:-ba2int(encoded_bit[:8])]
    decoded_text = ""
    while index < len(encoded_bit):
        while not (node.left is None and node.right is None):
            if not encoded_bit[index]:
                node = node.left
            else:
                node = node.right
            index += 1
        if node.char != "NYT":
            c_decoded = node.char
            dynamic.slide_increment(dynamic.leaves[c_decoded])
        else:
            c_coded = encoded_bit[index:index + 8]
            c_decoded = c_coded.tobytes().decode('latin1')
            dynamic.add_node(c_decoded)
            index += 8
        node = dynamic.root
        decoded_text += c_decoded
    return decoded_text

In [6]:
text = "Text example for decoding and encoding !"
dynamic = DynamicHuff()
encoded = dynamic.encode_dynamic(text)
decoded = decode_dynamic(encoded)
pd.DataFrame(data=[text, decoded], columns=["text"], index=["Before", "After"])

Unnamed: 0,text
Before,Text example for decoding and encoding !
After,Text example for decoding and encoding !


Generowanie plików dla znaków generowanych z rozkładu jednostajnego

In [7]:
def generate_random():
    array_of_n = [400,4000,40000,400000]
    array_of_files = ["in/Random1kB.txt","in/Random10kB.txt","in/Random100kB.txt","in/Random1MB.txt"]
    for index in range(len(array_of_n)):
        numpy_random_array = np.random.randint(0,256,size = array_of_n[index])
        array_ascii = [chr(i) for i in numpy_random_array]
        string = ' '.join(array_ascii)
        f = open(array_of_files[index], "w")
        f.write(string)
        f.close()

In [8]:
generate_random()

Testy

In [9]:
import os

In [19]:
def test():
    path = "./in"
    dir_list = os.listdir(path)
    dir_list.sort()
    if '.DS_Store' in dir_list:
        dir_list = dir_list[1:]
    array_compress = []
    for file_name in dir_list:
        array_both = []
        save_file = f"out/static_{file_name}.txt"
        # print(file_name)
        f = open(f'{path}/{file_name}','r')
        text = f.read()
        static = StaticHuff(text)
        encoded = static.encode(text)
        with open(save_file, "wb+") as f:
            encoded.tofile(f)
        static_compress = f'{(1 - os.path.getsize(save_file) / os.path.getsize(f"{path}/{file_name}"))*100}%'
        array_both.append(static_compress)

        dynamic = DynamicHuff()
        encoded = dynamic.encode_dynamic(text)
        save_file = f"out/dynamic_{file_name}.txt"
        with open(save_file, "wb+") as f:
            encoded.tofile(f)
        dynamic_compress = f'{(1 - os.path.getsize(save_file) / os.path.getsize(f"{path}/{file_name}"))*100}%'
        array_both.append(dynamic_compress)
        array_compress.append(array_both)
    return pd.DataFrame(data = array_compress, columns = ["Static","Dynamic"], index = dir_list)


In [20]:
test()

Unnamed: 0,Static,Dynamic
Gutenberg100kB.txt,41.24218750000001%,29.3232421875%
Gutenberg10kB.txt,40.5859375%,29.19921875%
Gutenberg1MB.txt,41.898345947265625%,29.991722106933594%
Gutenberg1kB.txt,39.84375%,30.56640625%
Linux100kB.txt,34.437244541920045%,26.18881467450902%
Linux10kB.txt,26.09078458227353%,26.55057718646058%
Linux1MB.txt,26.178189721518365%,24.87690483978794%
Linux1kB.txt,34.97757847533632%,25.291479820627806%
Random100kB.txt,32.29657650084562%,44.75667237083071%
Random10kB.txt,32.85728552881254%,40.667132727454316%


Test czasu

In [12]:
from timeit import default_timer as timer

In [13]:
def time_comparison():
    path = "./in"
    dir_list = os.listdir(path)
    dir_list.sort()
    if '.DS_Store' in dir_list:
        dir_list = dir_list[1:]
    array_times = []
    for file_name in dir_list:
        array_both = []
        # save_file = f"out/static_{file_name}.txt"
        f = open(f'{path}/{file_name}','r')
        text = f.read()

        static = StaticHuff(text)
        start = timer()
        encoded = static.encode(text)
        # print(encoded)
        static.decode(encoded)
        end = timer()
        array_both.append(f'{end-start}s')

        dynamic = DynamicHuff()
        start = timer()
        encoded = dynamic.encode_dynamic(text)
        decode_dynamic(encoded)
        end = timer()
        array_both.append(f'{end-start}s')
        array_times.append(array_both)
    return pd.DataFrame(data = array_times, columns = ["Static","Dynamic"], index = dir_list)



In [14]:
time_comparison()

Unnamed: 0,Static,Dynamic
Gutenberg100kB.txt,0.040652416995726526s,1.5099155829520896s
Gutenberg10kB.txt,0.0034628340508788824s,0.14055541693232954s
Gutenberg1MB.txt,0.38552312506362796s,28.956765167065896s
Gutenberg1kB.txt,0.000358375022187829s,0.00802366598509252s
Linux100kB.txt,0.03409291594289243s,1.0639557089889422s
Linux10kB.txt,0.0037334590451791883s,0.09052545798476785s
Linux1MB.txt,0.37988454091828316s,11.134941417025402s
Linux1kB.txt,0.000414749956689775s,0.008897082996554673s
Random100kB.txt,0.029194167000241578s,0.6921473329421133s
Random10kB.txt,0.0029049169970676303s,0.0784751670435071s


Przykładowe algorytmy kompresji:
https://dzone.com/articles/crunch-time-10-best-compression-algorithms

In [22]:
from compress import Compressor

Testy dla innego algorytmu kompresji (gzip)

In [43]:
def test2():
    path = "./in"
    dir_list = os.listdir(path)
    dir_list.sort()
    if '.DS_Store' in dir_list:
        dir_list = dir_list[1:]
    array_compress = []
    for file_name in dir_list:
        array_both = []
        save_file = f"out/additional_{file_name}.txt"
        # print(file_name)
        f = open(f'{path}/{file_name}','r')
        text = f.read()
        c = Compressor()
        c.use_gzip()
        binary = c.compress(text)
        encoded = bitarray()
        encoded.frombytes(binary)
        with open(save_file, "wb+") as f:
            encoded.tofile(f)
        compress = f'{(1 - os.path.getsize(save_file) / os.path.getsize(f"{path}/{file_name}"))*100}%'
        array_both.append(compress)
        array_compress.append(array_both)
    return pd.DataFrame(data = array_compress, columns = ["Additional"], index = dir_list)

In [44]:
test2()

Unnamed: 0,Additional
Gutenberg100kB.txt,60.7451171875%
Gutenberg10kB.txt,51.67968750000001%
Gutenberg1MB.txt,63.63382339477539%
Gutenberg1kB.txt,42.7734375%
Linux100kB.txt,71.45548798723955%
Linux10kB.txt,71.32655057718647%
Linux1MB.txt,78.04380976540632%
Linux1kB.txt,55.336322869955154%
Random100kB.txt,43.75994475967456%
Random10kB.txt,41.28632777389394%


W tym przypadku algorytm dla większości działa dużo lepiej (uzyskał lepszy współczynnik kompresji)

In [68]:
def test3():
    path = "./in"
    dir_list = os.listdir(path)
    dir_list.sort()
    if '.DS_Store' in dir_list:
        dir_list = dir_list[1:]
    array_compress = []
    for file_name in dir_list:
        array_both = []
        save_file = f"out/additional_{file_name}.txt"
        # print(file_name)
        f = open(f'{path}/{file_name}','r')
        text = f.read()
        c = Compressor()
        c.use_bz2()
        binary = c.compress(text)
        encoded = bitarray()
        encoded.frombytes(binary)
        with open(save_file, "wb+") as f:
            encoded.tofile(f)
        compress = f'{(1 - os.path.getsize(save_file) / os.path.getsize(f"{path}/{file_name}"))*100}%'
        array_both.append(compress)
        array_compress.append(array_both)
    return pd.DataFrame(data = array_compress, columns = ["Additional"], index = dir_list)

In [69]:
test3()

Unnamed: 0,Additional
Gutenberg100kB.txt,66.4306640625%
Gutenberg10kB.txt,55.0390625%
Gutenberg1MB.txt,72.84202575683594%
Gutenberg1kB.txt,38.37890625%
Linux100kB.txt,76.19479613199083%
Linux10kB.txt,69.7906476227744%
Linux1MB.txt,85.45431483941601%
Linux1kB.txt,51.300448430493276%
Random100kB.txt,58.48269236542676%
Random10kB.txt,54.91860581244382%


Użycie algorytmu bzip2 też poprawiło współczynnik konwersji
