In [1]:
import os
def compareFile(path1 = "midsummer.txt",path2 = "recon.txt"):
    if os.stat(path1).st_size ==0 or os.stat(path2).st_size == 0:
        print("\tFile is empty")
        return
    f1 = open(path1, "r")  
    f2 = open(path2, "r")   
    i = 0
    
    for line1 in f1:
        i += 1      
        for line2 in f2:
            if line1 != line2:
                print("Differs at line", i, ":")
                # else print that line from both files
                print("\tFile 1:", line1, end='')
                print("\tFile 2:", line2, end='')
                return
            break
    print("The input file and reconstructed file have no difference")
    # closing files
    f1.close()
    f2.close()

In [11]:
import heapq
import os

class HuffmanCoding:
	def __init__(self, path):
		self.path = path
		self.heap = []
		self.codes = {}
		self.reverse_mapping = {}

	class HeapNode:
		def __init__(self, char, freq):
			self.char = char
			self.freq = freq
			self.left = None
			self.right = None

		# defining comparators less_than and equals
		def __lt__(self, other):
			return self.freq < other.freq

		def __eq__(self, other):
			if(other == None):
				return False
			if(not isinstance(other, HeapNode)):
				return False
			return self.freq == other.freq

	# functions for compression:

	def make_frequency_dict(self, text):
		frequency = {}
		for character in text:
			if not character in frequency:
				frequency[character] = 0
			frequency[character] += 1
		return frequency

	def make_heap(self, frequency):
		for key in frequency:
			node = self.HeapNode(key, frequency[key])
			heapq.heappush(self.heap, node)

	def merge_nodes(self):
		while(len(self.heap)>1):
			node1 = heapq.heappop(self.heap)
			node2 = heapq.heappop(self.heap)

			merged = self.HeapNode(None, node1.freq + node2.freq)
			merged.left = node1
			merged.right = node2

			heapq.heappush(self.heap, merged)


	def make_codes_helper(self, root, current_code):
		if(root == None):
			return

		if(root.char != None):
			self.codes[root.char] = current_code
			self.reverse_mapping[current_code] = root.char
			return

		self.make_codes_helper(root.left, current_code + "0")
		self.make_codes_helper(root.right, current_code + "1")


	def make_codes(self):
		root = heapq.heappop(self.heap)
		current_code = ""
		self.make_codes_helper(root, current_code)


	def get_encoded_text(self, text):
		encoded_text = ""
		for character in text:
			encoded_text += self.codes[character]
		return encoded_text


	def pad_encoded_text(self, encoded_text):
		extra_padding = 8 - len(encoded_text) % 8
		for i in range(extra_padding):
			encoded_text += "0"

		padded_info = "{0:08b}".format(extra_padding)
		encoded_text = padded_info + encoded_text
		return encoded_text


	def get_byte_array(self, padded_encoded_text):
		if(len(padded_encoded_text) % 8 != 0):
			print("Encoded text not padded properly")
			exit(0)

		byte_array = bytearray()
		for i in range(0, len(padded_encoded_text), 8):
			byte = padded_encoded_text[i:i+8]
			byte_array.append(int(byte, 2))
		return byte_array


	def compress(self):
		filename, file_extension = os.path.splitext(self.path)
		output_path = filename + "_bin.txt"

		with open(self.path, 'r+') as file, open(output_path, 'wb') as output:
			text = file.read()
			#text = text.rstrip()

			frequency = self.make_frequency_dict(text)
			self.make_heap(frequency)
			self.merge_nodes()
			self.make_codes()

			encoded_text = self.get_encoded_text(text)
			padded_encoded_text = self.pad_encoded_text(encoded_text)

			b = self.get_byte_array(padded_encoded_text)
			output.write(bytes(b))

		print("Compressed")
		return output_path


	""" functions for decompression: """


	def remove_padding(self, padded_encoded_text):
		padded_info = padded_encoded_text[:8]
		extra_padding = int(padded_info, 2)

		padded_encoded_text = padded_encoded_text[8:] 
		encoded_text = padded_encoded_text[:-1*extra_padding]

		return encoded_text

	def decode_text(self, encoded_text):
		current_code = ""
		decoded_text = ""

		for bit in encoded_text:
			current_code += bit
			if(current_code in self.reverse_mapping):
				character = self.reverse_mapping[current_code]
				decoded_text += character
				current_code = ""

		return decoded_text


	def decompress(self, input_path):
		filename, file_extension = os.path.splitext(self.path)
		output_path = filename + "_decompressed" + ".txt"

		with open(input_path, 'rb') as file, open(output_path, 'w') as output:
			bit_string = ""

			byte = file.read(1)
			while(len(byte) > 0):
				byte = ord(byte)
				bits = bin(byte)[2:].rjust(8, '0')
				bit_string += bits
				byte = file.read(1)

			encoded_text = self.remove_padding(bit_string)

			decompressed_text = self.decode_text(encoded_text)
			
			output.write(decompressed_text)

		print("Decompressed")
		return output_path

In [12]:
%%time
original_path = "midsummer.txt"
h = HuffmanCoding(original_path)
compressed_path = h.compress()

Compressed
Wall time: 83 ms


In [13]:
%%time
recon_path = h.decompress(compressed_path)

Decompressed
Wall time: 176 ms


In [14]:
print("Huffman bitwise compression: ")
print("Compressed file path: " + compressed_path)
print("Decompressed file path: " + recon_path)

length_original = os.path.getsize(original_path) 
length_compressed = os.path.getsize(compressed_path) 
compareFile(original_path, recon_path)
print("size of original text is:",length_original)
print("size of compressed text is:",length_compressed)
print("rate of compression is:",length_original/length_compressed)


Huffman bitwise compression: 
Compressed file path: midsummer_bin.txt
Decompressed file path: midsummer_decompressed.txt
The input file and reconstructed file have no difference
size of original text is: 92603
size of compressed text is: 55349
rate of compression is: 1.6730744909573796
