# Solutions to Problem Set 7 -- Coding Part

**This problem set is part of the course "Data Compression With Deep Probabilistic Models" by Prof. Robert Bamler at University of Tuebingen, Germany. You can find more course materials (lecture notes, video recordings, and solutions) at the course website, https://robamler.github.io/teaching/compress21/**

**Problem Set Published:** 9 June 2021<br>
**Problem Set Discussed:** 14 June 2021

Please see accompanying PDF document for instructions.

## Naive ANS Coder from the lecture

Below is a copy of the implementation of the ANS coder that we implemented in the lecture.
While it is a logically correct entropy coder, it is not yet quite useful because, as discussed at the end of the lecture, its runtime scales quadratically in the number of encoded symbols.

In [1]:
class AnsCoder:
    def __init__(self, precision):
        self.precision = precision
        self.mask = (1 << precision) - 1
        self.compressed = 1

    def encode(self, symbol, scaled_probabilities):
        z = self.compressed % scaled_probabilities[symbol]
        self.compressed //= scaled_probabilities[symbol]
        for prob in scaled_probabilities[:symbol]:
            z += prob
        self.compressed = (self.compressed << self.precision) | z

    def decode(self, scaled_probabilities):
        z = self.compressed & self.mask
        self.compressed >>= self.precision

        for i, prob in enumerate(scaled_probabilities):
            if prob > z:
                symbol = i
                break
            else:
                z -= prob

        self.compressed = self.compressed * scaled_probabilities[symbol] + z
        return symbol

### Our tests from the lecture

In [2]:
# n = 2**precision
precision = 4 # ==> n = 16
coder = AnsCoder(precision)

scaled_probabilities1 = [3, 7, 2, 4]
scaled_probabilities2 = [8, 2, 2, 4]
scaled_probabilities3 = [1, 5, 3, 3, 4]

coder.encode(1, scaled_probabilities1)
coder.encode(0, scaled_probabilities2)
coder.encode(4, scaled_probabilities3)

print(f'{coder.compressed:b}')

print(coder.decode(scaled_probabilities3))
print(coder.decode(scaled_probabilities2))
print(coder.decode(scaled_probabilities1))
# Should print encoded symbols in reverse order.

11100
4
0
1


## Problem 7.1

Below is a skeleton implementation of a `StreamingAnsCoder` class.
Follow the instructions in the accompanying PDF document to fill in the missing parts.

In [3]:
class StreamingAnsCoder:
    def __init__(self, precision, compressed=None):
        # YOUR TASK (Problem 7.1 (e)): add an (optional) parameter `compressed` that
        # accepts an initial compressed representation from which we can then decode.
        self.precision = precision
        self.mask = (1 << precision) - 1
        
        if compressed is None:
            self.bulk = []
            self.head = 1 # We could technically initialize this with zero too.
        else:
            self.bulk = compressed.copy()
            self.head = 0
            if len(self.bulk) != 0:
                self.head = self.bulk.pop()
            if len(self.bulk) != 0:
                self.head = (self.head << precision) | self.bulk.pop()

    def encode(self, symbol, scaled_probabilities):
        # SOLUTION (Problem 7.1 (c)): uphold invariants for `self.head`
        if (self.head >> self.precision) >= scaled_probabilities[symbol]:
            self.bulk.append(self.head & self.mask)
            self.head >>= self.precision

        z = self.head % scaled_probabilities[symbol]
        self.head //= scaled_probabilities[symbol]
        for prob in scaled_probabilities[:symbol]:
            z += prob
        self.head = (self.head << self.precision) | z

    def decode(self, scaled_probabilities):
        # SOLUTION (Problem 7.1 (d)): uphold invariants for `self.head` and make sure
        # `self.decode` exactly inverts `self.encode`
        z = self.head & self.mask
        self.head >>= self.precision

        for i, prob in enumerate(scaled_probabilities):
            if prob > z:
                symbol = i
                break
            else:
                z -= prob

        self.head = self.head * scaled_probabilities[symbol] + z
        
        if (self.head >> self.precision) == 0 and len(self.bulk) != 0:
            self.head = (self.head << self.precision) | self.bulk.pop()
        
        return symbol
    
    def get_compressed(self):
        # SOLUTION (Problem 7.1 (e)): return the compressed representation as a list
        # of integers with `self.precision` bits each.
        compressed = self.bulk.copy()
        head = self.head
        while head != 0:
            compressed.append(head & self.mask)
            head >>= self.precision
        return compressed

### Test for Problems 7.1 (c) and (d)

Use this test (and possibly some additional tests of your own) to debug your implementations of the `encode` and `decode` methods.

In [4]:
import numpy as np

In [5]:
def random_model_and_symbol(seed, precision, min_alphabet_size=2, max_alphabet_size=11):
    """Creates a reproducible pseudorandom model and draws a reproducible pseudorandom symbol from it."""
    rng = np.random.RandomState(seed)
    alphabet_size = rng.choice(range(min_alphabet_size, max_alphabet_size + 1))

    # Ensure that all scaled_probabilities are nonzero and that they add up to `(1 << precision)`
    assert alphabet_size <= (1 << precision)
    scaled_probabilities = ((1 << precision) * rng.dirichlet([1] * alphabet_size)).astype(np.int64) + 1
    for _ in range(scaled_probabilities.sum() - (1 << precision)):
        scaled_probabilities[scaled_probabilities.argmax()] -= 1

    # Draw a random symbol and calculate its information content
    symbol = rng.choice(alphabet_size, p=(1/(1 << precision)) * scaled_probabilities)
    inf_content = precision - np.log2(scaled_probabilities[symbol])
    return scaled_probabilities, symbol, inf_content

In [6]:
precision = 12
num_symbols = 1000
master_seed = 123

coder = StreamingAnsCoder(precision)
total_inf_content = 0
for i in range(num_symbols):
    scaled_probabilities, symbol, inf_content = random_model_and_symbol(
        master_seed * num_symbols + i, precision)
    coder.encode(symbol, scaled_probabilities)
    total_inf_content += inf_content

bitrate = (len(coder.bulk) + 2) * precision
print(f'Encoded {num_symbols} random symbols with a total information content of ' +
      f'{total_inf_content:.2f} bits into {bitrate} bits.')
print(f'- absolute overhead: {bitrate - total_inf_content:.2f} bits')
print(f'- relative overhead: {100 * (bitrate - total_inf_content) / total_inf_content:.2g}% (expect about 1%)')

for i in reversed(range(num_symbols)):
    scaled_probabilities, expected_symbol, _ = random_model_and_symbol(
        master_seed * num_symbols + i, precision)
    symbol = coder.decode(scaled_probabilities)
    assert symbol == expected_symbol

print('Successfully reconstructed original message.')

Encoded 1000 random symbols with a total information content of 2001.55 bits into 2016 bits.
- absolute overhead: 14.45 bits
- relative overhead: 0.72% (expect about 1%)
Successfully reconstructed original message.


### Test for Problem 7.1 (e)

Use this test (and possibly some additional tests of your own) to debug your implementations of the `get_compressed` method and the constructor.

In [7]:
precision = 12
num_symbols = 1000
master_seed = 456

encoder = StreamingAnsCoder(precision)
total_inf_content = 0
for i in range(num_symbols):
    scaled_probabilities, symbol, inf_content = random_model_and_symbol(
        master_seed * num_symbols + i, precision)
    encoder.encode(symbol, scaled_probabilities)
    total_inf_content += inf_content

compressed = encoder.get_compressed()
bitrate = (len(encoder.bulk) + 2) * precision
print(f'Encoded {num_symbols} random symbols with a total information content of ' +
      f'{total_inf_content:.2f} bits into {bitrate} bits.')
print(f'- absolute overhead: {bitrate - total_inf_content:.2f} bits')
print(f'- relative overhead: {100 * (bitrate - total_inf_content) / total_inf_content:.2g}% (expect about 1%)')

decoder = StreamingAnsCoder(precision, compressed)
for i in reversed(range(num_symbols)):
    scaled_probabilities, expected_symbol, _ = random_model_and_symbol(
        master_seed * num_symbols + i, precision)
    symbol = decoder.decode(scaled_probabilities)
    assert symbol == expected_symbol

print('Successfully reconstructed original message.')

Encoded 1000 random symbols with a total information content of 2101.30 bits into 2124 bits.
- absolute overhead: 22.70 bits
- relative overhead: 1.1% (expect about 1%)
Successfully reconstructed original message.


## Comparison to Symbol Codes

The following additional tests compare the compression performance of our ANS implementation to Shannon and Huffman Coding, using the Huffman Coder we implemented on Problem Set 2.

In [8]:
import heapq

In [9]:
# Copied from solutions to Problem Set 2

class HuffmanEncoder:
    """A code book for encoding with a Huffman code"""
    
    def __init__(self, probabilities):
        """Create an optimal prefix code using the Huffman algorithm.
        
        The alphabet is assumed to be of the form
        {0, 1, 2, ..., probabilities.len() - 1}.

        Args:
            probabilities (list or numpy array): The probabilities of each symbol
                in the alphabet. The first entry is the probability of symbol `0`,
                the second entry is the probability of symbol `1`, and so on.
                Must be nonnegative and sum to 1 (up to rounding errors) for the
                Huffman algorithm to be correct.
        """
        
        parent_nodes = [None] * len(probabilities)
        
        roots = []
        # DONE: turn `roots` into a binary heap of pairs `(probability, symbol)`,
        # representing the set `R` from Algorithm 2 on the problem set.
        # (You may want to refer back to the code examples for Problem 1.3 from
        # last week's problem set.)
        for symbol, p in enumerate(probabilities):
            heapq.heappush(roots, (p, symbol))
        
        while len(roots) > 1:
            # DONE: implement the rest of the algorithm, as described on the problem set:
            # - update the `roots` heap by popping of two items and pushing back one;
            # - you may find it easiest to initially append `None` elements to
            #   `parent_nodes` and to mutate them once their parent is inserted;
            # - feel free to use a different approach if you find it easier to do so.
            weight1, index1 = heapq.heappop(roots)
            weight2, index2 = heapq.heappop(roots)
            new_index = len(parent_nodes)
            new_node = (weight1 + weight2, new_index)
            heapq.heappush(roots, new_node)
            parent_nodes.append(None)
            parent_nodes[index1] = (new_index, False)
            parent_nodes[index2] = (new_index, True)

        self.parent_nodes = parent_nodes
        
    def __getitem__(self, symbol):
        """Encode a single symbol.

        Args:
            symbol (int): A symbol from the alphabet {0, 1, ..., len-1}.
                
        Returns:
            The code word for `symbol`.
        """
        assert 0 <= symbol and symbol < len(self)
        
        codeword_reverse = []
        index = symbol
        while True:
            parent = self.parent_nodes[index]
            if parent is None:
                # Found root node.
                break
            else:
                index, bit = parent
                codeword_reverse.append(bit)
        
        return list(reversed(codeword_reverse))
    
    def __len__(self):
        """Returns the size of the alphabet."""
        # A binary tree with `N` leaves has `M = 2*N - 1` nodes, regardless of
        # its shape. Therefore, `N = (M + 1) / 2`.
        return (len(self.parent_nodes) + 1) // 2

In [10]:
def compare(num_symbols=1000, precision=12, master_seed=123, min_alphabet_size=2, max_alphabet_size=11):
    encoder = StreamingAnsCoder(precision)
    total_inf_content = 0
    shannon_bitrate = 0
    huffman_bitrate = 0
    for i in range(num_symbols):
        scaled_probabilities, symbol, inf_content = random_model_and_symbol(
            master_seed * num_symbols + i, precision, min_alphabet_size, max_alphabet_size)
        encoder.encode(symbol, scaled_probabilities)
        total_inf_content += inf_content
        shannon_bitrate += int(np.ceil(inf_content))
        huffman_codebook = HuffmanEncoder(scaled_probabilities)
        huffman_bitrate += len(huffman_codebook[symbol])

    compressed = encoder.get_compressed()
    ans_bitrate = (len(encoder.bulk) + 2) * precision
    print(f'Encoded {num_symbols} random symbols with a total information content of {total_inf_content:.2f} bits.')
    print(f'- ANS (precision={precision}): {ans_bitrate} bits ({ans_bitrate - total_inf_content:.2f} bits or {100 * (ans_bitrate - total_inf_content) / total_inf_content:.2g}% overhead)')
    print(f'- Huffman Coding: {huffman_bitrate} bits ({huffman_bitrate - total_inf_content:.2f} bits or {100 * (huffman_bitrate - total_inf_content) / total_inf_content:.2g}% overhead)')
    print(f'- Shannon Coding: {shannon_bitrate} bits ({shannon_bitrate - total_inf_content:.2f} bits or {100 * (shannon_bitrate - total_inf_content) / total_inf_content:.2g}% overhead)')

    decoder = StreamingAnsCoder(precision, compressed)
    for i in reversed(range(num_symbols)):
        scaled_probabilities, expected_symbol, _ = random_model_and_symbol(
            master_seed * num_symbols + i, precision, min_alphabet_size, max_alphabet_size)
        symbol = decoder.decode(scaled_probabilities)
        assert symbol == expected_symbol

    print('Successfully reconstructed original message.')
    print()

In [11]:
compare(num_symbols=1000, precision=12, min_alphabet_size=2, max_alphabet_size=11)
compare(num_symbols=10000, precision=12, min_alphabet_size=2, max_alphabet_size=11)

Encoded 1000 random symbols with a total information content of 2001.55 bits.
- ANS (precision=12): 2016 bits (14.45 bits or 0.72% overhead)
- Huffman Coding: 2112 bits (110.45 bits or 5.5% overhead)
- Shannon Coding: 2519 bits (517.45 bits or 26% overhead)
Successfully reconstructed original message.

Encoded 10000 random symbols with a total information content of 20705.43 bits.
- ANS (precision=12): 20748 bits (42.57 bits or 0.21% overhead)
- Huffman Coding: 21756 bits (1050.57 bits or 5.1% overhead)
- Shannon Coding: 25868 bits (5162.57 bits or 25% overhead)
Successfully reconstructed original message.



In [12]:
compare(num_symbols=1000, precision=12, min_alphabet_size=2, max_alphabet_size=4)
compare(num_symbols=10000, precision=12, min_alphabet_size=2, max_alphabet_size=4)

Encoded 1000 random symbols with a total information content of 1135.89 bits.
- ANS (precision=12): 1164 bits (28.11 bits or 2.5% overhead)
- Huffman Coding: 1350 bits (214.11 bits or 19% overhead)
- Shannon Coding: 1677 bits (541.11 bits or 48% overhead)
Successfully reconstructed original message.

Encoded 10000 random symbols with a total information content of 11796.06 bits.
- ANS (precision=12): 11844 bits (47.94 bits or 0.41% overhead)
- Huffman Coding: 13735 bits (1938.94 bits or 16% overhead)
- Shannon Coding: 17223 bits (5426.94 bits or 46% overhead)
Successfully reconstructed original message.

