In [1]:
import os
import bz2
import lzma
import gzip
import csv
from collections import Counter
from scipy.spatial.distance import jensenshannon
import numpy as np
import subprocess

script_dir = os.path.dirname(os.path.abspath(__file__))

base_dir = os.path.join(script_dir, "cds_ncrna_processing_and_tokenization/tokenized_sequences")
output_csv = os.path.join(script_dir, "aeropyrum_comp_test_output.csv")
geco3_binary = os.path.join(script_dir, "../geco3/src/GeCo3") 

def remove_delimiters(sequence):
    #removes anything that's not a  letter
    return ''.join([c for c in sequence if c.isalpha()])

def compute_geco3_metrics(file_path, clean_sequence, file_type):
    """Runs GeCo3 compression on the cleaned DNA sequence without spaces."""
    if file_type == "pep":
        return None  #skip protein sequences, non ACGT
    
    try:
        temp_clean_path = file_path + "_cleaned.txt"
        with open(temp_clean_path, "w") as temp_file:
            temp_file.write(clean_sequence)

        compressed_file = f"{temp_clean_path}.co"
        subprocess.run(
            [geco3_binary, "-l", "1", "-lr", "0.06", "-hs", "8", temp_clean_path],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
        )

        if os.path.exists(compressed_file):
            compressed_size = os.path.getsize(compressed_file)
            os.remove(compressed_file)
            os.remove(temp_clean_path)  
            return compressed_size
        else:
            print(f"didn't produce a compressed file: {file_path}")
            os.remove(temp_clean_path)
            return None
    except Exception as e:
        print(f" error running geco3: {e}")
        return None


def compute_jsd(freq1, freq2):
    """computes the jensen shannon divergence (JSD) between 2 frequency distributions.
    
    jsd is used to measure how dif or similar 2 probability distributions are.
    It is a **symmetric** and **smoothed** version of kullback-leibler divergence.
    
    my thinking: we can use jsd to compare
    - **The observed token frequency distribution** (e.g., actual token counts in a genome)
    - **A reference distribution** (a uniform distribution)
    
    **why is this relevant?**
    - if jsd is **close to 0**, the 2 distributions are very similar (aka tokens are uniformly distributed).
    - if it is **closer to 1**, the distributions are very diff (some tokens are much more frequent than others).
    - this in theory should help quantify how structured and/or repetitive the tokenized sequences are. (I think?)
    """

    #freq1 and freq2 are arrays of token frequencies. freq1 computes total num of token occurrences. division creates normalized prob dist
    prob1 = np.array(freq1) / np.sum(freq1)
    #later in compute_compression_metrics we set freq2 to np.ones(len(token_freq)) so that all tokens are given equal weight creating uniform dist.
    prob2 = np.array(freq2) / np.sum(freq2)
    #make sure its in bits so its bounded between 0 and 1 
    return jensenshannon(prob1, prob2, base=2)

def compute_compression_metrics(file_path, file_type):
    with open(file_path, 'r') as f:
        content = f.read().strip()
        tokenized_content = content.replace('▁', '').replace('.', '').split()
        clean_content = remove_delimiters(content)

    #dict mapping each token to its freq
    token_counts = Counter(tokenized_content)
    
    token_freq = np.array(list(token_counts.values()))

    #total num of tokens in seq, including duplicates:
    token_count = len(tokenized_content)

    #distintc tokens in seq only:
    unique_token_count = len(set(tokenized_content))

    #total num of chars in cleaned seq with spaces and delimiters removed:
    og_size_chars = len(clean_content)

    #raw size of raw tokenized txt file before any comp
    og_size_bytes = len(content.encode('utf-8'))

    jsd_value = compute_jsd(token_freq, np.ones(len(token_freq))) if len(token_freq) > 1 else 0

    gzip_size = len(gzip.compress(clean_content.encode('utf-8')))
    bzip2_size = len(bz2.compress(clean_content.encode('utf-8')))
    lzma_size = len(lzma.compress(clean_content.encode('utf-8')))

    # token compression factor

    #maps freq toks to single chars for comp, encodes the seq using reduced token rep 
    tok2chr = {tok: chr(k) for k, (tok, _) in enumerate(token_counts.most_common())}
    encoded_sequence = ''.join(tok2chr[tok] for tok in tokenized_content if tok in tok2chr)
    len_encoded = len(encoded_sequence)
    len_vocab = sum(len(tok) + 1 for tok in tok2chr.keys())
    tk_compression_ratio = (len_encoded + len_vocab) / og_size_chars

    gzip_ratio = gzip_size / og_size_chars
    bzip2_ratio = bzip2_size / og_size_chars
    lzma_ratio = lzma_size / og_size_chars

    #make sure geco only runs on og cleaned files w no spaces or delimiters
    geco3_compressed_size = compute_geco3_metrics(file_path, clean_content,file_type)
    geco3_ratio = (geco3_compressed_size / og_size_chars) if geco3_compressed_size else None

    return {
        "og_size_chars": og_size_chars,
        "og_size_bytes": og_size_bytes,
        "token_count": token_count,
        "unique_token_count": unique_token_count,
        "gzip_ratio": gzip_ratio,
        "bzip2_ratio": bzip2_ratio,
        "lzma_ratio": lzma_ratio,
        "tk_compression_ratio": tk_compression_ratio,
        "geco3_ratio": geco3_ratio,
        "jsd": jsd_value,
        "gzip_size": gzip_size,
        "bzip2_size": bzip2_size,
        "lzma_size": lzma_size,
        "tk_compressed_size": tk_compression_ratio * og_size_chars,
        "geco3_compressed_size": geco3_ratio * og_size_chars if geco3_ratio else None
    }

def extract_file_details(file_path):
    file_name = os.path.basename(file_path)
    parts = file_name.split("_")

    organism = "_".join(parts[:-3])

    if "cds" in file_name:
        file_type = "cds"
    elif "ncrna" in file_name:
        file_type = "ncrna"
    elif "pep" in file_name:
        file_type = "pep"
    else:
        return None, None, None  

    vocab_size = int(parts[-2].replace("vocab", "")) if 'vocab' in parts[-2] else None

    return organism, file_type, vocab_size

def process_aeropyrum_files(base_dir, output_csv):
    # runs test to process Aeropyrum_pernix files for all file types and vocab sizes"""
    results = []

    for root, _, files in os.walk(base_dir):
        for file in files:
            if file.endswith('.txt') and "Aeropyrum_pernix" in file:
                file_path = os.path.join(root, file)

                try:
                    organism, file_type, vocab_size = extract_file_details(file_path)
                    if file_type is None:
                        continue
                    
                    metrics = compute_compression_metrics(file_path, file_type)

                    results.append([
                        organism,
                        file_type,
                        vocab_size,
                        metrics["token_count"],
                        metrics["unique_token_count"],
                        metrics["og_size_chars"],
                        metrics["og_size_bytes"],
                        metrics["gzip_ratio"],
                        metrics["bzip2_ratio"],
                        metrics["lzma_ratio"],
                        metrics["tk_compression_ratio"],
                        metrics["geco3_ratio"],
                        metrics["jsd"],
                        metrics["gzip_size"],
                        (1 - metrics["gzip_ratio"]) * 100,
                        metrics["bzip2_size"],
                        (1 - metrics["bzip2_ratio"]) * 100,
                        metrics["lzma_size"],
                        (1 - metrics["lzma_ratio"]) * 100,
                        metrics["tk_compressed_size"],
                        (1 - metrics["tk_compression_ratio"]) * 100,
                        metrics["geco3_compressed_size"] if metrics["geco3_compressed_size"] else None,
                        (1 - metrics["geco3_ratio"]) * 100 if metrics["geco3_ratio"] else None,
                    ])
                except Exception as e:
                    print(f"error processing file: {file_path}: {e}")

    with open(output_csv, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow([
            "Organism", "File Type", "Vocab Size", "Token Count", "Unique Token Count",
            "Processed Seq Length", "File Size(Bytes)",
            "GZIP Ratio", "BZIP2 Ratio", "LZMA Ratio", "Tk Compression Factor", "GeCo3 Ratio", "JSD",
            "GZIP Ratio Compressed Size (Bytes)", "GZIP Ratio Compression Gain (%)",
            "BZIP2 Ratio Compressed Size (Bytes)", "BZIP2 Ratio Compression Gain (%)",
            "LZMA Ratio Compressed Size (Bytes)", "LZMA Ratio Compression Gain (%)",
            "Tk Compression Factor Compressed Size (Bytes)", "Tk Compression Factor Compression Gain (%)",
            "GeCo3 Ratio Compressed Size (Bytes)", "GeCo3 Ratio Compression Gain (%)"
        ])
        writer.writerows(results)

    print(f"results in {output_csv}")

process_aeropyrum_files(base_dir, output_csv)

Results saved in /Users/tiananoll-walker/Documents/biotokens/used_code/aeropyrum_test_output.csv
