In [None]:
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.optim import AdamW
from torch.optim.lr_scheduler import LinearLR
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import numpy as np
from tqdm import tqdm
import os
import json
from datetime import datetime
from datasets import load_dataset
from huggingface_hub import login
from transformers import T5ForConditionalGeneration


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Training configuration for Salesforce T5-small + BigCode
config = {
    'model_name': 'Salesforce/codet5-small', 
    'num_epochs': 5,
    'train_batch_size': 8,
    'eval_batch_size': 16,
    'learning_rate': 3e-5,  
    'warmup_steps': 500,
    'max_grad_norm': 1.0,
    'save_every': 1000,
    'eval_every': 500,
    'max_input_length': 256, 
    'max_target_length': 256,  
    'output_dir': './t5_bigcode_results',
    'max_samples': 15000,    
    'eval_samples': 3000 ,  
    'fp16': True ,
    # 'gradient_accumulation_steps': 2
}

# Create output directory
os.makedirs(config['output_dir'], exist_ok=True)

In [None]:
class BigCodeDataset(torch.utils.data.Dataset):
    def __init__(self, data, tokenizer, max_input_length, max_target_length):
        self.data = data
        self.tokenizer = tokenizer
        self.max_input_length = max_input_length
        self.max_target_length = max_target_length
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        item = self.data[idx]
        
        # Get the Python code content
        code_content = item['content'] if isinstance(item, dict) else item
        
        # Create input prompt for T5 (needs "translate:" prefix for T5)
        source_text = self.create_code_prompt(code_content)
        target_text = code_content
        
        # Tokenize source with T5 format
        source_tokens = self.tokenizer(
            source_text,
            max_length=self.max_input_length,
            truncation=True,
            padding='max_length',
            return_tensors='pt'
        )
        
        # Tokenize target 
        target_tokens = self.tokenizer(
            target_text,
            max_length=self.max_target_length,
            truncation=True,
            padding='max_length',
            return_tensors='pt'
        )
        
        return {
            'input_ids': source_tokens['input_ids'].squeeze(),
            'attention_mask': source_tokens['attention_mask'].squeeze(),
            'labels': target_tokens['input_ids'].squeeze()
        }
    
    def create_code_prompt(self, code):
        """Create appropriate prompt for T5 based on code content"""
        import re
        
        lines = code.strip().split('\n')
        first_line = lines[0] if lines else ""
        
        if 'def ' in first_line:
            func_match = re.search(r'def\s+(\w+)', first_line)
            if func_match:
                return f"generate python function {func_match.group(1)}: "
      
        elif 'class ' in first_line:
            class_match = re.search(r'class\s+(\w+)', first_line)
            if class_match:
                return f"generate python class {class_match.group(1)}: "
        
        elif first_line.startswith('import ') or first_line.startswith('from '):
            return "generate python import: "
       
        elif '"""' in code or "'''" in code:
            return "generate python code with docstring: "
        
        else:
            return "generate python code: "

In [None]:
def load_bigcode_python_data():
    
        login(token="Hugging Face Token") 
        dataset = load_dataset("bigcode/the-stack", data_dir="data/python", split="train", streaming=True)
        dataset = dataset.take(20000)
        processed_data = []
        count = 0
        
        print("Processing and filtering Python code samples...")
        
        for item in dataset:
            content = item.get('content', '').strip()
        
            if not content or len(content) < 50:
           
            if len(content) > 5000:
                continue
         
            if not any(keyword in content for keyword in ['def ', 'class ', 'import ', 'from ', 'if ', 'for ']):
                continue
        
            try:
                content.encode('ascii', errors='ignore').decode('ascii')
                if len(content.encode('ascii', errors='ignore').decode('ascii')) < len(content) * 0.8:
                    continue
            except:
                continue
            
            processed_data.append({
                'content': content,
                'repo': item.get('max_stars_repo_name', 'unknown'),
                'file': item.get('max_stars_repo_path', 'unknown')
            })
            
            count += 1
            if count % 1000 == 0:
                print(f"Processed {count} samples...")
            
            if count >= config['max_samples']:
                break
        
        print(f"Loaded {len(processed_data)} Python code samples")
        return processed_data
        


all_data = load_bigcode_python_data()


train_size = len(all_data) - config['eval_samples']
train_data = all_data[:train_size]
eval_data = all_data[train_size:train_size + config['eval_samples']]

print(f"Training samples: {len(train_data)}")
print(f"Validation samples: {len(eval_data)}")


if train_data:
    print(f"\nSample training data:")
    print(f"Content preview: {train_data[0]['content'][:1000]}...")
    print(f"Repo: {train_data[0].get('repo', 'N/A')}")

print("Loading Salesforce T5-small model and tokenizer...")


tokenizer = AutoTokenizer.from_pretrained(config['model_name'])
model = AutoModelForSeq2SeqLM.from_pretrained(config['model_name'])


print(f"Tokenizer pad token: {tokenizer.pad_token}")
print(f"Tokenizer pad token ID: {tokenizer.pad_token_id}")

code_tokens = ['<python>', '</python>', '<function>', '</function>', '<class>', '</class>']
num_new_tokens = tokenizer.add_tokens(code_tokens)
if num_new_tokens > 0:
    model.resize_token_embeddings(len(tokenizer))
    print(f"Added {num_new_tokens} new code tokens")


model = model.to(device)
print(f"Model loaded and moved to {device}")
print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")


In [None]:

train_dataset = BigCodeDataset(
    train_data, 
    tokenizer, 
    config['max_input_length'], 
    config['max_target_length']
)

eval_dataset = BigCodeDataset(
    eval_data, 
    tokenizer, 
    config['max_input_length'], 
    config['max_target_length']
)

train_loader = DataLoader(
    train_dataset, 
    batch_size=config['train_batch_size'], 
     shuffle=True,
    num_workers=2,
    pin_memory=True if device.type == 'cuda' else False
)

eval_loader = DataLoader(
    eval_dataset, 
    batch_size=config['eval_batch_size'], 
    shuffle=False,
    num_workers=2,
    pin_memory=True if device.type == 'cuda' else False
)

print(f"Train batches: {len(train_loader)}")
print(f"Eval batches: {len(eval_loader)}")


print("\nTesting dataset:")
sample_batch = next(iter(train_loader))
print(f"Input shape: {sample_batch['input_ids'].shape}")
print(f"Labels shape: {sample_batch['labels'].shape}")
print(f"Attention mask shape: {sample_batch['attention_mask'].shape}")


sample_input = tokenizer.decode(sample_batch['input_ids'][0], skip_special_tokens=True)
sample_label = tokenizer.decode(sample_batch['labels'][0], skip_special_tokens=True)
print(f"\nSample input: {sample_input}")
print(f"Sample target: {sample_label[:100]}...")

In [None]:

trainable_params = [p for p in model.parameters() if p.requires_grad]
optimizer = AdamW(trainable_params, lr=config['learning_rate'], weight_decay=0.01)


total_steps = len(train_loader) * config['num_epochs']
warmup_steps = min(config['warmup_steps'], total_steps // 10)

scheduler = LinearLR(
    optimizer, 
    start_factor=0.1,
    end_factor=1.0,
    total_iters=warmup_steps
)

print(f"Total training steps: {total_steps}")
print(f"Warmup steps: {warmup_steps}")

In [None]:
def compute_loss(model, batch):
    """Compute loss for a batch"""
    input_ids = batch['input_ids'].to(device)
    attention_mask = batch['attention_mask'].to(device)
    labels = batch['labels'].to(device)

    labels[labels == tokenizer.pad_token_id] = -100
    
    outputs = model(
        input_ids=input_ids,
        attention_mask=attention_mask,
        labels=labels
    )
    
    return outputs.loss

def evaluate_model(model, eval_loader):
    """Evaluate model on validation set"""
    model.eval()
    total_loss = 0
    num_batches = 0
    
    with torch.no_grad():
        for batch in tqdm(eval_loader, desc="Evaluating", leave=False):
            loss = compute_loss(model, batch)
            total_loss += loss.item()
            num_batches += 1
    
    avg_loss = total_loss / num_batches
    perplexity = torch.exp(torch.tensor(avg_loss)).item()
    
    return {
        'eval_loss': avg_loss,
        'eval_perplexity': perplexity
    }

def save_checkpoint(model, tokenizer, optimizer, epoch, step, loss, save_dir):
    """Save model checkpoint"""
    checkpoint_dir = os.path.join(save_dir, f"checkpoint-epoch-{epoch}-step-{step}")
    os.makedirs(checkpoint_dir, exist_ok=True)
    
    model.save_pretrained(checkpoint_dir)
    tokenizer.save_pretrained(checkpoint_dir)
    

    torch.save({
        'epoch': epoch,
        'step': step,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss,
    }, os.path.join(checkpoint_dir, 'training_state.pt'))
    
    print(f"Checkpoint saved to {checkpoint_dir}")


In [None]:
def train_model():
    """Main training loop with minimal printing and progress tracking"""
    print("Starting manual training loop...")

    training_history = {
        'train_losses': [],
        'eval_losses': [],
        'eval_perplexities': [],
        'epochs': [],
        'steps': []
    }

    global_step = 0
    best_eval_loss = float('inf')

    for epoch in range(config['num_epochs']):
        print(f"\n{'='*50}")
        print(f"Epoch {epoch + 1}/{config['num_epochs']}")
        print(f"{'='*50}")

        model.train()
        epoch_train_loss = 0
        num_train_batches = 0

        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}", leave=True)

        for batch_idx, batch in enumerate(progress_bar):
            loss = compute_loss(model, batch)

            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), config['max_grad_norm'])
            optimizer.step()

            if global_step < warmup_steps:
                scheduler.step()

            epoch_train_loss += loss.item()
            num_train_batches += 1
            global_step += 1

            if (batch_idx + 1) % 20 == 0 or batch_idx == 0 or (batch_idx + 1) == len(train_loader):
                progress_bar.set_postfix({
                    'loss': f"{loss.item():.4f}",
                    'lr': f"{optimizer.param_groups[0]['lr']:.2e}"
                })

            if global_step % config['eval_every'] == 0:
                eval_results = evaluate_model(model, eval_loader)
                training_history['eval_losses'].append(eval_results['eval_loss'])
                training_history['eval_perplexities'].append(eval_results['eval_perplexity'])
                training_history['steps'].append(global_step)

                if eval_results['eval_loss'] < best_eval_loss:
                    best_eval_loss = eval_results['eval_loss']
                    save_checkpoint(model, tokenizer, optimizer, epoch, global_step,
                                    eval_results['eval_loss'], config['output_dir'])

            if global_step % config['save_every'] == 0:
                save_checkpoint(model, tokenizer, optimizer, epoch, global_step,
                                loss.item(), config['output_dir'])

   
        avg_train_loss = epoch_train_loss / num_train_batches
        training_history['train_losses'].append(avg_train_loss)
        training_history['epochs'].append(epoch + 1)

        print(f"\nEpoch {epoch+1} Summary:")
        print(f"Train Loss: {avg_train_loss:.4f}")

        eval_results = evaluate_model(model, eval_loader)
        print(f"Eval Loss: {eval_results['eval_loss']:.4f}")
        print(f"Eval Perplexity: {eval_results['eval_perplexity']:.4f}")

        training_history['eval_losses'].append(eval_results['eval_loss'])
        training_history['eval_perplexities'].append(eval_results['eval_perplexity'])
        training_history['steps'].append(global_step)

    
        with open(os.path.join(config['output_dir'], 'training_history.json'), 'w') as f:
            json.dump(training_history, f, indent=2)


    final_model_dir = os.path.join(config['output_dir'], 'final_model')
    model.save_pretrained(final_model_dir)
    tokenizer.save_pretrained(final_model_dir)

    print(f"\n{'='*50}")
    print("Training complete!")
    print(f"Best Eval Loss: {best_eval_loss:.4f}")
    print(f"Final model saved to: {final_model_dir}")
    print(f"{'='*50}")

    return training_history


In [None]:
training_history = train_model()

In [None]:
def plot_training_results(history):
    """Plot training and evaluation metrics"""
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Training loss by epoch
    axes[0, 0].plot(history['epochs'], history['train_losses'], 'b-', marker='o')
    axes[0, 0].set_title('Training Loss by Epoch')
    axes[0, 0].set_xlabel('Epoch')
    axes[0, 0].set_ylabel('Loss')
    axes[0, 0].grid(True)
    
    # Evaluation loss by step
    axes[0, 1].plot(history['steps'], history['eval_losses'], 'r-', marker='s')
    axes[0, 1].set_title('Evaluation Loss by Step')
    axes[0, 1].set_xlabel('Step')
    axes[0, 1].set_ylabel('Loss')
    axes[0, 1].grid(True)
    
    # Evaluation perplexity by step
    axes[1, 0].plot(history['steps'], history['eval_perplexities'], 'g-', marker='^')
    axes[1, 0].set_title('Evaluation Perplexity by Step')
    axes[1, 0].set_xlabel('Step')
    axes[1, 0].set_ylabel('Perplexity')
    axes[1, 0].grid(True)
    
    # Combined losses
    if len(history['train_losses']) > 0 and len(history['eval_losses']) > 0:
        axes[1, 1].plot(history['epochs'], history['train_losses'], 'b-', 
                       marker='o', label='Train Loss')
        
        # Interpolate eval losses to match epoch scale
        eval_epochs = [s * config['num_epochs'] / max(history['steps']) 
                      for s in history['steps']]
        axes[1, 1].plot(eval_epochs, history['eval_losses'], 'r-', 
                       marker='s', label='Eval Loss')
        
        axes[1, 1].set_title('Training vs Evaluation Loss')
        axes[1, 1].set_xlabel('Epoch')
        axes[1, 1].set_ylabel('Loss')
        axes[1, 1].legend()
        axes[1, 1].grid(True)
    
    plt.tight_layout()
    plt.savefig(os.path.join(config['output_dir'], 'training_plots.png'), dpi=300)
    plt.show()

# Plot results
plot_training_results(training_history)


In [None]:
def test_trained_t5_model(model_path=None):
    """Test the trained T5 model with code generation prompts"""
    if model_path is None:
        model_path = os.path.join(config['output_dir'], 'final_model')
    
    print(f"Loading T5 model from: {model_path}")
    test_tokenizer = AutoTokenizer.from_pretrained(model_path)
    test_model = T5ForConditionalGeneration.from_pretrained(model_path)
    test_model.to(device)
    test_model.eval()
    
    test_prompts = [
        "generate python function factorial: ",
        "generate python function binary_search: ",
        "generate python class LinkedList: ",
        "generate python code merge two sorted lists: ",
        "generate python function quicksort: ",
        "generate python class Stack: ",
        "generate python function fibonacci: ",
        "generate python import requests: "
    ]
    
    print("\n" + "="*60)
    print("TESTING FINE-TUNED T5 MODEL ON PYTHON CODE GENERATION")
    print("="*60)
    
    for i, prompt in enumerate(test_prompts, 1):
        print(f"\n[Test {i}] Prompt: '{prompt}'")
        print("-" * 50)
        
        inputs = test_tokenizer.encode(prompt, return_tensors="pt").to(device)
        
        with torch.no_grad():
            outputs = test_model.generate(
                inputs,
                max_length=200,
                num_beams=5,
                temperature=0.8,
                do_sample=True,
                top_k=50,
                top_p=0.9,
                early_stopping=True,
                pad_token_id=test_tokenizer.pad_token_id,
                eos_token_id=test_tokenizer.eos_token_id,
                no_repeat_ngram_size=2
            )
        
        generated_code = test_tokenizer.decode(outputs[0], skip_special_tokens=True)
        
       
        if prompt in generated_code:
            generated_code = generated_code.replace(prompt, "").strip()
        
        print(f"Generated:\n{generated_code}")
        
       
        try:
            compile(generated_code, '<string>', 'exec')
            print("✓ Valid Python syntax")
        except SyntaxError as e:
            print(f"✗ Syntax error: {e}")
        except Exception as e:
            print(f"? Other error: {e}")
    
    print("\n" + "="*60)
def interactive_code_generation():
    """Interactive code generation with the trained model"""
    model_path = os.path.join(config['output_dir'], 'final_model')
    
    if not os.path.exists(model_path):
        print("Model not found. Please train the model first.")
        return
    
    print("Loading model for interactive generation...")
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    model = T5ForConditionalGeneration.from_pretrained(model_path)
    model.to(device)
    model.eval()
    
    print("\n🤖 Interactive Python Code Generation")
    print("Enter prompts like: 'generate python function sort_list: '")
    print("Type 'quit' to exit\n")
    
    while True:
        try:
            user_input = input("Enter prompt: ").strip()
            
            if user_input.lower() in ['quit', 'exit', 'q']:
                break
            
            if not user_input:
                continue
            
            if not user_input.startswith('generate python'):
                user_input = f"generate python {user_input}: "
            if not user_input.endswith(': '):
                user_input += ': '
            
            inputs = tokenizer.encode(user_input, return_tensors="pt").to(device)
            
            with torch.no_grad():
                outputs = model.generate(
                    inputs,
                    max_length=300,
                    num_beams=4,
                    temperature=0.7,
                    do_sample=True,
                    early_stopping=True,
                    pad_token_id=tokenizer.pad_token_id
                )
            
            generated = tokenizer.decode(outputs[0], skip_special_tokens=True)
            generated = generated.replace(user_input, "").strip()
            
            print(f"\nGenerated code:\n{generated}\n")
            print("-" * 50)
            
        except KeyboardInterrupt:
            break
        except Exception as e:
            print(f"Error: {e}")
    
    print("Goodbye!")


test_trained_t5_model()