In [4]:
pip install --upgrade datasets

ERROR! Session/line number was not unique in database. History logging moved to new session 254
Defaulting to user installation because normal site-packages is not writeable
Collecting datasets
  Using cached datasets-3.1.0-py3-none-any.whl.metadata (20 kB)
Collecting tqdm>=4.66.3 (from datasets)
  Downloading tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting huggingface-hub>=0.23.0 (from datasets)
  Using cached huggingface_hub-0.26.3-py3-none-any.whl.metadata (13 kB)
Using cached datasets-3.1.0-py3-none-any.whl (480 kB)
Using cached huggingface_hub-0.26.3-py3-none-any.whl (447 kB)
Downloading tqdm-4.67.1-py3-none-any.whl (78 kB)
Installing collected packages: tqdm, huggingface-hub, datasets
  Attempting uninstall: tqdm
    Found existing installation: tqdm 4.49.0
    Uninstalling tqdm-4.49.0:
      Successfully uninstalled tqdm-4.49.0
  Attempting uninstall: huggingface-hub
    Found existing installation: huggingface-hub 0.0.12
    Uninstalling huggingface-hub-0.0.12:
      S

In [3]:
pip install --upgrade transformers

Defaulting to user installation because normal site-packages is not writeable
Collecting transformers
  Using cached transformers-4.46.3-py3-none-any.whl.metadata (44 kB)
Collecting tokenizers<0.21,>=0.20 (from transformers)
  Downloading tokenizers-0.20.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.7 kB)
Using cached transformers-4.46.3-py3-none-any.whl (10.0 MB)
Downloading tokenizers-0.20.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m64.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tokenizers, transformers
  Attempting uninstall: tokenizers
    Found existing installation: tokenizers 0.10.3
    Uninstalling tokenizers-0.10.3:
      Successfully uninstalled tokenizers-0.10.3
  You can safely remove it manually.[0m[33m
[0m  Attempting uninstall: transformers
    Found existing installation: transformers 4.8.1
    Uninstalling transf

In [5]:
import pandas as pd
import pyarrow.parquet as pq
import json
import re
from datetime import datetime
from typing import List, Dict, Any
from tqdm import tqdm

class EmailParquetPreprocessor:
    def __init__(self):
        """Initialize the email preprocessor for parquet files"""
        self.df = None
        self.processed_threads = []

    def load_parquet(self, file_path: str) -> None:
        """Load email threads from a parquet file."""
        try:
            self.df = pd.read_parquet(file_path)
            print(f"Loaded {len(self.df)} email threads from parquet file")
        except Exception as e:
            raise ValueError(f"Error loading parquet file: {e}")

    @staticmethod
    def clean_body(body: str) -> str:
        """Clean and format the email body text."""
        if not isinstance(body, str):
            return ""

        # Remove quoted messages and header details
        body = re.sub(r'-{3,}Original Message-{3,}.*?(?=\n\n|\Z)', '', body, flags=re.DOTALL)
        body = re.sub(r'(From|Sent|To|Subject):.*?\n', '', body)

        # Handle encoding artifacts and whitespace
        body = re.sub(r'=\s*\n', '', body)   # Remove soft line breaks
        body = re.sub(r'=\d{2}', '', body)  
        return re.sub(r'\s+', ' ', body).strip()

    @staticmethod
    def extract_sender_name(from_field: str) -> str:
        """Extract sender's name, reformatting 'Last, First' to 'First Last'."""
        if not isinstance(from_field, str):
            return ""

        match = re.match(r'([^<]+)', from_field)
        if match:
            name = match.group(1).strip()
            parts = name.split(',')
            return f"{parts[1].strip()} {parts[0].strip()}" if len(parts) == 2 else name
        return from_field

    def process_single_thread(self, thread_data: Dict[str, Any]) -> Dict[str, Any]:
        """Process a single email thread into a structured format."""
        structured_thread = {
            'subject': thread_data.get('subject', '').strip(),
            'messages': []
        }

        for msg in thread_data.get('messages', []):
            # Default to an empty string if the timestamp is missing or not a string
            timestamp = msg.get('timestamp', '')
            if isinstance(timestamp, datetime):
                try:
                    formatted_timestamp = timestamp.strftime('%Y-%m-%d %H:%M:%S')
                except ValueError:
                    # If timestamp is not in ISO format, skip this message
                    formatted_timestamp = 'Invalid Timestamp'
            else:
                formatted_timestamp = 'Invalid Timestamp'

            cleaned_msg = {
                'timestamp': formatted_timestamp,
                'sender': self.extract_sender_name(msg.get('from', '')),
                'recipients': [self.extract_sender_name(recipient) for recipient in msg.get('to', [])],
                'body': self.clean_body(msg.get('body', ''))
            }
            structured_thread['messages'].append(cleaned_msg)

        # Sort messages by timestamp (excluding any with 'Invalid Timestamp' if desired)
        structured_thread['messages'].sort(key=lambda x: x['timestamp'] if x['timestamp'] != 'Invalid Timestamp' else '9999-12-31 23:59:59')

        # Create summary input format
        structured_thread['summary_input'] = self.format_for_summary(structured_thread)     
        return structured_thread


    @staticmethod
    def format_for_summary(thread: Dict[str, Any]) -> str:
        """Format structured thread data into a summarization-ready format."""
        summary_input = f"Subject: {thread['subject']}\n\n"

        for msg in thread['messages']:
            summary_input += f"[{msg['timestamp']}] {msg['sender']}:\n{msg['body']}\n\n"

        return summary_input.strip()

    def process_all_threads(self) -> List[Dict[str, Any]]:
        """Process all email threads loaded from the parquet file."""
        if self.df is None:
            raise ValueError("No parquet file loaded. Call load_parquet() first.")

        self.processed_threads = []

        for idx, row in tqdm(self.df.iterrows(), total=len(self.df), desc="Processing threads"):
            try:
                thread_data = json.loads(row['thread']) if isinstance(row['thread'], str) else row['thread']
                
                processed_thread = self.process_single_thread(thread_data)
                processed_thread['summary'] = row.get('summary', '') 
                
                self.processed_threads.append(processed_thread)

            except Exception as e:
                print(f"Error processing thread at index {idx}: {e}")
                continue

        return self.processed_threads

    def save_processed_threads(self, output_path: str) -> None:
        """Save processed threads to a new parquet file."""
        if not self.processed_threads:
            raise ValueError("No processed threads to save. Run process_all_threads() first.")

        processed_df = pd.DataFrame([{
            'subject': thread['subject'],
            'summary_input': thread['summary_input'],
            'processed_messages': json.dumps(thread['messages']),
            'summary' : thread['summary']
        } for thread in self.processed_threads])

        processed_df.to_parquet(output_path, index=False)
        print(f"Saved {len(processed_df)} processed threads to {output_path}")


In [6]:
preprocessor = EmailParquetPreprocessor()

# Load parquet file
preprocessor.load_parquet('train-00000-of-00001-41f2ca6bce8b68f8.parquet')

# Process all threads
processed_threads = preprocessor.process_all_threads()

# Save processed results
preprocessor.save_processed_threads('processed_threads.parquet')

# Print sample results
print("\nSample processed thread:")
sample_thread = processed_threads[0]
print(f"Subject: {sample_thread['subject']}")
print(f"Number of messages: {len(sample_thread['messages'])}")
print("\nSample summary input:")
print(sample_thread['summary_input'][:500] + "...")
print(sample_thread)

Loaded 3750 email threads from parquet file


Processing threads: 100%|██████████| 3750/3750 [00:02<00:00, 1867.20it/s]


Saved 3750 processed threads to processed_threads.parquet

Sample processed thread:
Subject: FW: Master Termination Log
Number of messages: 5

Sample summary input:
Subject: FW: Master Termination Log

[2002-01-29 11:23:42] Jeffrey C. Gossett:
Attached is the Daily Termination List for January 25 as well as the Master Termination Log, which incorporates all terminations received through January 25. The following were previously on the Master Termination Log and have now been marked as "Y" for a valid termination: Atlantic Coast Fibers, Inc.ENApulp/paper transactions CNC-Containers CorporationEPMImaster power agreement Public Utility District No. 1 of Chelan...
{'subject': 'FW: Master Termination Log', 'messages': [{'timestamp': '2002-01-29 11:23:42', 'sender': 'Jeffrey C. Gossett', 'recipients': ['Giron', 'Darron C.', 'Love', 'Phillip M.'], 'body': 'Attached is the Daily Termination List for January 25 as well as the Master Termination Log, which incorporates all terminations received 

In [1]:
class Config:
    RANDOM_SEED = 42
    MAX_INPUT_LENGTH = 512
    OUTPUT_DIR = './pegasus_results'
    SAVED_DIR = './saved_pegasus_model'

In [7]:
import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import PegasusTokenizer, PegasusForConditionalGeneration, Trainer, TrainingArguments, Adafactor, EarlyStoppingCallback
from datasets import Dataset
import torch
import optuna
import os


def load_and_preprocess_data(file_path):
    peg_df = pd.read_parquet(file_path)
    peg_df = peg_df.dropna(subset=['summary_input', 'summary'])
    train_df, val_df = train_test_split(
        peg_df,
        test_size=0.2,
        random_state=Config.RANDOM_SEED
    )
    return train_df, val_df

def preprocess_function(tokenizer, threads):
    model_inputs = tokenizer(
        threads['summary_input'],
        max_length=Config.MAX_INPUT_LENGTH,
        truncation=True,
        padding='max_length'
    )
    labels = tokenizer(
        text_target=examples['summary'],
        max_length=Config.MAX_INPUT_LENGTH,
        truncation=True,
        padding='max_length'
    )
    model_inputs['labels'] = labels['input_ids']
    return model_inputs


# Load data
train_df, val_df = load_and_preprocess_data('processed_threads.parquet')

# Convert to datasets
train_dataset = Dataset.from_pandas(train_df[['summary_input', 'summary']])
val_dataset = Dataset.from_pandas(val_df[['summary_input', 'summary']])

# Load tokenizer and model
model_name = "google/pegasus-xsum"
tokenizer = PegasusTokenizer.from_pretrained(model_name)
model = PegasusForConditionalGeneration.from_pretrained(model_name)

# Tokenize datasets
train_tokenized_dataset = train_dataset.map(lambda x: preprocess_function(tokenizer, x), batched=True, num_proc=max(2, os.cpu_count()-1))
val_tokenized_dataset = val_dataset.map(lambda x: preprocess_function(tokenizer, x), batched=True, num_proc=max(2, os.cpu_count()-1))


def objective(trial):
    # Hyperparameters
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 5e-4, log=True)
    num_train_epochs = trial.suggest_int('num_train_epochs', 3, 10)
    per_device_train_batch_size = trial.suggest_categorical('per_device_train_batch_size', [2, 4])
    gradient_accumulation_steps = trial.suggest_int('gradient_accumulation_steps', 4, 8)

    # Define training arguments
    training_args = TrainingArguments(
        output_dir=Config.OUTPUT_DIR,
        eval_strategy='epoch',
        logging_strategy='steps',
        logging_steps=10,
        save_strategy='epoch',
        learning_rate=learning_rate,
        per_device_train_batch_size=per_device_train_batch_size,
        per_device_eval_batch_size=2,
        num_train_epochs=num_train_epochs,
        weight_decay=0.01,
        report_to='tensorboard',
        fp16=torch.cuda.is_available() and torch.cuda.get_device_capability()[0] < 8,
        bf16=torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 8,
        save_total_limit=3,
        load_best_model_at_end=True,
        metric_for_best_model='eval_loss',
        greater_is_better=False,
        gradient_accumulation_steps=gradient_accumulation_steps,
        dataloader_num_workers=10,
        gradient_checkpointing=True,
        max_grad_norm=1.0,
        remove_unused_columns=True,
        dataloader_pin_memory=True,
        skip_memory_metrics=True,
        lr_scheduler_type='linear'
    )

    optimizer = Adafactor(model.parameters(), lr=learning_rate, scale_parameter=False, relative_step=False)

    # Initialize trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        optimizers=(optimizer, None),
        train_dataset=train_tokenized_dataset,
        eval_dataset=val_tokenized_dataset,
        data_collator=None,
        callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
    )

    # Start training
    trainer.train()

    # Evaluate the model
    eval_result = trainer.evaluate()
    return eval_result['eval_loss']


study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10)

# Save the best model and tokenizer
best_trial = study.best_trial
print(f"Best trial: {best_trial.params}")
model.save_pretrained(Config.SAVED_DIR)
tokenizer.save_pretrained(Config.SAVED_DIR)

Some weights of PegasusForConditionalGeneration were not initialized from the model checkpoint at google/pegasus-xsum and are newly initialized: ['model.decoder.embed_positions.weight', 'model.encoder.embed_positions.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Map (num_proc=63):   0%|          | 0/3000 [00:00<?, ? examples/s]

Map (num_proc=63):   0%|          | 0/750 [00:00<?, ? examples/s]

[I 2024-11-28 14:14:00,032] A new study created in memory with name: no-name-834d681b-3919-4455-9bea-96cc9aac78b7
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
0,0.4814,0.401836
2,0.374,0.359848
4,0.3393,0.350874
6,0.3153,0.348124
8,0.3068,0.347474


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 15:44:07,321] Trial 0 finished with value: 0.3474743366241455 and parameters: {'learning_rate': 7.293807020845046e-05, 'num_train_epochs': 9, 'per_device_train_batch_size': 4, 'gradient_accumulation_steps': 4}. Best is trial 0 with value: 0.3474743366241455.
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
0,0.3211,0.350169
1,0.2713,0.347738
2,0.267,0.352805
4,0.2484,0.359742


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 16:32:26,289] Trial 1 finished with value: 0.3477378785610199 and parameters: {'learning_rate': 0.0003574032313977492, 'num_train_epochs': 5, 'per_device_train_batch_size': 4, 'gradient_accumulation_steps': 8}. Best is trial 0 with value: 0.3474743366241455.
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
1,0.2506,0.349033
2,0.2525,0.35043
3,0.2374,0.352051
4,0.2426,0.352687


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 17:18:14,700] Trial 2 finished with value: 0.3490328788757324 and parameters: {'learning_rate': 3.554967218658174e-05, 'num_train_epochs': 6, 'per_device_train_batch_size': 2, 'gradient_accumulation_steps': 6}. Best is trial 0 with value: 0.3474743366241455.
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
0,0.2314,0.370013
2,0.2118,0.376208
4,0.1972,0.390606


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 17:58:30,555] Trial 3 finished with value: 0.37001264095306396 and parameters: {'learning_rate': 0.00026902299270575375, 'num_train_epochs': 8, 'per_device_train_batch_size': 4, 'gradient_accumulation_steps': 4}. Best is trial 0 with value: 0.3474743366241455.
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
0,0.1425,0.427181
1,0.1701,0.390509
2,0.2151,0.383052
3,0.1881,0.389141
4,0.1994,0.398907
5,0.1657,0.405254


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 18:56:54,153] Trial 4 finished with value: 0.38305187225341797 and parameters: {'learning_rate': 0.00019634610560278755, 'num_train_epochs': 8, 'per_device_train_batch_size': 4, 'gradient_accumulation_steps': 7}. Best is trial 0 with value: 0.3474743366241455.
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
1,0.2064,0.381291
2,0.1873,0.399279
3,0.1543,0.417668


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 19:32:01,592] Trial 5 finished with value: 0.3812909424304962 and parameters: {'learning_rate': 0.00024100187605160443, 'num_train_epochs': 3, 'per_device_train_batch_size': 2, 'gradient_accumulation_steps': 5}. Best is trial 0 with value: 0.3474743366241455.
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
1,0.1165,0.480528
2,0.1712,0.411397
3,0.1585,0.417872
4,0.1623,0.418314
5,0.156,0.421493


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 20:29:18,921] Trial 6 finished with value: 0.4113965332508087 and parameters: {'learning_rate': 4.776761237143552e-05, 'num_train_epochs': 5, 'per_device_train_batch_size': 2, 'gradient_accumulation_steps': 6}. Best is trial 0 with value: 0.3474743366241455.
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
1,0.103,0.55744
2,0.1553,0.446429
3,0.1472,0.442688
4,0.1408,0.446483
5,0.1304,0.472627
6,0.1225,0.482697


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 21:38:16,286] Trial 7 finished with value: 0.4426884353160858 and parameters: {'learning_rate': 0.00014126347053151992, 'num_train_epochs': 10, 'per_device_train_batch_size': 2, 'gradient_accumulation_steps': 5}. Best is trial 0 with value: 0.3474743366241455.
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
1,0.0713,0.55804
2,0.0849,0.559635
3,0.1081,0.534447
4,0.1228,0.487157
5,0.1166,0.495355
6,0.1091,0.502011
7,0.1048,0.504904


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 22:46:21,917] Trial 8 finished with value: 0.4871566593647003 and parameters: {'learning_rate': 8.625851358918672e-05, 'num_train_epochs': 8, 'per_device_train_batch_size': 4, 'gradient_accumulation_steps': 6}. Best is trial 0 with value: 0.3474743366241455.
Detected kernel version 4.18.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss
1,0.0521,0.633065
2,0.0622,0.69576
3,0.0777,0.634952
4,0.0893,0.586006
5,0.0942,0.547559
6,0.0838,0.561485


There were missing keys in the checkpoint model loaded: ['model.encoder.embed_tokens.weight', 'model.decoder.embed_tokens.weight', 'lm_head.weight'].


[I 2024-11-28 23:44:54,962] Trial 9 finished with value: 0.5475591421127319 and parameters: {'learning_rate': 0.00027850828310803936, 'num_train_epochs': 6, 'per_device_train_batch_size': 4, 'gradient_accumulation_steps': 6}. Best is trial 0 with value: 0.3474743366241455.


Best trial: {'learning_rate': 7.293807020845046e-05, 'num_train_epochs': 9, 'per_device_train_batch_size': 4, 'gradient_accumulation_steps': 4}


('./saved_pegasus_model/tokenizer_config.json',
 './saved_pegasus_model/special_tokens_map.json',
 './saved_pegasus_model/spiece.model',
 './saved_pegasus_model/added_tokens.json')

In [7]:
# Load test parquet file
preprocessor.load_parquet('test-00000-of-00001-cd15b40aacd3c33e.parquet')

# Process all threads
processed_test_threads = preprocessor.process_all_threads()

# Save processed results
preprocessor.save_processed_threads('processed_test_threads.parquet')

Loaded 417 email threads from parquet file


Processing threads: 100%|██████████| 417/417 [00:00<00:00, 1914.50it/s]

Saved 417 processed threads to processed_test_threads.parquet





In [30]:
import os
import pandas as pd
from tqdm import tqdm
import torch
from transformers import PegasusTokenizer, PegasusForConditionalGeneration

# Load the processed data
test_df = pd.read_parquet('processed_test_threads.parquet')

# Check for NaN values and drop them
test_df = test_df.dropna(subset=['summary_input'])

# Load the saved model and tokenizer
model = PegasusForConditionalGeneration.from_pretrained('./saved_pegasus_model')
tokenizer = PegasusTokenizer.from_pretrained('./saved_pegasus_model')

# Generate summaries for the test data
def generate_summary(threads):
    try:
        inputs = tokenizer(
            threads['summary_input'],
            return_tensors='pt',
            max_length=512,
            truncation=True,
            padding='max_length'
        ).to(device)
        input_ids = inputs['input_ids']
        if input_ids.max() >= tokenizer.vocab_size:
            print(f"Out-of-range token ID detected. Max ID: {input_ids.max()}, Vocab size: {tokenizer.vocab_size}")
            return ""
        summary_ids = model.generate(
            inputs['input_ids'],
            max_length=512,
            min_length=40,
            length_penalty=2.0,
            num_beams=2,
            early_stopping=True
        )
        summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
        return summary
    except Exception as e:
        print(f"Error processing input: {example['subject']}, Error: {e}")
        return ""

tqdm.pandas()
test_df['generated_summary'] = test_df.progress_apply(lambda x: generate_summary(x), axis=1)

# Save only the summary_input and generated_summary columns
test_df[['summary', 'generated_summary']].to_csv('pegasus_generated_test_summaries.csv', index=False)

100%|██████████| 417/417 [1:00:54<00:00,  8.76s/it]
