In [None]:
import gzip
import tiktoken
from pathlib import Path

import zstd
import pickle
import pandas as pd

enc = tiktoken.get_encoding("o200k_base")

tinyshakespeare = Path("../data/tinyshakespeare_input.txt").read_text()


splits = {
    "test": "all/test-00000-of-00001.parquet",
    "validation": "all/validation-00000-of-00001.parquet",
    "dev": "all/dev-00000-of-00001.parquet",
    "auxiliary_train": "all/auxiliary_train-00000-of-00001.parquet",
}
df = pd.read_parquet("hf://datasets/cais/mmlu/" + splits["test"])

  from .autonotebook import tqdm as notebook_tqdm


In [67]:
import numpy as np


def make_text_string_for_mmlu(question, choices):
    text_template = f"""
    Question: {question}
    A. {choices[0]}
    B. {choices[1]}
    C. {choices[2]}
    D. {choices[3]}
    """
    return text_template


sample_df = df.sample(frac=1, random_state=42)[:5000]
train_df = sample_df.iloc[:4900]
test_df = sample_df.iloc[4900:]

test_set = [
    (make_text_string_for_mmlu(row["question"], row["choices"]), row["answer"])
    for _, row in test_df.iterrows()
]
training_set = [
    (make_text_string_for_mmlu(row["question"], row["choices"]), row["answer"])
    for _, row in train_df.iterrows()
]

In [117]:
digit_recognizer = pd.read_csv("../data/digit-recognizer-train.csv")
len(digit_recognizer)

42000

In [104]:
from PIL import Image
import io


def make_digit_pil_image(row):
    # Convert pixel values from 0 to 255 (grayscale)
    image = row.iloc[1:].values.reshape(28, 28).astype(np.uint8)
    # Create PIL image in "L" mode (8-bit grayscale, 0-255)
    image = Image.fromarray(image, mode="L")
    return image


images = digit_recognizer.apply(make_digit_pil_image, axis=1)
class_labels = digit_recognizer["label"]
image_bytes = images.apply(lambda x: io.BytesIO(x.tobytes()).getvalue())

In [113]:
print(f"length of image_bytes[0]: {len(image_bytes[0])}")
##gzip
gzip_image_bytes = gzip.compress(image_bytes[0])
print(f"length of gzip_image_bytes: {len(gzip_image_bytes)}")
##zstd
zstd_image_bytes = zstd.compress(image_bytes[0])
print(f"length of zstd_image_bytes: {len(zstd_image_bytes)}")

##tokenizer
tokenizer = enc
tokenized_image_bytes = tokenizer._encode_bytes(image_bytes[0])
print(f"length of tokenized_image_bytes: {len(tokenized_image_bytes)}")

entire_dataset = list(zip(image_bytes, class_labels))[:5000]
training_set = entire_dataset[:4900]
test_set = entire_dataset[4900:]

length of image_bytes[0]: 784
length of gzip_image_bytes: 170
length of zstd_image_bytes: 174
length of tokenized_image_bytes: 444


In [118]:
import gzip


def compress_with_gzip(text):
    if isinstance(text, bytes):
        return len(gzip.compress(text))
    else:
        return len(gzip.compress(text.encode()))


def compress_with_zstd(text):
    if isinstance(text, bytes):
        return len(zstd.compress(text))
    else:
        return len(zstd.compress(text.encode()))


def compress_with_tokenizer(text, tokenizer=enc):
    if isinstance(text, bytes):
        return len(tokenizer._encode_bytes(text))
    else:
        return len(tokenizer.encode(text))


def concatenate_data(x1, x2):
    if isinstance(x1, bytes):
        return x1 + x2
    else:
        return " ".join([x1, x2])


def calculate_ncd(x1, x2, compression_method, tokenizer=None):
    compression_functions = {
        "gzip": compress_with_gzip,
        "zstd": compress_with_zstd,
        "tokenizer": lambda x: compress_with_tokenizer(x, tokenizer or enc),
    }

    if compression_method not in compression_functions:
        raise ValueError(f"Unknown compression method: {compression_method}")

    compress_func = compression_functions[compression_method]

    Cx1 = compress_func(x1)
    Cx2 = compress_func(x2)
    x1x2 = concatenate_data(x1, x2)
    Cx1x2 = compress_func(x1x2)

    return (Cx1x2 - min(Cx1, Cx2)) / max(Cx1, Cx2)


def evaluate_compression_method(method, test_samples=10):
    """Evaluate a compression method on the test set and return accuracy."""
    track_answers = []
    for x1, x1_gt in test_set[:test_samples]:
        distance_from_x1 = []

        for x2, _ in training_set:
            ncd = calculate_ncd(x1, x2, method)
            distance_from_x1.append(ncd)

        # Use argpartition to efficiently find the smallest element (k=1)
        nearest_idx = np.argpartition(np.array(distance_from_x1), 0)[0]
        # Get the class of the nearest neighbor
        predict_class = training_set[nearest_idx][1]
        track_answers.append((x1_gt, predict_class))

    # Calculate accuracy
    accuracy = sum(
        [1 for (x1_gt, predict_class) in track_answers if x1_gt == predict_class]
    ) / len(track_answers)
    return accuracy


# Evaluate all three compression methods
compression_methods = ["gzip", "zstd", "tokenizer"]
results = {}

for method in compression_methods:
    print(f"Evaluating {method}...")
    accuracy = evaluate_compression_method(method)
    results[method] = accuracy
    print(f"{method} accuracy: {accuracy:.4f}")

# Print summary of results
print("\nSummary of results:")
for method, accuracy in results.items():
    print(f"{method}: {accuracy:.4f}")

# Find the best method
best_method = max(results, key=results.get)
print(
    f"\nBest compression method: {best_method} with accuracy {results[best_method]:.4f}"
)


## baseline for random guessing
print(f"Baseline for random guessing: {1/4:.4f}")

Evaluating gzip...
gzip accuracy: 0.2000
Evaluating zstd...
zstd accuracy: 0.0000
Evaluating tokenizer...
tokenizer accuracy: 0.1000

Summary of results:
gzip: 0.2000
zstd: 0.0000
tokenizer: 0.1000

Best compression method: gzip with accuracy 0.2000
Baseline for random guessing: 0.2500


In [39]:
text_string = tinyshakespeare[:50000]
print(f"Original text length: {len(text_string)}")
text_bytes = text_string.encode()
print(f"Text bytes length: {len(text_bytes)}")

text_bytes_gzip = gzip.compress(text_bytes)
print(f"Text bytes gzip length: {len(text_bytes_gzip)}")

text_bytes_zstd = zstd.compress(text_bytes)
print(f"Text bytes zstd length: {len(text_bytes_zstd)}")

text_tokens = enc.encode(text_string)
print(f"Text tokens length: {len(text_tokens)}")
text_tokens_bytes = pickle.dumps(text_tokens)
print(f"Text tokens bytes length: {len(text_tokens_bytes)}")

tokenized_gzip = enc._encode_bytes(text_bytes_gzip)
print(f"Tokenized gzip length: {len(tokenized_gzip)}")

tokenized_zstd = enc._encode_bytes(text_bytes_zstd)
print(f"Tokenized zstd length: {len(tokenized_zstd)}")

Original text length: 50000
Text bytes length: 50000
Text bytes gzip length: 21097
Text bytes zstd length: 21660
Text tokens length: 13411
Text tokens bytes length: 40308
Tokenized gzip length: 19715
Tokenized zstd length: 20193


In [40]:
from bpe import BPETokenizer

bpe_tokenizer = BPETokenizer(2000)

bpe_tokenizer.train(list(text_bytes_zstd))
encoded_bpe = bpe_tokenizer.encode_bytes(text_bytes_zstd)
decoded_bpe = bpe_tokenizer.decode_bytes(encoded_bpe)
assert decoded_bpe == text_bytes_zstd

bpe_tokenizer = BPETokenizer(2000)
bpe_tokenizer.train(list(text_bytes_gzip))
encoded_bpe_gzip = bpe_tokenizer.encode_bytes(text_bytes_gzip)
decoded_bpe_gzip = bpe_tokenizer.decode_bytes(encoded_bpe_gzip)
assert decoded_bpe_gzip == text_bytes_gzip

bpe_tokenizer = BPETokenizer(2000)
bpe_tokenizer.train(list(text_bytes))
encoded_bpe_bytes = bpe_tokenizer.encode_bytes(text_bytes)
decoded_bpe_bytes = bpe_tokenizer.decode_bytes(encoded_bpe_bytes)
assert decoded_bpe_bytes == text_bytes

Initial sequence length: 21660 | Final sequence length: 17765 | Vocab size: 2000 | Compressed ratio: 1.2192513368983957
Initial sequence length: 21097 | Final sequence length: 17352 | Vocab size: 2000 | Compressed ratio: 1.215825265099124
Initial sequence length: 50000 | Final sequence length: 13668 | Vocab size: 2000 | Compressed ratio: 3.6581796897863623
