Skip to content

vaberry/canonmap

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

CanonMap

CanonMap is a Python library for data matching and canonicalization with multiple database connector support. It provides a powerful entity resolution pipeline that can match and canonicalize entity names against database records using multiple blocking strategies and similarity scoring algorithms.

Features

  • Multi-Strategy Blocking: Uses phonetic (Double Metaphone), Soundex, initialism, and exact matching to efficiently filter candidate matches
  • Advanced Scoring: Combines multiple similarity metrics (Levenshtein, Jaro-Winkler, token overlap, trigram, phonetic, soundex, initialism) with configurable weights
  • MySQL Connector: Built-in MySQL connection pooling with transaction support
  • Helper Fields: Automatic generation of indexed helper columns (phonetic, soundex, initialism) for fast blocking queries
  • Data Import: Import CSV/XLSX files with automatic type inference and schema creation
  • Database Management: High-level API for creating tables, fields, constraints, and managing database schemas
  • FastAPI Integration: Example FastAPI application scaffold for REST API deployment
  • CLI Tools: Command-line interface for scaffolding FastAPI applications
  • Production-Ready Logging: Environment-aware logging (Rich for dev, JSON for production)

Installation

pip install canonmap

For development dependencies:

pip install canonmap[dev]

For FastAPI integration:

pip install canonmap[fastapi]

Quick Start

Basic Usage

from canonmap import MySQLConnector, MySQLConfig, MappingPipeline, EntityMappingRequest

# Configure database connection
config = MySQLConfig.from_env()  # Reads from MYSQL_HOST, MYSQL_USER, etc.
# Or manually:
# config = MySQLConfig(
#     host="localhost",
#     user="root",
#     password="password",
#     database="mydb"
# )

connector = MySQLConnector(config)
connector.initialize_pool()

# Create mapping pipeline
pipeline = MappingPipeline(connector)

# Match an entity
request = EntityMappingRequest(
    entity_name="John Smith",
    candidate_table_name="customers",
    candidate_field_name="full_name",
    top_n=10,
    max_prefilter=1000
)

response = pipeline.run(request)

# Access results
for result in response.results:
    print(f"{result.canonical_entity} (score: {result.score})")

Using Custom Weights

from canonmap import MappingWeights

weights = MappingWeights(
    exact=6.0,          # High weight for exact matches
    levenshtein=1.0,    # Edit distance similarity
    jaro=1.2,           # Jaro-Winkler similarity
    token=2.0,           # Token overlap
    trigram=1.0,         # 3-gram similarity
    phonetic=1.0,        # Phonetic similarity
    soundex=1.0,         # Soundex similarity
    initialism=0.5,      # Initialism matching
    multi_bonus=1.0      # Bonus for multiple matching features
)

response = pipeline.run(request, mapping_weights=weights)

Core Concepts

Entity Mapping Pipeline

The mapping pipeline follows a two-stage process:

  1. Blocking: Filters candidate records using fast indexing strategies (phonetic, soundex, initialism, exact)
  2. Scoring: Computes similarity scores for filtered candidates using multiple string similarity metrics

Blocking Strategies

  • Phonetic: Uses Double Metaphone algorithm to match similar-sounding names
  • Soundex: Uses Soundex codes for phonetic matching
  • Initialism: Matches based on first letters of words (e.g., "IBM" matches "International Business Machines")
  • Exact: Case-insensitive exact matching

Helper Fields

Helper fields are pre-computed indexed columns that speed up blocking queries:

  • __field_name_phonetic__: Double Metaphone codes
  • __field_name_soundex__: Soundex codes
  • __field_name_initialism__: Initialism strings

These can be automatically generated using the DBClient.create_helper_fields() method.

Usage Examples

Database Client Operations

from canonmap import MySQLConnector, MySQLConfig, DBClient

connector = MySQLConnector(MySQLConfig.from_env())
client = DBClient(connector)

# Import a CSV file
rows_imported = client.import_table_from_file(
    "customers.csv",
    table_name="customers",
    if_table_exists="append"  # or "replace" or "fail"
)

# Create helper fields for fast matching
client.create_helper_fields({
    "table_name": "customers",
    "field_name": "full_name",
    "transforms": ["phonetic", "soundex", "initialism"],
    "if_exists": "replace"  # or "skip" or "error"
})

# Create a table
client.create_table(
    table_name="products",
    fields_ddl="id BIGINT PRIMARY KEY AUTO_INCREMENT, name VARCHAR(255) NOT NULL"
)

# Add a field
client.create_field(
    table_name="products",
    field_name="description",
    field_ddl="TEXT"
)

Prefiltering with SQL

# Use SQL to prefilter candidates before blocking
request = EntityMappingRequest(
    entity_name="John Smith",
    candidate_table_name="customers",
    candidate_field_name="full_name",
    prefilter_sql="SELECT * FROM customers WHERE status = 'active' AND created_at > '2020-01-01'",
    top_n=10
)

response = pipeline.run(request)

Development Mode

The run_dev() method provides additional debugging and deterministic behavior:

response = pipeline.run_dev(
    entity_mapping_request=request,
    mapping_weights=weights,
    per_strategy_limits={"phonetic": 500, "soundex": 300},
    global_prefilter_cap=1000,
    debug=True
)

Architecture

Package Structure

canonmap/
├── connectors/
│   └── mysql_connector/
│       ├── connector.py          # Connection pooling
│       ├── config.py              # Configuration management
│       ├── db_client.py           # High-level DB operations
│       ├── services/              # Service layer
│       │   ├── helper_fields_service.py
│       │   ├── import_table_service.py
│       │   ├── schema_service.py
│       │   └── ...
│       └── utils/                 # Utilities
│           ├── blocking.py
│           ├── transforms.py
│           └── ...
├── mapping/
│   ├── mapping_pipeline.py       # Main pipeline
│   ├── models.py                  # Pydantic models
│   └── utils/
│       ├── blocking.py            # Blocking strategies
│       ├── scoring.py             # Similarity scoring
│       └── normalize.py           # Text normalization
├── cli.py                         # CLI interface
└── logger.py                      # Logging configuration

Key Components

MySQLConnector

Manages MySQL connection pooling and provides:

  • Connection context managers
  • Transaction support
  • Query execution with automatic LIMIT enforcement
  • Write protection (optional)

MappingPipeline

Orchestrates the entity matching process:

  • Normalizes input entities
  • Executes blocking strategies in parallel
  • Scores candidates concurrently
  • Returns ranked results

DBClient

High-level database operations:

  • Table and field creation
  • Data import from files
  • Helper field generation
  • Schema management
  • Constraint management

API Reference

Models

EntityMappingRequest

class EntityMappingRequest(BaseModel):
    entity_name: str                    # Entity to match
    candidate_table_name: str           # Table to search
    candidate_field_name: str          # Field to match against
    top_n: int = 20                     # Number of results to return
    max_prefilter: int = 1000           # Max candidates per blocking strategy
    semantic_rerank: bool = False       # Future: semantic reranking
    prefilter_sql: Optional[str] = None # Optional SQL prefilter

EntityMappingResponse

class EntityMappingResponse(BaseModel):
    results: List[SingleMappedEntity]

class SingleMappedEntity(BaseModel):
    raw_entity: str              # Original input
    canonical_entity: str         # Matched entity
    canonical_table_name: str    # Source table
    canonical_field_name: str    # Source field
    score: float                 # Similarity score

MappingWeights

class MappingWeights(BaseModel):
    exact: float = 6.0
    levenshtein: float = 1.0
    jaro: float = 1.2
    token: float = 2.0
    trigram: float = 1.0
    phonetic: float = 1.0
    soundex: float = 1.0
    initialism: float = 0.5
    multi_bonus: float = 1.0

Methods

MappingPipeline.run()

def run(
    self,
    entity_mapping_request: Union[EntityMappingRequest, Dict[str, Any]],
    mapping_weights: Optional[Union[MappingWeights, Dict[str, Any]]] = None,
) -> EntityMappingResponse

Main pipeline execution method. Returns top N matches sorted by score.

MappingPipeline.run_dev()

def run_dev(
    self,
    entity_mapping_request: Union[EntityMappingRequest, Dict[str, Any]],
    mapping_weights: Optional[Union[MappingWeights, Dict[str, Any]]] = None,
    per_strategy_limits: Optional[Dict[str, int]] = None,
    global_prefilter_cap: Optional[int] = None,
    debug: bool = True,
) -> EntityMappingResponse

Development mode with additional controls and debugging.

Configuration

Environment Variables

CanonMap uses environment variables for database configuration:

MYSQL_HOST=localhost
MYSQL_USER=root
MYSQL_PASSWORD=your_password
MYSQL_DATABASE=your_database
MYSQL_POOL_NAME=mypool          # Optional, default: "mypool"
MYSQL_POOL_SIZE=5               # Optional, default: 5
ENV=dev                         # Optional, default: "dev" (affects logging)

Logging

Logging automatically adapts to environment:

  • Development (ENV=dev): Rich, colorized console output
  • Production (ENV=prod): JSON-formatted logs for cloud logging

CLI Usage

CanonMap includes a CLI tool for scaffolding FastAPI applications:

# Install CLI
pip install canonmap

# Create a new FastAPI app
cm make api

# Or specify a name
cm make api --name my_api

This creates a FastAPI application scaffold with:

  • Entity mapping endpoints
  • Database routes
  • Example configuration
  • CORS middleware

FastAPI Integration

The package includes an example FastAPI application in canonmap._example_usage.app. After scaffolding:

# Set up environment
cp .env.example .env
# Edit .env with your database credentials

# Run the API
uvicorn your_api_name.main:app --reload

Example Endpoints

  • POST /entity/map-entity: Map an entity to canonical forms
  • Database management endpoints (see example routes)

Performance Considerations

Helper Fields

For best performance, create helper fields on fields you frequently match against:

client.create_helper_fields({
    "table_name": "customers",
    "field_name": "full_name",
    "transforms": ["phonetic", "soundex", "initialism"]
})

This creates indexed columns that dramatically speed up blocking queries.

Prefiltering

Use prefilter_sql to reduce the candidate set before blocking:

request = EntityMappingRequest(
    entity_name="John Smith",
    candidate_table_name="customers",
    candidate_field_name="full_name",
    prefilter_sql="SELECT * FROM customers WHERE active = 1"
)

Parallel Execution

The pipeline automatically uses parallel execution for:

  • Multiple blocking strategies
  • Candidate scoring

Development

Setup

# Clone the repository
git clone https://github.com/vaberry/canonmap.git
cd canonmap

# Install in development mode
pip install -e ".[dev]"

# Run tests
pytest

# Format code
black src/
isort src/

# Type checking
mypy src/

Project Structure

  • src/canonmap/: Main package code
  • tests/: Test suite
  • scripts/: Build and deployment scripts

Dependencies

Core Dependencies

  • pydantic>=2.0.0: Data validation and settings
  • pandas>=2.0: Data manipulation
  • mysql-connector-python>=8.0.0: MySQL connectivity
  • metaphone>=0.6: Phonetic matching
  • python-Levenshtein>=0.20.0: String similarity
  • jellyfish>=0.9.0: String matching algorithms
  • rich>=12.0.0: Rich console output
  • SQLAlchemy>=2.0: Database abstraction

Optional Dependencies

  • fastapi>=0.100.0: FastAPI integration
  • uvicorn>=0.20.0: ASGI server
  • cohere>=4.0.0: Semantic reranking (future feature)
  • python-dotenv>=1.0.0: Environment variable management
  • openpyxl>=3.1: Excel file support
  • chardet>=4.0.0: Character encoding detection

Advanced Features

Schema Generation

Generate comprehensive schema metadata for your database:

schema = client.generate_schema(
    table_fields=["customers.full_name", "products.name"],  # Optional: specific fields
    num_examples=10,  # Number of example values per field
    save_location="./schemas",  # Optional: save to disk
    schema_name="my_schema.json",
    if_schema_exists="replace"
)

This generates metadata including:

  • Data types and column definitions
  • Example values for each field
  • Datetime format inference
  • Nullability and default values

Type Inference for Imports

When importing files, CanonMap automatically infers optimal MySQL types with robust error handling and automatic type widening for out-of-range values.

How It Works Internally

Matching Pipeline Flow

  1. Input Normalization: Entity name is normalized (Unicode NFKD normalization, punctuation removal, whitespace collapse, lowercasing)

  2. Blocking Phase (Parallel using ThreadPoolExecutor):

    • Phonetic Blocking: Generates Double Metaphone codes, queries helper column __field_phonetic__ with LIKE pattern
    • Soundex Blocking: Generates Soundex codes, queries helper column __field_soundex__ or falls back to MySQL SOUNDEX() function
    • Initialism Blocking: Extracts first letters of words, queries helper column __field_initialism__ with exact match
    • Exact Blocking: Normalized exact match with LIKE pattern on the source field
  3. Candidate Union: All candidates from different strategies are combined into a single set (duplicates removed)

  4. Scoring Phase (Parallel using ThreadPoolExecutor):

    • Each candidate is scored against the normalized query using weighted combination of:
      • Exact match (binary, weight: 6.0 default)
      • Levenshtein ratio (weight: 1.0 default)
      • Jaro-Winkler similarity (weight: 1.2 default)
      • Token overlap - first/last token matching (weight: 2.0 default)
      • Trigram similarity - 3-character sequences (weight: 1.0 default)
      • Phonetic similarity - Double Metaphone comparison (weight: 1.0 default)
      • Soundex similarity (weight: 1.0 default)
      • Initialism match (weight: 0.5 default)
    • Multi-bonus (weight: 1.0 default) applied when multiple features match
    • Final score = weighted sum of all features
  5. Ranking: Results sorted by score (descending) with deterministic tie-breaking by entity name

  6. Top-N Selection: Returns the top N results based on top_n parameter

Helper Field Generation Process

Helper fields are generated through a robust batch processing system:

  1. Column Detection: Checks for existing helper columns with naming convention __field_name_transform__ or field_name_transform__
  2. Primary Key Detection: Identifies primary key or auto-increment column for stable paging
  3. Temporary Key Creation: If no stable key exists, creates temporary auto-increment column __cm_tmp_pk__
  4. Batch Processing: Processes records in configurable chunks (default chunk size)
  5. Transform Application: Applies transform functions (phonetic via Double Metaphone, soundex, initialism)
  6. Bulk Updates: Uses CASE statements for efficient bulk updates by primary key
  7. Automatic Retry: Implements retry logic for transient database errors
  8. Cleanup: Removes temporary columns after processing

Troubleshooting

Common Issues

"Helper field already exists" error

  • Use if_exists="replace" or if_exists="skip" when creating helper fields
  • Or drop the column manually: ALTER TABLE table_name DROP COLUMN __field_name_phonetic__

Slow matching performance

  • Ensure helper fields are created and indexed
  • Use prefilter_sql to reduce candidate set
  • Adjust max_prefilter to limit candidates per strategy
  • Consider creating indexes on helper columns manually: CREATE INDEX idx_phonetic ON table_name(__field_name_phonetic__)

Connection pool errors

  • Increase MYSQL_POOL_SIZE if you have many concurrent operations
  • Check MySQL server connection limits: SHOW VARIABLES LIKE 'max_connections'
  • Ensure proper connection cleanup (always use context managers)

Import errors with CSV/XLSX

  • Check file encoding (UTF-8 recommended)
  • Verify column names don't conflict with MySQL reserved words
  • For large files, consider chunking or using direct MySQL LOAD DATA
  • Check for out-of-range numeric values (library will attempt automatic type widening)

No matches returned

  • Verify helper fields exist for the blocking strategies you're using
  • Check that prefilter_sql isn't too restrictive
  • Try increasing max_prefilter to allow more candidates
  • Use run_dev() with debug=True to see per-strategy candidate counts

Limitations

  • Currently supports MySQL only (other database connectors planned)
  • Helper fields must be created before optimal performance (first run may be slower)
  • Large candidate sets (>10,000) may require tuning max_prefilter and per_strategy_limits
  • Semantic reranking (semantic_rerank=True) is planned but not yet implemented
  • Helper field generation requires a stable key (primary key or auto-increment column)

Roadmap

  • Additional database connectors (PostgreSQL, SQLite)
  • Semantic reranking using Cohere API
  • Machine learning-based scoring models
  • Real-time streaming matching
  • Distributed matching for very large datasets
  • Additional blocking strategies (fuzzy matching, n-gram indexing)

License

MIT License - see LICENSE file for details.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

Support

For issues, questions, or contributions, please visit the GitHub repository.

Author

Vince Berry - vincent.berry11@gmail.com


CanonMap - Intelligent entity resolution and canonicalization for Python

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages