In [None]:
!pip install transformers datasets



In [None]:
import re
import random
from datasets import load_dataset, Dataset, DatasetDict
from transformers import T5Tokenizer
import pandas as pd
from typing import List, Dict, Optional
import nltk
from nltk.tokenize import sent_tokenize
import requests
import json
import numpy as np
import torch
from datasets import load_from_disk
from transformers import (
    T5Tokenizer,
    T5Config,
    T5ForConditionalGeneration,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
    DataCollatorForSeq2Seq
)
import torch.nn as nn
from transformers.modeling_outputs import BaseModelOutput

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
def create_unpunctuated_text(text: str) -> str:
    """Remove punctuation and capitalization from text"""
    # Remove punctuation
    text = re.sub(r'[^\w\s]', '', text)
    # Convert to lowercase
    text = text.lower()
    # Normalize whitespace
    text = re.sub(r'\s+', ' ', text).strip()
    return text


def add_extra_spaces(text: str) -> str:
    """Add extra spaces randomly"""
    words = text.split()
    result = []
    for word in words:
        result.append(word)
        if random.random() < 0.3:  # 30% chance to add extra space
            result.append(' ')
    return ' '.join(result)

def remove_some_spaces( text: str) -> str:
    """Remove some spaces randomly"""
    words = text.split()
    result = []
    for i, word in enumerate(words):
        result.append(word)
        if i < len(words) - 1 and random.random() < 0.2:  # 20% chance to remove space
            continue
        else:
            result.append(' ')
    return ''.join(result).strip()

def mix_case_randomly(text: str) -> str:
    """Randomly mix uppercase and lowercase"""
    return ''.join(c.upper() if random.random() < 0.1 else c.lower() for c in text)


def load_wikipedia_dataset(language: str = "en", max_samples: Optional[int] = 10000):
    nltk.download('punkt')
    nltk.download('punkt_tab')
    try:
        wiki_dataset = load_dataset("wikimedia/wikipedia", f"20231101.{language}", split="train", streaming=True)

        samples = []
        count = 0

        for example in wiki_dataset:
            if max_samples and count >= max_samples:
                break

            text = example['text']
            sentences = sent_tokenize(text)

            for sentence in sentences:
                if len(sentence.split()) > 5:
                    # Apply transformations sequentially
                    input_text = create_unpunctuated_text(sentence)
                    input_text = add_extra_spaces(input_text)
                    input_text = remove_some_spaces(input_text)
                    input_text = mix_case_randomly(input_text)

                    if input_text.strip() and input_text != sentence:
                        samples.append({
                            "input_text": f"normalize: {input_text}",
                            "target_text": sentence
                        })
                        count += 1

            if count % 1000 == 0:
                print(f"Processed {count} samples...")

        dataset = Dataset.from_list(samples)
        print(f" Created Wikipedia dataset with {len(dataset)} samples")
        return dataset

    except Exception as e:
        print(f"Error loading Wikipedia dataset: {e}")
        return None


wiki_dataset = load_wikipedia_dataset(max_samples=100000)
train_test_split = wiki_dataset.train_test_split(test_size=0.2, seed=42)

print(f"\nFinal dataset statistics:")
print(f"Training samples: {len(train_test_split['train'])}")
print(f"Validation samples: {len(train_test_split['test'])}")

# Save datasets
train_test_split['train'].save_to_disk("./large_text_normalization_train")
train_test_split['test'].save_to_disk("./large_text_normalization_val")

# Show sample
print("\nSample from dataset:")
for i in range(3):
    sample = train_test_split['train'][i]
    print(f"Input:  {sample['input_text']}")
    print(f"Output: {sample['target_text']}")
    print()


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


README.md: 0.00B [00:00, ?B/s]

Resolving data files:   0%|          | 0/41 [00:00<?, ?it/s]

Processed 39000 samples...
 Created Wikipedia dataset with 100034 samples

Final dataset statistics:
Training samples: 80027
Validation samples: 20007


Saving the dataset (0/1 shards):   0%|          | 0/80027 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/20007 [00:00<?, ? examples/s]


Sample from dataset:
Input:  normalize: most Of the islands remainedLargely unconTRolled and uNdefended making them apoteNtiAl oppoRtuNiTy FoR northWestern euroPeancountRies THat WAntEd to Break spAins mOnopolY on colonizing the new World
Output: Most of the islands remained largely uncontrolled and undefended, making them a potential opportunity for northwestern European countries that wanted to break Spain's monopoly on colonizing the New World.

Input:  normalize: he worKs hardto make anantiplague Serumbutas the ePidemic contInues heshows increasIng signS of wear andtear
Output: He works hard to make an antiplague serum, but as the epidemic continues, he shows increasing signs of wear and tear.

Input:  normalize: ancestry and immigration at tHe 2021 censuS the most commonlynominatedancestrIes were overseasborn adelaideans composed313 of the total population at tHe 2021 census
Output: Ancestry and immigration 

At the 2021 census, the most commonly nominated ancestries were: 

Over

In [None]:

train_dataset = load_from_disk("/content/large_text_normalization_train")
test_dataset  = load_from_disk("/content/large_text_normalization_val")


model_name = "t5-small"
tokenizer = T5Tokenizer.from_pretrained(model_name)
model = T5ForConditionalGeneration.from_pretrained(model_name)


def preprocess(examples):
    inputs = examples["input_text"]
    targets = examples["target_text"]

    model_inputs = tokenizer(inputs,
                             truncation=True, padding="max_length",max_length=256)

    with tokenizer.as_target_tokenizer():
        labels = tokenizer(targets,
                           truncation=True, padding="max_length",max_length=256)

    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

train_dataset = train_dataset.map(preprocess, batched=True)
test_dataset  = test_dataset.map(preprocess, batched=True)


data_collator = DataCollatorForSeq2Seq(tokenizer, model=model)


def compute_metrics(eval_pred):
    predictions, labels = eval_pred

    # Decode
    preds = tokenizer.batch_decode(predictions, skip_special_tokens=True)
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    refs = tokenizer.batch_decode(labels, skip_special_tokens=True)

    # Exact string match accuracy
    correct = sum(p.strip() == r.strip() for p, r in zip(preds, refs))
    acc = correct / len(preds)

    return {"accuracy": acc}


training_args = Seq2SeqTrainingArguments(
    output_dir="/content/drive/MyDrive/t5-text-normalization/",
    #evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=5e-5,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    weight_decay=0.01,
    save_total_limit=2,
    num_train_epochs=3,
    predict_with_generate=True,
    logging_dir="./logs",
    logging_steps=100,
    report_to="none"
)


trainer = Seq2SeqTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics
)

trainer.train()

print("Evaluating on test set...")
results = trainer.evaluate()
print(results)


tokenizer_config.json:   0%|          | 0.00/2.32k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

Map:   0%|          | 0/80027 [00:00<?, ? examples/s]



Map:   0%|          | 0/20007 [00:00<?, ? examples/s]

  trainer = Seq2SeqTrainer(


Step,Training Loss
100,2.7753
200,0.423
300,0.3452
400,0.3074
500,0.3044
600,0.2764
700,0.2675
800,0.2778
900,0.2785
1000,0.2549


Evaluating on test set...


{'eval_loss': 0.11599044501781464, 'eval_accuracy': 0.031189083820662766, 'eval_runtime': 1172.9672, 'eval_samples_per_second': 17.057, 'eval_steps_per_second': 2.132, 'epoch': 3.0}
