# Huffman Coding

In [2]:
import os
import heapq
class BTNode:
    
    def __init__(self, value, freq):
        self.value = value
        self.freq = freq
        self.left = None
        self.right = None
    
    # for comparision of nodes in heap
    def __lt__(self, other):
        return self.freq < other.freq
    
    def __eq__(self, other):
        return self.freq == other.freq

class HuffmanCoding:
    
    def __init__(self, path):
        self.path = path
        self.__heap = []
        self.__codes = {}
        self.__rev_codes = {}
    
    def __buildFreqDict(self, text):
        freq_dict = {}
        for ele in text:
            freq_dict[ele] = freq_dict.get(ele, 0) + 1
        return freq_dict
    
    def __buildHeap(self, freq_dict):
        for ele in freq_dict:
            freq = freq_dict[ele]
            newNode = BTNode(ele, freq)
            heapq.heappush(self.__heap, newNode)
    
    def __buildTree(self):
        while len(self.__heap) > 1:
            Node1 = heapq.heappop(self.__heap)
            Node2 = heapq.heappop(self.__heap)
            newNode = BTNode(None, Node1.freq + Node2.freq)
            newNode.left = Node1
            newNode.right = Node2
            heapq.heappush(self.__heap, newNode)
            
    def __buildCodesHelper(self, root, curr_bits):
        if root is None:
            return
        if root.value:
            self.__codes[root.value] = curr_bits
            self.__rev_codes[curr_bits] = root.value
            return
        self.__buildCodesHelper(root.left, curr_bits+'0')
        self.__buildCodesHelper(root.right, curr_bits+'1')
    
    def __buildCodes(self):
        root = heapq.heappop(self.__heap)
        self.__buildCodesHelper(root, "")
        
    def __getEncodedText(self, text):
        en_text = ''
        for char in text:
            en_text += self.__codes[char]
        return en_text
    
    def __getPaddedEnText(self, encoded_text):
        padded_amt = 8 - (len(encoded_text) % 8)
        for i in range(padded_amt):
            encoded_text += '0'
        padding_info = "{0:08b}".format(padded_amt)
        padded_encoded_text = padding_info + encoded_text
        return padded_encoded_text
    
    def __getBytesArray(self, pEnText):
        array = []
        for i in range(0,len(pEnText),8):
            byte = pEnText[i:i+8]
            array.append(int(byte, 2))
        return array
        
    def compress(self): # get file and compress it to binary file
        
        # get file from path
        file_name, file_ext = os.path.splitext(self.path)
        output_path = file_name + '.bin'
        
        with open(self.path, 'r+') as file, open(output_path, 'wb') as output:
            
            # read text
            text = file.read()
            text = text.rstrip()

            # make freq dict
            freq_dict = self.__buildFreqDict(text)

            # construct heap from dict
            self.__buildHeap(freq_dict)

            # construct binary tree
            self.__buildTree()

            # construct codes from tree
            self.__buildCodes()

            # create encoded text from code
            encoded_text = self.__getEncodedText(text)

            # pad encoded text
            padded_en_text = self.__getPaddedEnText(encoded_text)

            # get the bytes
            bytes_array = self.__getBytesArray(padded_en_text)
            final_bytes = bytes(bytes_array)

            # put encoded text into binary file and return the file
            output.write(final_bytes)
            
        print('COMPRESSED!')
        return output_path
    
    def __removePadding(self, text):
        padded_info = text[:8]
        padding = int(padded_info, 2)
        text = text[8:]
        actual_text = text[:-1*padding]
        return actual_text
    
    def __decode(self, text):
        dec_text = ''
        curr_bits = ''
        for bit in text:
            curr_bits += bit
            if curr_bits in self.__rev_codes:
                char = self.__rev_codes[curr_bits]
                dec_text += char
                curr_bits = ''
        return dec_text
    
    def decompress(self, input_path):
        file_name, file_ext = os.path.splitext(self.path)
        output_path = file_name + '_decompressed.txt'
        with open(input_path, 'rb') as file, open(output_path, 'w') as output:
            bit_string = ''
            byte = file.read(1)
            while byte:
                byte = ord(byte) # converts to integer
                bits = bin(byte)[2:].rjust(8, '0') # bin() gives in the form b'1001 that we convert to 8 bits
                bit_string += bits
                byte = file.read(1)
            
            actual_text = self.__removePadding(bit_string)
            decompressed_text = self.__decode(actual_text)
            output.write(decompressed_text)
        print('DECOMPRESSED!')
        return output_path

In [3]:
path = 'sample.txt'
h = HuffmanCoding(path)
output_path = h.compress()
print(h.decompress(output_path))

COMPRESSED!
DECOMPRESSED!
sample_decompressed.txt
