Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/gmail_cli/db/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import sqlite3
from pathlib import Path
from sqlalchemy import text


# FTS5 full-text search setup
Expand Down Expand Up @@ -58,3 +59,43 @@ def setup_fts5(db_path: Path) -> None:
conn.commit()
finally:
conn.close()


def disable_fts_triggers(session) -> None:
"""Disable FTS5 triggers for bulk operations (performance optimization).

Args:
session: SQLModel session
"""
session.execute(text("DROP TRIGGER IF EXISTS messages_ai;"))
session.execute(text("DROP TRIGGER IF EXISTS messages_ad;"))
session.execute(text("DROP TRIGGER IF EXISTS messages_au;"))
session.commit()


def enable_fts_triggers(session) -> None:
"""Re-enable FTS5 triggers after bulk operations.

Args:
session: SQLModel session
"""
session.execute(text(FTS5_TRIGGER_INSERT))
session.execute(text(FTS5_TRIGGER_DELETE))
session.execute(text(FTS5_TRIGGER_UPDATE))
session.commit()


def rebuild_fts_index(session) -> None:
"""Rebuild FTS5 index from messages table.

Call this after bulk inserts with triggers disabled.

Args:
session: SQLModel session
"""
session.execute(text("DELETE FROM messages_fts;"))
session.execute(text("""
INSERT INTO messages_fts(rowid, subject, body_plain, from_addr, to_addrs)
SELECT rowid, subject, body_plain, from_addr, to_addrs FROM messages;
"""))
session.commit()
101 changes: 61 additions & 40 deletions src/gmail_cli/sync/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from gmail_cli.db import get_session
from gmail_cli.db.models import Message, Attachment
from gmail_cli.db.queries import get_message_count, get_sync_state, update_sync_state
from gmail_cli.db.schema import disable_fts_triggers, enable_fts_triggers, rebuild_fts_index
from gmail_cli.sync.client import GmailClient, HistoryIdTooOldError, BatchFetchError
from gmail_cli.sync.parser import parse_message

Expand Down Expand Up @@ -53,6 +54,9 @@ def full_sync(self, max_messages: Optional[int] = None, force: bool = False, dry
return

try:
# Disable FTS triggers during bulk sync for performance
disable_fts_triggers(session)

# Check for existing partial sync
sync_state = get_sync_state(session)

Expand Down Expand Up @@ -166,20 +170,21 @@ def full_sync(self, max_messages: Optional[int] = None, force: bool = False, dry
try:
parsed_messages = future.result()

# Store messages (thread-safe)
# Store messages (thread-safe, batched commits)
with self._db_lock:
for message, attachments in parsed_messages:
try:
self._store_message(session, message, attachments)
synced_count += 1
except Exception as e:
progress.log(f"[yellow]Failed to store message: {e}[/yellow]")
raise
try:
self._store_batch(session, parsed_messages)
synced_count += len(parsed_messages)
except Exception as e:
progress.log(f"[yellow]Failed to store batch: {e}[/yellow]")
raise

# Update progress title after each batch
batches_processed += 1
elapsed = time.monotonic() - last_heartbeat if batches_processed == 1 else time.monotonic() - last_heartbeat + 30
throughput = synced_count / max(elapsed, 0.1)
percentage = (synced_count / total_messages * 100) if total_messages > 0 else 0
progress.title = f"Syncing messages ({synced_count:,}/{total_messages:,} - {percentage:.1f}%)"
progress.title = f"Syncing messages ({synced_count:,}/{total_messages:,} - {percentage:.1f}% - {throughput:.1f} msg/s)"

except Exception as e:
progress.log(f"[yellow]Failed to fetch batch: {e}[/yellow]")
Expand Down Expand Up @@ -209,6 +214,11 @@ def full_sync(self, max_messages: Optional[int] = None, force: bool = False, dry
# So we need significant delay between pages
time.sleep(0.5)

# Re-enable FTS triggers and rebuild index
enable_fts_triggers(session)
self.app.print("Rebuilding full-text search index...", tag="info")
rebuild_fts_index(session)

# Update sync state - mark as complete
latest_profile = self.client.get_profile()
latest_history_id = latest_profile.get("historyId")
Expand All @@ -232,6 +242,7 @@ def full_sync(self, max_messages: Optional[int] = None, force: bool = False, dry
self.app.print(f"✓ Sync complete! Synced {actual_db_count:,} messages", tag="success")

except Exception as e:
enable_fts_triggers(session)
update_sync_state(
session, sync_status="error", error_message=str(e)
)
Expand Down Expand Up @@ -377,27 +388,28 @@ def incremental_sync(self) -> None:
continue

for future in done:
try:
parsed_messages = future.result()

# Store messages (thread-safe)
with self._db_lock:
for message, attachments in parsed_messages:
try:
self._store_message(session, message, attachments)
synced_count += 1
except Exception as e:
progress.log(f"[yellow]Failed to store message: {e}[/yellow]")
raise

# Update progress title after each batch
batches_processed += 1
percentage = (synced_count / len(affected_ids) * 100) if len(affected_ids) > 0 else 0
progress.title = f"Syncing changes ({synced_count}/{len(affected_ids)} - {percentage:.1f}%)"

except Exception as e:
progress.log(f"[yellow]Failed to fetch batch: {e}[/yellow]")
raise
try:
parsed_messages = future.result()

# Store messages (thread-safe, batched commits)
with self._db_lock:
try:
self._store_batch(session, parsed_messages)
synced_count += len(parsed_messages)
except Exception as e:
progress.log(f"[yellow]Failed to store batch: {e}[/yellow]")
raise

# Update progress title after each batch
batches_processed += 1
elapsed = time.monotonic() - last_heartbeat if batches_processed == 1 else time.monotonic() - last_heartbeat + 30
throughput = synced_count / max(elapsed, 0.1)
percentage = (synced_count / len(affected_ids) * 100) if len(affected_ids) > 0 else 0
progress.title = f"Syncing changes ({synced_count}/{len(affected_ids)} - {percentage:.1f}% - {throughput:.1f} msg/s)"

except Exception as e:
progress.log(f"[yellow]Failed to fetch batch: {e}[/yellow]")
raise
except Exception as e:
progress.log(f"[red]Error in batch processing: {e}[/red]")
raise
Expand Down Expand Up @@ -513,20 +525,29 @@ def _fetch_and_parse_batch(

raise Exception("Failed to fetch batch after retries")

def _store_message(
self, session: Session, message: Message, attachments: list
def _store_batch(
self, session: Session, parsed_messages: list
) -> None:
"""Store or update a message and its attachments in the database."""
# Update timestamp
message.updated_at = datetime.utcnow()
"""Store or update a batch of messages and their attachments in the database.

Commits once per batch instead of per message for better performance.

Args:
session: Database session
parsed_messages: List of (Message, List[Attachment]) tuples
"""
for message, attachments in parsed_messages:
# Update timestamp
message.updated_at = datetime.utcnow()

# Use merge to insert or update
session.merge(message)
# Use merge to insert or update
session.merge(message)

# Handle attachments
for attachment in attachments:
session.merge(attachment)
# Handle attachments
for attachment in attachments:
session.merge(attachment)

# Single commit for the entire batch
session.commit()

def _calculate_sync_stats(self, session: Session, max_messages: Optional[int]) -> dict:
Expand Down
88 changes: 88 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.