Создание системы сжатия и распаковки данных

Описание:

Ваша задача — разработать программу на Python, которая выполняет сжатие и распаковку текстовых данных с использованием алгоритма сжатия Хаффмана. Задача разбивается на следующие этапы:

Часть 1: Создание алгоритма сжатия Хаффмана

1. Разработайте алгоритм, который будет строить дерево Хаффмана на основе частоты символов в тексте.

2. Создайте таблицу символов и их кодовых представлений на основе дерева Хаффмана.

3. Напишите функцию, которая сжимает текстовые данные, заменяя символы исходного текста их кодами Хаффмана.

Часть 2: Создание алгоритма распаковки

1. Разработайте алгоритм, который будет распаковывать сжатые данные, восстанавливая исходный текст.

2. Напишите функцию, которая будет принимать сжатые данные и распаковывать их в исходный текст, используя таблицу символов и кодовых представлений.

Часть 3: Тестирование системы сжатия и распаковки

1. Протестируйте вашу программу, сжимая и распаковывая разные текстовые данные.

2. Оцените степень сжатия, сравнивая размер сжатых данных с размером исходных данных.

In [1]:
from collections import Counter
import heapq as hq
import numpy as np
import sys

MAX_CODE_LEN = 16
# Максимально допустимая длина кода Хаффмана, поддерживаемая реализацией; влияет на размер таблиц для распаковки.

In [2]:
def compress(string_to_compress):
    
    '''
    Возвращает сжатую строку и словарь "символ-код"
    '''
    
    # класс для представления узла дерева Хаффмана
    class node:
        def __init__(self, char, freq, left=None, right=None):
            self.char = char
            self.freq = freq
            self.left = left
            self.right = right        
        
        def __str__(self):
            return str(self.char)
    
        def __eq__(self, other):
            return self.freq == other.freq
         
        def __ne__(self, other):
            return self.freq != other.freq

        def __lt__(self, other):
            return self.freq < other.freq

        def __le__(self, other):
            return self.freq <= other.freq

        def __gt__(self, other):
            return self.freq > other.freq

        def __ge__(self, other):
            return self.freq >= other.freq
    
    if string_to_compress == "":
        print("Получена пустая строка")
        return "", {}
    
    ch_counts = Counter(string_to_compress)
    # подсчет количества символов

    p_queue = [node(ch, ch_counts[ch]) for ch in ch_counts]
    # создание списка узлов-листьев дерева Хаффмана

    hq.heapify(p_queue)
    # преобразование списка в очередь с приоритетами (in-place)

    # построение дерева Хаффмана
    while len(p_queue) >= 2:
        left = hq.heappop(p_queue)
        right = hq.heappop(p_queue)
        new = node('#', left.freq + right.freq, left, right)
        hq.heappush(p_queue, new)
    
    code_dict = {}
    # словарь кодов

    # обход дерева, заполнение словаря
    def get_codes(tree, code):
    
        if tree is None: return
    
        if tree.left is None:
            code_dict[tree.char] = code
            return
    
        get_codes(tree.left, code + '0')
        get_codes(tree.right, code + '1')

    get_codes(p_queue[0], "")
    
    # обработать случай одного символа в строке
    if len(code_dict) == 1:
        code_dict[list(code_dict.keys())[0]]='0'
    
    # сжатие
    compressed_string = ""
    for ch in string_to_compress:
        compressed_string += code_dict[ch]
    
    len_before = len(string_to_compress)
    len_after = len(compressed_string)
    print("Строка до сжатия: {} символов, после сжатия: {:.1f} байт, {:.2f} бита на символ. ".format(
        len_before, len_after/8.0, len_after/len_before))
    
    return compressed_string, code_dict

In [3]:
def decompress(compressed_string, code_dict):
    
    '''
    Принимает на вход сжатую строку и словарь "символ-код", и возвращает распакованную строку
    '''

    # проверка выполнения условия на максимальную длину кодов
    try:
        m_code_len = max([len(value) for value in list(code_dict.values())])
    except:
        raise ValueError("Неверный формат словаря кодов.")
    if m_code_len > MAX_CODE_LEN:
        raise ValueError("Присутствует код Хаффмана длины {}, которая больше допустимой ({}).".format(m_code_len, MAX_CODE_LEN))

    # создание таблиц для распаковки
    lut_ch = np.zeros(2**MAX_CODE_LEN, dtype='U1')
    lut_len = np.zeros(2**MAX_CODE_LEN, dtype='uint8')
    #print(sys.getsizeof(lut_ch), sys.getsizeof(lut_len))
    
    for (ch, code) in code_dict.items():
    
        code_len = len(code)
        pad_len = MAX_CODE_LEN - code_len
        
        try:
            begin = int(code, 2) << pad_len
        except:
            raise ValueError("Неверный формат словаря кодов.")
        
        end = begin + 2**pad_len
    
        #print("{} {:016b} {:016b}".format(code, begin, end-1))
    
        for i in range(begin, end):
            lut_ch[i] = ch
            lut_len[i] = code_len
            
    # распаковка
    cursor = 0
    decompressed_string = ""

    while True:
    
        chunk = compressed_string[cursor:cursor + MAX_CODE_LEN]
        chunk_len = len(chunk)
        if chunk_len == 0: break
    
        pad_len = MAX_CODE_LEN - chunk_len
        if pad_len > 0: chunk += '0'* pad_len
    
        try:
            index = int(chunk, 2)
        except:
            raise ValueError("Неверный формат сжатой строки.")
            break
    
        ch = lut_ch[index]
        to_skip = lut_len[index]
        
        if to_skip == 0: raise ValueError("Неверный формат словаря кодов.")
    
        decompressed_string += ch
        cursor += to_skip
        
    return decompressed_string

In [10]:
string_to_compress = '''Data Engineer -- это оооооочень круто!'''

compressed_string, code_dict = compress(string_to_compress)

decompressed_string = decompress(compressed_string, code_dict)

print()
print(decompressed_string)


Строка до сжатия: 38 символов, после сжатия: 19.5 байт, 4.11 бита на символ. 

Data Engineer -- это оооооочень круто!
