# Schema Migration Demo

This notebook demonstrates the database schema migration from the current 1:1 identifier mapping to the new schema supporting multiple source identifiers per canonical ID.

## What this covers

1. **Current schema** - The existing 1:1 mapping between source identifiers and canonical IDs
2. **Sample data** - Loading data representing the current state of the catalogue pipeline
3. **Migration steps** - Creating the new schema and migrating existing data
4. **Verification** - Confirming the migration was successful with basic read/write tests

For a detailed demonstration of the ID Minter functionality (batch operations, predecessor inheritance, race condition handling), see [id_minter_demo.ipynb](id_minter_demo.ipynb).

## Prerequisites

- Docker installed
- [uv](https://docs.astral.sh/uv/) installed

## Setup

```bash
cd /Users/kennyr/workspace/docs/rfcs/XXX-stable_identifiers

# Install dependencies and create virtual environment
uv sync

# Start the MySQL container
docker-compose up -d
```

Then select the `.venv` Python interpreter for this notebook.

In [1]:
import pymysql
import csv
from typing import Optional, Tuple

from id_minter import generate_canonical_id

# Database connection settings
DB_CONFIG = {
    'host': 'localhost',
    'port': 3306,
    'user': 'root',
    'password': 'rootpassword',
    'database': 'id_minter'
}

def get_connection():
    """Get a database connection."""
    return pymysql.connect(**DB_CONFIG, cursorclass=pymysql.cursors.DictCursor)

def execute_query(query: str, params: tuple = None, fetch: bool = False):
    """Execute a query and optionally fetch results."""
    conn = get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute(query, params)
            if fetch:
                return cursor.fetchall()
            conn.commit()
            return cursor.rowcount
    finally:
        conn.close()

# Test connection
try:
    conn = get_connection()
    conn.close()
    print("✓ Connected to MySQL successfully")
except Exception as e:
    print(f"✗ Connection failed: {e}")
    print("\nMake sure docker-compose is running:")
    print("  cd /Users/kennyr/workspace/docs/rfcs/XXX-stable_identifiers")
    print("  docker-compose up -d")

✓ Connected to MySQL successfully


## Step 1: Create the Current (Legacy) Schema

This is the existing schema with a 1:1 relationship between canonical IDs and source identifiers, enforced by the primary key on `CanonicalId`.

In [2]:
# Drop existing tables if they exist (for demo reset)
execute_query("DROP TABLE IF EXISTS identifiers")
execute_query("DROP TABLE IF EXISTS identifiers_old")
execute_query("DROP TABLE IF EXISTS canonical_ids")

# Create the current (legacy) schema
current_schema = """
CREATE TABLE identifiers (
    CanonicalId VARCHAR(255) NOT NULL,
    OntologyType VARCHAR(255) NOT NULL,
    SourceId VARCHAR(255) NOT NULL,
    SourceSystem VARCHAR(255) NOT NULL,
    PRIMARY KEY (CanonicalId),
    UNIQUE KEY UniqueFromSource (OntologyType, SourceSystem, SourceId)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
"""

execute_query(current_schema)
print("✓ Current (legacy) schema created")

# Verify the schema
result = execute_query("DESCRIBE identifiers", fetch=True)
print("\nCurrent 'identifiers' table structure:")
for row in result:
    print(f"  {row['Field']:20} {row['Type']:20} {row['Key']}")

✓ Current (legacy) schema created

Current 'identifiers' table structure:
  CanonicalId          varchar(255)         PRI
  OntologyType         varchar(255)         MUL
  SourceId             varchar(255)         
  SourceSystem         varchar(255)         


## Step 2: Load Sample Data

Load the sample identifiers from the CSV file representing the current state of the catalogue pipeline.

In [3]:
# Load sample data from CSV
csv_path = '/Users/kennyr/workspace/docs/rfcs/XXX-stable_identifiers/identifiers_sample.csv'

sample_data = []
with open(csv_path, 'r') as f:
    reader = csv.DictReader(f)
    for row in reader:
        sample_data.append({
            'canonical_id': row['CanonicalId'],
            'ontology_type': row['OntologyType'],
            'source_system': row['SourceSystem'],
            'source_id': row['SourceId']
        })

print(f"✓ Loaded {len(sample_data)} records from CSV")

# Count by source system
source_counts = {}
for record in sample_data:
    system = record['source_system']
    source_counts[system] = source_counts.get(system, 0) + 1

print("\nRecords by source system:")
for system, count in sorted(source_counts.items(), key=lambda x: -x[1]):
    print(f"  {system:30} {count:6} records")

print(f"\nSample records:")
for record in sample_data[:3]:
    print(f"  {record['canonical_id']} <- {record['source_system']}/{record['source_id']}")

✓ Loaded 10000 records from CSV

Records by source system:
  mets-image                       7781 records
  sierra-system-number             1613 records
  miro-image-number                 136 records
  label-derived                     103 records
  calm-record-id                     80 records
  mets                               67 records
  lc-names                           66 records
  ebsco-alt-lookup                   50 records
  calm-ref-no                        48 records
  library-of-congress-names          20 records
  lc-subjects                        17 records
  nlm-mesh                           12 records
  medical-subject-headings            3 records
  tei-manuscript-id                   2 records
  library-of-congress-subject-headings      2 records

Sample records:
  gbum7y2b <- sierra-system-number/1890040
  be99823c <- mets-image/b28065037/FILE_0175_OBJECTS
  thhp9d2t <- mets-image/b22372210/FILE_0045_OBJECTS


In [4]:
# Insert sample data into current schema
conn = get_connection()
cursor = conn.cursor()

insert_query = """
INSERT INTO identifiers (CanonicalId, OntologyType, SourceSystem, SourceId)
VALUES (%s, %s, %s, %s)
"""

for record in sample_data:
    cursor.execute(insert_query, (
        record['canonical_id'],
        record['ontology_type'],
        record['source_system'],
        record['source_id']
    ))

conn.commit()
cursor.close()
conn.close()

print(f"✓ Inserted {len(sample_data)} records into legacy schema")

# Verify counts
result = execute_query("""
    SELECT SourceSystem, COUNT(*) as count 
    FROM identifiers 
    GROUP BY SourceSystem
    ORDER BY count DESC
""", fetch=True)

print("\nRecords by source system in database:")
for row in result:
    print(f"  {row['SourceSystem']:30} {row['count']:6} records")

✓ Inserted 10000 records into legacy schema

Records by source system in database:
  mets-image                       7781 records
  sierra-system-number             1613 records
  miro-image-number                 136 records
  label-derived                     103 records
  calm-record-id                     80 records
  mets                               67 records
  lc-names                           66 records
  ebsco-alt-lookup                   50 records
  calm-ref-no                        48 records
  library-of-congress-names          20 records
  lc-subjects                        17 records
  nlm-mesh                           12 records
  medical-subject-headings            3 records
  library-of-congress-subject-headings      2 records
  tei-manuscript-id                   2 records


## Step 3: Run the Migration

Migrate to the new schema with:
- `canonical_ids` table for ID registry and pre-generation
- `identifiers` table allowing multiple source IDs per canonical ID

### Step 3a: Create the canonical_ids table

In [5]:
print("Creating canonical_ids table...")

canonical_ids_schema = """
CREATE TABLE canonical_ids (
    CanonicalId VARCHAR(8) NOT NULL PRIMARY KEY,
    Status ENUM('free', 'assigned') NOT NULL DEFAULT 'free',
    CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_free (Status, CanonicalId)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
"""

execute_query(canonical_ids_schema)
print("✓ canonical_ids table created")

# Verify
result = execute_query("DESCRIBE canonical_ids", fetch=True)
print("\n'canonical_ids' table structure:")
for row in result:
    print(f"  {row['Field']:15} {row['Type']:30} {row['Key']}")

Creating canonical_ids table...
✓ canonical_ids table created

'canonical_ids' table structure:
  CanonicalId     varchar(8)                     PRI
  Status          enum('free','assigned')        MUL
  CreatedAt       timestamp                      


### Step 3b: Populate canonical_ids from existing identifiers

In [6]:
print("Populating canonical_ids from existing data...")

populate_query = """
INSERT INTO canonical_ids (CanonicalId, Status)
SELECT DISTINCT CanonicalId, 'assigned' FROM identifiers
"""

rows_inserted = execute_query(populate_query)
print(f"✓ Inserted {rows_inserted} canonical IDs")

# Verify
result = execute_query("SELECT COUNT(*) as count FROM canonical_ids", fetch=True)
print(f"\nTotal canonical IDs: {result[0]['count']}")

result = execute_query("SELECT * FROM canonical_ids LIMIT 5", fetch=True)
print("\nSample canonical_ids records:")
for row in result:
    print(f"  {row['CanonicalId']}  {row['Status']:10}  {row['CreatedAt']}")

Populating canonical_ids from existing data...
✓ Inserted 10000 canonical IDs

Total canonical IDs: 10000

Sample canonical_ids records:
  a22kzjax  assigned    2026-02-05 09:22:42
  a22yfu6v  assigned    2026-02-05 09:22:42
  a23v5jrt  assigned    2026-02-05 09:22:42
  a24b67g9  assigned    2026-02-05 09:22:42
  a26ap7qq  assigned    2026-02-05 09:22:42


### Step 3c: Create new identifiers table with updated schema

In [7]:
print("Creating new identifiers table...")

new_identifiers_schema = """
CREATE TABLE identifiers_new (
    OntologyType VARCHAR(255) NOT NULL,
    SourceSystem VARCHAR(255) NOT NULL,
    SourceId VARCHAR(255) NOT NULL,
    CanonicalId VARCHAR(8) NOT NULL,
    CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (OntologyType, SourceSystem, SourceId),
    FOREIGN KEY (CanonicalId) REFERENCES canonical_ids(CanonicalId),
    INDEX idx_canonical (CanonicalId)
) ENGINE=InnoDB DEFAULT CHARSET=latin1
"""

execute_query(new_identifiers_schema)
print("✓ identifiers_new table created")

# Verify
result = execute_query("DESCRIBE identifiers_new", fetch=True)
print("\n'identifiers_new' table structure:")
for row in result:
    print(f"  {row['Field']:15} {row['Type']:30} {row['Key']}")

Creating new identifiers table...
✓ identifiers_new table created

'identifiers_new' table structure:
  OntologyType    varchar(255)                   PRI
  SourceSystem    varchar(255)                   PRI
  SourceId        varchar(255)                   PRI
  CanonicalId     varchar(8)                     MUL
  CreatedAt       timestamp                      


### Step 3d: Copy data and swap tables

In [8]:
print("Copying data to new identifiers table...")

copy_query = """
INSERT INTO identifiers_new (OntologyType, SourceSystem, SourceId, CanonicalId)
SELECT OntologyType, SourceSystem, SourceId, CanonicalId FROM identifiers
"""

rows_copied = execute_query(copy_query)
print(f"✓ Copied {rows_copied} records")

# Atomic table swap
print("\nSwapping tables (atomic rename)...")
execute_query("RENAME TABLE identifiers TO identifiers_old, identifiers_new TO identifiers")
print("✓ Tables swapped")

# Verify final state
result = execute_query("SELECT COUNT(*) as count FROM identifiers", fetch=True)
print(f"\nRecords in new 'identifiers' table: {result[0]['count']}")

result = execute_query("SHOW TABLES", fetch=True)
print("\nTables in database:")
for row in result:
    print(f"  {list(row.values())[0]}")

Copying data to new identifiers table...
✓ Copied 10000 records

Swapping tables (atomic rename)...
✓ Tables swapped

Records in new 'identifiers' table: 10000

Tables in database:
  canonical_ids
  identifiers
  identifiers_old


## Step 4: Pre-generate Free IDs

Maintain a pool of pre-generated IDs to eliminate collision checking during minting.

In [9]:
def pre_generate_ids(count: int) -> int:
    """Pre-generate a batch of free IDs."""
    conn = get_connection()
    cursor = conn.cursor()
    
    generated = 0
    attempts = 0
    max_attempts = count * 2  # Allow for some collisions
    
    while generated < count and attempts < max_attempts:
        new_id = generate_canonical_id()
        try:
            cursor.execute(
                "INSERT IGNORE INTO canonical_ids (CanonicalId, Status) VALUES (%s, 'free')",
                (new_id,)
            )
            if cursor.rowcount > 0:
                generated += 1
        except Exception as e:
            pass  # Collision or other error, try again
        attempts += 1
    
    conn.commit()
    cursor.close()
    conn.close()
    
    return generated

# Pre-generate 100 free IDs
print("Pre-generating free IDs...")
generated = pre_generate_ids(100)
print(f"✓ Generated {generated} new free IDs")

# Check pool status
result = execute_query("""
    SELECT Status, COUNT(*) as count 
    FROM canonical_ids 
    GROUP BY Status
""", fetch=True)

print("\nCanonical ID pool status:")
for row in result:
    print(f"  {row['Status']:10} {row['count']:5} IDs")

Pre-generating free IDs...
✓ Generated 100 new free IDs

Canonical ID pool status:
  free         100 IDs
  assigned   10000 IDs


## Step 5: Verify Migration with ID Minter

Confirm the migration was successful by testing basic read/write operations with the ID Minter.

In [10]:
import importlib
import id_minter
importlib.reload(id_minter)
from id_minter import IDMinter

# Instantiate the minter
minter = IDMinter(get_connection)
print("✓ ID Minter initialized")

# Test batch lookup - verify we can read existing data
print("\n" + "="*60)
print("Testing batch lookup on migrated data...")
print("="*60)

# Get some existing Sierra IDs
existing = execute_query("""
    SELECT OntologyType, SourceSystem, SourceId, CanonicalId 
    FROM identifiers 
    WHERE SourceSystem = 'sierra-system-number'
    LIMIT 3
""", fetch=True)

lookup_ids = [(r['OntologyType'], r['SourceSystem'], r['SourceId']) for r in existing]
found = minter.lookup_ids(lookup_ids)

print(f"\nLooked up {len(lookup_ids)} IDs, found {len(found)}:")
for (ont, sys, sid), cid in found.items():
    expected = next(r['CanonicalId'] for r in existing if r['SourceId'] == sid)
    status = "✓" if cid == expected else "✗"
    print(f"  {status} {ont}/{sys}/{sid} -> {cid}")

✓ ID Minter initialized

Testing batch lookup on migrated data...

Looked up 3 IDs, found 3:
  ✓ Item/sierra-system-number/i19854572 -> a22yfu6v
  ✓ Item/sierra-system-number/1016923 -> a329mxe7
  ✓ Work/sierra-system-number/b13439844 -> a45fug5q


In [11]:
import random

# Test minting a new ID - verify we can write to the new schema
print("="*60)
print("Testing mint_ids on migrated schema...")
print("="*60)

# Get pool status before
pool_before = execute_query("""
    SELECT Status, COUNT(*) as count FROM canonical_ids GROUP BY Status
""", fetch=True)
free_before = next(r['count'] for r in pool_before if r['Status'] == 'free')

# Mint a new ID
test_id = f"MIGRATION-TEST-{random.randint(100000, 999999)}"
results = minter.mint_ids([
    (('Work', 'test-system', test_id), None)
])

# Get pool status after
pool_after = execute_query("""
    SELECT Status, COUNT(*) as count FROM canonical_ids GROUP BY Status
""", fetch=True)
free_after = next(r['count'] for r in pool_after if r['Status'] == 'free')

print(f"\nMinted new ID:")
for (ont, sys, sid), cid in results.items():
    print(f"  {ont}/{sys}/{sid} -> {cid}")

print(f"\nFree IDs consumed: {free_before - free_after}")
print("\n✓ Migration verified - read and write operations successful!")

Testing mint_ids on migrated schema...

Minted new ID:
  Work/test-system/MIGRATION-TEST-872008 -> a26hunyp

Free IDs consumed: 1

✓ Migration verified - read and write operations successful!


## Summary

This notebook demonstrated the complete schema migration:

1. **Legacy schema created** - 1:1 mapping with PK on `CanonicalId`
2. **Sample data loaded** - 10,000 records from the catalogue pipeline
3. **Migration executed**:
   - Created `canonical_ids` table for ID registry
   - Populated with existing IDs (status: 'assigned')
   - Created new `identifiers` table with composite PK
   - Atomic table swap
4. **Free IDs pre-generated** - Pool for efficient minting
5. **Migration verified** - Read and write operations successful

### Key Schema Changes

| Aspect | Legacy | New |
|--------|--------|-----|
| Primary key | `CanonicalId` | `(OntologyType, SourceSystem, SourceId)` |
| Mapping | 1:1 (source → canonical) | Many:1 (multiple sources → one canonical) |
| ID generation | On-demand with collision retry | Pre-generated pool |
| Timestamps | None | `CreatedAt` for provenance |

For detailed ID Minter functionality (batch operations, predecessor inheritance, race conditions), see [id_minter_demo.ipynb](id_minter_demo.ipynb).

## Cleanup

Run this to stop and remove the Docker container:

In [12]:
# Close connections
if 'minter' in dir() and minter.conn and minter.conn.open:
    minter.conn.close()
    print("✓ Minter connection closed")

# Stop the container (use -v to also remove the volume)
!docker compose down -v
print("✓ Docker container stopped")

✓ Minter connection closed
[?25l[0G[+] down 0/1
 [33m⠋[0m Container id-minter-mysql Stopping                                      [34m0.1s [0m
[?25h[?25l[2A[0G[+] down 0/1
 [33m⠙[0m Container id-minter-mysql Stopping                                      [34m0.2s [0m
[?25h[?25l[2A[0G[+] down 0/1
 [33m⠹[0m Container id-minter-mysql Stopping                                      [34m0.3s [0m
[?25h[?25l[2A[0G[+] down 0/1
 [33m⠸[0m Container id-minter-mysql Stopping                                      [34m0.4s [0m
[?25h[?25l[2A[0G[+] down 0/1
 [33m⠼[0m Container id-minter-mysql Stopping                                      [34m0.5s [0m
[?25h[?25l[2A[0G[+] down 0/1
 [33m⠴[0m Container id-minter-mysql Stopping                                      [34m0.6s [0m
[?25h[?25l[2A[0G[+] down 0/1
 [33m⠦[0m Container id-minter-mysql Stopping                                      [34m0.7s [0m
[?25h[?25l[2A[0G[+] down 0/1
 [33m⠧[0m Container id-m