# Data Quality Analysis with Nepal Entity Service

This notebook demonstrates how to analyze and improve data quality in the Nepal Entity Service (nes2). We'll identify issues, validate data integrity, and generate quality reports.

## Topics Covered

1. Database statistics and overview
2. Identify entities with missing data
3. Check for naming inconsistencies
4. Validate relationship integrity
5. Find orphaned relationships
6. Analyze version history patterns
7. Generate data quality report

## 1. Setup

In [None]:
from pathlib import Path
from collections import defaultdict
from datetime import datetime

from nes2.database.file_database import FileDatabase
from nes2.services.publication import PublicationService
from nes2.services.search import SearchService

In [None]:
# Initialize services
db_path = Path("../nes-db/v2")
db = FileDatabase(base_path=str(db_path))
pub_service = PublicationService(database=db)
search_service = SearchService(database=db)

print("✓ Services initialized")

## 2. Database Statistics

In [None]:
# Collect database statistics
print("Collecting database statistics...\n")

# Count entities by type
entity_types = ["person", "organization", "location"]
type_counts = {}

for entity_type in entity_types:
    entities = await db.list_entities(entity_type=entity_type, limit=1000)
    type_counts[entity_type] = len(entities)

print("Entity Counts by Type:")
print("=" * 50)
for entity_type, count in type_counts.items():
    print(f"  {entity_type.capitalize()}: {count}")

total_entities = sum(type_counts.values())
print(f"\n  Total Entities: {total_entities}")

# Count relationships
all_relationships = await search_service.search_relationships(limit=1000)
print(f"\n  Total Relationships: {len(all_relationships)}")

## 3. Identify Entities with Missing Data

In [None]:
# Find entities with incomplete data
print("Analyzing data completeness...\n")

issues = {"missing_nepali_names": [], "missing_attributes": [], "single_name_only": []}

# Check all entities
for entity_type in entity_types:
    entities = await db.list_entities(entity_type=entity_type, limit=1000)

    for entity in entities:
        # Check for Nepali names
        has_nepali = any(name.ne and name.ne.full for name in entity.names)
        if not has_nepali:
            issues["missing_nepali_names"].append(entity.id)

        # Check for attributes
        if not entity.attributes or len(entity.attributes) == 0:
            issues["missing_attributes"].append(entity.id)

        # Check for multiple names
        if len(entity.names) == 1:
            issues["single_name_only"].append(entity.id)

print("Data Completeness Issues:")
print("=" * 50)
print(f"  Missing Nepali names: {len(issues['missing_nepali_names'])}")
print(f"  Missing attributes: {len(issues['missing_attributes'])}")
print(f"  Single name only: {len(issues['single_name_only'])}")

# Show examples
if issues["missing_nepali_names"]:
    print(f"\n  Examples (missing Nepali names):")
    for entity_id in issues["missing_nepali_names"][:3]:
        entity = await pub_service.get_entity(entity_id)
        if entity:
            print(f"    - {entity.names[0].en.full} ({entity.id})")

## 4. Check Naming Consistency

In [None]:
# Check for naming issues
print("Checking naming consistency...\n")

naming_issues = {"no_primary_name": [], "multiple_primary_names": [], "empty_names": []}

for entity_type in entity_types:
    entities = await db.list_entities(entity_type=entity_type, limit=1000)

    for entity in entities:
        # Count PRIMARY names
        primary_count = sum(1 for name in entity.names if name.kind == "PRIMARY")

        if primary_count == 0:
            naming_issues["no_primary_name"].append(entity.id)
        elif primary_count > 1:
            naming_issues["multiple_primary_names"].append(entity.id)

        # Check for empty names
        for name in entity.names:
            if not name.en or not name.en.full:
                naming_issues["empty_names"].append(entity.id)
                break

print("Naming Consistency Issues:")
print("=" * 50)
print(f"  No PRIMARY name: {len(naming_issues['no_primary_name'])}")
print(f"  Multiple PRIMARY names: {len(naming_issues['multiple_primary_names'])}")
print(f"  Empty names: {len(naming_issues['empty_names'])}")

if naming_issues["multiple_primary_names"]:
    print(f"\n  Examples (multiple PRIMARY names):")
    for entity_id in naming_issues["multiple_primary_names"][:3]:
        entity = await pub_service.get_entity(entity_id)
        if entity:
            primary_names = [n.en.full for n in entity.names if n.kind == "PRIMARY"]
            print(f"    - {entity.id}: {', '.join(primary_names)}")

## 5. Validate Relationship Integrity

In [None]:
# Check relationship integrity
print("Validating relationship integrity...\n")

relationship_issues = {"missing_source": [], "missing_target": [], "invalid_dates": []}

all_relationships = await search_service.search_relationships(limit=1000)

for rel in all_relationships:
    # Check if source entity exists
    source = await pub_service.get_entity(rel.source_entity_id)
    if not source:
        relationship_issues["missing_source"].append(rel.id)

    # Check if target entity exists
    target = await pub_service.get_entity(rel.target_entity_id)
    if not target:
        relationship_issues["missing_target"].append(rel.id)

    # Check date validity
    if rel.start_date and rel.end_date:
        if rel.end_date < rel.start_date:
            relationship_issues["invalid_dates"].append(rel.id)

print("Relationship Integrity Issues:")
print("=" * 50)
print(f"  Missing source entity: {len(relationship_issues['missing_source'])}")
print(f"  Missing target entity: {len(relationship_issues['missing_target'])}")
print(f"  Invalid dates: {len(relationship_issues['invalid_dates'])}")

if relationship_issues["missing_source"]:
    print(f"\n  Orphaned relationships (missing source):")
    for rel_id in relationship_issues["missing_source"][:3]:
        print(f"    - {rel_id}")

## 6. Analyze Version History Patterns

In [None]:
# Analyze version history
print("Analyzing version history patterns...\n")

version_stats = {
    "total_versions": 0,
    "entities_with_multiple_versions": 0,
    "max_versions": 0,
    "authors": defaultdict(int),
}

# Sample entities for version analysis
sample_entities = await db.list_entities(limit=50)

for entity in sample_entities:
    versions = await pub_service.get_entity_versions(entity.id)
    version_count = len(versions)

    version_stats["total_versions"] += version_count

    if version_count > 1:
        version_stats["entities_with_multiple_versions"] += 1

    if version_count > version_stats["max_versions"]:
        version_stats["max_versions"] = version_count

    # Count by author
    for version in versions:
        version_stats["authors"][version.author.slug] += 1

print("Version History Statistics:")
print("=" * 50)
print(f"  Total versions (sample): {version_stats['total_versions']}")
print(
    f"  Entities with multiple versions: {version_stats['entities_with_multiple_versions']}"
)
print(f"  Maximum versions for single entity: {version_stats['max_versions']}")

print(f"\n  Changes by author:")
for author, count in sorted(
    version_stats["authors"].items(), key=lambda x: x[1], reverse=True
):
    print(f"    - {author}: {count} change(s)")

## 7. Generate Data Quality Report

In [None]:
# Generate comprehensive quality report
print("=" * 70)
print("DATA QUALITY REPORT")
print("=" * 70)

print(f"\nGenerated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Database: {db_path}")

print(f"\n1. DATABASE OVERVIEW")
print("-" * 70)
print(f"  Total Entities: {total_entities}")
for entity_type, count in type_counts.items():
    print(f"    - {entity_type.capitalize()}: {count}")
print(f"  Total Relationships: {len(all_relationships)}")

print(f"\n2. DATA COMPLETENESS")
print("-" * 70)
completeness_score = 100
if total_entities > 0:
    missing_nepali_pct = (len(issues["missing_nepali_names"]) / total_entities) * 100
    missing_attrs_pct = (len(issues["missing_attributes"]) / total_entities) * 100
    completeness_score = 100 - (missing_nepali_pct + missing_attrs_pct) / 2

print(f"  Completeness Score: {completeness_score:.1f}%")
print(
    f"  Missing Nepali names: {len(issues['missing_nepali_names'])} ({missing_nepali_pct:.1f}%)"
)
print(
    f"  Missing attributes: {len(issues['missing_attributes'])} ({missing_attrs_pct:.1f}%)"
)

print(f"\n3. NAMING CONSISTENCY")
print("-" * 70)
naming_score = 100
if total_entities > 0:
    naming_issues_count = sum(len(v) for v in naming_issues.values())
    naming_score = max(0, 100 - (naming_issues_count / total_entities) * 100)

print(f"  Naming Score: {naming_score:.1f}%")
print(f"  No PRIMARY name: {len(naming_issues['no_primary_name'])}")
print(f"  Multiple PRIMARY names: {len(naming_issues['multiple_primary_names'])}")
print(f"  Empty names: {len(naming_issues['empty_names'])}")

print(f"\n4. RELATIONSHIP INTEGRITY")
print("-" * 70)
integrity_score = 100
if len(all_relationships) > 0:
    integrity_issues_count = sum(len(v) for v in relationship_issues.values())
    integrity_score = max(
        0, 100 - (integrity_issues_count / len(all_relationships)) * 100
    )

print(f"  Integrity Score: {integrity_score:.1f}%")
print(f"  Missing source: {len(relationship_issues['missing_source'])}")
print(f"  Missing target: {len(relationship_issues['missing_target'])}")
print(f"  Invalid dates: {len(relationship_issues['invalid_dates'])}")

print(f"\n5. OVERALL QUALITY SCORE")
print("-" * 70)
overall_score = (completeness_score + naming_score + integrity_score) / 3
print(f"  Overall Score: {overall_score:.1f}%")

if overall_score >= 90:
    grade = "A (Excellent)"
elif overall_score >= 80:
    grade = "B (Good)"
elif overall_score >= 70:
    grade = "C (Fair)"
elif overall_score >= 60:
    grade = "D (Poor)"
else:
    grade = "F (Critical)"

print(f"  Grade: {grade}")

print(f"\n6. RECOMMENDATIONS")
print("-" * 70)
if len(issues["missing_nepali_names"]) > 0:
    print(f"  • Add Nepali names to {len(issues['missing_nepali_names'])} entities")
if len(issues["missing_attributes"]) > 0:
    print(f"  • Add attributes to {len(issues['missing_attributes'])} entities")
if len(naming_issues["multiple_primary_names"]) > 0:
    print(
        f"  • Fix {len(naming_issues['multiple_primary_names'])} entities with multiple PRIMARY names"
    )
if (
    len(relationship_issues["missing_source"]) > 0
    or len(relationship_issues["missing_target"]) > 0
):
    print(f"  • Clean up orphaned relationships")
if overall_score >= 90:
    print(f"  • Data quality is excellent! Continue maintaining high standards.")

print("\n" + "=" * 70)

## 8. Summary

In this notebook, we've performed comprehensive data quality analysis:

- ✓ Collected database statistics
- ✓ Identified entities with missing data
- ✓ Checked naming consistency
- ✓ Validated relationship integrity
- ✓ Analyzed version history patterns
- ✓ Generated data quality report with scores

### Next Steps

1. Address identified data quality issues
2. Set up regular quality monitoring
3. Create automated quality checks
4. Document data quality standards

### Resources

- Data Maintainer Guide: `docs/data-maintainer-guide.md`
- Example Scripts: `examples/`
- API Documentation: Run the server and visit `/docs`