In [1]:
import os, wave, struct
import numpy as np
import pandas as pd

# =============================
# 1) Helper: unary & bitstring
# =============================
def unary_encode(q: int) -> str:
    # q ones followed by a zero (note: q=0 -> "0")
    return "1"*q + "0"

def unary_decode(bits: str, i: int):
    # returns (q, next_index)
    q = 0
    n = len(bits)
    while i < n and bits[i] == "1":
        q += 1
        i += 1
    # bits[i] must be '0' terminator
    i += 1
    return q, i

def to_k_bits(x: int, k: int) -> str:
    return format(x, f"0{k}b")

def from_k_bits(bits: str, i: int, k: int):
    return int(bits[i:i+k], 2), i + k

# =============================
# 2) Rice for a single integer S
# =============================
# We need to handle signed audio samples. Use ZigZag so small magnitude (±) map to small positives.
def zigzag_encode_16(x):
    # x may be any int; interpret as signed 16 and zigzag -> [0..65535]
    s = int(np.int16(x))                     # safe: now in [-32768, 32767]
    return ((s << 1) ^ (s >> 15)) & 0xFFFF   # mask to 16 bits

def zigzag_decode_16(u):
    # u in [0..65535] -> signed 16
    u = int(u)
    s = (u >> 1) ^ -(u & 1)
    return np.int16(s)

def rice_encode_number(S: int, K: int) -> str:
    M = 1 << K                 # M = 2^K
    U = zigzag_encode_16(S)    # map signed -> non-negative
    q = U // M                 # quotient
    r = U % M                  # remainder in [0, M-1]
    return unary_encode(q) + to_k_bits(r, K)

def rice_decode_number(bitstr: str, i: int, K: int):
    # returns (decoded_signed_16, next_index)
    M = 1 << K
    q, i = unary_decode(bitstr, i)
    r, i = from_k_bits(bitstr, i, K)
    U = q * M + r
    return zigzag_decode_16(U), i

# ======================================
# 3) Block encode/decode (arrays of int16)
# ======================================
def rice_encode_block(arr: np.ndarray, K: int) -> str:
    # arr: int16 (interleaved if stereo)
    out = []
    for v in arr:
        out.append(rice_encode_number(int(np.int16(v)), K))
    return "".join(out)

def rice_decode_block(bitstr: str, K: int, n_values: int) -> np.ndarray:
    vals = []
    i = 0
    for _ in range(n_values):
        v, i = rice_decode_number(bitstr, i, K)
        vals.append(v)
    return np.array(vals, dtype=np.int16)

# =====================================
# 4) Pack bitstring <-> bytes, simple header
# =====================================
# .ex2 format (very simple):
#   [magic 4B "EX2\0"][uint8 K][uint32 n_values][payload bits packed MSB->LSB]
_HDR = "<4sBI"  # magic, K, n
_HDR_SIZE = struct.calcsize(_HDR)

def pack_bits(bitstr: str) -> bytes:
    out = bytearray()
    cur = 0
    count = 0
    for ch in bitstr:
        cur = (cur << 1) | (1 if ch == "1" else 0)
        count += 1
        if count == 8:
            out.append(cur)
            cur = 0
            count = 0
    if count:
        cur <<= (8 - count)      # pad with zeros
        out.append(cur)
    return bytes(out)

def unpack_bits(b: bytes) -> str:
    return "".join(format(by, "08b") for by in b)

def write_ex2(path: str, K: int, n_values: int, bitstr: str):
    header = struct.pack(_HDR, b"EX2\0", K, n_values)
    payload = pack_bits(bitstr)
    with open(path, "wb") as f:
        f.write(header)
        f.write(payload)

def read_ex2(path: str):
    with open(path, "rb") as f:
        header = f.read(_HDR_SIZE)
        magic, K, n_values = struct.unpack(_HDR, header)
        assert magic == b"EX2\0"
        payload = f.read()
    return K, n_values, unpack_bits(payload)

# =============================
# 5) WAV I/O (16-bit PCM only)
# =============================
def read_wav(path: str):
    with wave.open(path, "rb") as wf:
        params = wf.getparams()
        nch, sw, fr, nframes, ct, cn = params
        assert sw == 2 and ct == "NONE", "Expect 16-bit PCM WAV"
        raw = wf.readframes(nframes)
    data = np.frombuffer(raw, dtype=np.int16)
    return params, data

def write_wav(path: str, params, data_int16: np.ndarray):
    nch, sw, fr, _, ct, cn = params
    with wave.open(path, "wb") as wf:
        wf.setnchannels(nch)
        wf.setsampwidth(sw)
        wf.setframerate(fr)
        wf.setcomptype(ct, cn)
        wf.writeframes(np.asarray(data_int16, dtype=np.int16).tobytes())

# ==========================================
# 6) High-level: encode/decode like the spec
# ==========================================
def to_residuals_interleaved(data_int16: np.ndarray, nch: int) -> np.ndarray:
    """Delta per channel on interleaved int16 audio, vectorized and safe."""
    # reshape to [num_frames, nch]
    frames = data_int16.reshape(-1, nch).astype(np.int16, copy=False)
    resid = np.empty_like(frames)
    resid[0, :] = frames[0, :]
    # subtraction stays in int16 domain; wrap is fine for lossless coding
    resid[1:, :] = (frames[1:, :] - frames[:-1, :]).astype(np.int16, copy=False)
    return resid.reshape(-1)

def from_residuals_interleaved(res_int16: np.ndarray, nch: int) -> np.ndarray:
    """Invert delta per channel on interleaved int16 audio, vectorized and safe."""
    frames = res_int16.reshape(-1, nch).astype(np.int16, copy=False)
    # accumulate in int32 to avoid transient wrap, then cast back to int16
    recon = np.cumsum(frames.astype(np.int32), axis=0).astype(np.int16)
    return recon.reshape(-1)



def encode_wav_to_ex2(wav_path: str, K: int):
    params, data = read_wav(wav_path)
    nch, sw, fr, nframes, ct, cn = params

    # decorrelate: delta per channel (interleaved)
    resid = to_residuals_interleaved(data, nch)

    # Rice‑encode residuals (unchanged API)
    bitstr = rice_encode_block(resid, K)

    root, _ = os.path.splitext(wav_path)
    ex2_path = f"{root}_K{K}_Enc.ex2"
    # we keep the same simple header (K, n_values)
    write_ex2(ex2_path, K, len(resid), bitstr)
    return ex2_path, params, len(resid)

def decode_ex2_to_wav(ex2_path: str, params_hint=None):
    K, n_values, bitstr = read_ex2(ex2_path)
    resid = rice_decode_block(bitstr, K, n_values)

    # rebuild WAV params (you pass the real params from run_one)
    if params_hint is None:
        params_hint = wave._wave_params(1, 2, 44100, n_values, "NONE", "not compressed")

    nch, sw, fr, nframes, ct, cn = params_hint

    # invert predictor to get PCM back
    data = from_residuals_interleaved(resid, nch)

    root, _ = os.path.splitext(ex2_path)
    wav_out = f"{root}_Dec.wav"
    write_wav(wav_out, params_hint, data)
    return wav_out

# ==========================================
# 7) Demo: matches the lecture’s style + table
# ==========================================
def run_one(wav_path: str, K_values=(4,2), params_cache={}):
    params, _ = read_wav(wav_path)
    params_cache[wav_path] = params

    row = {"File": os.path.basename(wav_path),
           "Original size": os.path.getsize(wav_path)}

    for K in K_values:
        ex2_path, params_used, nvals = encode_wav_to_ex2(wav_path, K)
        dec_wav = decode_ex2_to_wav(ex2_path, params_used)

        # Verify round-trip bit‑exact
        _, orig = read_wav(wav_path)
        _, dec  = read_wav(dec_wav)
        assert np.array_equal(orig, dec), f"Round‑trip mismatch for {wav_path} (K={K})"

        row[f"Rice (K={K})"] = os.path.getsize(ex2_path)
        row[f"% Compression (K={K})"] = 100.0 * (row["Original size"] - row[f"Rice (K={K})"]) / row["Original size"]

    return row

# Run for Sound1 & Sound2 (put the WAVs next to the notebook)
targets = ["Sound1.wav", "Sound2.wav"]
results = []
for t in targets:
    if os.path.exists(t):
        results.append(run_one(t, (4,2)))
    else:
        print(f"⚠️ Missing {t} — skipping")

df = pd.DataFrame(results, columns=[
    "File",
    "Original size",
    "Rice (K=4)",
    "Rice (K=2)",
    "% Compression (K=4)",
    "% Compression (K=2)",
])
df


Unnamed: 0,File,Original size,Rice (K=4),Rice (K=2),% Compression (K=4),% Compression (K=2)
0,Sound1.wav,1002088,857460,2429914,14.432665,-142.485091
1,Sound2.wav,1008044,56448272,224816513,-5499.782549,-22202.251985
