In [None]:
import tqdm
from datasets import concatenate_datasets, load_dataset
from nemo_curator.datasets.doc_dataset import DocumentDataset
from nemo_curator.utils.config_utils import build_filter_pipeline
from dask.distributed import Client, LocalCluster
import pandas as pd
from dask_cuda import LocalCUDACluster
import numpy as np
import os
import datetime
import time
from nemo_curator import Modify, ScoreFilter, Sequential
from nemo_curator.modifiers import UnicodeReformatter



In [None]:
# Load the dataset
ds = load_dataset("BramVanroy/CommonCrawl-CreativeCommons", "CC-MAIN-2025-05-eng") 

# Set the paths for the curation settings
basic_config = "../config/basic_curation.yaml"
c4_config = "../config/c4_curation.yaml"
full_config = "../config/full_curation.yaml"

In [None]:
# Convert DatasetDict to pandas DataFrame
df = pd.DataFrame(ds['train'])
# Rename language field to en 
df['language'] = "en"  

In [None]:
# Sample 1.000.000 random lines from the dataset and split into 10 subsets
parts = []
random_sample = df.sample(1000000)
for i in range(10):
    df_shuffled = random_sample.sample(frac=1).reset_index(drop=True)
    samples = np.array_split(df_shuffled, 10)
    for j in range(10):
        parts.append(samples[j])

In [None]:
# To save the processed dataset
random_sample.to_parquet(f"cc-main-2025-05-eng-part.parquet", engine='pyarrow', index=False)


In [None]:
#To load the saved dataset and split into 10 subsets
df = pd.read_parquet(f"cc-main-2025-05-eng-part.parquet", engine='pyarrow')
parts = []
df_shuffled = df.sample(frac=1).reset_index(drop=True)
samples = np.array_split(df_shuffled, 10)
for j in range(10):
    parts.append(samples[j])

In [None]:
# Preprocessing pipeline to detect and fix unicode problems
initial_curation = Modify(UnicodeReformatter())

#For each part, create a DocumentDataset
ds_parts = []
times = []

for i, part in enumerate(parts):
    print("Processing part ", i)
    ds_part = DocumentDataset.from_pandas(part)
    t0 = time.time()
    cleaned_dataset = initial_curation(ds_part)
    cleaned_output = cleaned_dataset.df.compute()
    part = DocumentDataset.from_pandas(cleaned_output)
    times.append(f"Unicode reformatting took {time.time() - t0} s")
    ds_parts.append(part)

In [None]:
basics = []
c4 = []
full = []

for i, part in enumerate(ds_parts):
    print(f"Processing chunk {i+1} of {len(ds_parts)}")
    t0 = time.time()

    # construct pipeline from config
    basic_pipeline = build_filter_pipeline(basic_config)

    print("Running basic pipeline...")
    # filter data and write to disk
    basic_dataset = basic_pipeline(part)
    basic = basic_dataset.df.compute()
    basics.append(len(basic))
    t0f = time.time()

    times.append(t0f - t0)

    print("Running C4 pipeline...")
    t1 = time.time()
    c4_pipeline = build_filter_pipeline(c4_config)
    c4_dataset = c4_pipeline(part)
    c4_data = c4_dataset.df.compute()
    c4.append(len(c4_data))
    t1f = time.time()
    times.append(t1f - t1)

    print("Running full pipeline...")
    t2 = time.time()
    full_pipeline = build_filter_pipeline(full_config)
    full_dataset = full_pipeline(part)
    full_data = full_dataset.df.compute()
    full.append(len(full_data))
    t2f = time.time()
    times.append(t2f - t2)



In [None]:
# Create a dataframe with the current experiment results
results_df = pd.DataFrame({
    'Basic': basics,
    'C4': c4, 
    'Full': full
})

# Add percentages relative to original chunk sizes
original_sizes = [len(part) for part in parts]
results_df['Original_Size'] = original_sizes
results_df['Basic_Percentage'] = (results_df['Basic'] / results_df['Original_Size'] * 100).round(2)
results_df['C4_Percentage'] = (results_df['C4'] / results_df['Original_Size'] * 100).round(2)
results_df['Full_Percentage'] = (results_df['Full'] / results_df['Original_Size'] * 100).round(2)
results_df['Set'] = [i+1 for i in range(len(basics))]

# Add totals row for this experiment
totals = pd.DataFrame({
    'Set': ['Total'],
    'Original_Size': [sum(original_sizes)],
    'Basic': [sum(basics)],
    'Basic_Percentage': [(sum(basics) / sum(original_sizes) * 100)],
    'C4': [sum(c4)],
    'C4_Percentage': [(sum(c4) / sum(original_sizes) * 100)],
    'Full': [sum(full)],
    'Full_Percentage': [(sum(full) / sum(original_sizes) * 100)],
})

results_df = pd.concat([results_df, totals], ignore_index=True)

# Rearrange columns for better readability
results_df = results_df[['Set', 'Original_Size', 'Basic', 'Basic_Percentage', 
                          'C4', 'C4_Percentage', 'Full', 'Full_Percentage']]

# Check if results file exists
results_path = './results.csv'
if os.path.exists(results_path):
    # Load existing results and append new ones
    existing_results = pd.read_csv(results_path)
    combined_results = pd.concat([existing_results, results_df], ignore_index=True)
    combined_results.to_csv(results_path, index=False)
    print(f"Results appended to {os.path.abspath(results_path)}")
else:
    # Create new results file
    results_df.to_csv(results_path, index=False)
    print(f"New results file created at {os.path.abspath(results_path)}")

In [None]:
# Experimenting with the results
import random
random_samples = basic.sample(10)
print("Random samples from the basic dataset:")
print(random_samples['text'])

# Selects 10 samples removed from the basic dataset
not_basic_set = df_shuffled[~df_shuffled['text'].isin(basic['text'])].sample(10)
print("Random samples from the original dataset that are not in the basic dataset:")
print(not_basic_set['text'])

# Selects 10 samples removed from the c4 dataset
not_c4_set = df_shuffled[~df_shuffled['text'].isin(c4_data['text'])].sample(10)

# Selects 10 samples removed from the c4 dataset that were not removed in the basic dataset
not_c4_samples = not_c4_set[~not_c4_set['text'].isin(not_basic_set['text'])].sample(10)
print("Random samples from the original dataset that are not in the c4 dataset:")
print(not_c4_samples['text'])

# Selects 10 samples removed from the full dataset
not_full_set = df_shuffled[~df_shuffled['text'].isin(full_data['text'])].sample(10)
# Selects 10 samples removed from the full dataset that were not removed in the c4 dataset
not_full_samples = not_full_set[~not_full_set['text'].isin(not_c4_set['text'])].sample(10)
print("Random samples from the original dataset that are not in the c4 dataset:")
print(not_full_samples['text'])

#save not_basic_set, not_c4_samples and not_full_samples in a single csv file
not_basic_set.to_csv("not_basic_set.csv", index=False)
not_c4_samples.to_csv("not_c4_samples.csv", index=False)
not_full_samples.to_csv("not_full_samples.csv", index=False)
#save all samples in a single csv file
all_samples = pd.concat([random_samples, not_basic_set, not_c4_samples, not_full_samples], ignore_index=True)
all_samples.to_csv("all_samples.csv", index=False)
