# Lecture 2: LLM Data Cleaning Tutorial: From Raw Data to Cleaned Corpus

This notebook demonstrates a complete pipeline for preparing LLM training data, covering both pretraining and fine-tuning datasets.

## Key Workflows:
### Environment Setup:
- Sets up Local or Remote Run with and without GPU

### Data Sourcing:
- Pretraining Data: Loads `upstage/Pretraining_Dataset` (RedPajama subset)
- Fine-tuning Data: Uses Alpaca-GPT4 instruction dataset
- Custom Data: Scrapes Python scripts from GitHub repositories

### Dataset Operations:
- Combines pretraining data with custom code examples
- Converts to Hugging Face `Dataset` format

### Data Cleaning Pipeline:
- Short text filtering (<3 lines)
- Intra-text repetition removal:
  - Paragraph-level duplicates (>30%)
  - Character-level duplicates (>20%)
- Exact document deduplication
- Language filtering (English-only using Lingua)

### Output:
- Saves cleaned dataset as JSON
- Preserves text structure while removing low-quality content


In [None]:
import os
import subprocess
from typing import Tuple

def setup_environment(curr_proj_folder: str = "pretraining-llms", google_drive_base_folder: str = "Colab Notebooks",\
                      run_remote: bool= True, use_gpu: bool = True) -> Tuple[str, bool]:
    """
    Sets up the environment for running code, handling local and remote execution.

    Args:
        curr_proj_folder (str, optional): Folder name of the current project. Defaults to "pretraining-llms".
        google_drive_base_folder (str, optional): Folder name of the Google drive base folder. Defaults to ""Colab Notebooks".
        use_gpu (bool, optional): Whether to use GPU if available. Defaults to True.

    Returns:
        Tuple[str,bool]: (computed path_to_scripts,mount_success status)
    """
    # Initialize mount status for Colab
    mount_success = False
    # Remote run code
    if run_remote:
      from google.colab import drive
      # Mount Google Drive
      drive.mount('/content/drive')
      # Check if the mount was successful
      if os.path.ismount('/content/drive'):
        print("Google Drive mounted successfully!")
        mount_success = True
      else:
        print("Drive mount failed.")
      # By Default, this is complete mount path
      mount_path = '/content/drive/MyDrive'

      # complete path to current files
      path_to_scripts = os.path.join(mount_path, google_drive_base_folder,curr_proj_folder)
      # Create the directory if it doesn't exist
      if not os.path.exists(path_to_scripts):
        os.makedirs(path_to_scripts)
        # change to the path
      os.chdir(path_to_scripts)
      print(f'Running code in path {os.getcwd()}')
    # Local Run
    else:
      path_to_scripts  = os.getcwd()
      # folder name provided as argument should match the one existing
      assert os.path.basename(path_to_scripts ) == curr_proj_folder, \
          f"Folder Name Mismatch: {os.path.basename(path_to_scripts )} != {curr_proj_folder}"
      print(f'Running code in path {path_to_scripts }')
    # check GPU usage
    if use_gpu:
      try:
        gpu_info = subprocess.check_output("nvidia-smi", shell=True).decode('utf-8')
        print("******GPU is available and will be used:**********")
        print(gpu_info)
      except subprocess.SubprocessError:
        print("GPU check failed (nvidia-smi not found or no GPU available). Falling back to CPU.")
        use_gpu = False  # Force CPU usage if GPU check fails
    else:
        print("******use_gpu is set to False. Using CPU******")
    return  path_to_scripts,mount_success


Always set following parameters before each run

In [None]:
# Project-specific configuration parameters
# Specifies the current project folder name
curr_proj_folder = "pretraining-llms"
# Base folder name in Google Drive where notebooks are stored
google_drive_base_folder = "Colab Notebooks"
# Flag to determine whether to use GPU for computations
use_gpu = True
# Flag to indicate remote execution environment
run_remote = False
# Flag to control model loading from a specific folder or through URL
load_model_from_folder = False


if run_remote:
  run_local = False
  run_local_usingColab = False
else:
  run_local = False
  run_local_usingColab = not run_local

# call method to setup environment
path_to_scripts,mount_success = setup_environment(curr_proj_folder = curr_proj_folder, \
                                   google_drive_base_folder =  google_drive_base_folder,\
                                    run_remote = run_remote, use_gpu = use_gpu)

## 2.1. Overview
- **Pre-training vs Fine-tuning Data**:
  - 🧠 **Pre-training:** Massive unstructured text (books, web content, code) for general language understanding
  - 🎯 **Fine-tuning:** High-quality Q&A pairs for specific tasks (traditionally human-made, now LLM-generated)

- **Critical Data Quality Improvement methods**:
  - *Remove duplicates* (**Duplicates biase models, waste resources**)
  -  *Language consistency*  (**filter non-target languages**)
  - *Toxicity/PII removal* (**critical for safety**)
  - *Maintain Structural quality* (**proper casing, punctuation**)
  - *Domain relevance* (High-quality domains like research papers, news sites, educational blogs) are **prioritized**, while low-quality sources (e.g., forums, SEO spam) are **downweighted** or excluded)
  - *Heuristics & rule-based filtering* (**remove spam, and irrelevant content** like  autogenerated or gibberish text).
  - *Perplexity-based filtering* (Use a small language model to remove incoherent or low-quality text.)

An Upstage tool called **Dataverse** can be utilized for data cleaning.  [this link](https://github.com/UpstageAI/dataverse).

## 2.2. Sourcing datasets for pretraining

In this section, we will see two ways to source data for training:
1. **Download an existing dataset from Hugging Face**
2. **Create a dataset of python scripts sourced from Github**

* In both cases the result will be a Hugging Face **Dataset object,** part of the **Datasets` library** [Hugging Face website](https://huggingface.co/docs/datasets/en/index).

  ### 2.2.1 Download data from Hugging face

The dataset to be downloaded  here is ***upstage/Pretraining_Dataset*** a subset of a much larger dataset called **Red Pajama**. [this link](https://huggingface.co/datasets/togethercomputer/RedPajama-Data-1T).

**Notes about dataset:**
* **RedPajama-Data-1T** is an open dataset **modeled after Meta's LLaMA pretraining dataset**, aiming to provide a diverse and **high-quality corpus for training foundation models**
* It consists of 1 trillion tokens collected from various sources, including books, Wikipedia, online discussions, and Common Crawl.
* **Common Crawl** refers to a **publicly available web archive that regularly scrapes the internet**, capturing vast amounts of text from websites.Common Crawl provides raw web text, which requires filtering and cleaning to remove low-quality, irrelevant, or duplicate content.The dataset aims to include diverse, high-quality sources for improved generalization in language models.



In [None]:
# Install this package if running locally
#!conda install -c huggingface -c conda-forge datasets
# to avoid conflicts: do this
# pip install datasets pyarrow==14.0.0
import warnings
warnings.filterwarnings("ignore")
import datasets

In [None]:

# load PretrainingDataset : use Training split portion of it
pretraining_dataset = datasets.load_dataset(
    "upstage/Pretraining_Dataset",
    split="train"
)
# print details about  dataset
features = pretraining_dataset.features.keys()
print(f"This is a Dataset with Number of Rows: {pretraining_dataset.num_rows} and Features: {list(features)}")

# Note, the dataset can be converted to Tabular format for
# pandas  style manipulations
#pretraining_dataset.set_format(type="pandas")

Only extract   `text` portion of dataset, ignoring the metadata

In [None]:
pretraining_dataset = pretraining_dataset.select_columns(
    ["text"]
)

Print a sample:

In [None]:
print(pretraining_dataset[0]["text"][:500])

### 2.2.2 Compare with fine-tuning datasets

 Summary of the Alpaca Fine-Tuned Model

1. **Overview and Purpose**:
   - Alpaca is a **fine-tuned instruction-following language model based on Meta LLaMA 7B model.**
   - It was trained on 52,000 instruction-following demonstrations generated using OpenAI text-davinci-003 via the self-instruct method.
   - The model  serves as an *academic alternative* to proprietary models like ChatGPT and text-davinci-003, which are not open for public use.
   - By being open and accessible, **it allows researchers to study how these models work, what they are good at, and where they fall short.**
   - In the  **Alpaca dataset**, about 60% of examples include an input field, while the remaining 40% only have instructions and outputs.  
      These are **instruction only tasks: tasks that  self-contained within the instruction**

2. **Training Process**:
   - The dataset was generated by prompting text-davinci-003 to create instructions and outputs, starting from a small seed set of 175 human-written examples.
   - Fine-tuning was performed using Hugging Face’s training framework with techniques like Fully Sharded Data Parallel and mixed precision training.
   
3. **Performance and Limitations**:
  
   - Despite its smaller size and lower training cost, Alpaca exhibits common issues like hallucination, toxicity, and stereotypes,  
     **making it unsuitable for general deployment without further safety measures**

[here](https://crfm.stanford.edu/2023/03/13/alpaca.html).

In [None]:
# load the dataset
instruction_dataset = datasets.load_dataset(
    "c-s-ale/alpaca-gpt4-data",
    split='train'
)
print(instruction_dataset)
# print first pair
i=0
print("Instruction: " + instruction_dataset[i]["instruction"]
      + "\nInput: " + instruction_dataset[i]["input"]
      + "\nOutput: " + instruction_dataset[i]["output"])

Notice how in contrast to the pretraining data, which is just raw text, **fine-tuning datasets are structured into question-answer pairs or instruction-response sets that can include additional input context if required**


### 2.2.3  Scraping Python Code and Preparing a Hugging Face Dataset

* Download Python scripts from GitHub.

* Process the scripts into a format suitable for training machine learning models.

* Convert the processed data into a Hugging Face Dataset object.

* Combine this Dataset with an existing pretraining dataset.


*  Step 1: Import Required Libraries

In [None]:
# Import some required packages
import os
import requests

*  Step 2 :Define the Directory and GitHub URLs

In [None]:
# Path to directory to store python scripts
code_dir = "./codeSavedForCreatingdSet"
# List of GitHub raw URLs for Python scripts
urls = [
    "https://raw.githubusercontent.com/TheAlgorithms/Python/master/searches/double_linear_search_recursion.py",
    "https://raw.githubusercontent.com/KosingZhu/tensorflow/master/tensorflow/python/tools/module_util.py",
    "https://raw.githubusercontent.com/EricRemmerswaal/tensorflow/master/tensorflow/python/distribute/distribute_coordinator_context.py",
    "https://raw.githubusercontent.com/computationalartist/tensorflow/master/tensorflow/python/ops/numpy_ops/integration_test/benchmarks/numpy_mlp.py",
    "https://raw.githubusercontent.com/Van-an/tensorflow/master/tensorflow/python/distribute/coordinator/values.py",
    "https://raw.githubusercontent.com/nkgwer/tensorflow/master/tensorflow/lite/tools/visualize.py",
    "https://raw.githubusercontent.com/gitblazer/youtube-dl/master/youtube_dl/version.py",
    "https://raw.githubusercontent.com/Joshua-Barawa/My-Photos/master/venv/lib/python3.8/site-packages/django/contrib/messages/__init__.py",
    "https://raw.githubusercontent.com/PaliC/pytorch/master/test/fx/test_subgraph_rewriter.py"
]

*  Step 3: Download Python Scripts from GitHub
We loop through each URL, download its content, and save it in the specified directory.

In [None]:
# Ensure the code directory exists
os.makedirs(code_dir, exist_ok=True)

for url in urls:
    print(f"Working on url: {url}")
    # Fetch content from URL
    response = requests.get(url)
    #  Extract file name from URL
    file_name = os.path.basename(url)
    #  Full path to save file
    file_path = os.path.join(code_dir, file_name)
    #  Save the content to a local file
    with open(file_path, "wb") as file:
        file.write(response.content)

*  Step 4: Read and Concatenate Scripts into a List:Once all scripts are downloaded, we read their contents and store them in a list.   
*  **Each entry in the list represents one script , where Storing each  Python scripts as plain `text` in the list ensures that they can be tokenized and fed into models for training**.



In [None]:
files = os.listdir(code_dir)
for file in files:
    print(file)

# Read all scripts into a list of dictionaries
code_dataset = []  # Initialize an empty list

 # Iterate through all files in the directory
for file_name in os.listdir(code_dir):
    #Full path of each file
    file_path = os.path.join(code_dir, file_name)  #

    # Read the content of each script and append it to the list as a dictionary
    # with text fields
    with open(file_path, 'r') as file:
        code_dataset.append({"text": file.read()})

In [None]:
# print content of one of the scripts
code_dataset[1]

### Step 5: Convert List to Hugging Face `Dataset` Object

In [None]:
# Convert list of dictionaries into a Hugging Face Dataset object
code_dataset = datasets.Dataset.from_list(code_dataset)

# Print dataset information for verification
print(code_dataset)

### Step 6: Combine with Pretraining Dataset
Finally, we combine our scraped code dataset with an existing pretraining dataset (from RedPajama-Data-1T that we loaded earlier).



In [None]:
# verify before combining that datasets have same feature names
print(pretraining_dataset.features)
print(code_dataset.features)
#extract feature name
assert(pretraining_dataset.features.keys() == code_dataset.features.keys(), "Features do not match")
# Combine code dataset with pretraining dataset
dataset_combined = datasets.concatenate_datasets([pretraining_dataset, code_dataset])
# note to avoid error:concat_tables() got an unexpected keyword argument 'promote_options'
# i was not able to update pyarrow beyond 11.0.0, while promote_options argument
# was introduced in pyarrow version 12.x
#solution: uninstall pyarrow, install with pip version 14.0.0. This will remove datasets,
# so, add datasets again


print(pretraining_dataset)
# Print combined dataset information for verification
print(dataset_combined)
nRows  = dataset_combined.num_rows
print(f"concatanated dataset has #rows: {dataset_combined.num_rows}")

In [None]:
print(dataset_combined[0]["text"][:3000])

## 2.4. Data cleaning

In the cells below, we 'll carry out the following cleaning steps:
1. Filter out samples that are too short
2. Remove repetitions within a single text example
3. Remove duplicated documents
4. Quality filter to remove non-English texts

### Remove examples that are too short
Short examples are not useful for pre-training

In [None]:
import heapq

def paragraph_length_filter(x):
    """
    Filters out pages that have too few lines or lines that are too short.

    Args:
        x (dict): A dictionary containing a 'text' key, where the value is a string of text.

    Returns:
        bool: Returns False if the page has fewer than 3 lines or if the 3 longest lines
              have any line shorter than 3 characters. Otherwise, returns True.
    """
    # Split the text into lines
    lines = x['text'].split('\n')

    # Check if there are fewer than 3 lines OR
    # If the shortest of the top 3 longest lines is less than 3 characters
    if (
        len(lines) < 3
        #or min(heapq.nlargest(3, [len(line) for line in lines])) < 3
    ):
       # Filter out this page (does not meet criteria)
        return False
    return True  # Keep this page (meets criteria)


In [None]:
 #apply this filter now and check some text has been removed
# Setting load_from_cache_file=False forces the filter operation to
#recompute results rather than using cached data.
#  instead of using potentially outdated cached results.
dataset_combined = dataset_combined.filter(paragraph_length_filter, load_from_cache_file=False)

print(f"concatenated dataset now has #rows: {dataset_combined.num_rows} with #short examples removed :{nRows - dataset_combined.num_rows}")
nRows  = dataset_combined.num_rows

### Remove repeated text within training examples
* Finding Duplicates in Paragraphs():**identifies repeated paragraphs** and **calculates the number of duplicate characters and elements.**

* Filtering Text Based on Repetition():**Determines whether a given text contains too many repeated paragraphs or characters. If the repetition exceeds certain thresholds, the text is excluded**.

* Applying the Filter to a Dataset()
The dataset is filtered using the paragraph_repetition_filter function, ensuring only high-quality, non-repetitive text data remains.

In [None]:
# Function to find duplicate paragraphs and calculate repetition metrics
def find_duplicates(paragraphs):
    """
    Use this function to find the number of repetitions in the paragraphs.
    """
    # Set to store unique paragraphs
    unique_x = set()

    # Variables to count duplicate characters and elements
    duplicate_chars = 0
    duplicate_elements = 0

    # Iterate through each paragraph
    for element in paragraphs:
        # If paragraph is already in unique set, count it as duplicate
        if element in unique_x:
            duplicate_chars += len(element)  # Count characters in duplicate paragraph
            duplicate_elements += 1         # Increment duplicate paragraph count
        else:
            # Add new paragraph to the set
            unique_x.add(element)

    # Return counts of duplicate elements and characters
    return duplicate_elements, duplicate_chars


# Importing regular expressions library for text processing
import re

# Function to filter out pages with excessive repetitions
def paragraph_repetition_filter(x):
    """
    Returns False if a page has too many repetitions.
    """
    # Extract text from input dictionary
    text = x['text']

    # Split text into paragraphs using double newlines as delimiter
    paragraphs = re.compile(r"\n{2,}").split(text.strip())

    # Find duplicates in the paragraphs
    paragraphs_duplicates, char_duplicates = find_duplicates(paragraphs)

    # Filter out text if too many duplicate paragraphs (threshold: >30%)
    if paragraphs_duplicates / len(paragraphs) > 0.3:
        return False

    # Filter out text if too many duplicate characters (threshold: >20%)
    if char_duplicates / len(text) > 0.2:
        return False

    # Return True if text passes both filters
    return True


In [None]:
# Apply the filter function to a dataset
dataset_combined = dataset_combined.filter(
    paragraph_repetition_filter,  # Filtering function for dataset
    load_from_cache_file=False , # Option to reload dataset from cache or not
)



In [None]:
 print(f"concatenated dataset now has #rows: {dataset_combined.num_rows} with #repeated text rows removed :{nRows - dataset_combined.num_rows}")
# nRows  = dataset_combined.num_rows

### Deduplication
**Remove duplicate examples from the entire dataset** (in contrast to the previous step where we  were  just looking for repeated text in each example.)

In [None]:
# Function to remove exact duplicates from the dataset
def deduplication(dataset):
    """
    Removes exact duplicates from the dataset.
    """
    seen_texts = set()  # Tracks unique texts

    def is_unique(example):
        """
        Checks if the example's text is unique.
        """
        text = example['text']
        if text in seen_texts:  # If text is already seen, it's a duplicate
            return False
        seen_texts.add(text)  # Add new unique text to the set
        return True

    return dataset.filter(is_unique, load_from_cache_file=False)


# Apply deduplication to the dataset
dataset_combined = deduplication(dataset_combined)
print(f"concatenated dataset now has #rows: {dataset_combined.num_rows} with #entire dataset duplication  rows removed :{nRows - dataset_combined.num_rows}")
nRows  = dataset_combined.num_rows

In [None]:
# # extract random % of dataset
sample_size = 0.1
dataset_combined_toSave = dataset_combined.train_test_split(test_size=sample_size)["test"]
dataset_combined_toSave.num_rows

In [None]:
print(dataset_combined_toSave[0]["text"][:100])

In [None]:
from lingua import Language,LanguageDetectorBuilder

### Quality filter - Language

Remove any text examples that are in a language other than English. 

In [None]:
from tqdm.auto import tqdm
import time

def english_language_filter(ds):
    """
    Filters the dataset to keep only English text examples using Lingua.
    """
    print(f"Starting language filtering on dataset with {len(ds)} examples")
    start_time = time.time()

    # Create a detector with English and a few common languages for comparison
    print("Building language detector...")
    detector = LanguageDetectorBuilder.from_languages(
        Language.ENGLISH,
        Language.FRENCH
    ).build()
    print("Language detector built successfully")

    # Track filtered examples
    total_processed = 0
    english_count = 0
    non_english_count = 0
    error_count = 0

    def is_english(x):
        """
        Checks if the text in the dataset entry is in English.
        """
        nonlocal total_processed, english_count, non_english_count, error_count

        # Print progress every 100 examples
        total_processed += 1
        if total_processed % 100 == 0:
            elapsed = time.time() - start_time
            print(f"Processed: {total_processed}, English: {english_count}, Non-English: {non_english_count}, Errors: {error_count}, Time elapsed: {elapsed:.2f}s")

        try:

            # Detect if the language is English
            detected_language = detector.detect_language_of(x['text'])
            is_eng = detected_language == Language.ENGLISH

            if is_eng:
                english_count += 1
            else:
                non_english_count += 1
            return is_eng
        except Exception as e:
            error_count += 1
            # Show first few errors
            if error_count <= 5:
                print(f"Error processing text: {str(e)}")
            return False

    print("Starting filtering process...")
    # Filter the dataset to retain only English examples
    ds_filtered = ds.filter(is_english, load_from_cache_file=False, num_proc=1)

    # Final report
    end_time = time.time()
    total_time = end_time - start_time
    print(f"\nFiltering complete:")
    print(f"Original dataset size: {len(ds)}")
    print(f"Filtered dataset size: {len(ds_filtered)}")
    print(f"English texts found: {english_count}")
    print(f"Non-English texts filtered out: {non_english_count}")
    print(f"Errors encountered: {error_count}")
    print(f"Total processing time: {total_time:.2f} seconds")

    return ds_filtered

# Apply the filter to your dataset
print("Starting English language filtering...")
dataset_combined_toSave = english_language_filter(dataset_combined_toSave)
print(f"Final dataset size: {dataset_combined_toSave.num_rows}")

## 3. Save the dataset to disk

Save the data in Json format

In [None]:
save_dir = ".//saved_pretrain_cleaned_data"
os.makedirs(code_dir, exist_ok=True)
file_path = os.path.join(save_dir,"preprocessed_dataset.json")
dataset_combined_toSave.to_json(file_path)
if os.path.exists(file_path):
    print(f"File '{file_path}' created successfully.")
else:
    print(f"File '{file_path}' creation failed.")

In [None]:
if run_remote  :
  from google.colab import drive
  drive.flush_and_unmount()
  print('All changes made in this colab session should now be visible in Drive.')