In [6]:
from copy import deepcopy
from scipy.io import wavfile
import numpy as np
from json import dumps

In [2]:
class Heap:
    def __init__(self, key, value):
        self.left = None
        self.right = None
        self.key = key
        self.value = value
        self.flag = None

    def __gt__(self, other):
        if other == None or (not isinstance(other, Heap)):
            return -1
        return self.value > other.value

    def __lt__(self, other):
        if other == None or (not isinstance(other, Heap)):
            return -1
        return self.value < other.value

In [3]:

class TreePath:
    def __init__(self):
        self.temp_dict_var = {}

    def paths(self, root):
        path = []
        self.paths_rec(root, path, 0)

    def paths_rec(self, root, path, path_len):
        if root is None:
            return
        if(len(path) > path_len):
            path[path_len] = root.flag
        else:
            path.append(root.flag)
        path_len += 1
        if root.left is None and root.right is None:
            self.temp_dict_var[root.key] = deepcopy(
                ''.join(str(char) for char in path[1:]))
        else:
            self.paths_rec(root.left, path, path_len)
            self.paths_rec(root.right, path, path_len)


In [4]:
from collections import Counter
from math import log, ceil
import heapq

class HuffmanCoding:
    def __init__(self, orignal_signal):
        self.orignal_signal = orignal_signal
        self.frequencies = self.frequencies_counter()
        self.codes = dict()
        self.keys = dict()
        self.compressed_signal = list()
        self.decompressed_signal = []

    def frequencies_counter(self):
        frequencies = {}
        for value in self.orignal_signal:
            if value not in frequencies:
                frequencies[value] = 0
            frequencies[value] += 1
        return frequencies

    
    def create_codes(self, heap):
        treePath = TreePath()
        treePath.paths(heap[0])
        self.keys = treePath.temp_dict_var
        self.compressed_signal = [self.keys[i] for i in self.orignal_signal]
        for key in self.keys :
            self.codes[self.keys[key]] = key
        return self.compressed_signal, self.codes
    
    
    def compress(self):
        freq_dict = {key: value for key, value in sorted(
            self.frequencies.items(), key = lambda item: item[1])}
        heap = []
        for key in freq_dict:
            node = Heap(key, freq_dict[key])
            heapq.heappush(heap, node)
        counter = 0
        while(len(heap) > 1):
            node1 = heapq.heappop(heap)
            node2 = heapq.heappop(heap)
            node1.flag = 0
            node2.flag = 1
            merged = Heap(None, node1.value + node2.value)
            merged.flag = (counter % 2)
            counter += 1
            merged.left, merged.right = node1, node2
            heapq.heappush(heap, merged)
        return create_codes(heap[0])
 

    
    def decompress(self):
        self.decompressed_signal = np.array([self.codes[i] for i in self.compressed_signal])
        return self.decompressed_signal
    
    
    def save_codes(self):
        with open("compressed.json", "wb") as file:
            file.write(dumps(self.keys).encode("utf-8"))
        print('--->Compressed codes of signal were saved to compressed.json!')
        

In [5]:
fs, orignal_signal = wavfile.read('../task3/song.wav')
orignal_signal = list(orignal_signal)
HuffmanCoding = HuffmanCoding(orignal_signal)
# compressed_signal, code_dict = CompressorObject.fixed_length_helper()


print('\n---Fixed Length Huffman Coding---')
print('Orignal:', orignal_signal[:20])
# print('Compressed:', compressed_signal)
# print('Decompressed:', decompressed_signal)

compressed_signal, code_dict_var = HuffmanCoding.variable_huffman_coding()
# DecompressorObject = HuffmanCoding(compressed_signal, code_dict_var)
decompressed_signal = HuffmanCoding.decompress()
print('\n---Variable Length Huffman Coding---')
print('Compressed:', compressed_signal[:20])
# print('Code Dict:', code_dict_var[:20])
print('Decompressed:', decompressed_signal[:20])


---Fixed Length Huffman Coding---
Orignal: [0, 120, 240, 358, 475, 589, 701, 809, 913, 1013, 1108, 1200, 1286, 1367, 1444, 1516, 1584, 1646, 1705, 1760]

---Variable Length Huffman Coding---
Compressed: ['10101100111111111111', '11111111110111111111', '11000011111111111111', '0111010100111111111', '11111001011011111111', '11000011000011111111', '10100011010011111111', '11111011011001111111', '11001001100111111111', '11000001110101111111', '10100011111111111111', '10011110100101111111', '0000001001001', '11010111101110111111', '11110111011001111111', '11110000111101111111', '1000001000011111111', '0110010001111111111', '0110111111110011111', '11110001110001111111']
Decompressed: [   0  120  240  358  475  589  701  809  913 1013 1108 1200 1286 1367
 1444 1516 1584 1646 1705 1760]
