# Data Processing - Intellihack Scope 03

This notebook focuses on processing the technical documentation:
- Chunking documents into manageable segments
- Cleaning and formatting text
- Preparing for QA pair generation

In [None]:
# Import necessary libraries
import os
import re
import json
import pandas as pd
from pathlib import Path
import nltk
from tqdm.auto import tqdm

# Download necessary NLTK data
nltk.download('punkt')
from nltk.tokenize import sent_tokenize

## Document Chunking Strategies

We'll implement different chunking strategies:
1. Fixed size chunks (with overlap)
2. Semantic chunking (by section/heading)
3. Hybrid approach

In [None]:
def clean_markdown(text):
    """
    Clean markdown formatting while preserving important content.
    """
    # Remove code blocks but keep their content
    text = re.sub(r'```.*?\n', '', text)
    text = re.sub(r'```', '', text)
    
    # Keep content of links but remove markdown formatting
    text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', text)
    
    # Remove other markdown artifacts
    text = re.sub(r'\*\*([^*]+)\*\*', r'\1', text)  # Bold
    text = re.sub(r'\*([^*]+)\*', r'\1', text)        # Italic
    
    # Clean extra whitespaces
    text = re.sub(r'\s+', ' ', text).strip()
    
    return text

In [None]:
def chunk_by_fixed_size(text, chunk_size=500, overlap=100):
    """
    Split text into fixed-size chunks with overlap.
    """
    # Clean the text first
    text = clean_markdown(text)
    
    # Tokenize into sentences
    sentences = sent_tokenize(text)
    
    chunks = []
    current_chunk = []
    current_size = 0
    
    for sentence in sentences:
        sentence_words = len(sentence.split())
        
        if current_size + sentence_words > chunk_size and current_chunk:
            # Save current chunk
            chunks.append(' '.join(current_chunk))
            
            # Keep overlap for context continuation
            overlap_words = 0
            overlap_chunk = []
            
            for s in reversed(current_chunk):
                s_words = len(s.split())
                if overlap_words + s_words <= overlap:
                    overlap_chunk.insert(0, s)
                    overlap_words += s_words
                else:
                    break
            
            # Start new chunk with overlap
            current_chunk = overlap_chunk
            current_size = overlap_words
        
        current_chunk.append(sentence)
        current_size += sentence_words
    
    # Add the last chunk if not empty
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    
    return chunks

In [None]:
def chunk_by_section(text):
    """
    Split text by markdown headings to maintain semantic coherence.
    """
    # Find all headings
    heading_pattern = re.compile(r'^#{1,6}\s+.*$', re.MULTILINE)
    heading_positions = [(m.start(), m.end()) for m in heading_pattern.finditer(text)]
    
    if not heading_positions:
        # If no headings found, return the entire text as one chunk
        return [clean_markdown(text)]
    
    chunks = []
    for i, (start, end) in enumerate(heading_positions):
        # Get section heading
        heading = text[start:end]
        
        # Get section content
        if i < len(heading_positions) - 1:
            section_content = text[end:heading_positions[i+1][0]]
        else:
            section_content = text[end:]
        
        # Combine heading and content, clean, and add to chunks
        combined = heading + "\n" + section_content
        cleaned = clean_markdown(combined)
        
        # Only add non-empty chunks
        if cleaned.strip():
            chunks.append(cleaned)
    
    return chunks

In [None]:
# Define paths
data_dir = Path('../data/raw')
output_dir = Path('../data/processed')
output_dir.mkdir(parents=True, exist_ok=True)

# List all files
files = list(data_dir.glob('**/*.md')) + list(data_dir.glob('**/*.txt'))

# Process each file
all_chunks = []

for file in tqdm(files, desc="Processing files"):
    with open(file, 'r', encoding='utf-8') as f:
        content = f.read()
        
    # Get file topic from filename or path
    topic = file.stem
    
    # Try both chunking methods
    fixed_chunks = chunk_by_fixed_size(content)
    section_chunks = chunk_by_section(content)
    
    # Choose which chunking method to use based on result quality
    # For this implementation, we'll prefer section-based if it produced enough chunks
    if len(section_chunks) >= 3:
        chunks = section_chunks
        chunking_method = "section"
    else:
        chunks = fixed_chunks
        chunking_method = "fixed"
    
    # Store metadata with each chunk
    for i, chunk in enumerate(chunks):
        all_chunks.append({
            "chunk_id": f"{topic}_{i}",
            "source": str(file),
            "topic": topic,
            "chunking_method": chunking_method,
            "content": chunk,
            "word_count": len(chunk.split())
        })

# Convert to DataFrame
chunks_df = pd.DataFrame(all_chunks)
chunks_df.to_csv(output_dir / "chunks.csv", index=False)

# Display statistics
print(f"Total chunks created: {len(chunks_df)}")
print(f"Average chunk word count: {chunks_df['word_count'].mean():.1f}")
chunks_df.head()

## Save the processed chunks for the next stage

The processed chunks will be used for QA pair generation.

In [None]:
# Save as JSON for easier processing in the next step
chunks_json = chunks_df.to_dict(orient='records')
with open(output_dir / "chunks.json", 'w', encoding='utf-8') as f:
    json.dump(chunks_json, f, indent=2)

print(f"Saved {len(chunks_json)} processed chunks to {output_dir / 'chunks.json'}")