# Import Claude Code Logs to PostgreSQL

This notebook imports Claude Code conversation logs from JSONL files into a PostgreSQL database with full-text search support.

## Schema Overview

- **sessions** - One row per Claude Code session
- **messages** - One row per log entry (user message, assistant response, system, etc.)
- **content_blocks** - One row per content block within a message (text, tool_use, tool_result, thinking)

## Prerequisites

1. PostgreSQL 13+ running (can be on a remote host)
2. Create a `.env` file in the project root (copy from `.env.example`)
3. Install dependencies: `pip install -r requirements.txt`

## Setup

In [1]:
import json
import os
from pathlib import Path
from uuid import UUID

import psycopg2
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Database configuration from environment
DB_CONFIG = {
    'host': os.getenv('CLAUDE_LOGS_DB_HOST', 'localhost'),
    'port': int(os.getenv('CLAUDE_LOGS_DB_PORT', '5432')),
    'database': os.getenv('CLAUDE_LOGS_DB_NAME', 'claude_logs'),
    'user': os.getenv('CLAUDE_LOGS_DB_USER', 'postgres'),
    'password': os.getenv('CLAUDE_LOGS_DB_PASSWORD', ''),
}

CLAUDE_LOGS_DIR = Path.home() / '.claude' / 'projects'

print(f"Database host: {DB_CONFIG['host']}:{DB_CONFIG['port']}")
print(f"Database name: {DB_CONFIG['database']}")
print(f"Claude logs directory: {CLAUDE_LOGS_DIR}")

Database host: s2ag:5432
Database name: claude_logs
Claude logs directory: /home/romilly/.claude/projects


## Test Database Connection

In [2]:
def test_connection():
    """Test database connection."""
    try:
        conn = psycopg2.connect(**DB_CONFIG)
        cur = conn.cursor()
        cur.execute('SELECT version()')
        version = cur.fetchone()[0]
        cur.close()
        conn.close()
        print(f"Connected successfully!")
        print(f"PostgreSQL version: {version}")
        return True
    except psycopg2.OperationalError as e:
        print(f"Connection failed: {e}")
        return False

test_connection()

Connected successfully!
PostgreSQL version: PostgreSQL 13.15 (Raspbian 13.15-0+deb11u1) on arm-unknown-linux-gnueabihf, compiled by gcc (Raspbian 10.2.1-6+rpi1) 10.2.1 20210110, 32-bit


True

## Create Schema

Run this once to create the tables and indexes. Safe to re-run (uses IF NOT EXISTS).

In [None]:
SCHEMA_SQL = """
-- Sessions table: one row per Claude Code session
CREATE TABLE IF NOT EXISTS sessions (
    id SERIAL PRIMARY KEY,
    session_uuid UUID UNIQUE NOT NULL,
    project_path TEXT,
    summary TEXT,
    created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
    total_input_tokens INT DEFAULT 0,
    total_output_tokens INT DEFAULT 0
);

-- Messages table: one row per log entry (envelope only, content in content_blocks)
CREATE TABLE IF NOT EXISTS messages (
    id SERIAL PRIMARY KEY,
    session_id INT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
    uuid TEXT,
    type TEXT NOT NULL,           -- 'user', 'assistant', 'system', 'summary', etc.
    role TEXT,                     -- 'user', 'assistant' (from message.role)
    timestamp TIMESTAMPTZ,
    cwd TEXT,
    input_tokens INT,
    output_tokens INT,
    version TEXT
);

-- Content blocks table: one row per content block within a message
CREATE TABLE IF NOT EXISTS content_blocks (
    id SERIAL PRIMARY KEY,
    message_id INT NOT NULL REFERENCES messages(id) ON DELETE CASCADE,
    block_index INT NOT NULL,      -- order within the message
    block_type TEXT NOT NULL,      -- 'text', 'tool_use', 'tool_result', 'thinking'
    
    -- Text content (for text, thinking, tool_result blocks)
    text_content TEXT,
    
    -- Tool use fields
    tool_name TEXT,                -- for tool_use blocks
    tool_input JSONB,              -- for tool_use blocks (the input parameters)
    tool_use_id TEXT,              -- links tool_use to its tool_result
    
    -- Full-text search on text content
    content_tsvector tsvector GENERATED ALWAYS AS (
        to_tsvector('english', COALESCE(text_content, ''))
    ) STORED
);

-- Import metadata: tracks last import timestamp per project for idempotent imports
CREATE TABLE IF NOT EXISTS import_metadata (
    project_path TEXT PRIMARY KEY,
    last_import_timestamp TIMESTAMPTZ NOT NULL
);

-- Indexes for messages
CREATE INDEX IF NOT EXISTS idx_messages_session_id ON messages(session_id);
CREATE INDEX IF NOT EXISTS idx_messages_type ON messages(type);
CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages(timestamp DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session_timestamp ON messages(session_id, timestamp DESC);

-- Unique index on messages.uuid as safety net for idempotent imports
CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_uuid_unique
    ON messages(uuid) WHERE uuid IS NOT NULL;

-- Indexes for content_blocks
CREATE INDEX IF NOT EXISTS idx_content_blocks_message_id ON content_blocks(message_id);
CREATE INDEX IF NOT EXISTS idx_content_blocks_type ON content_blocks(block_type);
CREATE INDEX IF NOT EXISTS idx_content_blocks_tool_use_id ON content_blocks(tool_use_id);
CREATE INDEX IF NOT EXISTS idx_content_blocks_tool_name ON content_blocks(tool_name);
CREATE INDEX IF NOT EXISTS idx_content_blocks_fts ON content_blocks USING GIN(content_tsvector);
"""

# Migration SQL for existing databases (safe to re-run)
MIGRATION_SQL = """
-- Add summary column if it doesn't exist
ALTER TABLE sessions ADD COLUMN IF NOT EXISTS summary TEXT;

-- Create import_metadata table if it doesn't exist
CREATE TABLE IF NOT EXISTS import_metadata (
    project_path TEXT PRIMARY KEY,
    last_import_timestamp TIMESTAMPTZ NOT NULL
);

-- Add unique index on messages.uuid
CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_uuid_unique
    ON messages(uuid) WHERE uuid IS NOT NULL;
"""

def create_schema():
    """Create database schema (and run migrations for existing databases)."""
    conn = psycopg2.connect(**DB_CONFIG)
    cur = conn.cursor()
    cur.execute(SCHEMA_SQL)
    cur.execute(MIGRATION_SQL)
    conn.commit()
    cur.close()
    conn.close()
    print("Schema created successfully!")

create_schema()

## Discover Log Files

In [4]:
def find_jsonl_files():
    """Discover all JSONL files in Claude Code log directory."""
    if not CLAUDE_LOGS_DIR.exists():
        print(f"Error: Claude logs directory not found at {CLAUDE_LOGS_DIR}")
        return []
    
    jsonl_files = list(CLAUDE_LOGS_DIR.glob('**/*.jsonl'))
    print(f"Found {len(jsonl_files)} JSONL files")
    return jsonl_files

log_files = find_jsonl_files()

# Show first few files
for f in log_files[:10]:
    print(f"  {f.relative_to(CLAUDE_LOGS_DIR)}")
if len(log_files) > 10:
    print(f"  ... and {len(log_files) - 10} more")

Found 220 JSONL files
  -home-romilly-git-active-video-transcriber/agent-81b3a20c.jsonl
  -home-romilly-git-active-video-transcriber/agent-a286637.jsonl
  -home-romilly-git-active-video-transcriber/agent-aeac6d77.jsonl
  -home-romilly-git-active-video-transcriber/agent-867d63b3.jsonl
  -home-romilly-git-active-video-transcriber/agent-aed946a.jsonl
  -home-romilly-git-active-video-transcriber/f7f84c65-45e6-4255-949a-645d3f32f3a8.jsonl
  -home-romilly-git-active-video-transcriber/e91a98a1-1fcb-4f9f-904c-afd06ec20677.jsonl
  -home-romilly-git-active-video-transcriber/agent-e021f857.jsonl
  -home-romilly-git-active-video-transcriber/agent-a4b03da.jsonl
  -home-romilly-git-active-video-transcriber/agent-92ee2965.jsonl
  ... and 210 more


## Import Functions

In [None]:
def get_or_create_session(cur, session_uuid, project_path):
    """
    Get session ID, creating it if needed.
    Returns session_id.
    """
    try:
        session_uuid = UUID(session_uuid) if isinstance(session_uuid, str) else session_uuid
    except (ValueError, AttributeError):
        pass
    
    cur.execute(
        """
        INSERT INTO sessions (session_uuid, project_path, created_at, updated_at)
        VALUES (%s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
        ON CONFLICT (session_uuid) DO UPDATE SET updated_at = CURRENT_TIMESTAMP
        RETURNING id
        """,
        (str(session_uuid), project_path)
    )
    return cur.fetchone()[0]


def get_last_import_timestamp(cur, project_path):
    """Get the last import timestamp for a project, or None if never imported."""
    cur.execute(
        "SELECT last_import_timestamp FROM import_metadata WHERE project_path = %s",
        (project_path,)
    )
    row = cur.fetchone()
    return row[0] if row else None


def update_last_import_timestamp(cur, project_path, timestamp):
    """Update (upsert) the last import timestamp for a project."""
    cur.execute(
        """INSERT INTO import_metadata (project_path, last_import_timestamp)
           VALUES (%s, %s)
           ON CONFLICT (project_path) DO UPDATE SET last_import_timestamp = %s""",
        (project_path, timestamp, timestamp)
    )


def insert_content_blocks(cur, message_id, content):
    """
    Insert content blocks for a message.
    
    Content can be:
    - A string (user messages)
    - A list of block dicts (assistant messages)
    
    Returns number of blocks inserted.
    """
    if content is None:
        return 0
    
    # Handle string content (user messages)
    if isinstance(content, str):
        if not content.strip():
            return 0
        cur.execute(
            """
            INSERT INTO content_blocks (message_id, block_index, block_type, text_content)
            VALUES (%s, %s, %s, %s)
            """,
            (message_id, 0, 'text', content)
        )
        return 1
    
    # Handle list of content blocks (assistant messages)
    if not isinstance(content, list):
        return 0
    
    block_count = 0
    for idx, block in enumerate(content):
        if not isinstance(block, dict):
            continue
        
        block_type = block.get('type', 'unknown')
        
        if block_type == 'text':
            cur.execute(
                """
                INSERT INTO content_blocks (message_id, block_index, block_type, text_content)
                VALUES (%s, %s, %s, %s)
                """,
                (message_id, idx, 'text', block.get('text', ''))
            )
            block_count += 1
            
        elif block_type == 'thinking':
            cur.execute(
                """
                INSERT INTO content_blocks (message_id, block_index, block_type, text_content)
                VALUES (%s, %s, %s, %s)
                """,
                (message_id, idx, 'thinking', block.get('thinking', ''))
            )
            block_count += 1
            
        elif block_type == 'tool_use':
            cur.execute(
                """
                INSERT INTO content_blocks (
                    message_id, block_index, block_type, 
                    tool_name, tool_input, tool_use_id
                )
                VALUES (%s, %s, %s, %s, %s, %s)
                """,
                (
                    message_id, idx, 'tool_use',
                    block.get('name'),
                    json.dumps(block.get('input', {})),
                    block.get('id')
                )
            )
            block_count += 1
            
        elif block_type == 'tool_result':
            # tool_result content can be string or list
            result_content = block.get('content', '')
            if isinstance(result_content, list):
                # Extract text from content blocks within the result
                result_content = '\n'.join(
                    item.get('text', '') for item in result_content 
                    if isinstance(item, dict) and item.get('type') == 'text'
                )
            
            cur.execute(
                """
                INSERT INTO content_blocks (
                    message_id, block_index, block_type, 
                    text_content, tool_use_id
                )
                VALUES (%s, %s, %s, %s, %s)
                """,
                (
                    message_id, idx, 'tool_result',
                    result_content,
                    block.get('tool_use_id')
                )
            )
            block_count += 1
    
    return block_count


def import_jsonl_file(filepath, conn):
    """
    Import a single JSONL file into the database.
    Idempotent: skips entries already imported based on timestamp tracking.
    Returns: (message_count, block_count, error_count, skipped_count)
    """
    from datetime import datetime, timezone
    
    cur = conn.cursor()
    message_count = 0
    block_count = 0
    error_count = 0
    skipped_count = 0
    session_id = None
    summary_text = None
    max_timestamp = None
    
    try:
        project_path = str(filepath.parent.relative_to(CLAUDE_LOGS_DIR))
    except ValueError:
        project_path = str(filepath.parent)
    
    # Get the cutoff timestamp for this project
    cutoff = get_last_import_timestamp(cur, project_path)
    
    try:
        with open(filepath, 'r') as f:
            for line_num, line in enumerate(f, 1):
                try:
                    data = json.loads(line)
                    entry_type = data.get('type')
                    
                    # Skip file-history-snapshot entries entirely
                    if entry_type == 'file-history-snapshot':
                        skipped_count += 1
                        continue
                    
                    # Capture summary text (no timestamp, always process)
                    if entry_type == 'summary':
                        summary_text = data.get('summary')
                        skipped_count += 1  # not a message we insert
                        continue
                    
                    # Timestamp-based filtering for regular entries
                    entry_timestamp = data.get('timestamp')
                    if entry_timestamp and cutoff:
                        # Parse ISO timestamp for comparison
                        entry_dt = datetime.fromisoformat(entry_timestamp.replace('Z', '+00:00'))
                        if entry_dt <= cutoff:
                            skipped_count += 1
                            continue
                    
                    # Track max timestamp for updating cutoff after import
                    if entry_timestamp:
                        entry_dt = datetime.fromisoformat(entry_timestamp.replace('Z', '+00:00'))
                        if max_timestamp is None or entry_dt > max_timestamp:
                            max_timestamp = entry_dt
                    
                    # Create or get session
                    if data.get('sessionId') and session_id is None:
                        session_id = get_or_create_session(
                            cur, 
                            data['sessionId'],
                            project_path
                        )
                    
                    if not session_id:
                        continue
                    
                    # Extract message metadata
                    message = data.get('message', {})
                    usage = message.get('usage', {}) if isinstance(message, dict) else {}
                    
                    # Insert message with ON CONFLICT safety net
                    msg_uuid = data.get('uuid')
                    if msg_uuid:
                        cur.execute(
                            """
                            INSERT INTO messages (
                                session_id, uuid, type, role,
                                timestamp, cwd, input_tokens, output_tokens, version
                            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
                            ON CONFLICT (uuid) WHERE uuid IS NOT NULL DO NOTHING
                            RETURNING id
                            """,
                            (
                                session_id,
                                msg_uuid,
                                entry_type,
                                message.get('role') if isinstance(message, dict) else None,
                                entry_timestamp,
                                data.get('cwd'),
                                usage.get('input_tokens') if isinstance(usage, dict) else None,
                                usage.get('output_tokens') if isinstance(usage, dict) else None,
                                data.get('version')
                            )
                        )
                        result = cur.fetchone()
                        if result is None:
                            # Already existed (conflict), skip
                            skipped_count += 1
                            continue
                        message_id = result[0]
                    else:
                        cur.execute(
                            """
                            INSERT INTO messages (
                                session_id, uuid, type, role,
                                timestamp, cwd, input_tokens, output_tokens, version
                            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
                            RETURNING id
                            """,
                            (
                                session_id,
                                None,
                                entry_type,
                                message.get('role') if isinstance(message, dict) else None,
                                entry_timestamp,
                                data.get('cwd'),
                                usage.get('input_tokens') if isinstance(usage, dict) else None,
                                usage.get('output_tokens') if isinstance(usage, dict) else None,
                                data.get('version')
                            )
                        )
                        message_id = cur.fetchone()[0]
                    
                    message_count += 1
                    
                    # Insert content blocks
                    content = message.get('content') if isinstance(message, dict) else None
                    block_count += insert_content_blocks(cur, message_id, content)
                    
                    # Commit every 100 messages
                    if message_count % 100 == 0:
                        conn.commit()
                
                except (json.JSONDecodeError, ValueError) as e:
                    error_count += 1
                    if error_count <= 3:
                        print(f"  Error at line {line_num}: {e}")
                    continue
        
        # Final commit
        conn.commit()
        
        # Update session summary if we found one
        if session_id and summary_text:
            cur.execute(
                "UPDATE sessions SET summary = %s WHERE id = %s",
                (summary_text, session_id)
            )
            conn.commit()
        
        # Update session token totals
        if session_id:
            cur.execute(
                """
                UPDATE sessions 
                SET total_input_tokens = (
                    SELECT COALESCE(SUM(input_tokens), 0) FROM messages 
                    WHERE session_id = %s
                ),
                total_output_tokens = (
                    SELECT COALESCE(SUM(output_tokens), 0) FROM messages 
                    WHERE session_id = %s
                )
                WHERE id = %s
                """,
                (session_id, session_id, session_id)
            )
            conn.commit()
        
        # Update last import timestamp for this project
        if max_timestamp:
            update_last_import_timestamp(cur, project_path, max_timestamp)
            conn.commit()
        
        return message_count, block_count, error_count, skipped_count
    
    except Exception as e:
        print(f"  Failed to import: {e}")
        conn.rollback()
        return 0, 0, -1, 0
    
    finally:
        cur.close()

## Run Import

In [None]:
def import_all_logs(files):
    """Import all discovered log files. Idempotent - safe to re-run."""
    conn = psycopg2.connect(**DB_CONFIG)
    
    total_messages = 0
    total_blocks = 0
    total_errors = 0
    total_skipped = 0
    
    for i, filepath in enumerate(files, 1):
        messages, blocks, errors, skipped = import_jsonl_file(filepath, conn)
        total_messages += messages
        total_blocks += blocks
        total_skipped += skipped
        if errors > 0:
            total_errors += errors
        if messages > 0 or errors > 0:
            print(f"[{i}/{len(files)}] {filepath.name}")
            print(f"  -> {messages} messages, {blocks} blocks, {skipped} skipped ({errors} errors)")
    
    conn.close()
    
    print(f"\n=== Import Complete ===")
    print(f"New messages: {total_messages}")
    print(f"New content blocks: {total_blocks}")
    print(f"Skipped entries: {total_skipped}")
    print(f"Errors: {total_errors}")

# Run the import
import_all_logs(log_files)

## Verify Import

In [None]:
def show_stats():
    """Show database statistics."""
    conn = psycopg2.connect(**DB_CONFIG)
    cur = conn.cursor()
    
    cur.execute("SELECT COUNT(*) FROM sessions")
    session_count = cur.fetchone()[0]
    
    cur.execute("SELECT COUNT(*) FROM sessions WHERE summary IS NOT NULL")
    sessions_with_summary = cur.fetchone()[0]
    
    cur.execute("SELECT COUNT(*) FROM messages")
    message_count = cur.fetchone()[0]
    
    cur.execute("SELECT COUNT(*) FROM content_blocks")
    block_count = cur.fetchone()[0]
    
    cur.execute("""
        SELECT block_type, COUNT(*) 
        FROM content_blocks 
        GROUP BY block_type 
        ORDER BY COUNT(*) DESC
    """)
    block_types = cur.fetchall()
    
    cur.execute("""
        SELECT 
            COALESCE(SUM(input_tokens), 0) as total_input,
            COALESCE(SUM(output_tokens), 0) as total_output
        FROM messages
    """)
    tokens = cur.fetchone()
    
    cur.execute("SELECT COUNT(*) FROM import_metadata")
    tracked_projects = cur.fetchone()[0]
    
    print(f"Sessions: {session_count} ({sessions_with_summary} with summaries)")
    print(f"Messages: {message_count}")
    print(f"Content blocks: {block_count}")
    print(f"\nBlock types:")
    for block_type, count in block_types:
        print(f"  {block_type}: {count}")
    print(f"\nTotal tokens: {tokens[0] + tokens[1]:,}")
    print(f"  Input: {tokens[0]:,}")
    print(f"  Output: {tokens[1]:,}")
    print(f"\nTracked projects: {tracked_projects}")
    
    cur.close()
    conn.close()

show_stats()

## Quick Test: Full-Text Search

In [4]:
def search(query_text, limit=5):
    """Search text content across all block types."""
    conn = psycopg2.connect(**DB_CONFIG)
    cur = conn.cursor()
    
    cur.execute("""
        SELECT 
            cb.id,
            cb.block_type,
            m.timestamp,
            m.type as message_type,
            LEFT(cb.text_content, 200) as content_preview,
            ts_rank(cb.content_tsvector, q.query) AS relevance
        FROM content_blocks cb
        JOIN messages m ON cb.message_id = m.id,
             plainto_tsquery('english', %s) q(query)
        WHERE cb.content_tsvector @@ q.query
        ORDER BY relevance DESC, m.timestamp DESC
        LIMIT %s
    """, (query_text, limit))
    
    results = cur.fetchall()
    
    print(f"Search: '{query_text}' ({len(results)} results)\n")
    for block_id, block_type, timestamp, msg_type, preview, relevance in results:
        print(f"[{relevance:.3f}] {timestamp} ({msg_type}/{block_type})")
        print(f"  {preview}...")
        print()
    
    cur.close()
    conn.close()

# Try a search
search('Hexagonal', 20)

Search: 'Hexagonal' (20 results)

[0.098] 2025-12-17 15:56:54.592000+00:00 (user/tool_result)
  Web search results for query: "Hexagonal Architecture Ports and Adapters Alistair Cockburn history origins 2005"

Links: [{"title":"Hexagonal architecture (software) - Wikipedia","url":"https://en.wik...

[0.097] 2025-12-08 09:59:38.032000+00:00 (user/tool_result)
  Excellent! Now I have all the information I need. Let me compile a comprehensive recommendations report:

## Hexagonal Architecture Implementation Recommendations for Transcriber Application

Based on...

[0.097] 2025-12-08 09:59:37.917000+00:00 (assistant/text)
  Excellent! Now I have all the information I need. Let me compile a comprehensive recommendations report:

## Hexagonal Architecture Implementation Recommendations for Transcriber Application

Based on...

[0.097] 2025-12-08 09:57:17.199000+00:00 (user/tool_result)
  Web search results for query: "hexagonal architecture Python best practices 2025"

Links: [{"title":"Stru

## Find Failed Tool Calls

Example query to find tool calls that resulted in errors.

In [8]:
def find_failed_tool_calls(limit=10):
    """Find tool calls where the result contains error indicators."""
    conn = psycopg2.connect(**DB_CONFIG)
    cur = conn.cursor()
    
    cur.execute("""
        SELECT 
            m.timestamp,
            s.project_path,
            cb_call.tool_name,
            LEFT(cb_result.text_content, 300) as result_preview
        FROM content_blocks cb_result
        JOIN content_blocks cb_call 
            ON cb_call.tool_use_id = cb_result.tool_use_id 
            AND cb_call.block_type = 'tool_use'
        JOIN messages m ON cb_result.message_id = m.id
        JOIN sessions s ON m.session_id = s.id
        WHERE cb_result.block_type = 'tool_result'
          AND cb_result.content_tsvector @@ to_tsquery('english', 
              'error | failed | exception | traceback | denied'
          )
        ORDER BY m.timestamp DESC
        LIMIT %s
    """, (limit,))
    
    results = cur.fetchall()
    
    print(f"Found {len(results)} failed tool calls:\n")
    for timestamp, project, tool_name, preview in results:
        print(f"{timestamp} [{project}]")
        print(f"  Tool: {tool_name}")
        print(f"  Result: {preview}...")
        print()
    
    cur.close()
    conn.close()

find_failed_tool_calls()

Found 10 failed tool calls:

2025-12-22 11:34:12.594000+00:00 [-home-romilly-git-active-claude-code-log-tools]
  Tool: Write
  Result: The file /home/romilly/git/active/claude-code-log-tools/notebooks/01_import_logs.ipynb has been updated. Here's the result of running `cat -n` on a snippet of the edited file:
     1→{
     2→ "cells": [
     3→  {
     4→   "cell_type": "markdown",
     5→   "metadata": {},
     6→   "source": [
  ...

2025-12-22 11:32:32.674000+00:00 [-home-romilly-git-active-claude-code-log-tools]
  Tool: Read
  Result: <cell id="cell-0"><cell_type>markdown</cell_type># Import Claude Code Logs to PostgreSQL

This notebook imports Claude Code conversation logs from JSONL files into a PostgreSQL database with full-text search support.

## Prerequisites

1. PostgreSQL 13+ running (can be on a remote host)
2. Create a ...

2025-12-22 11:15:41.162000+00:00 [-home-romilly-git-active-claude-code-log-tools]
  Tool: Read
  Result:      1→# PostgreSQL Full-Text Search Setup fo

## Tool Usage Summary

See which tools are used most frequently.

In [None]:
def tool_usage_summary():
    """Show tool usage statistics."""
    conn = psycopg2.connect(**DB_CONFIG)
    cur = conn.cursor()
    
    cur.execute("""
        SELECT tool_name, COUNT(*) as usage_count
        FROM content_blocks
        WHERE block_type = 'tool_use'
          AND tool_name IS NOT NULL
        GROUP BY tool_name
        ORDER BY usage_count DESC
    """)
    
    results = cur.fetchall()
    
    print("Tool usage summary:\n")
    for tool_name, count in results:
        print(f"  {tool_name}: {count}")
    
    cur.close()
    conn.close()

tool_usage_summary()