### 1. Download data
#### 1.1 CC WET files
- since I do not have access, I will just download 5k `.warc.wet.gz` to local.

In [None]:
! wget https://data.commoncrawl.org/crawl-data/CC-MAIN-2025-18/wet.paths.gz

In [None]:
import os
import requests
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from xopen import xopen

base_url = "https://data.commoncrawl.org/"
MOUNT_DIR = Path("/home/azureuser/mount/")
N_CPU = len(os.sched_getaffinity(0))
N_WET = 100

def download_file(url, output_dir):
    filename = Path(url).name
    output_path = output_dir / filename
    
    if output_path.exists():
        return True, f"Skipped: {filename}"
    
    try:
        response = requests.get(url, stream=True)
        with open(output_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        return True, f"Downloaded: {filename}"
    except Exception as e:
        return False, f"Error {filename}: {e}"

# Read all paths
with xopen('wet.paths.gz', 'rt') as f:
    all_paths = [line.strip() for line in f]

output_dir = Path(MOUNT_DIR/"CC")
output_dir.mkdir(exist_ok=True)

# Download until we have N_WET successful downloads
successful_downloads = 0
path_idx = 0
futures = {}

with ThreadPoolExecutor(max_workers=N_CPU) as executor:
    # Submit initial batch
    while path_idx < len(all_paths) and len(futures) < N_CPU:
        url = base_url + all_paths[path_idx]
        future = executor.submit(download_file, url, output_dir)
        futures[future] = path_idx
        path_idx += 1
    
    # Process results and submit new jobs as needed
    while successful_downloads < N_WET and futures:
        done, _ = as_completed(futures), None
        
        for future in list(futures.keys()):
            if future.done():
                success, message = future.result()
                print(message)
                
                if success and not message.startswith("Skipped"):
                    successful_downloads += 1
                
                del futures[future]
                
                # Submit new job if we need more downloads
                if successful_downloads < N_WET and path_idx < len(all_paths):
                    url = base_url + all_paths[path_idx]
                    new_future = executor.submit(download_file, url, output_dir)
                    futures[new_future] = path_idx
                    path_idx += 1
                
                break

print(f"\nCompleted: {successful_downloads} successful downloads")

#### 1.2 validation data - paloma c4_100_domains - val

In [None]:
# from huggingface_hub import login
# login(token="")

from datasets import load_dataset
paloma_c4_100_domains_val = load_dataset("allenai/paloma", "c4_100_domains", split="val")
print(len(paloma_c4_100_domains_val))

In [None]:
## tokenize validataion dataset
# import multiprocessing
# import numpy as np
# from tqdm import tqdm
# from transformers import AutoTokenizer

# tokenizer = AutoTokenizer.from_pretrained("gpt2")

# def tokenize_line_and_add_eos(line):
#     return tokenizer.encode(line) + [tokenizer.eos_token_id]


# lines = paloma_c4_100_domains_val["text"]

# pool = multiprocessing.Pool(multiprocessing.cpu_count())
# chunksize = 100
# results = []

# for result in tqdm(
#     pool.imap(tokenize_line_and_add_eos, lines, chunksize=chunksize),
#     total=len(lines),
#     desc="Tokenizing lines"
# ):
#     results.append(result)
#     pool.close()
#     pool.join()

# # Flatten the list of ids and convert to numpy array
# all_ids = [token_id for sublist in results for token_id in sublist]
# # print(f"Tokenized and encoded {input_path} into {len(all_ids)} tokens")
# ids_array = np.array(all_ids, dtype=np.uint16)
# ids_array.tofile(output_dir/"paloma_tokens.bin")                    

### 2. Processing
- TLD (top-level domain) filtering
    - checked TLD from `paloma` ds and they are quite normal
- Quality rules (number of words, lengths of words, etc.)
    - range of number of words also from `paloma`.
- The validation dataset looks all English, so I will keep only English data
    - To determine the threshold, I ran the model on `paloma` and get the average of `0.95`. Thus I will use `0.9` for filtering to be on the safe side, and do further removal if needed.
- Harmful removal. 
    - To determine the threshold, I ran the model on `paloma` and get the average of `0.99`. Thus I will use `0.9` for filtering to be on the safe side, and do further removal if needed.
- Deduplication
    - paper uses `r=20, b=450` which is too much for my budget. I choose `r=16, b=150` with total of `2400` hashes.

#### Performance Optimizations Applied

To speed up WET file processing, the following improvements were implemented in `leaderboard_process_wet.py`:

**1. Batch Processing for ML Models** (Batch size: 64)
- Language identification, NSFW, and toxicity models now process texts in batches via `filter_batch()`
- Reduces model invocation overhead by ~64x compared to individual calls
- Uses `model.predict(batch)` to leverage vectorized operations

**2. Incremental File Writing**
- Write filtered content to JSONL as each batch completes
- Reduces memory footprint (no buffering all content in memory)
- Starts I/O earlier for better throughput
- Opens file once and writes progressively

**3. Optimized Filter Ordering**
- **Cheap filters first:** record type → URL (fast, rule-based)
- **Expensive ML models last:** language → quality → NSFW → toxic (batched)
    - quality has to be on English therefore go after language
- Early filtering reduces number of texts sent to expensive ML models
- Cascading batch filters: lang batch → nsfw batch → toxic batch → kept

**4. Parallel Processing with ProcessPoolExecutor**
- 16 workers (`concurrent.futures`) process WET files independently
- Each worker runs complete pipeline on one file
- Single progress bar tracks overall completion
- Automatic work distribution as workers finish tasks

In [None]:
from cs336_data.leaderboard_process_wet import process_single_wet_file
wet_file = "CC-MAIN-20250417135010-20250417165010-00065.warc.wet.gz"
process_single_wet_file(wet_file, "CC-MAIN-20250417135010-20250417165010-00065.jsonl")

In [None]:
from cs336_data.minhash_dedpulication import normalize_text, minhashing, get_signatures
from pathlib import Path

input_dir = Path("/home/azureuser/mount/CC-filtered-50")
input_files = sorted(input_dir.glob("*.jsonl"))
len(input_files)

In [None]:
from cs336_data.minhash_dedpulication import normalize_text
from concurrent.futures import ProcessPoolExecutor, as_completed
import os
from os import PathLike
import random
import json
import mmh3
from tqdm import tqdm

def get_signatures_single_file(file_path, file_idx, seeds, ngrams):
    """Process a single file's signatures"""
    signatures = []
    with open(file_path) as f:
        for line in f.readlines():
            doc = json.loads(line)['text']
            doc_words = normalize_text(doc)
            
            signature = [float("inf")] * len(seeds)
            for i in range(len(doc_words) - ngrams):
                ngram_str = " ".join(doc_words[i:i+ngrams])
                for j, seed in enumerate(seeds):
                    hash_val = mmh3.hash(ngram_str, seed)
                    signature[j] = min(signature[j], hash_val)
            
            signatures.append(signature)
    return file_idx, signatures

def get_signatures_parallel(input_files: list[str | PathLike], num_hashes: int, ngrams: int) -> list[list[int]]:
    """Parallel processing across files"""
    seeds = [random.randint(0, 2**32-1) for _ in range(num_hashes)]
    n_workers = len(os.sched_getaffinity(0))
    
    signatures = []
    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        futures = [executor.submit(get_signatures_single_file, fp, seeds, ngrams) 
                   for fp in input_files]
        
        for future in tqdm(futures, total=len(input_files)):
            signatures.extend(future.result())
    
    return signatures

import pickle
from pathlib import Path

def get_signatures_parallel_incremental(
    input_files: list[str | PathLike], 
    num_hashes: int, 
    ngrams: int,
    output_file: str,
    checkpoint_interval: int = 100
) -> None:
    """Parallel processing with incremental saving"""
    seeds = [random.randint(0, 2**32-1) for _ in range(num_hashes)]
    n_workers = len(os.sched_getaffinity(0))
    
    output_path = Path(output_file)
    checkpoint_file = output_path.with_suffix('.checkpoint')
    
    # Resume from checkpoint if exists
    start_idx = 0
    if checkpoint_file.exists():
        with open(checkpoint_file, 'r') as f:
            start_idx = int(f.read().strip())
        print(f"Resuming from file {start_idx}")
    
    # Open output file in append mode
    mode = 'ab' if start_idx > 0 else 'wb'
    
    with open(output_path, mode) as out_f:
        with ProcessPoolExecutor(max_workers=n_workers) as executor:
            # Process in batches
            for batch_start in range(start_idx, len(input_files), checkpoint_interval):
                batch_end = min(batch_start + checkpoint_interval, len(input_files))
                batch = input_files[batch_start:batch_end]
                
                futures = {executor.submit(get_signatures_single_file, fp, batch_start + i, seeds, ngrams): batch_start + i
                           for i, fp in enumerate(batch)}
                
                # Collect results in order
                results = {}
                for future in tqdm(as_completed(futures), desc=f"Batch {batch_start}-{batch_end}", total=len(futures)):
                    file_idx, signatures = future.result()
                    results[file_idx] = signatures

                # Save in order
                for idx in sorted(results.keys()):
                    pickle.dump((idx, results[idx]), out_f)
                    out_f.flush()
                
                # Update checkpoint
                with open(checkpoint_file, 'w') as cf:
                    cf.write(str(batch_end))
    
    # Clean up checkpoint on completion
    checkpoint_file.unlink()
    print(f"Saved signatures to {output_path}")


In [None]:
# Usage
get_signatures_parallel_incremental(
    input_files[:2], 
    num_hashes=240, 
    ngrams=5,
    output_file="signatures.pkl",
    checkpoint_interval=2
)

### Memory-efficient loading: Process files one at a time
**Problem**: Reading entire 7GB files into memory creates 3x overhead (raw data + unpickled objects + final array)

**Solution**: Process incrementally with streaming approach

Memory monitoring: `watch -n 2 'free -h && echo "---" && ps aux --sort=-%mem | head -n 10'`

In [None]:
# Memory-efficient: Load files incrementally without massive temp buffers
from pathlib import Path
import pickle
from tqdm import tqdm
import numpy as np
import gc

sig_dir = "/home/azureuser/mount/"
batch_files = sorted(Path(sig_dir).glob('signatures_batch_*.pkl'))[:1]

print(f"Found {len(batch_files)} files to load")

# First pass: count total documents to pre-allocate array
print("Counting documents...")
total_docs = 0
for batch_file in tqdm(batch_files, desc="Counting"):
    with open(batch_file, 'rb') as f:
        while True:
            try:
                pickle.load(f)
                total_docs += 1
            except EOFError:
                break

print(f"Total documents: {total_docs}")

# Pre-allocate arrays (much more memory efficient)
sigs = np.empty((total_docs, 2400), dtype=np.int32)
all_metadata = []

# Second pass: fill arrays incrementally
print("Loading signatures...")
doc_idx = 0
for batch_file in tqdm(batch_files, desc="Loading files"):
    with open(batch_file, 'rb') as f:
        while True:
            try:
                doc_data = pickle.load(f)
                sigs[doc_idx] = doc_data['signatures']
                all_metadata.append({
                    'jsonl_file': doc_data.get('jsonl_file'),
                    'line_id': doc_data.get('line_id')
                })
                doc_idx += 1
            except EOFError:
                break
    
    # Force garbage collection after each file
    gc.collect()
    print(f"  Loaded {batch_file.name}, progress: {doc_idx}/{total_docs}")

print(f"\nFinal shape: {sigs.shape}")
print(f"Memory usage: {sigs.nbytes / 1024**3:.2f} GB")

In [None]:
import sys
print(f"List size: {sys.getsizeof(sigs) / 1024**3:.2f} GB")
print(f"Number of items: {len(sigs)}")
if len(sigs) > 0:
    print(f"Size of first item: {sys.getsizeof(sigs[0])} bytes")
    print(f"First signature length: {len(sigs[0])}")

# # Force garbage collection to reclaim memory
# import gc
# gc.collect()
# print("Garbage collection complete")

In [None]:
from collections import defaultdict
def get_candidates_single_band(sigs, band_idx, band_size=16):
    """Process a single band to find candidate pairs - optimized with numpy"""
    
    start = band_idx * band_size
    sig_band = sigs[:, start:start+band_size]
    
    # Use numpy for faster hashing: convert each row to bytes for hashing
    buckets = defaultdict(set)
    for idx in range(sig_band.shape[0]):
        # Convert row to tuple for hashing (much faster than .tolist() on whole array)
        key = tuple(sig_band[idx])
        buckets[key].add(idx)
    
    # Return only buckets with multiple documents
    # return [v for v in buckets.values() if len(v) > 1]
    return list(buckets.values())

res = get_candidates_single_band(sigs, 0)
len(res)

In [None]:
# Option 1: Use as_completed (best for progress tracking)
from concurrent.futures import ThreadPoolExecutor, as_completed
import os

band_size = 16
num_bands = sigs.shape[1] // band_size

# Parallel processing across bands
n_workers = len(os.sched_getaffinity(0))
candidates = []

with ThreadPoolExecutor(max_workers=n_workers) as executor:
    futures = [executor.submit(get_candidates_single_band, sigs, i) for i in range(num_bands)]
    
    for future in tqdm(as_completed(futures), desc="Processing bands", total=num_bands):
        band_candidates = future.result()
        candidates.extend(band_candidates)

print(f"Total candidate sets: {len(candidates)}")

In [None]:
candidates[0]

In [None]:
class UnionFind:
    """Efficient union-find data structure with path compression"""
    def __init__(self):
        self.parent = {}
    
    def find(self, x):
        """Find root with iterative path compression (avoids recursion limit)"""
        if x not in self.parent:
            self.parent[x] = x
            return x
        
        # Find root iteratively
        root = x
        while self.parent[root] != root:
            root = self.parent[root]
        
        # Path compression: make all nodes point directly to root
        current = x
        while current != root:
            next_node = self.parent[current]
            self.parent[current] = root
            current = next_node
        
        return root
    
    def union(self, x, y):
        """Union two sets"""
        root_x = self.find(x)
        root_y = self.find(y)
        if root_x != root_y:
            self.parent[root_x] = root_y
    
    def get_clusters(self):
        """Get all clusters as dict mapping root -> set of members"""
        clusters = defaultdict(set)
        for x in self.parent:
            clusters[self.find(x)].add(x)
        return list(clusters.values())


def merge_overlapping_sets(sets: list[set[int]], keep_singletons: bool = False) -> list[set[int]]:
    """Merge sets with overlapping elements using Union-Find
    
    Args:
        sets: List of sets to merge
        keep_singletons: If True, include singleton sets in output
    
    Time: O(n × α(n)) where n = total elements across all sets
    Space: O(n)
    """
    uf = UnionFind()
    
    # Union all elements within each set
    for s in sets:
        elements = list(s)
        # Add all elements to union-find (even singletons)
        for elem in elements:
            uf.find(elem)  # Ensure element exists
        # Union pairs within each set
        for i in range(1, len(elements)):
            uf.union(elements[0], elements[i])
    
    clusters = uf.get_clusters()
    
    # Filter out singletons if requested
    if not keep_singletons:
        clusters = [c for c in clusters if len(c) > 1]
    
    return clusters


In [None]:
# Test with example
test_sets = [
    {1, 2, 3},
    {3, 4},
    {5, 6},
    {7, 8},
    {6, 9},
    {10}
]

clusters_no_singletons = merge_overlapping_sets(test_sets, keep_singletons=False)
clusters_with_singletons = merge_overlapping_sets(test_sets, keep_singletons=True)

print(f"Input: {test_sets}")
print(f"Without singletons: {sorted([sorted(c) for c in clusters_no_singletons])}")
print(f"With singletons: {sorted([sorted(c) for c in clusters_with_singletons])}")
# Expected without: {1,2,3,4}, {5,6,9}, {7,8}
# Expected with: {1,2,3,4}, {5,6,9}, {7,8}, {10}

In [None]:
# Apply to your candidates
final_clusters = merge_overlapping_sets(candidates, keep_singletons=True)
print(f"Number of duplicate clusters: {len(final_clusters)}")
print(f"Total documents in clusters: {sum(len(c) for c in final_clusters)}")

# Show top 3 largest clusters
sorted_clusters = sorted(final_clusters, key=len, reverse=True)
print(f"\nTop 3 largest clusters:")
for i, cluster in enumerate(sorted_clusters[:3], 1):
    print(f"  {i}. Size {len(cluster)}: {sorted(list(cluster))[:10]}{'...' if len(cluster) > 10 else ''}")

In [None]:
all_metadata[0]

## Tokenization

In [None]:
import pickle
import random

cluster_file = "/home/azureuser/mount/duplicate_clusters.pkl"

with open(cluster_file, "rb") as f:
    all_clusters = pickle.load(f)

# choose a random element from each set
files_2keep = [random.choice(list(c)) for c in all_clusters["clusters"]]
metadata = all_clusters["metadata"]
print(f"Total docs: {len(metadata)/1e6}M")
print(f"Kept docs: {len(files_2keep)/1e6}M")

In [None]:
import json
from pathlib import Path
import pandas as pd

metadata_2keep = [metadata[i] for i in sorted(files_2keep)]
input_file_dict = pd.DataFrame(metadata_2keep).groupby("jsonl_file")["line_id"].agg(list).to_dict()
test_dict = pd.DataFrame(metadata_2keep).groupby("jsonl_file")["line_id"].agg(list).iloc[:4].to_dict()

In [None]:
import multiprocessing
import numpy as np
from tqdm import tqdm
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("gpt2")

def tokenize_line_and_add_eos(line):
    return tokenizer.encode(line) + [tokenizer.eos_token_id]


output_dir = Path("/home/azureuser/mount")
input_dir = Path("/home/azureuser/mount/CC-filtered")

lines = []
# for file, line_ids in input_file_dict.items():
for file, line_ids in test_dict.items():
    with open(input_dir/file) as f:
        all_lines = f.readlines()
        lines_ = [json.loads(all_lines[i])["text"] for i in line_ids]
        lines.extend(lines_)
    # break

pool = multiprocessing.Pool(multiprocessing.cpu_count())
chunksize = 100
results = []

for result in tqdm(
    pool.imap(tokenize_line_and_add_eos, lines, chunksize=chunksize),
    total=len(lines),
    desc="Tokenizing lines"
):
    results.append(result)
    pool.close()
    pool.join()

# Flatten the list of ids and convert to numpy array
all_ids = [token_id for sublist in results for token_id in sublist]
# print(f"Tokenized and encoded {input_path} into {len(all_ids)} tokens")
ids_array = np.array(all_ids, dtype=np.uint16)
ids_array.tofile(output_dir/"CC_token_1.bin")                    

In [None]:
def tokenize_incremental(input_file_dict, output_file, batch_size=500):
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    files_list = list(input_file_dict.items())
    
    for batch_start in range(0, len(files_list), batch_size):
        batch_end = min(batch_start + batch_size, len(files_list))
        batch_files = files_list[batch_start:batch_end]
        
        print(f"Processing files {batch_start}-{batch_end}")
        
        # Load lines from all files in batch
        all_lines = []
        for file, line_ids in batch_files:
            with open(input_dir/file) as f:
                file_lines = f.readlines()
                texts = [json.loads(file_lines[i])["text"] for i in line_ids]
                all_lines.extend(texts)
        
        # Tokenize batch
        results = []
        for result in tqdm(
            pool.imap(tokenize_line_and_add_eos, all_lines, chunksize=100),
            total=len(all_lines),
            desc=f"Tokenizing batch {batch_start//batch_size + 1}"
        ):
            results.append(result)
        
        # Flatten and save incrementally
        all_ids = [token_id for sublist in results for token_id in sublist]
        ids_array = np.array(all_ids, dtype=np.uint16)
        
        # Append to file
        with open(output_file, 'ab') as f:
            ids_array.tofile(f)
        
        print(f"Saved batch {batch_start//batch_size + 1}, total tokens so far: {(batch_end * len(all_ids)) // len(batch_files)}")
    
    pool.close()
    pool.join()

### Find best combination of `b` and `r` given pre-defined `num_hashes`.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from IPython.display import HTML

# Configuration
total_hashes = 2400
s_values = np.linspace(0, 1, 1000)

# Different (r, b) combinations where r * b = total_hashes
combinations = []
for b in [10, 12, 15, 16, 20, 24, 25, 30, 40, 48, 50, 60, 75, 80, 100, 120, 150, 200, 240, 300, 400, 600, 1200]:
    r = total_hashes // b
    if r * b == total_hashes:
        combinations.append((r, b))

# Create figure
fig, ax = plt.subplots(figsize=(10, 6))
line, = ax.plot([], [], 'b-', linewidth=2)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.set_xticks(np.arange(0, 1.05, 0.05))
ax.set_xlabel('Similarity (s)', fontsize=12)
ax.set_ylabel('Probability of Collision', fontsize=12)
ax.grid(True, alpha=0.3)
title = ax.set_title('', fontsize=14)

def init():
    line.set_data([], [])
    return line, title

def animate(frame):
    r, b = combinations[frame]
    # Probability: 1 - (1 - s^r)^b
    prob = 1 - (1 - s_values**r)**b
    line.set_data(s_values, prob)
    title.set_text(f'MinHash LSH: r={r}, b={b} (total hashes = {r*b})')
    return line, title

anim = FuncAnimation(fig, animate, init_func=init, frames=len(combinations), 
                     interval=500, blit=True, repeat=True)

plt.close()
HTML(anim.to_jshtml())

### load and check duplicate clusters
- `metadata` maps to every line in every .jsonl file, values in `clusters` are item id in `metadata`

In [None]:
import pickle
cluster_file = "/home/azureuser/mount/duplicate_clusters.pkl"

with open(cluster_file, "rb") as f:
    all_clusters = pickle.load(f)

# choose a random element from each set
metadata = all_clusters["metadata"]
print(f"Total docs: {len(metadata)/1e6}M")
print(all_clusters["clusters"][:10])


In [None]:
import pandas as pd
from pathlib import Path

dir_cc_filt = Path("/home/azureuser/mount/CC-filtered")
for tid in [2, 115837]:
    jsonl_file = metadata[tid]['jsonl_file']
    line_id = metadata[tid]['line_id']
    df = pd.read_json(dir_cc_filt / jsonl_file, lines=True)
    print(df.iloc[line_id].text[:400])
    print("")