-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Part of: #274
Note: Story content truncated due to GitHub issue size limit. See file for full details.
Story: Git History Indexing with Blob Deduplication and Branch Metadata
Story Description
As a AI coding agent
I want to index a repository's git history with storage deduplication and branch awareness
So that I can search across historical code cost-effectively while preserving branch context
Conversation Context:
- User specified need for semantic search across git history to find removed code
- Emphasized storage efficiency via git blob deduplication (92%+ savings achieved)
- Required to handle 40K+ commit repositories efficiently
- Analysis of Evolution repo (1,135 branches, 89K commits) showed single-branch indexing covers 91.6% of commits
- Default to current branch only, opt-in for all branches to avoid 85% storage increase
⚠️ CRITICAL IMPLEMENTATION INSTRUCTION
STOP AFTER COMPLETION: When asked to "start implementing" the Temporal Epic, implement ONLY Story 1 (this story) and then STOP. Do not proceed to Story 2 or subsequent stories without explicit approval.
Implementation Checkpoint Workflow:
- Implement Story 1 completely (TDD workflow with all tests passing)
- Run code review and manual testing
- Commit changes
- STOP and wait for user review/approval
- Only proceed to Story 2 after user explicitly approves
Rationale: This is the foundational story establishing temporal indexing architecture. User must review and validate the implementation approach before building dependent features on top of it.
Acceptance Criteria
Core Functionality (Both Modes)
- Running
cidx index --index-commitsindexes current branch only (default behavior) - Running
cidx index --index-commits --all-branchesindexes all branches - Creates SQLite database at
.code-indexer/index/temporal/commits.dbwith commit graph - Creates
commit_branchestable tracking which branches each commit appears in - Builds blob registry at
.code-indexer/index/temporal/blob_registry.db(SQLite) mapping blob_hash → point_ids - Reuses existing vectors for blobs already in HEAD (deduplication)
- Only embeds new blobs not present in current HEAD
- Stores temporal metadata including branch information in
.code-indexer/index/temporal/temporal_meta.json
Daemon Mode Functionality
- Temporal indexing works when
daemon.enabled: truein config - CLI automatically delegates to daemon via
_index_via_daemon(index_commits=True) - Daemon's
exposed_index_blocking()handles temporal indexing via TemporalIndexer - Progress callbacks stream from daemon to CLI in real-time
- Cache invalidated before temporal indexing starts (daemon mode)
- Graceful fallback to standalone mode if daemon unavailable
- All flags (
--all-branches,--max-commits,--since-date) passed through delegation
User Experience
- Shows progress during indexing: "Indexing commits: 500/5000 (10%) [development branch]"
- Displays cost warning before indexing all branches: "
⚠️ Indexing 715 branches will use ~514MB storage and cost ~$4.74" - Requires user confirmation (y/N) for --all-branches in large repos (>50 branches)
- Shows final statistics: branches indexed, commits per branch, deduplication ratio
Performance
- Achieves >92% storage savings through blob deduplication
- Handles large repositories (40K+ commits, 1000+ branches) without running out of memory
- Single-branch indexing is fast (similar to current indexing performance)
Technical Architecture (CRITICAL - Read First)
What We Index: Full Blob Versions, NOT Diffs
CRITICAL DECISION: We index complete file versions (blobs) at each point in git history, NOT diffs.
✅ CORRECT: Index full blob content
Commit abc123: user.py (blob def456)
→ Index complete file with full class User, all methods, imports, context
❌ WRONG: Index diffs
Commit abc123: user.py diff
→ Would only index: "+ def greet(): ..." (no context, can't do semantic search)
Rationale:
- Semantic search requires full context - Users query "authentication with JWT" needs complete class implementation
- Users want complete implementations - "Find removed auth code" must return full working code, not fragments
- Git stores blobs efficiently - Git handles compression/deduplication internally
- Better deduplication - Same file content = same blob hash = reuse existing vectors
Example Query: "function that handles JWT authentication"
- Full blob approach: Finds complete
AuthManagerclass with all context ✅ - Diff approach: Only finds "+ jwt.decode()" line without context ❌
Component Architecture and Reuse Strategy
Pipeline Reuse (85% of workspace indexing code):
Workspace Indexing (Current HEAD):
Disk Files → FileIdentifier → FixedSizeChunker.chunk_file()
→ VectorCalculationManager → FilesystemVectorStore
Git History Indexing (This Story):
Git Blobs → GitBlobReader → FixedSizeChunker.chunk_text()
→ VectorCalculationManager → FilesystemVectorStore
↑ ↑
SAME COMPONENTS REUSED (85%)
✅ Reused Components (No Changes):
VectorCalculationManager- Parallel embedding generation (VoyageAI API)FilesystemVectorStore- JSON vector storage (already has blob_hash field)- Threading patterns (
ThreadPoolExecutor,CleanSlotTracker) - Progress callback mechanism
⚙️ Modified Components (Minor Changes):
FixedSizeChunker- Addchunk_text(text, source)method for pre-loaded text
🆕 New Git-Specific Components:
-
TemporalBlobScanner - Discovers blobs in git history
def get_blobs_for_commit(commit_hash: str) -> List[BlobInfo]: """Uses: git ls-tree -r <commit_hash>""" # Returns: [(file_path, blob_hash, size), ...]
-
GitBlobReader - Reads blob content from git object store
def read_blob_content(blob_hash: str) -> str: """Uses: git cat-file blob <blob_hash>""" # Returns: Full file content as string
-
HistoricalBlobProcessor - Parallel blob processing (analogous to HighThroughputProcessor)
def process_blobs_high_throughput(blobs: List[BlobInfo]) -> Stats: """ Orchestrates: blob → read → chunk → vector → store Reuses: VectorCalculationManager + FilesystemVectorStore """
Deduplication Flow (92% Vector Reuse)
Key Insight: Most blobs across history already have vectors from HEAD indexing.
# For each commit (e.g., 150 blobs per commit)
for commit in commits:
# Step 1: Get all blobs in commit
all_blobs = scanner.get_blobs_for_commit(commit.hash) # 150 blobs
# Step 2: Check blob registry (SQLite lookup, microseconds)
new_blobs = []
for blob in all_blobs:
if not blob_registry.has_blob(blob.blob_hash): # Fast SQLite query
new_blobs.append(blob) # Only ~12 blobs are new (92% dedup)
# Step 3: Process ONLY new blobs (skip 138 existing)
if new_blobs:
# Use HistoricalBlobProcessor (reuses VectorCalculationManager)
blob_processor.process_blobs_high_throughput(
new_blobs, # Only 12 blobs instead of 150
vector_thread_count=8
)
# Step 4: Link commit → all blobs (new + reused) in SQLite
store_commit_metadata(commit, all_blobs)Result: 150 blobs → 12 embeddings → 92% savings
Performance Expectations (42K files, 10GB repo)
First Run:
- Estimated 150,000 unique blobs across history
- 92% deduplication (most files unchanged across commits)
- Only ~12,000 new blobs need embedding
- 8 parallel threads with VoyageAI batch processing
- Time: 4-7 minutes (similar to workspace indexing)
Incremental (New Commits):
- Only process blobs from new commits
- High deduplication (most files unchanged)
- Time: <1 minute
Bottlenecks:
- Git blob extraction (
git cat-file) - slower than disk reads but parallelized - VoyageAI API calls - same as workspace (token-aware batching, 120K limit)
- SQLite blob registry lookups - microseconds (indexed)
Technical Implementation
Entry Point (CLI)
# In cli.py index command
@click.option("--index-commits", is_flag=True,
help="Index git commit history for temporal search (current branch only)")
@click.option("--all-branches", is_flag=True,
help="Index all branches (requires --index-commits, may increase storage significantly)")
@click.option("--max-commits", type=int,
help="Maximum number of commits to index per branch (default: all)")
@click.option("--since-date",
help="Index commits since date (YYYY-MM-DD)")
def index(..., index_commits, all_branches, max_commits, since_date):
if index_commits:
# Lazy import for performance
from src.code_indexer.services.temporal_indexer import TemporalIndexer
temporal_indexer = TemporalIndexer(config_manager, vector_store)
# Cost estimation and warning for all-branches
if all_branches:
cost_estimate = temporal_indexer.estimate_all_branches_cost()
console.print(Panel(
f"⚠️ [yellow]Indexing all branches will:[/yellow]\n"
f" • Process {cost_estimate.additional_commits:,} additional commits\n"
f" • Create {cost_estimate.additional_blobs:,} new embeddings\n"
f" • Use {cost_estimate.storage_mb:.1f} MB additional storage\n"
f" • Cost ~${cost_estimate.api_cost:.2f} in VoyageAI API calls",
title="Cost Warning",
border_style="yellow"
))
if cost_estimate.total_branches > 50:
if not click.confirm("Continue with all-branches indexing?", default=False):
console.print("[yellow]Cancelled. Using single-branch mode.[/yellow]")
all_branches = False
result = temporal_indexer.index_commits(
all_branches=all_branches,
max_commits=max_commits,
since_date=since_date
)Core Implementation (TemporalIndexer Orchestration)
CRITICAL: TemporalIndexer orchestrates the flow but delegates actual blob processing to HistoricalBlobProcessor.
class TemporalIndexer:
def __init__(self, config_manager, vector_store):
self.config = config_manager.get_config()
self.vector_store = vector_store
self.db_path = Path(".code-indexer/index/temporal/commits.db")
# Initialize git-specific components
self.blob_scanner = TemporalBlobScanner(self.config.codebase_dir)
self.blob_reader = GitBlobReader(self.config.codebase_dir)
# Initialize blob registry
self.blob_registry = BlobRegistry(
Path(".code-indexer/index/temporal/blob_registry.db")
)
# Initialize blob processor (reuses VectorCalculationManager)
from .embedding_factory import EmbeddingProviderFactory
embedding_provider = EmbeddingProviderFactory.create(config=self.config)
self.blob_processor = HistoricalBlobProcessor(
config=self.config,
embedding_provider=embedding_provider,
vector_store=vector_store,
blob_registry=self.blob_registry
)
def index_commits(self, all_branches: bool = False,
max_commits: Optional[int] = None,
since_date: Optional[str] = None,
progress_callback: Optional[Callable] = None) -> IndexingResult:
"""Index git history with blob deduplication and branch tracking.
This method orchestrates but DELEGATES blob processing to
HistoricalBlobProcessor (which reuses VectorCalculationManager).
"""
# Step 1: Build blob registry from existing vectors (SQLite)
self.blob_registry.build_from_vector_store(self.vector_store)
# Step 2: Get commit history from git (with branch info)
commits = self._get_commit_history(all_branches, max_commits, since_date)
current_branch = self._get_current_branch()
# Step 3: Process each commit
total_blobs_processed = 0
total_vectors_created = 0
for i, commit in enumerate(commits):
# 3a. Discover all blobs in this commit (git ls-tree)
all_blobs = self.blob_scanner.get_blobs_for_commit(commit.hash)
# 3b. Filter to ONLY new blobs (deduplication check)
new_blobs = []
for blob_info in all_blobs:
if not self.blob_registry.has_blob(blob_info.blob_hash):
new_blobs.append(blob_info)
# 3c. Process new blobs using HistoricalBlobProcessor
# (This reuses VectorCalculationManager + FilesystemVectorStore)
if new_blobs:
stats = self.blob_processor.process_blobs_high_throughput(
new_blobs,
vector_thread_count=8,
progress_callback=progress_callback
)
total_vectors_created += stats.vectors_created
total_blobs_processed += len(all_blobs)
# 3d. Store commit metadata in SQLite (links commit → blobs)
self._store_commit_tree(commit, all_blobs)
# 3e. Store branch metadata for THIS COMMIT (CRITICAL: During processing)
# DO NOT defer this to after the loop - branch metadata must be stored
# as we process each commit for accuracy and to avoid expensive lookups later
self._store_commit_branch_metadata(
commit_hash=commit.hash,
all_branches_mode=all_branches,
current_branch=current_branch
)
# Progress with branch info
if progress_callback:
branch_info = f" [{current_branch}]" if not all_branches else ""
progress_callback(
i + 1,
len(commits),
Path(f"commit {commit.hash[:8]}"),
info=f"{i+1}/{len(commits)} commits{branch_info}"
)
# Step 5: Save temporal metadata with branch info
branch_stats = self._calculate_branch_statistics(commits, all_branches)
self._save_temporal_metadata(
last_commit=commits[-1].hash,
total_commits=len(commits),
total_blobs=total_blobs_processed,
new_blobs=total_vectors_created // 3, # Approx (3 chunks/file avg)
branch_stats=branch_stats,
indexing_mode='all-branches' if all_branches else 'single-branch'
)
return IndexingResult(
total_commits=len(commits),
unique_blobs=total_blobs_processed,
new_blobs_indexed=total_vectors_created // 3,
deduplication_ratio=1 - (total_vectors_created / (total_blobs_processed * 3)),
branches_indexed=branch_stats.branches,
commits_per_branch=branch_stats.per_branch_counts
)New Component: TemporalBlobScanner
@dataclass
class BlobInfo:
"""Information about a blob in git history."""
blob_hash: str # Git's blob hash (for deduplication)
file_path: str # Relative path in repo
commit_hash: str # Which commit this blob appears in
size: int # Blob size in bytes
class TemporalBlobScanner:
"""Discovers blobs in git history."""
def __init__(self, codebase_dir: Path):
self.codebase_dir = codebase_dir
def get_blobs_for_commit(self, commit_hash: str) -> List[BlobInfo]:
"""Get all blobs in a commit's tree.
Uses: git ls-tree -r -l <commit_hash>
"""
cmd = ["git", "ls-tree", "-r", "-l", commit_hash]
result = subprocess.run(
cmd,
cwd=self.codebase_dir,
capture_output=True,
text=True,
check=True
)
blobs = []
for line in result.stdout.strip().split("\n"):
if not line:
continue
# Format: <mode> <type> <hash> <size>\t<path>
# Example: 100644 blob abc123def456 1234\tsrc/module.py
parts = line.split()
if len(parts) >= 4 and parts[1] == "blob":
blob_hash = parts[2]
size = int(parts[3])
file_path = line.split("\t", 1)[1]
blobs.append(BlobInfo(
blob_hash=blob_hash,
file_path=file_path,
commit_hash=commit_hash,
size=size
))
return blobsNew Component: GitBlobReader
class GitBlobReader:
"""Reads blob content from git object store."""
def __init__(self, codebase_dir: Path):
self.codebase_dir = codebase_dir
def read_blob_content(self, blob_hash: str) -> str:
"""Extract blob content as text.
Uses: git cat-file blob <blob_hash>
"""
cmd = ["git", "cat-file", "blob", blob_hash]
result = subprocess.run(
cmd,
cwd=self.codebase_dir,
capture_output=True,
text=True
)
if result.returncode != 0:
raise ValueError(f"Failed to read blob {blob_hash}: {result.stderr}")
return result.stdoutNew Component: HistoricalBlobProcessor
CRITICAL: This component reuses VectorCalculationManager and FilesystemVectorStore.
class HistoricalBlobProcessor:
"""Processes historical git blobs with parallel vectorization.
Similar to HighThroughputProcessor but for git blobs instead of disk files.
Reuses: VectorCalculationManager + FilesystemVectorStore
"""
def __init__(self, config, embedding_provider, vector_store, blob_registry):
self.config = config
self.embedding_provider = embedding_provider
self.vector_store = vector_store
self.blob_registry = blob_registry
self.blob_reader = GitBlobReader(config.codebase_dir)
self.chunker = FixedSizeChunker() # Will use chunk_text() method
def process_blobs_high_throughput(
self,
blobs: List[BlobInfo],
vector_thread_count: int,
progress_callback: Optional[Callable] = None
) -> BlobProcessingStats:
"""Process blobs with parallel vectorization.
Uses SAME architecture as HighThroughputProcessor:
- VectorCalculationManager for parallel embeddings
- FilesystemVectorStore for vector storage
- ThreadPoolExecutor for parallel blob processing
"""
stats = BlobProcessingStats()
# ✅ REUSE VectorCalculationManager (unchanged)
with VectorCalculationManager(
self.embedding_provider, vector_thread_count
) as vector_manager:
# Parallel blob processing
with ThreadPoolExecutor(max_workers=vector_thread_count) as executor:
futures = []
for blob_info in blobs:
future = executor.submit(
self._process_single_blob,
blob_info,
vector_manager
)
futures.append((future, blob_info))
# Collect results as they complete
for i, (future, blob_info) in enumerate(futures):
try:
result = future.result()
stats.blobs_processed += 1
stats.vectors_created += result.chunks_processed
# Progress callback
if progress_callback:
progress_callback(
i + 1,
len(blobs),
Path(blob_info.file_path),
info=f"{i+1}/{len(blobs)} blobs"
)
except Exception as e: