In [1]:
import json
import re
from pathlib import Path
from typing import List, Dict
from sentence_transformers import SentenceTransformer
import numpy as np


# Initialize embedding model (good for Turkish)
model = SentenceTransformer('emrecan/bert-base-turkish-cased-mean-nli-stsb-tr')


def split_into_sentences(text: str) -> List[str]:
    """
    Split text into sentences.
    
    Args:
        text: Input text
        
    Returns:
        List of sentences
    """
    # Split by sentence-ending punctuation
    sentences = re.split(r'(?<=[.!?])\s+', text)
    sentences = [s.strip() for s in sentences if s.strip()]
    return sentences


def calculate_sentence_similarities(sentences: List[str]) -> List[float]:
    """
    Calculate semantic similarity between consecutive sentences.
    
    Args:
        sentences: List of sentences
        
    Returns:
        List of similarity scores between consecutive sentences
    """
    if len(sentences) < 2:
        return []
    
    # Create embeddings for all sentences
    embeddings = model.encode(sentences)
    
    # Calculate cosine similarity between consecutive sentences
    similarities = []
    for i in range(len(embeddings) - 1):
        sim = np.dot(embeddings[i], embeddings[i + 1]) / (
            np.linalg.norm(embeddings[i]) * np.linalg.norm(embeddings[i + 1])
        )
        similarities.append(sim)
    
    return similarities


def find_split_points(similarities: List[float], threshold: float = 0.7) -> List[int]:
    """
    Find indices where similarity drops below threshold (topic boundaries).
    
    Args:
        similarities: List of similarity scores
        threshold: Minimum similarity to keep sentences together
        
    Returns:
        List of sentence indices where splits should occur
    """
    split_points = []
    
    for i, sim in enumerate(similarities):
        if sim < threshold:
            split_points.append(i + 1)  # Split after sentence i
    
    return split_points


def create_chunks_with_overlap(
    sentences: List[str],
    split_points: List[int],
    max_chunk_size: int = 1000,
    overlap_percent: float = 0.1
) -> List[str]:
    """
    Create chunks at split points with overlap.
    
    Args:
        sentences: List of sentences
        split_points: Indices where splits should occur
        max_chunk_size: Maximum characters per chunk
        overlap_percent: Percentage of overlap between chunks
        
    Returns:
        List of text chunks
    """
    chunks = []
    split_points = [0] + split_points + [len(sentences)]
    
    for i in range(len(split_points) - 1):
        start_idx = split_points[i]
        end_idx = split_points[i + 1]
        
        # Get sentences for this chunk
        chunk_sentences = sentences[start_idx:end_idx]
        chunk_text = ' '.join(chunk_sentences)
        
        # If chunk is too large, split it further by max_chunk_size
        if len(chunk_text) > max_chunk_size:
            sub_chunks = split_large_chunk(chunk_sentences, max_chunk_size)
            chunks.extend(sub_chunks)
        else:
            chunks.append(chunk_text)
    
    # Add overlap between chunks
    chunks_with_overlap = []
    for i, chunk in enumerate(chunks):
        if i == 0:
            chunks_with_overlap.append(chunk)
        else:
            # Calculate overlap size
            overlap_size = int(len(chunks[i - 1]) * overlap_percent)
            overlap_text = chunks[i - 1][-overlap_size:] if overlap_size > 0 else ""
            
            # Add overlap from previous chunk
            chunks_with_overlap.append(overlap_text + " " + chunk)
    
    return chunks_with_overlap


def split_large_chunk(sentences: List[str], max_size: int) -> List[str]:
    """
    Split a large chunk into smaller chunks by size.
    
    Args:
        sentences: List of sentences
        max_size: Maximum chunk size
        
    Returns:
        List of chunks
    """
    chunks = []
    current_chunk = []
    current_size = 0
    
    for sentence in sentences:
        sentence_size = len(sentence)
        
        if current_size + sentence_size > max_size and current_chunk:
            chunks.append(' '.join(current_chunk))
            current_chunk = [sentence]
            current_size = sentence_size
        else:
            current_chunk.append(sentence)
            current_size += sentence_size
    
    if current_chunk:
        chunks.append(' '.join(current_chunk))
    
    return chunks


def semantic_chunk(
    text: str,
    max_chunk_size: int = 1000,
    similarity_threshold: float = 0.7,
    overlap_percent: float = 0.1
) -> List[str]:
    """
    Main semantic chunking function.
    
    Args:
        text: Input text
        max_chunk_size: Maximum characters per chunk
        similarity_threshold: Similarity threshold for splitting (lower = more splits)
        overlap_percent: Percentage of overlap between chunks
        
    Returns:
        List of text chunks
    """
    # Split into sentences
    sentences = split_into_sentences(text)
    
    if len(sentences) == 0:
        return []
    
    if len(sentences) == 1:
        return [sentences[0]]
    
    # Calculate semantic similarities
    print("  Calculating sentence similarities...", end=" ")
    similarities = calculate_sentence_similarities(sentences)
    print("✓")
    
    # Find split points based on similarity drops
    split_points = find_split_points(similarities, similarity_threshold)
    
    # Create chunks with overlap
    chunks = create_chunks_with_overlap(
        sentences, split_points, max_chunk_size, overlap_percent
    )
    
    return chunks


def process_text_file(
    txt_path: Path,
    max_chunk_size: int = 1000,
    similarity_threshold: float = 0.7,
    overlap_percent: float = 0.1
) -> List[Dict]:
    """
    Process a single TXT file and create semantic chunks.
    
    Args:
        txt_path: Path to the TXT file
        max_chunk_size: Maximum characters per chunk
        similarity_threshold: Similarity threshold for splitting
        overlap_percent: Percentage of overlap between chunks
        
    Returns:
        List of chunk dictionaries
    """
    with open(txt_path, 'r', encoding='utf-8') as f:
        text = f.read()
    
    # Create semantic chunks
    chunks = semantic_chunk(text, max_chunk_size, similarity_threshold, overlap_percent)
    
    # Synthesize minimal metadata from filename
    metadata = {
        'source_file': txt_path.name,
        'source_path': str(txt_path.resolve()),
    }
    
    # Add metadata to each chunk
    result = []
    for i, chunk_text in enumerate(chunks):
        result.append({
            "chunk_id": i,
            "text": chunk_text,
            "char_count": len(chunk_text),
            "word_count": len(chunk_text.split()),
            "source_file": metadata['source_file'],
            "source_path": metadata['source_path']
        })
    
    return result


def save_chunks_jsonl(chunks: List[Dict], output_path: Path) -> None:
    """
    Save chunks to JSONL file (one JSON object per line).
    
    Args:
        chunks: List of chunk dictionaries
        output_path: Path for output JSONL file
    """
    with open(output_path, 'w', encoding='utf-8') as f:
        for chunk in chunks:
            json.dump(chunk, f, ensure_ascii=False)
            f.write('\n')


def process_all_text_files(
    input_dir: str = r"C:\Users\yigit\Desktop\Enterprises\polcon\text-mistral",
    output_dir: str = r"C:\Users\yigit\Desktop\Enterprises\polcon\chunks",
    max_chunk_size: int = 1000,
    similarity_threshold: float = 0.7,
    overlap_percent: float = 0.1
) -> None:
    """
    Process all TXT files and create semantic chunks with overlap.
    
    Args:
        input_dir: Directory containing TXT files
        output_dir: Directory to save chunked JSONL files
        max_chunk_size: Maximum characters per chunk
        similarity_threshold: Lower = more splits (0.6-0.8 recommended)
        overlap_percent: Overlap percentage (0.1 = 10%)
    """
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    
    # Create output directory if it doesn't exist
    output_path.mkdir(parents=True, exist_ok=True)
    
    # Find all TXT files
    txt_files = list(input_path.glob("*.txt"))
    
    if not txt_files:
        print(f"No TXT files found in {input_dir}")
        return
    
    print(f"Found {len(txt_files)} TXT files to process")
    print(f"Settings: max_chunk_size={max_chunk_size}, threshold={similarity_threshold}, overlap={overlap_percent*100}%\n")
    
    total_chunks = 0
    
    # Process each TXT file
    for txt_file in txt_files:
        try:
            print(f"Processing: {txt_file.name}")
            
            # Create semantic chunks
            chunks = process_text_file(
                txt_file, max_chunk_size, similarity_threshold, overlap_percent
            )
            
            # Save as JSONL
            output_file = output_path / f"{txt_file.stem}_chunks.jsonl"
            save_chunks_jsonl(chunks, output_file)
            
            total_chunks += len(chunks)
            print(f"  ✓ Created {len(chunks)} chunks → {output_file.name}\n")
            
        except Exception as e:
            print(f"  ✗ Error: {e}\n")
    
    print(f"Processing complete!")
    print(f"Total chunks created: {total_chunks}")
    print(f"Files saved to: {output_dir}")


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Run the chunker
if __name__ == "__main__":
    # Process all TXT files with semantic chunking and 10% overlap
    process_all_text_files(
        max_chunk_size=1000,
        similarity_threshold=0.7,  # Adjust between 0.6-0.8
        overlap_percent=0.1
    )

Found 89 TXT files to process
Settings: max_chunk_size=1000, threshold=0.7, overlap=10.0%

Processing: 1) Temel kavramlar önyargı, kalıpyargı ve ayrımcılık.txt
  Calculating sentence similarities... ✓
  ✓ Created 183 chunks → 1) Temel kavramlar önyargı, kalıpyargı ve ayrımcılık_chunks.jsonl

Processing: 10) TÜRKİYE’DE ÖRGÜTLENME ÖZGÜRLÜĞÜNÜN GENEL GÖRÜNÜMÜ-II .txt
  Calculating sentence similarities... ✓
  ✓ Created 183 chunks → 1) Temel kavramlar önyargı, kalıpyargı ve ayrımcılık_chunks.jsonl

Processing: 10) TÜRKİYE’DE ÖRGÜTLENME ÖZGÜRLÜĞÜNÜN GENEL GÖRÜNÜMÜ-II .txt
  Calculating sentence similarities... ✓
  ✓ Created 1162 chunks → 10) TÜRKİYE’DE ÖRGÜTLENME ÖZGÜRLÜĞÜNÜN GENEL GÖRÜNÜMÜ-II _chunks.jsonl

Processing: 11) Yurttaslik_Alani_Bilgi_Notu_1.txt
  Calculating sentence similarities... ✓
  ✓ Created 1162 chunks → 10) TÜRKİYE’DE ÖRGÜTLENME ÖZGÜRLÜĞÜNÜN GENEL GÖRÜNÜMÜ-II _chunks.jsonl

Processing: 11) Yurttaslik_Alani_Bilgi_Notu_1.txt
  Calculating sentence similarities... ✓
  ✓ Cre

In [None]:
# Utilities to compute chunk size statistics over a folder of JSONL chunk files
from pathlib import Path
import json
from typing import Dict, List, Tuple, Any
import math


def read_jsonl(path: Path) -> List[Dict[str, Any]]:
    """Read a JSONL file and return a list of dicts."""
    rows: List[Dict[str, Any]] = []
    with path.open('r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            try:
                rows.append(json.loads(line))
            except json.JSONDecodeError:
                # Skip malformed lines but continue
                continue
    return rows


def summarize(values: List[float]) -> Dict[str, float]:
    """Return summary stats for a numeric list."""
    if not values:
        return {
            'count': 0,
            'min': 0,
            'max': 0,
            'mean': 0,
            'median': 0,
            'p95': 0,
            'std': 0,
        }
    n = len(values)
    values_sorted = sorted(values)
    total = sum(values)
    mean = total / n
    # Median
    if n % 2 == 1:
        median = values_sorted[n // 2]
    else:
        median = (values_sorted[n // 2 - 1] + values_sorted[n // 2]) / 2
    # p95
    p95_index = min(n - 1, max(0, int(math.ceil(0.95 * n) - 1)))
    p95 = values_sorted[p95_index]
    # std (population std)
    var = sum((x - mean) ** 2 for x in values) / n
    std = math.sqrt(var)
    return {
        'count': n,
        'min': values_sorted[0],
        'max': values_sorted[-1],
        'mean': mean,
        'median': median,
        'p95': p95,
        'std': std,
    }


def analyze_chunks_folder(folder: Path) -> Dict[str, Any]:
    """Compute per-file and global stats for chunk JSONL files in a folder."""
    folder = folder.resolve()
    files = sorted(folder.glob('*.jsonl'))
    per_file: Dict[str, Any] = {}
    all_chars: List[int] = []
    all_words: List[int] = []

    for fp in files:
        rows = read_jsonl(fp)
        char_counts = []
        word_counts = []
        for r in rows:
            # prefer explicit fields; fallback to computing from text if missing
            if 'char_count' in r and isinstance(r['char_count'], (int, float)):
                char_counts.append(int(r['char_count']))
            elif 'text' in r:
                char_counts.append(len(r['text']))
            if 'word_count' in r and isinstance(r['word_count'], (int, float)):
                word_counts.append(int(r['word_count']))
            elif 'text' in r:
                word_counts.append(len(r['text'].split()))
        per_file[fp.name] = {
            'files_counted': len(rows),
            'char_stats': summarize(char_counts),
            'word_stats': summarize(word_counts),
        }
        all_chars.extend(char_counts)
        all_words.extend(word_counts)

    global_stats = {
        'total_files': len(files),
        'total_chunks': len(all_chars),
        'char_stats': summarize(all_chars),
        'word_stats': summarize(all_words),
    }
    return {
        'folder': str(folder),
        'global': global_stats,
        'per_file': per_file,
    }


def print_stats_report(report: Dict[str, Any]) -> None:
    """Pretty-print a compact stats report."""
    g = report['global']
    print(f"Folder: {report['folder']}")
    print(f"Total files: {g['total_files']} | Total chunks: {g['total_chunks']}")
    cs = g['char_stats']
    ws = g['word_stats']
    print("\nGlobal character counts:")
    print(f"  count={cs['count']} min={cs['min']} max={cs['max']} mean={cs['mean']:.1f} median={cs['median']:.1f} p95={cs['p95']:.1f} std={cs['std']:.1f}")
    print("Global word counts:")
    print(f"  count={ws['count']} min={ws['min']} max={ws['max']} mean={ws['mean']:.1f} median={ws['median']:.1f} p95={ws['p95']:.1f} std={ws['std']:.1f}")

    # Show top-5 largest files by mean char count
    print("\nTop-5 files by mean char_count:")
    rows = []
    for fn, stats in report['per_file'].items():
        rows.append((fn, stats['char_stats']['mean'], stats['files_counted']))
    for fn, mean_val, cnt in sorted(rows, key=lambda x: x[1], reverse=True)[:5]:
        print(f"  {fn:60s} mean={mean_val:.1f} (chunks={cnt})")


def save_stats(report: Dict[str, Any], out_json: Path = None) -> None:
    if out_json is None:
        return
    with out_json.open('w', encoding='utf-8') as f:
        json.dump(report, f, ensure_ascii=False, indent=2)


# Example: analyze the default 'chunks' folder in the repo root
# You can change this path if needed.
if __name__ == '__main__':
    chunks_dir = Path(r"C:\Users\yigit\Desktop\Enterprises\polcon\chunks")
    report = analyze_chunks_folder(chunks_dir)
    print_stats_report(report)
    # Optionally save to JSON next to the folder
    # save_stats(report, chunks_dir / 'chunks_stats.json')

Folder: C:\Users\yigit\Desktop\Enterprises\arayuz-9\chunks
Total files: 89 | Total chunks: 89073

Global character counts:
  count=89073 min=2 max=926151 mean=185.1 median=122.0 p95=430.0 std=4007.4
Global word counts:
  count=89073 min=1 max=4202 mean=22.4 median=16.0 p95=57.0 std=39.5

Top-5 files by mean char_count:
  53) Gençlerin Güçlendirilmesine Yönelik Harcamaları İzleme Kılavuzu_chunks.jsonl mean=500441.0 (chunks=4)
  89) UNFPA İstatisliklerle Gençlik_chunks.jsonl             mean=3269.0 (chunks=1)
  64) TGSP Türkiye_nin Gençleri Yükseköğrenim Algısı_chunks.jsonl mean=547.0 (chunks=265)
  63) TGSP Türkiye_nin Gençleri Dindarlık Algısı_chunks.jsonl mean=522.0 (chunks=196)
  48) TGSP Gençlerin Gönüllülük Algısı_chunks.jsonl            mean=468.1 (chunks=184)
