In [339]:
from tqdm.notebook import tqdm
from collections import defaultdict
from queue import PriorityQueue
import bitstring
import json
import matplotlib.pyplot as plt
import math
import random

In [342]:
class HuffmanNode:

   def __init__(self, name, value, bit=None, parent=None):
      self.name = name
      self.value = value
      self.parent = parent
      self.bit = bit

   def __lt__(self, obj):
      return self.value < obj.value

   def __str__(self):
      parent_val = "None" if self.parent is None else self.parent.name
      return f"Name: {self.name}, Value: {self.value}, Bit: {self.bit}, Parent: {parent_val}"

class Huffman:

   def __init__(self):
      pass

   def encode(self, message):
      assert type(message)==bytes, "Message should be Byte-String"
      pq = PriorityQueue()
      counts = defaultdict(int)
      base_nodes = {}
      for byte in message:
         counts[byte] += 1
      for name, value in counts.items():
         huffman_node = HuffmanNode(name, value)
         pq.put(huffman_node)
         base_nodes[name] = huffman_node
      while pq.qsize() > 1:
         left_node = pq.get()
         right_node = pq.get()
         left_node.bit, right_node.bit = "0", "1"
         internal_node = HuffmanNode(f"( {left_node.name}, {right_node.name} )",
                                    left_node.value+right_node.value)
         left_node.parent, right_node.parent = internal_node, internal_node
         pq.put(internal_node)
      translation, reverse_translation = dict(), dict()
      for name, node in base_nodes.items():
         translate = ""
         curr_node = node
         while curr_node.bit is not None:
            translate = curr_node.bit + translate
            curr_node = curr_node.parent
         translation[name] = translate
         reverse_translation[translate] = name
      print("Encoding Message...")
      encoded = "".join(translation[byte] for byte in tqdm(message))
      extra = len(encoded)%8
      encoded += "0"*extra
      reverse_translation_encode = bytes(json.dumps(reverse_translation), 'utf-8')
      preamble = "{0:03b}".format(extra) + "{0:013b}".format(len(reverse_translation_encode))
      preamble = bitstring.BitArray(bin=preamble).tobytes()
      encoded = bitstring.BitArray(bin=encoded).tobytes()
      encoded = preamble + reverse_translation_encode + encoded
      return encoded

   def decode(self, encoded):
      try:
         binary = ""
         for byte in encoded:
            binary += "{0:08b}".format(byte)
         extra = int(binary[:3], 2)
         reverse_len = int(binary[3:16], 2)
         reverse_translation = json.loads(encoded[2:2+reverse_len].decode('utf-8'))
         message = binary[(2+reverse_len)*8:len(binary)-extra]

         i = 0
         decoded = ""
         max_len = max(len(key) for key in reverse_translation)
         print("Decoding the message...")
         pb = tqdm(total=len(message))
         while i < len(message):
            collect = ""
            while i < len(message) and collect not in reverse_translation:
               collect += message[i]
               assert len(collect)<=max_len
               pb.update(1)
               i += 1
            if collect in reverse_translation:
               byte = reverse_translation[collect]
               byte = "{0:08b}".format(byte)
               decoded += byte
         assert len(decoded)%8 == 0
         decoded = bitstring.BitArray(bin=decoded).tobytes()
         return decoded
      except:
         print("Encoding Scheme Does not Match!!")
         return b""

In [347]:
filename = "ProfileSm.tin"
with open(filename, "rb") as f:
   raw = f.read()

huffman = Huffman()
encoded = huffman.encode(raw)
with open(filename+".hf", "wb") as f:
   f.write(encoded)

assert raw == huffman.decode(encoded)
compression_ratio = 100*(1-len(encoded)/len(raw))
compression_ratio

Encoding Message...


  0%|          | 0/125805 [00:00<?, ?it/s]

Decoding the message...


  0%|          | 0/581525 [00:00<?, ?it/s]

40.992806327252495