[Neuralink Compression Challenge](https://content.neuralink.com/compression-challenge/README.html)

content.neuralink.com/compression-challenge/data.zip is one hour of raw electrode recordings from a Neuralink implant.

This Neuralink is implanted in the motor cortex of a non-human primate, and recordings were made while playing a video game, like this.

Compression is essential: N1 implant generates ~200Mbps of eletrode data (1024 electrodes @ 20kHz, 10b resolution) and can transmit ~1Mbps wirelessly.
So > 200x compression is needed.
Compression must run in real time (< 1ms) at low power (< 10mW, including radio).

Neuralink is looking for new approaches to this compression problem, and exceptional engineers to work on it.
If you have a solution, email compression@neuralink.com

Leaderboard

Name	Compression ratio	Compressed size	./encode size	./decode size
zip	2.2	63M	231K	480K

Task

Build executables ./encode and ./decode which pass eval.sh. This verifies compression is lossless and measures compression ratio.

Your submission will be scored on the compression ratio it achieves on a different set of electrode recordings.
Bonus points for optimizing latency and power efficiency

Submit with source code and build script. Should at least build on Linux.

Data

$ ls -lah data/
total 143M
193K 0052503c-2849-4f41-ab51-db382103690c.wav
193K 006c6dd6-d91e-419c-9836-c3f320da4f25.wav
...

Uncompressed monochannel WAV files.
5 seconds per file.

## Load Data

In [None]:
! wget https://content.neuralink.com/compression-challenge/data.zip

In [None]:
! wget https://content.neuralink.com/compression-challenge/eval.sh

In [None]:
! unzip data.zip

## Binary encoding of samples and cardinality

Documentation says that sample size is 10bit.
However, WAV files are encoded with sample size 16bit.
This leads to believe that 6 bits are not utilized.

- WAV: 16bit integer PCM, int16, -32768, +32767
- real: 10bit, max uint10 = 2^10 - 1 = 1024 - 1 = 0b0011_1111_1111  0x3FF


Moreover, basic approach is to construct frequency based conding scheme for possible binary numbers.

In [None]:
import os
from scipy.io import wavfile

def get_sample_unique_counts():
  sample_count = dict()
  num_samples_total = 0

  for fname in os.listdir("data"):
      sample_rate, samples = wavfile.read(os.path.join("data", fname))

      for q in samples:
          num_samples_total += 1
          if not q in sample_count:
              sample_count[q] = 0
          sample_count[q] +=1

  return sample_count, num_samples_total

sample_unique_counts, num_samples_total = get_sample_unique_counts()
print("num_samples_unique", len(sample_unique_counts), "num_samples_total", num_samples_total)

In [None]:
import numpy as np

In [None]:
def print_samples_binary(sample_unique_count: dict[int, int]):
  vals = sorted([(k,v) for k,v in sample_unique_count.items()], key=lambda x: x[0])
  for k,v in vals:
    print(np.binary_repr(k, width=16), v)

print_samples_binary(sample_unique_counts)

In [None]:
def print_samples_binary_hist(sample_unique_count: dict[int, int]):
  vals = sorted([(k,v) for k,v in sample_unique_count.items()], key=lambda x: x[1], reverse=True)
  for k,v in vals:
    print(np.binary_repr(k, width=16), v)

print_samples_binary_hist(sample_unique_counts)

In [None]:
def get_bit_set_frequency(sample_unique_counts: dict[int, int]) -> dict[int, int]:
    count = dict()
    for k,v in sample_unique_counts.items():
        for i in range(16):
          b = (1 << i)
          if (k & b) != 0:
            if not b in count:
              count[b] = 0
            count[b] += v
    return count

print_samples_binary(get_bit_set_frequency(sample_unique_counts))

In [None]:
def samples_to_bits(samples: np.array):
  v = np.zeros((samples.shape[0], 16), dtype=np.uint8)
  for i, sample in enumerate(samples):
    for j in range(16):
      b = (1 << j)
      if (sample & b) != 0:
        v[i,j] = 1
  return v

fname = "a90f5eca-fdbe-4a21-94bd-c4c7f32fe365.wav"
sample_rate, samples = wavfile.read(os.path.join("data", fname))
samples_bits = samples_to_bits(samples)
for q in samples_bits[:100]:
  print(q)

In [None]:
def len_sequence_same_per_bit(samples: np.array):
  counts = [[dict() for q in range(16)], [dict() for q in range(16)]]
  prev_bit = [0 for q in range(16)]
  count = [-1 for q in range(16)]

  for s in samples:
    for i,b in enumerate(s):
      if b == prev_bit[i]:
        if count[i] == -1:
          count[i] = 0
        count[i] += 1
      else:
        if count[i] > 0:
          if not count[i] in counts[b][i]:
            counts[b][i][count[i]] = 0
          counts[b][i][count[i]] += 1
          count[i] = 1
          prev_bit[i] = b

  return counts

cont_seq_lens = len_sequence_same_per_bit(samples_bits)

print("cont seq of zeroes lengths count:")
for i in range(16):
  print(i, sorted([(k,v) for k,v in cont_seq_lens[0][i].items()], key=lambda x: x[0]))

print("cont seq of ones lengths count:")
for i in range(16):
  print(i, sorted([(k,v) for k,v in cont_seq_lens[1][i].items()], key=lambda x: x[0]))

In [None]:
import plotly.graph_objects as go

for b in [0,1]:
  fig = go.Figure()
  fig.update_layout(title=f'total number of bits in sequences of "{b}" by length', xaxis_title='len', yaxis_title='count')

  for i in range(16):
    x = []
    y = []

    for l, count in cont_seq_lens[b][i].items():
      x.append(l)
      y.append(count * l)

    fig.add_trace(go.Scatter(x=x, y=y, mode ='markers', name=f"bit{i}"))

  fig.show()

In [None]:
def num_bits_in_seq_longer_than(counts: list[dict[int, int]], min_len: int = 16) -> int:
  sum = 0
  for q in counts:
    for l, count in q.items():
      if l >= min_len:
        sum += count * l
  return sum

nb = 16
num_total_bits = 16 * len(samples)
num_bits_in_seq_longer_than_nb_0 = num_bits_in_seq_longer_than(cont_seq_lens[0], min_len=nb)
num_bits_in_seq_longer_than_nb_1 = num_bits_in_seq_longer_than(cont_seq_lens[1], min_len=nb)

print("num_total_bits", num_total_bits)
print(f"num_bits_in_seq_longer_than({nb}) for zeroes:", num_bits_in_seq_longer_than_nb_0, num_bits_in_seq_longer_than_nb_0 / num_total_bits)
print(f"num_bits_in_seq_longer_than({nb}) for ones:", num_bits_in_seq_longer_than_nb_1, num_bits_in_seq_longer_than_nb_1 / num_total_bits)

In [None]:
def changed_bits(samples: np.array):
  counts = [0]

  for i, s in enumerate(samples):
    if i == 0:
      continue

    num_diff = 0
    for j in range(16):
      if samples[i-1][j] != s[j]:
        num_diff += 1

    counts.append(num_diff)

  return np.array(counts)

samples_changed_bits = changed_bits(samples_bits)

import plotly.express as px
df = px.data.tips()
fig = px.histogram(samples_changed_bits, histnorm='probability density')
fig.show()

In [None]:
from collections import Counter
import itertools

def get_changed_masks(samples):
  for i, s in enumerate(samples):
    if i == 0:
      continue
    m = samples[i-1] ^ s
    yield (m, samples[i-1] & m, s & m)

def count_all_changed_masks():
    for fname in os.listdir("data"):
      sample_rate, samples = wavfile.read(os.path.join("data", fname))
      for s in get_changed_masks(samples):
        yield s

changed_masks = Counter(count_all_changed_masks())
print("num_diff1_masks", len(changed_masks))
for k,v in sorted([(k,v) for k,v in changed_masks.items()], key=lambda x: x[1], reverse=True)[:2000]:
  print(np.binary_repr(k[0], width=16), np.binary_repr(k[1], width=16), np.binary_repr(k[2], width=16), v, v/num_samples_total)

In [None]:
selected_changed_masks = sorted([(k,v) for k,v in changed_masks.items()], key=lambda x: x[1], reverse=True)[:1028]
len(selected_changed_masks), sum(v for k,v in selected_changed_masks), sum(v for k,v in selected_changed_masks) / num_samples_total

In [None]:
selected_changed_masks = [((-1,-1,-1),144835626)] + selected_changed_masks

In [None]:
from collections import Counter
from collections import defaultdict
import itertools

def get_changed_masks_4b(samples):
  for i, s in enumerate(samples):
    if i == 0:
      continue
    m = samples[i-1] ^ s
    m0 = (m & 0xF000, s & m & 0xF000)
    m1 = (m & 0x0F00, s & m & 0x0F00)
    m2 = (m & 0x00F0, s & m & 0x00F0)
    m3 = (m & 0x000F, s & m & 0x000F)
    yield (m0, m1, m2, m3)

def count_all_changed_masks_4b():
    num_total = 0
    count_m0 = defaultdict(int)
    count_m1 = defaultdict(int)
    count_m2 = defaultdict(int)
    count_m3 = defaultdict(int)

    for fname in ["d40b3d0a-21fd-42a8-a0bd-a38a431e9401.wav"]:#os.listdir("data"):
      sample_rate, samples = wavfile.read(os.path.join("data", fname))
      for m0,m1,m2,m3 in get_changed_masks_4b(samples):
        num_total += 1
        count_m0[m0] += 1
        count_m1[m1] += 1
        count_m2[m2] += 1
        count_m3[m3] += 1

    return num_total, (count_m0, count_m1, count_m2, count_m3)

num_total, changed_masks_4b = count_all_changed_masks_4b()

for i in range(4):
  changed_masks = changed_masks_4b[i]
  print(f"num_diff1_4bmasks: {i}", len(changed_masks))
  for k,v in sorted([(k,v) for k,v in changed_masks.items()], key=lambda x: x[1], reverse=True):
    print(np.binary_repr(k[0], width=16), np.binary_repr(k[1], width=16), v, v/num_total)

In [None]:
"""Huffman encoding and decoding. Requires Python >= 3.7."""
from __future__ import annotations

from collections import Counter

from heapq import heapify
from heapq import heappush
from heapq import heappop

from itertools import chain
from itertools import islice

from typing import BinaryIO
from typing import Dict
from typing import Iterable
from typing import Optional
from typing import Tuple


LEFT_BIT = "0"
RIGHT_BIT = "1"
WORD_SIZE = 8  # Assumed to be a multiple of 8.
READ_SIZE = WORD_SIZE // 8
P_EOF = 1 << WORD_SIZE


class Node:
    """Huffman tree node."""

    def __init__(
        self,
        weight: int,
        symbol: Optional[int] = None,
        left: Optional[Node] = None,
        right: Optional[Node] = None,
    ):
        self.weight = weight
        self.symbol = symbol
        self.left = left
        self.right = right

    def is_leaf(self) -> bool:
        """Return `True` if this node is a leaf node, or `False` otherwise."""
        return self.left is None and self.right is None

    def __lt__(self, other: Node) -> bool:
        return self.weight < other.weight


def huffman_tree(weights: Dict[int, int]) -> Node:
    """Build a prefix tree from a map of symbol frequencies."""
    heap = [Node(v, k) for k, v in weights.items()]
    heapify(heap)

    # Pseudo end-of-file with a weight of 1.
    heappush(heap, Node(1, P_EOF))

    while len(heap) > 1:
        left, right = heappop(heap), heappop(heap)
        node = Node(weight=left.weight + right.weight, left=left, right=right)
        heappush(heap, node)

    return heappop(heap)


def huffman_table(tree: Node) -> Dict[int, str]:
    """Build a table of prefix codes by visiting every leaf node in `tree`."""
    codes: Dict[int, str] = {}

    def walk(node: Optional[Node], code: str = ""):
        if node is None:
            return

        if node.is_leaf():
            assert node.symbol
            codes[node.symbol] = code
            return

        walk(node.left, code + LEFT_BIT)
        walk(node.right, code + RIGHT_BIT)

    walk(tree)
    return codes

def _decode(bits: Iterable[str], tree: Node) -> Iterable[int]:
    node = tree

    for bit in bits:
        if bit == LEFT_BIT:
            assert node.left
            node = node.left
        else:
            assert node.right
            node = node.right

        if node.symbol == P_EOF:
            break

        if node.is_leaf():
            assert node.symbol
            yield node.symbol
            node = tree  # Back to the top of the tree.

In [None]:
tree = huffman_tree(sample_unique_counts)
table = huffman_table(tree)
print("len huffman table", len(table))
print(f"Symbol Code\n------ ----")
for k, v in sorted(table.items(), key=lambda x: len(x[1])):
    print(np.binary_repr(k, width=16), v)

In [None]:
freq = [(2786, 34), (1504, 34), (1761, 30), (2529, 28), (2273, 27), (2978, 24), (2209, 24), (2401, 23), (2722, 22), (1248, 20), (2081, 19), (1889, 17), (2337, 17), (1184, 16), (2914, 16), (2465, 15), (736, 15), (1697, 14), (2593, 14), (3042, 14), (1953, 13), (2145, 12), (3106, 11), (1825, 11), (1440, 11), (2017, 10), (864, 10), (1056, 10), (1633, 10), (1569, 10), (1312, 10), (1376, 10), (3170, 9), (3234, 9), (992, 8), (2850, 7), (3490, 7), (928, 7), (2658, 7), (3298, 7), (800, 6), (3426, 6), (3554, 6), (1120, 5), (287, 4), (415, 4), (479, 4), (544, 3), (3362, 3), (3683, 3), (3618, 2), (672, 2), (351, 2), (-545, 2), (223, 2), (4003, 2), (3811, 2), (3747, 2), (608, 1), (4131, 1), (95, 0), (159, 0), (-160, 0), (3939, 0), (4259, 0), (4451, 0), (3875, 0)]

In [None]:
tree = huffman_tree({v[0]: v[1] for v in freq})
table = huffman_table(tree)
print("len huffman table", len(table))
print(f"Symbol Code\n------ ----")
for k, v in sorted(table.items(), key=lambda x: len(x[1])):
    print(k, v)

In [None]:
from collections import Counter
import itertools

def get_is_match_prev(samples, n_prev: int = 256):
  for i, s in enumerate(samples):
    if i < n_prev + 1:
      yield 0
      continue

    is_match = False
    for j in range(n_prev):
      if s == samples[i - n_prev - j]:
        is_match = True

    yield 1 if is_match else 0

def get_is_match_prev_one(fname: str, n_prev: int = 256):
    sample_rate, samples = wavfile.read(os.path.join("data", fname))
    for (s, is_match) in zip(samples, get_is_match_prev(samples, n_prev)):
      yield s, is_match

def count_is_match_prev_one(s):
  sum = 0
  total = 0
  for _, is_match in s:
    total += 1
    sum += is_match
  return sum, total

def lengths_is_match_prev_one(s):
  lengths0 = []
  lengths1 = []

  prev = -1
  count = 0
  for _, is_match in s:
    if prev == -1:
      prev = is_match
      count = 1
      continue

    if is_match == prev:
      count += 1
    else:
      if is_match == 1:
        lengths1.append(count)
      else:
        lengths0.append(count)
      prev = is_match
      count = 1

  return np.array(lengths0), np.array(lengths1)

n_prev = 128
print("n_prev", n_prev)
df_is_match = list(get_is_match_prev_one("d40b3d0a-21fd-42a8-a0bd-a38a431e9401.wav", n_prev=n_prev))

num_matched, total = count_is_match_prev_one(df_is_match)
print("num_matched", num_matched, "num_total", num_total, "ratio_matched", num_matched / num_total)
for i,(s,v) in enumerate(df_is_match[:1000]):
  print(i, np.binary_repr(s, width=16), v)

In [None]:
import math
import plotly.express as px

for n_prev in [2 ** n for n in range(12)]:
  df_is_match = list(get_is_match_prev_one("d40b3d0a-21fd-42a8-a0bd-a38a431e9401.wav", n_prev=n_prev))
  num_matched, total = count_is_match_prev_one(df_is_match)
  compression_ratio = 16 / ((1 - num_matched / num_total) * (16 + 1) + num_matched / num_total * (math.log2(n_prev) + 1))

  print(0)
  fig = px.histogram(lengths_is_match_prev_one(df_is_match)[0])#, histnorm='probability density')
  fig.show()

  print(1)
  fig = px.histogram(lengths_is_match_prev_one(df_is_match)[1])#, histnorm='probability density')
  fig.show()

  print("n_prev", n_prev, "num_matched", num_matched, "num_total", num_total, "ratio_matched", num_matched / num_total, "compression_ratio", compression_ratio)

In [None]:
import os
from scipy.io import wavfile

fname = "b4a354ca-8194-4459-b711-0fd099b117e8.wav"
sample_rate, samples = wavfile.read(os.path.join("data", fname))

num_uncompressed_bytes = len(samples) * 16
print("num_uncompressed_bytes", num_uncompressed_bytes)

In [None]:
import numpy as np
import math

class Encoder():
  def __init__(self, cache_size: int = 128, max_dictionary_use_len: int = 1024, min_dictionary_use_len: int = 8, log_first_n_samples: int = 1000):
    self.cache_size = cache_size
    self.max_dictionary_use_len = max_dictionary_use_len
    self.log_first_n_samples = log_first_n_samples
    self.min_dictionary_use_len = min_dictionary_use_len
    self.cache = dict()
    self.buffer = []

  def __flush(self):
    if len(self.buffer) == 0:
      return

    if len(self.buffer) < self.min_dictionary_use_len:
      for s in self.buffer:
          q = np.binary_repr(s, width=16)
          self.__log(f"{q}: <- {np.binary_repr(s, width=16)}: {len(q)/16:.2f} raw sample (buffer_len({len(self.buffer)}))")
          yield q

    marker = "1" + np.binary_repr(len(self.buffer), width=int(math.log2(self.max_dictionary_use_len)))
    self.__log(f"{marker}: flush buffer, next n({len(self.buffer)}) samples are encoded with dictionary")
    yield marker

    for s in self.buffer:
      q = np.binary_repr(self.__get_cache_idx(s), width=int(math.log2(self.cache_size)))
      self.cache[s] += 1
      self.__log(f"{q}: <- {np.binary_repr(s, width=16)}: {len(q)/16:.2f}")
      yield q

    self.buffer = []

  def __evict_from_cache(self):
    # least frequently used cache eviction policy
    min_k, min_v = None, None
    for k,v in self.cache.items():
      if min_v is None or v < min_v:
        min_k, min_v = k, v
    del self.cache[min_k]

  def __add_to_cache(self, v: np.int16):
    if len(self.cache) > self.cache_size:
      self.__evict_from_cache()
    self.cache[v] = 0

  def __get_cache_idx(self, s: np.int16) -> int:
    for i,(k,v) in enumerate(sorted([(k,v) for k,v in self.cache.items()], key=lambda x: x[1], reverse=True)):
      if s == k:
        return i
    return None

  def __log(self, s):
    if self.log_first_n_samples < 0:
      return
    self.log_first_n_samples -= 1
    print(s)

  def encode(self, samples: np.array):
    # we do not transfer dictionary, decoder has same algorithm, decoder reconstructs dictionary on its own from same sequence
    for i,s in enumerate(samples):
        if s in self.cache:
          if len(self.buffer) >= self.max_dictionary_use_len:
            for q in self.__flush():
              yield q

          self.buffer.append(s)
          continue

        for q in self.__flush():
          yield q

        self.__add_to_cache(s)

        raw_sample = np.binary_repr(s, width=16)
        self.__log(f"{raw_sample}: <- {np.binary_repr(s, width=16)}: {len(raw_sample)/16:.2f} raw sample")
        yield raw_sample

encoded = list(Encoder(cache_size=2**6,max_dictionary_use_len=2**15, min_dictionary_use_len=4).encode(samples))
compressed_bytes = sum(len(s) for s in encoded)
print("compressed_bytes", compressed_bytes, "compression_ratio", num_uncompressed_bytes / compressed_bytes, "num_samples_encoded", len(encoded), "num_samples", len(samples))

In [None]:
import numpy as np
import math

class Encoder2():
  def __init__(self, cache_size: int = 128, max_dictionary_use_len: int = 1024, min_dictionary_use_len: int = 8, log_first_n_samples: int = 1000):
    self.cache_size = cache_size
    self.max_dictionary_use_len = max_dictionary_use_len
    self.log_first_n_samples = log_first_n_samples
    self.min_dictionary_use_len = min_dictionary_use_len
    self.cache = dict()
    self.buffer = []

  def __flush(self):
    if len(self.buffer) == 0:
      return

    if len(self.buffer) < self.min_dictionary_use_len:
      for s in self.buffer:
          q = "0" + np.binary_repr(s, width=16)
          self.__log(f"{q}: <- {np.binary_repr(s, width=16)}: {len(q)/16:.2f} raw sample (buffer_len({len(self.buffer)}))")
          yield q

    marker = "1" + np.binary_repr(len(self.buffer), width=int(math.log2(self.max_dictionary_use_len)))
    self.__log(f"{marker}: flush buffer, next n({len(self.buffer)}) samples are encoded with dictionary")
    yield marker

    for s in self.buffer:
      q = np.binary_repr(self.__get_cache_idx(s), width=int(math.log2(self.cache_size)))
      self.cache[s] += 1
      self.__log(f"{q}: <- {np.binary_repr(s, width=16)}: {len(q)/16:.2f}")
      yield q

    self.buffer = []

  def __evict_from_cache(self):
    # least frequently used cache eviction policy
    min_k, min_v = None, None
    for k,v in self.cache.items():
      if min_v is None or v < min_v:
        min_k, min_v = k, v
    del self.cache[min_k]

  def __add_to_cache(self, v: np.int16):
    if len(self.cache) > self.cache_size:
      self.__evict_from_cache()
    self.cache[v] = 0

  def __get_cache_idx(self, s: np.int16) -> int:
    for i,(k,v) in enumerate(sorted([(k,v) for k,v in self.cache.items()], key=lambda x: x[1], reverse=True)):
      if s == k:
        return i
    return None

  def __log(self, s):
    if self.log_first_n_samples < 0:
      return
    self.log_first_n_samples -= 1
    print(s)

  def encode(self, samples: np.array):
    # we do not transfer dictionary, decoder has same algorithm, decoder reconstructs dictionary on its own from same sequence
    for i,s in enumerate(samples):
        if len(self.buffer) >= self.max_dictionary_use_len:
          for q in self.__flush():
            yield q

        self.buffer.append(s)

    for q in self.__flush():
      yield q

    for q in self.__flush():
      yield q

encoded = list(Encoder2(cache_size=2**7,max_dictionary_use_len=2**14, min_dictionary_use_len=8).encode(samples))
compressed_bytes = sum(len(s) for s in encoded)
print("compressed_bytes", compressed_bytes, "compression_ratio", num_uncompressed_bytes / compressed_bytes)

## Observations Log
- all values do indeed fit into 10bits, there are only 1023 distinct values
- 1024 electrodes (set/not-set electrode data per sample) is already compressed into some 10bit number, which of 1024 electrodes is firing, we do not know based on 10bit number
- there are 1024 differnt values for samples
- 10bit values are not lower-bits, for some reason higher bits are also set. this can mean there is either: A) reserved full space of 16bits or B) there is some structure to bits arrangement already
- value of 0 is not used. looks like it is reserved
- ~most frequent samples have contigious series of zeroes~
- ~most frequent samples have many zeroes~
- ~there are no unused bits among 16bit samples~
- ~bits are set equally frequently, there is not "dead"-bits in samples~
- to achieve 200x compression, reducing single sample 16bit to 10bit or lower would not do it. we need to compress cross-sample information
- ~there is significant continuity in bits across samples in single sequence of samples. akin to columnar data types, column based compression may be useful. maybe even just transposing data and compressing that can be very significant reduction in size.~
- ~max bits sequence is ~2K (both, zeroes and ones)~
- ~by doing "column based same value count encoding for sequences of 16+" 30% of bits can be removed~
- ~consecutive samples are not very different, many samples have only 4 bits difference. (note: does not make sense to repeat whole almost-previous sample all over again)~
- in single file, out of 90K samples, therea are only 1.5K different transitions between samples. in whole dataset there is only 25K different masks. out of 630M samples, that is 0.00003968253968 of total samples. unlikely this is coincidence. this simple heuristic highlights the fact of possible transitions. there is strong fundamental causality between neuron spikes. certain neruons spike only before other neurons. this causality in simplest form is encoded in "possible" transitions.
- there is 1K transitions that are used more than 1K times, which is 98% of samples
- ~calculating differences from previous in words of 4 does not work at all for some of 16bits~
- bits in number not meaningful, this is again due to copmression of 1028 into 10bit number already. what we can actually do is check if this number is the same or not the same compare to others. information is likely encoded in ordering of these numbers. likely they either: A) repeat patterns (oscilations?); B) transition in certian graph of possible transitions (oscilations?);. Some basic herusitic on most recently repeated values may work well.
- keeping cache of last N bit words, and passing either new word or index in cache, is best at N=128 and gives 1.54 compression ratio

## References

* in WAV each sample is bit value
* http://tiny.systems/software/soundProgrammer/WavFormatDocs.pdf
* https://docs.python.org/3/library/wave.html
* https://github.com/go-audio/wav
* provided files wav encode in 16bit per sample (even though doc says 10bit resolution)
* `scipy.io.wavfile.read` does not support 10bit resolution
* https://en.wikipedia.org/wiki/Variable-length_code
* https://en.wikipedia.org/wiki/Prefix_code
* https://rosettacode.org/wiki/Huffman_coding#Python
* https://golang.google.cn/src/compress/bzip2/huffman.go
* https://iopscience.iop.org/article/10.1088/1741-2552/acf5a4

In [None]:
import numpy as np

crs = np.genfromtxt('compressed_ratios.csv', delimiter=',')
crs.shape, crs[:10], crs[-10:]

In [None]:
import pandas as pd
import plotly.express as px

df = pd.DataFrame({"compresseion_ratio": crs})
fig = px.histogram(df, histnorm='probability density', x="compresseion_ratio", nbins=250)
fig.show()

In [None]:
np.percentile(-crs, 64)

In [None]:
import plotly.graph_objects as go

fig = go.Figure(data=[go.Histogram(x=-crs, cumulative_enabled=True, histnorm='probability', nbinsx=270)])
fig.show()

In [None]:
import numpy as np

crs_6bit = np.genfromtxt('compressed_ratios_6bit.csv', delimiter=',')
crs_6bit.shape, crs[:10], crs[-10:]

In [None]:
import pandas as pd
import plotly.express as px

df = pd.DataFrame({"compresseion_ratio": crs_6bit})
fig = px.histogram(df, histnorm='probability density', x="compresseion_ratio", nbins=250)
fig.show()

In [None]:
np.percentile(-crs_6bit, 64)

In [None]:
import plotly.graph_objects as go

fig = go.Figure(data=[go.Histogram(x=-crs_6bit, cumulative_enabled=True, histnorm='probability', nbinsx=270)])
fig.show()

In [None]:
import numpy as np

crs = np.genfromtxt('compressed_ratios.csv', delimiter=',')
crs.shape, crs[:10], crs[-10:]

In [None]:
import pandas as pd
import plotly.express as px

df = pd.DataFrame({"compresseion_ratio": crs})
fig = px.histogram(df, histnorm='probability density', x="compresseion_ratio", nbins=250)
fig.show()

In [None]:
np.percentile(-crs, 65)

In [None]:
import plotly.graph_objects as go

fig = go.Figure(data=[go.Histogram(x=-crs, cumulative_enabled=True, histnorm='probability', nbinsx=270)])
fig.show()

In [None]:
## 4,6,8

In [None]:
import numpy as np

crs = np.genfromtxt('compressed_ratios.csv', delimiter=',')
crs.shape, crs[:10], crs[-10:]

In [None]:
np.percentile(-crs, 65)

In [None]:
import pandas as pd
import plotly.express as px

df = pd.DataFrame({"compression_ratio": crs})
fig = px.histogram(df, histnorm='probability density', x="compression_ratio", nbins=250)
fig.show()

In [None]:
import plotly.graph_objects as go

import plotly.express as px
df = pd.DataFrame({"compression_ratio": crs})
fig = px.ecdf(
    df, x="compression_ratio",
    markers=True,
    ecdfmode="complementary",
    title="Probability of Compression Ratio Higher-Than (ecdfmode='complementary')",
    marginal="histogram",
)
fig.show()

In [None]:
import numpy as np

crs = np.genfromtxt('compressed_ratios_zip.csv', delimiter=',')
crs.shape, crs[:10], crs[-10:]

In [None]:
np.percentile(-crs, 65)

In [None]:
import pandas as pd
import plotly.express as px

df = pd.DataFrame({"compression_ratio": crs})
fig = px.histogram(df, histnorm='probability density', x="compression_ratio", nbins=250)
fig.show()

In [None]:
import plotly.graph_objects as go

import plotly.express as px
df = pd.DataFrame({"compression_ratio": crs})
fig = px.ecdf(
    df, x="compression_ratio",
    markers=True,
    ecdfmode="complementary",
    title="Probability of Compression Ratio Higher-Than (ecdfmode='complementary')",
    marginal="histogram",
)
fig.show()