# 01 â€“ Data Quality Assessment

This notebook analyses the raw credit application dataset for data quality issues across the following dimensions:

- Completeness
- Consistency
- Validity
- Accuracy

All issues are quantified (counts and percentages) and mapped to governance implications.

## 1. Data Loading & Structural Inspection
- Load raw JSON
- Inspect nested structure
- Examine column names and data types

## 2. Completeness Analysis
- Missing values per column
- Incomplete nested objects
- % affected records

## 3. Consistency & Type Validation
- Data type mismatches
- Inconsistent categorical encoding (e.g., gender formats)
- Date format inconsistencies

## 4. Validity Checks
- Impossible values (e.g., negative income, negative credit history)
- Logical inconsistencies (e.g., interest rate assigned when rejected)

## 5. Duplicate Record Detection

## 6. Remediation Strategy
- Cleaning logic
- Standardisation decisions
- Governance implications

---

## Setup & Connection

In [30]:
# Import required libraries
!pip install pymongo 



In [37]:
import json
from pathlib import Path
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError

# Establish connection to the local MongoDB instance
client = MongoClient('mongodb://localhost:27017/')
db = client['novacred']
collection = db['credit_applications']

# Reset the collection to ensure an idempotent and clean baseline for the audit
collection.drop()
print(f"Connected to: {db.name}. Dropped existing collection for fresh ingestion.")

# Determine the canonical data path based on the current working directory
cwd = Path.cwd()
if cwd.name == 'notebooks':
    data_path = cwd.parent / 'data' / 'raw_credit_applications.json'
else:
    data_path = cwd / 'data' / 'raw_credit_applications.json'

# Validate the existence of the source file before proceeding
if not data_path.exists():
    raise FileNotFoundError(f"CRITICAL: Could not locate data file at: {data_path}")

# Load the raw application data from the JSON file
with data_path.open('r') as file:
    raw_data = json.load(file)

# Execute the fault-tolerant ingestion loop
successful_inserts = 0
rescued_duplicates = 0

for doc in raw_data:
    try:
        # Attempt to insert the document into the collection
        collection.insert_one(doc)
        successful_inserts += 1
    except DuplicateKeyError:
        # Append a suffix to the primary key to rescue colliding records for audit purposes
        doc['_id'] = f"{doc['_id']}_duplicate"
        collection.insert_one(doc)
        rescued_duplicates += 1

# Output the final ingestion metrics to verify the audit baseline
print("\n--- Ingestion Audit Summary ---")
print(f"Standard Records Inserted: {successful_inserts}")
print(f"Duplicate IDs Rescued:    {rescued_duplicates}")
print(f"Total Audit Baseline:      {collection.count_documents({})} documents")

Connected to: novacred. Dropped existing collection for fresh ingestion.

--- Ingestion Audit Summary ---
Standard Records Inserted: 500
Duplicate IDs Rescued:    2
Total Audit Baseline:      502 documents


**Ingestion Audit Note:**
 
MongoDB automatically rejected 2 records (app_042, app_001) during import due to E11000 duplicate key errors on the _id field. To prevent data loss and ensure 100% auditability, these records were rescued by appending a _duplicate suffix to their _id. Baseline record count established at 502 documents.

---

## Quick Data Overview

In [96]:
# View a sample document from the collection
sample = collection.find_one()
pprint(sample)

{'_id': 'app_200',
 'applicant_info': {'date_of_birth': '2001-03-09',
                    'email': 'jerry.smith17@hotmail.com',
                    'full_name': 'Jerry Smith',
                    'gender': 'Male',
                    'ip_address': '192.168.48.155',
                    'ssn': '596-64-4340',
                    'zip_code': '10036'},
 'decision': {'loan_approved': False,
              'rejection_reason': 'algorithm_risk_score'},
 'financials': {'annual_income': 73000,
                'credit_history_months': 23,
                'debt_to_income': 0.2,
                'savings_balance': 31212},
 'processing_timestamp': '2024-01-15T00:00:00Z',
 'spending_behavior': [{'amount': 480, 'category': 'Shopping'},
                       {'amount': 790, 'category': 'Rent'},
                       {'amount': 247, 'category': 'Alcohol'}]}


---

## Data Quality Dimension 1: Uniqueness

The **Uniqueness Audit** identified **11 violations** across the 502-record dataset, establishing an initial health score of **97.81%** and a clean baseline of **491 records**. Findings included **two technical duplicates** for **Joseph Lopez** and **Stephanie Nguyen** flagged as "**RESUBMISSION**" and "**DUPLICATE_ENTRY_ERROR**", **four identity collisions** involving shared SSNs (e.g., Martinez/Wilson), and **five completeness failures** where SSNs were missing. All **11 records** were quarantined to satisfy **AI Act Art. 10** requirements for data accuracy and uniqueness, ensuring no redundant or ambiguous entries compromise the subsequent fairness audit.

In [53]:
# Define the aggregation pipeline to find duplicate SSNs - each person should appear only once!
# This query specifically addresses the Uniqueness and Completeness dimensions for the 2026 Audit.
pipeline_duplicates = [
    {
        # Group by the SSN identifier to detect identity collisions and missing values
        "$group": {
            "_id": "$applicant_info.ssn",
            "count": {"$sum": 1},
            # Map names and IDs into a records array to maintain a granular audit trail
            "records": {
                "$push": {
                    "name": "$applicant_info.full_name",
                    "id": "$_id"
                }
            }
        }
    },
    {
        # Filter for groups that appear more than once to isolate potential data quality violations
        "$match": {
            "count": {"$gt": 1}
        }
    },
    {
        # Sort by frequency to prioritize the investigation of high-risk identifier collisions
        "$sort": {"count": -1}
    }
]

# Execute the aggregation pipeline against the credit_applications collection
duplicates = list(collection.aggregate(pipeline_duplicates))

# Segregate results into Missing Identifiers and Duplicate Identifiers for focused governance reporting
missing_ssn = [dup for dup in duplicates if dup['_id'] is None]
actual_duplicates = [dup for dup in duplicates if dup['_id'] is not None]

# Output findings for missing identifiers to address the Completeness requirement of the AI Act
if missing_ssn:
    print("--- Missing Identifiers (Completeness Issues) ---")
    for item in missing_ssn:
        print(f" Found {item['count']} records without an SSN:")
        for record in item['records']:
            # List each specific record with its ID for technical validation
            print(f"  - {record['name']} (ID: {record['id']})")
        print()

# Output findings for duplicate identifiers to address the Uniqueness requirement for high-risk AI data
if actual_duplicates:
    print("--- Duplicate Identifiers (Uniqueness Issues) ---")
    print(f"Found {len(actual_duplicates)} duplicate SSNs (Identity Collisions):")
    for dup in actual_duplicates:
        print(f" SSN: {dup['_id']} - Count: {dup['count']}")
        for record in dup['records']:
            # Document individual applicants sharing a single unique identifier
            print(f"  - {record['name']} (ID: {record['id']})")
        print()

--- Missing Identifiers (Completeness Issues) ---
 Found 5 records without an SSN:
  - Margaret Williams (ID: app_075)
  - Carolyn Martin (ID: app_120)
  - Larry Williams (ID: app_268)
  - Stephanie Nguyen (ID: app_001_duplicate)
  - Brandon Moore (ID: app_165)

--- Duplicate Identifiers (Uniqueness Issues) ---
Found 3 duplicate SSNs (Identity Collisions):
 SSN: 780-24-9300 - Count: 2
  - Susan Martinez (ID: app_088)
  - Gary Wilson (ID: app_016)

 SSN: 652-70-5530 - Count: 2
  - Joseph Lopez (ID: app_042)
  - Joseph Lopez (ID: app_042_duplicate)

 SSN: 937-72-8731 - Count: 2
  - Sandra Smith (ID: app_101)
  - Samuel Hill (ID: app_234)



The uniqueness audit of the **502-record baseline** identifies **11 violating documents** (2.19% of the dataset) across **four distinct failure groups**. These findings include **five applicants with missing SSN identifiers**, representing a critical **completeness failure**, and **six records** involved in **identity collisions** where **three SSNs** are shared by multiple entries. These collisions range from **exact system duplicates** (Joseph Lopez) to **conflicting identities** (Susan Martinez and Gary Wilson), suggesting both **system integration errors** and **potential fraud attempts** that violate **AI Act Art. 10** high-quality data standards.

In [54]:
# Pipeline to find name collisions and their associated SSNs for secondary discovery
pipeline_name_discovery = [
    {
        # Group by full name to identify homonym groups or missing-link duplicates
        "$group": {
            "_id": "$applicant_info.full_name",
            "count": {"$sum": 1},
            "record_details": {
                "$push": {
                    "id": "$_id",
                    "ssn": "$applicant_info.ssn"
                }
            }
        }
    },
    {
        # Filter for names appearing more than once to detect potential duplicates
        "$match": {"count": {"$gt": 1}}
    },
    {
        # Sort by frequency to prioritize the investigation of common name groups
        "$sort": {"count": -1}
    }
]

# Execute discovery against the current audit collection
name_collisions = list(collection.aggregate(pipeline_name_discovery))

print(f"--- Name-Based Collision Discovery ({len(name_collisions)} groups identified) ---\n")
for group in name_collisions:
    print(f"Name: {group['_id']} | Total Records: {group['count']}")
    for record in group['record_details']:
        ssn_value = record.get('ssn', 'MISSING')
        print(f"  - Record ID: {record['id']} | SSN: {ssn_value}")
    print("-" * 50) # Visual separator for cleaner reporting

--- Name-Based Collision Discovery (26 groups identified) ---

Name: Susan Flores | Total Records: 3
  - Record ID: app_448 | SSN: 383-48-9078
  - Record ID: app_073 | SSN: 470-84-5617
  - Record ID: app_226 | SSN: 817-96-6416
--------------------------------------------------
Name: Amy Flores | Total Records: 2
  - Record ID: app_212 | SSN: 654-72-8456
  - Record ID: app_146 | SSN: 577-59-1479
--------------------------------------------------
Name: Rachel King | Total Records: 2
  - Record ID: app_193 | SSN: 852-24-1787
  - Record ID: app_418 | SSN: 107-92-5280
--------------------------------------------------
Name: Shirley Davis | Total Records: 2
  - Record ID: app_148 | SSN: 384-17-7019
  - Record ID: app_219 | SSN: 994-53-6088
--------------------------------------------------
Name: James Rivera | Total Records: 2
  - Record ID: app_465 | SSN: 853-96-1952
  - Record ID: app_498 | SSN: 942-34-6834
--------------------------------------------------
Name: Jerry Nguyen | Total Recor

The secondary discovery phase identified **26 groups** where names appear multiple times, totaling 53 records. Cross-referencing these with unique identifiers reveals that **23 groups** are **unique individuals** sharing common names (**homonyms**), such as Susan Flores, who must remain in the dataset. Only one group is a **confirmed system duplicate** (Joseph Lopez), while two others (Stephanie Nguyen, Brandon Moore) involve records with **missing identifiers**. These results confirm that **SSN-based auditing** is the only reliable deduplication method to avoid "**Homonym Bias**" and comply with **AI Act Art. 10** standards.

In [57]:
# Comprehensive list of all 11 records identified in the Uniqueness/Completeness audit
all_violating_ids = [
    "app_042", "app_042_duplicate", # Joseph Lopez (Confirmed Duplicate)
    "app_001", "app_001_duplicate", # Stephanie Nguyen (Incomplete Link)
    "app_088", "app_016",           # Martinez/Wilson (SSN Collision)
    "app_101", "app_234",           # Smith/Hill (SSN Collision)
    "app_075", "app_120", "app_268",# Williams/Martin/Williams (Missing IDs)
    "app_134", "app_165"            # Brandon Moore (Incomplete Link)
]

# Retrieve the full documents for all flagged identifiers
audit_details = list(collection.find({"_id": {"$in": all_violating_ids}}))

print(f"--- Global Metadata Inspection (11 Records) ---\n")
for doc in audit_details:
    name = doc.get('applicant_info', {}).get('full_name', 'Unknown')
    ssn = doc.get('applicant_info', {}).get('ssn', 'MISSING')
    
    print(f"ID: {doc['_id']} | Name: {name} | SSN: {ssn}")
    
    # Dynamically scan for any field that might contain 'resubmission' or 'note'
    found_metadata = False
    for key, value in doc.items():
        # Check top-level strings and nested dictionaries for 'note' or 'resubmit'
        if any(term in str(key).lower() or term in str(value).lower() 
               for term in ['note', 'resubmit', 'audit', 'comment', 'flag']):
            print(f"  -> [METADATA FOUND in '{key}']: {value}")
            found_metadata = True
            
    if not found_metadata:
        print("  -> No audit notes or system flags detected.")
    print("-" * 50)

--- Global Metadata Inspection (11 Records) ---

ID: app_001 | Name: Stephanie Nguyen | SSN: 427-90-1892
  -> No audit notes or system flags detected.
--------------------------------------------------
ID: app_001_duplicate | Name: Stephanie Nguyen | SSN: MISSING
  -> [METADATA FOUND in 'notes']: DUPLICATE_ENTRY_ERROR
--------------------------------------------------
ID: app_016 | Name: Gary Wilson | SSN: 780-24-9300
  -> No audit notes or system flags detected.
--------------------------------------------------
ID: app_042 | Name: Joseph Lopez | SSN: 652-70-5530
  -> No audit notes or system flags detected.
--------------------------------------------------
ID: app_042_duplicate | Name: Joseph Lopez | SSN: 652-70-5530
  -> [METADATA FOUND in 'notes']: RESUBMISSION
--------------------------------------------------
ID: app_075 | Name: Margaret Williams | SSN: MISSING
  -> No audit notes or system flags detected.
--------------------------------------------------
ID: app_088 | Name: Su

The global metadata inspection confirms that two records contain explicit audit flags: **app_042_duplicate** (Joseph Lopez) is marked as a "**RESUBMISSION**" and **app_001_duplicate** (Stephanie Nguyen) as a "**DUPLICATE_ENTRY_ERROR**". These notes provide evidence of **system integration failures** for these specific cases. However, the remaining **nine records**, including the **identity collisions** (Martinez/Wilson and Smith/Hill) and **missing SSNs** (Williams, Martin, Moore), lack any explanatory metadata.

In [None]:
# Definitive list of the 11 violating records identified for the 2026 Audit
target_quarantine_ids = [
    "app_042", "app_001_duplicate", "app_016", "app_088", 
    "app_101", "app_234", "app_075", "app_120", 
    "app_268", "app_134", "app_165"
]

# Reconstruct the 502-record baseline count by checking both collections
# This ensures the KPI remains accurate even after remediation
active_count = collection.count_documents({})
quarantined_count = db.quarantine_uniqueness.count_documents({"_id": {"$in": target_quarantine_ids}})
baseline_total = active_count + quarantined_count

# Calculate metrics based on the historical baseline to quantify data health
violation_count = len(target_quarantine_ids)
unique_score = ((baseline_total - violation_count) / baseline_total) * 100

# Output the Uniqueness Health Metric for the audit report
print(f"--- Uniqueness Health Metric (Historical Baseline) ---")
print(f"Original Baseline Total: {baseline_total}")
print(f"Total Records with Violations: {violation_count}")
print(f"Uniqueness Score: {unique_score:.2f}%\n")

--- Uniqueness Health Metric (Historical Baseline) ---
Original Baseline Total: 502
Total Records with Violations: 11
Uniqueness Score: 97.81%


In [None]:
# Retrieve all documents held in the quarantine collection for verification
# This ensures the 11 identified violations are securely segregated from the active baseline
quarantined_audit_trail = list(db.quarantine_uniqueness.find({}))

# Output the Verification Table for the Audit Report to document the exclusion rationale
print(f"--- Uniqueness Quarantine Verification ({len(quarantined_audit_trail)} Records) ---")
print(f"{'ID':<20} | {'Name':<20} | {'Status/Note'}")
print("-" * 60)

for record in quarantined_audit_trail:
    # Access the applicant name and relevant audit flags for reporting transparency
    name = record.get('applicant_info', {}).get('full_name', 'Unknown')
    # Capture the specific 'notes' field to prove the technical reason for quarantine
    note = record.get('notes', 'No specific flag (SSN Collision/Missing)')
    
    print(f"{record['_id']:<20} | {name:<20} | {note}")

# Final validation of the remaining active records to confirm readiness for the Consistency phase
# The goal is to verify the transition from a 502-record set to a 491-record clean baseline
print(f"\nAudit Status: Phase 1 (Uniqueness) Verified.")
print(f"Clean Baseline: {collection.count_documents({})} records remain in the audit pool.")

--- Uniqueness Quarantine Verification (11 Records) ---
ID                   | Name                 | Status/Note
------------------------------------------------------------
app_001_duplicate    | Stephanie Nguyen     | DUPLICATE_ENTRY_ERROR
app_016              | Gary Wilson          | No specific flag (SSN Collision/Missing)
app_042              | Joseph Lopez         | No specific flag (SSN Collision/Missing)
app_075              | Margaret Williams    | No specific flag (SSN Collision/Missing)
app_088              | Susan Martinez       | No specific flag (SSN Collision/Missing)
app_101              | Sandra Smith         | No specific flag (SSN Collision/Missing)
app_120              | Carolyn Martin       | No specific flag (SSN Collision/Missing)
app_134              | Brandon Moore        | No specific flag (SSN Collision/Missing)
app_165              | Brandon Moore        | No specific flag (SSN Collision/Missing)
app_234              | Samuel Hill          | No specific fla

---

## Data Quality Dimension 2: Consistency

The **Categorical and Temporal Consistency Remediation** successfully resolved both encoding fragmentation and chronological anomalies within the 491-record baseline. The audit identified **109 records** using **non-standard abbreviations** (57 "F", 52 "M") and mapped them to the unified "Female" (248) and "Male" (243) taxonomy. 

To satisfy **AI Act Art. 10** and **12**, the cleaning pipeline quarantined **2 future-dated records** to preserve log traceability and **reconstructed the missing age feature** for the remaining 489 applicants. For applicant **app_350**, a **median age** was imputed to preserve its high-density financial data, resulting in a statistically robust 489-record baseline ready for subsequent risk modeling and fairness testing.

In [69]:
# How many different gender values exist?
pipeline_gender_consistency = [
    {
        "$group": {
            "_id": "$applicant_info.gender",
            "count": {"$sum": 1}
        }
    },
    {
        "$sort": {"count": -1}
    }
]

gender_values = list(collection.aggregate(pipeline_gender_consistency))

print("Gender value distribution:")
print("Expected: 2 distinct values (Male, Female)")
print(f"Actual: {len(gender_values)} distinct values")
print()
for gv in gender_values:
    print(f"  '{gv['_id']}': {gv['count']} records")

Gender value distribution:
Expected: 2 distinct values (Male, Female)
Actual: 4 distinct values

  'Female': 191 records
  'Male': 191 records
  'F': 57 records
  'M': 52 records


In [None]:
# Execute in-place database updates to resolve 109 categorical encoding fragmentation errors
# Standardize gender values to align with the required taxonomy for AI Act Art. 10 fairness testing

# Map 52 abbreviated 'M' encodings to the standard 'Male' category
result_m = collection.update_many(
    {"applicant_info.gender": "M"},
    {"$set": {"applicant_info.gender": "Male"}}
)

# Map 57 abbreviated 'F' encodings to the standard 'Female' category
result_f = collection.update_many(
    {"applicant_info.gender": "F"},
    {"$set": {"applicant_info.gender": "Female"}}
)

print(f"--- Categorical Remediation Executed ---")
print(f"Standardized 'M' -> 'Male': {result_m.modified_count} records updated.")
print(f"Standardized 'F' -> 'Female': {result_f.modified_count} records updated.\n")

--- Categorical Remediation Executed ---
Standardized 'M' -> 'Male': 0 records updated.
Standardized 'F' -> 'Female': 0 records updated.



In [72]:
# Execute verification aggregation to confirm the remediation of the gender field for the formal audit trail
# This ensures the 491-record baseline contains exactly two distinct values for gender
gender_values_clean = list(collection.aggregate(pipeline_gender_consistency))

print("--- Consistency Verification: applicant_info.gender ---")
print("Expected standard values: ['Male', 'Female']")
print(f"Distinct values detected: {len(gender_values_clean)}\n")

for gv in gender_values_clean:
    # Handle potential null or missing values to prevent execution errors
    val = gv['_id'] if gv['_id'] is not None else "MISSING_OR_NULL"
    print(f"  [{val}]: {gv['count']} records")
print("-" * 50)

--- Consistency Verification: applicant_info.gender ---
Expected standard values: ['Male', 'Female']
Distinct values detected: 2

  [Female]: 248 records
  [Male]: 243 records
--------------------------------------------------


In [152]:
# Expanded Consistency Audit for all available fields
def check_field_consistency(field_path, expected_values=None, unwind_array_path=None):
    """Check how many distinct values exist for a field and audit categorical density."""
    
    # Fast schema guard: avoid misleading outputs when a field does not exist
    existing_count = collection.count_documents({field_path: {"$exists": True, "$ne": None}})
    print(f"\nField Path: {field_path}")
    if existing_count == 0:
        print("Status: Field not found (or always null) in current dataset schema.")
        return
    
    pipeline = []
    if unwind_array_path:
        pipeline.append({"$unwind": f"${unwind_array_path}"})
    
    pipeline.extend([
        {"$group": {"_id": f"${field_path}", "count": {"$sum": 1}}},
        {"$sort": {"count": -1}}
    ])
    results = list(collection.aggregate(pipeline))
    
    missing_count = sum(r['count'] for r in results if r['_id'] is None)
    non_missing_results = [r for r in results if r['_id'] is not None]
    
    print(f"Distinct Values: {len(non_missing_results)}")
    if missing_count:
        print(f"Missing/Null: {missing_count} records")
    if expected_values:
        print(f"Expected: {expected_values}")
    
    # Print only the top 10 values for high-variance fields to keep the audit scannable
    for r in non_missing_results[:10]:
        print(f"  '{r['_id']}': {r['count']}")
    if len(non_missing_results) > 10:
        print(f"  ... (+ {len(non_missing_results) - 10} more distinct values)")

print("--- Phase 3: Global Consistency & Categorical Audit ---")

# 1. CORE IDENTITY & DEMOGRAPHICS
check_field_consistency("applicant_info.gender", ["Male", "Female"])
check_field_consistency("applicant_info.age")
check_field_consistency("applicant_info.date_of_birth") # Will show format inconsistency

# 2. FINANCIAL & EMPLOYMENT ATTRIBUTES
check_field_consistency("financials.annual_income")
check_field_consistency("financials.credit_history", ["Good", "Fair", "Poor"])
check_field_consistency("loan_purpose") # Audit the 90% null rate context

# 3. DECISION & GOVERNANCE METADATA
check_field_consistency("decision.loan_approved", [True, False])
check_field_consistency("decision.rejection_reason")
check_field_consistency("processing_timestamp") # Audit the 87.6% missing rate

# 4. NESTED BEHAVIORAL ATTRIBUTES
check_field_consistency("spending_behavior.category", unwind_array_path="spending_behavior")
check_field_consistency("spending_behavior.amount", unwind_array_path="spending_behavior")

print("\n" + "-" * 60)

--- Phase 3: Global Consistency & Categorical Audit ---

Field Path: applicant_info.gender
Distinct Values: 2
Expected: ['Male', 'Female']
  'Female': 248
  'Male': 243

Field Path: applicant_info.age
Status: Field not found (or always null) in current dataset schema.

Field Path: applicant_info.date_of_birth
Distinct Values: 488
  '1980-09-19': 2
  '1965-11-07': 2
  '1997-09-29': 2
  '1983-05-10': 1
  '1992-09-01': 1
  '1999-08-26': 1
  '1986-09-08': 1
  '29/03/1963': 1
  '25/01/1990': 1
  '07/15/1999': 1
  ... (+ 478 more distinct values)

Field Path: financials.annual_income
Distinct Values: 124
  '79000': 11
  '75000': 11
  '100000': 10
  '46000': 10
  '80000': 9
  '81000': 9
  '98000': 9
  '86000': 9
  '74000': 8
  '82000': 8
  ... (+ 114 more distinct values)

Field Path: financials.credit_history
Status: Field not found (or always null) in current dataset schema.

Field Path: loan_purpose
Distinct Values: 10
Missing/Null: 442 records
  'medical': 8
  'wedding': 6
  'debt_consoli

The **Consistency Audit** confirms that demographic standardization is complete, with the gender field now partitioned into two balanced cohorts of **248 Female** and **243 Male** records. While the **loan_approved** field is **100% consistent**, the analysis exposed a critical **90.02% completeness failure** in **loan_purpose,** where 442 records are missing or null. Furthermore, **date_of_birth** exhibits severe **formatting fragmentation** across ISO, EU, and US standards, and the **processing_timestamp** contains a future-dated entry (2027), indicating **temporal data corruption**. Additionally, the audit successfully flagged high-risk behavioral data, including **Gambling** and **Adult Entertainment** within the spending arrays, though the systemic absence of **employment_status** indicates a remaining schema mapping error that prevents a full compliance review under **AI Act Art. 10**.

In [154]:
import pandas as pd

# Extract logs and cast to UTC datetime for standardization
cursor = list(collection.find({"processing_timestamp": {"$exists": True, "$ne": None}}))
df_temp = pd.DataFrame(cursor)

# Coerce errors to isolate corrupted strings instantly
df_temp['parsed_time'] = pd.to_datetime(
    df_temp['processing_timestamp'], 
    errors='coerce', 
    utc=True
)

# Define system present time to detect future data leakage
audit_today = pd.Timestamp('2026-03-01', tz='UTC')

# Filter for Art. 12 chronological violations
future_records = df_temp[df_temp['parsed_time'] > audit_today]
corrupted_records = df_temp[df_temp['parsed_time'].isna()]

print(f"Total Timestamps Audited: {len(df_temp)} / 491")
print(f"Future-Dated Records: {len(future_records)}")
print(f"Corrupted Formats: {len(corrupted_records)}")

Total Timestamps Audited: 61 / 491
Future-Dated Records: 2
Corrupted Formats: 0


The audit detected two transaction logs dated after March 1, 2026. This forward-looking data leakage is a critical breach of AI Act Art. 12, which requires accurate operational traceability.

In [None]:
import pandas as pd

# Define the audit boundary (current system date)
audit_today = pd.Timestamp('2026-03-01', tz='UTC')

# Convert processing_timestamp to datetime for logical comparison
# errors='coerce' ensures we don't crash on nulls
df_main['parsed_time'] = pd.to_datetime(df_main['processing_timestamp'], errors='coerce', utc=True)

# Identify the boolean mask for Art. 12 violations (future dates)
future_mask = df_main['parsed_time'] > audit_today

# Split the dataset: Quarantine invalid records, retain valid/null records
df_quarantine_temporal = df_main[future_mask].copy()
df_main = df_main[~future_mask].copy()

# Drop the temporary parsing column to keep the active schema clean
df_main = df_main.drop(columns=['parsed_time'])

print("--- Data Engineering: Temporal Quarantine ---")
print(f"Records Quarantined (Future Dated): {len(df_quarantine_temporal)}")
print(f"Remaining Valid Baseline Records:   {len(df_main)}")

--- Data Engineering: Temporal Quarantine ---
Records Quarantined (Future Dated): 2
Remaining Valid Baseline Records:   489
-----------------------------------------------


To remediate this without destroying evidence, both corrupted records were programmatically quarantined. This non-destructive exclusion successfully isolates the anomalies from the 59 valid baseline logs while preserving the raw data for regulatory review.

In [157]:
import pandas as pd
import numpy as np

# Extract the baseline and flatten the schema to instantiate df_main
cursor = list(collection.find({}))
df_main = pd.json_normalize(cursor)

# Standardizes mixed US/EU/ISO date strings into unified datetime objects.
# Corrupted or unparseable strings are safely converted to NaT via errors='coerce'.
df_main['normalized_dob'] = pd.to_datetime(
    df_main['applicant_info.date_of_birth'], 
    format='mixed', 
    errors='coerce'
)

# Establishes the 'Audit Present' to ensure calculations remain historically locked.
audit_baseline = pd.Timestamp('2026-03-01')

# Calculates exact timedelta, accounts for leap years, and applies floor function.
df_main['engineered_age'] = np.floor(
    (audit_baseline - df_main['normalized_dob']).dt.days / 365.25
)

print("--- Data Engineering: Age Reconstruction Audit ---")
print(f"Valid DOB Formats Recovered: {df_main['normalized_dob'].notna().sum()} / 491")
print(f"Age Features Engineered:     {df_main['engineered_age'].notna().sum()} / 491")

# Validating mathematical output to detect synthetic data anomalies.
min_age = df_main['engineered_age'].min()
max_age = df_main['engineered_age'].max()
print(f"Reconstructed Age Bounds:    {min_age} to {max_age} years")

--- Data Engineering: Age Reconstruction Audit ---
Valid DOB Formats Recovered: 490 / 491
Age Features Engineered:     490 / 491
Reconstructed Age Bounds:    23.0 to 67.0 years


In [160]:
import pandas as pd

# Filter the DataFrame for the specific row where the datetime conversion yielded NaT
failed_record = df_main[df_main['normalized_dob'].isna()]

print("--- Diagnostic Query: Comprehensive Metadata Extraction ---\n")

if not failed_record.empty:
    # Extract the single series representing the failed row without dropping NaN columns
    record = failed_record.iloc[0]
    
    for col, val in record.items():
        # Exclude the heavy nested array but intentionally keep the empty date_of_birth
        if col != 'spending_behavior':
            # Force Pandas NaT/NaN values to render as an empty string for clean output
            display_val = "" if pd.isna(val) else val
            print(f"{col:.<35} {display_val}")
else:
    print("No failed records found.")

--- Diagnostic Query: Comprehensive Metadata Extraction ---

_id................................ app_350
processing_timestamp............... 
applicant_info.full_name........... Linda Adams
applicant_info.email............... 
applicant_info.ssn................. 356-98-8263
applicant_info.ip_address.......... 10.207.183.196
applicant_info.gender.............. Female
applicant_info.date_of_birth....... 
applicant_info.zip_code............ 90291
financials.annual_income........... 89000.0
financials.credit_history_months... 52
financials.debt_to_income.......... 0.2
financials.savings_balance......... 14377
decision.loan_approved............. True
decision.rejection_reason.......... 
loan_purpose....................... 
decision.interest_rate............. 4.1
decision.approved_amount........... 67000.0
notes.............................. 
normalized_dob..................... 
engineered_age..................... 


In [None]:
import pandas as pd

# Pandas .median() automatically excludes NaT values during calculation
median_age = df_main['engineered_age'].median()

# Target the engineered_age column and replace the NaT value for app_350
df_main['engineered_age'] = df_main['engineered_age'].fillna(median_age)

missing_ages_after = df_main['engineered_age'].isna().sum()

print("--- Data Engineering: Median Age Imputation ---")
print(f"Target Record:                app_350")
print(f"Calculated Median Age:        {median_age} years")
print(f"Missing Ages Post-Imputation: {missing_ages_after}")
print(f"Total Baseline Records:       {len(df_main)}")

--- Data Engineering: Median Age Imputation ---
Target Record:                app_350
Calculated Median Age:        39.0 years
Missing Ages Post-Imputation: 0
Total Baseline Records:       489
-----------------------------------------------


To comply with AI Act Art. 10, median age imputation was performed on applicant app_350 to preserve a high-density financial profile ($89k income, 0.2 DTI) that would otherwise be lost to a structural schema void. The median was selected over the mean for its inherent resistance to outliers, ensuring the imputed value remains representative of the core demographic without warping the overall distribution. This engineering trade-off maintains the dataset's statistical integrity and maximizes the available training signals for subsequent risk modeling and fairness testing.

---

## Data Quality Dimension 3: Completeness

The Completeness Audit identifies a dataset with high integrity in core identity fields but significant operational voids that **jeopardize AI Act Art. 12 compliance**. While **SSN**, **Gender**, and **Credit History** maintain a **100% population rate**, an **87.6% missing rate** for **processing_timestamp** breaks the operational audit trail for **430 records**. 

Furthermore, structural fragmentation between **annual_income** and **annual_salary** indicates a "**dirty schema**" that prevents unified financial analysis. These gaps are compounded by a **100% absence of mandatory governance metadata** such as **data_source** and **retention_until** and a **90% null rate** in **loan_purpose**, leaving the baseline insufficient for the rigorous fairness and bias testing required for high-risk AI systems.

In [142]:
import pandas as pd

# Flatten the baseline and extract unique types per column
df_flat = pd.json_normalize(list(collection.find({})))

print(f"--- Global Schema Discovery & Type Audit ---\n")
print(f"Total Columns Detected: {len(df_flat.columns)}\n")

for col in sorted(df_flat.columns):
    # Extract non-null data and identify unique Python types
    valid_data = df_flat[col].dropna()
    unique_types = sorted(valid_data.map(lambda x: type(x).__name__).unique()) if not valid_data.empty else ["None"]
    
    # Standardized output for Completeness (Counts) and Accuracy (Types)
    print(f"{col:.<45} Records: {len(valid_data):>3}/491 | Types: {unique_types}")

--- Global Schema Discovery & Type Audit ---

Total Columns Detected: 20

_id.......................................... Records: 491/491 | Types: ['str']
applicant_info.date_of_birth................. Records: 491/491 | Types: ['str']
applicant_info.email......................... Records: 491/491 | Types: ['str']
applicant_info.full_name..................... Records: 491/491 | Types: ['str']
applicant_info.gender........................ Records: 491/491 | Types: ['str']
applicant_info.ip_address.................... Records: 491/491 | Types: ['str']
applicant_info.ssn........................... Records: 491/491 | Types: ['str']
applicant_info.zip_code...................... Records: 491/491 | Types: ['str']
decision.approved_amount..................... Records: 288/491 | Types: ['float']
decision.interest_rate....................... Records: 288/491 | Types: ['float']
decision.loan_approved....................... Records: 491/491 | Types: ['bool']
decision.rejection_reason................

The financial core is compromised by a "**dirty schema**" where income data is fragmented between **annual_income (486 records)** and **annual_salary (5 records)**, preventing unified feature analysis. Critically, the Accuracy Audit reveals a fatal **Type Inconsistency** within **annual_income**, which contains a contaminated **mix of ['float', 'int', 'str']**. These embedded string values represent a "silent killer" for model stability, as non-numeric inputs will crash mathematical pipelines. 

Furthermore, an **87.6% missing rate** for the **processing_timestamp** and a **100% absence of mandatory governance fields** like **data_source** break the operational audit trail for **430 records**, constituting a direct regulatory breach. Finally, the **90% null rate** in **loan_purpose** and the **total absence of employment_status** create significant data voids, leaving the dataset blind to variables required for meaningful fairness and bias testing.

In [139]:
import pandas as pd

# Retrieve the baseline and specifically flatten the 'spending_behavior' list
cursor = list(collection.find({}))
df_spending = pd.json_normalize(
    cursor, 
    record_path=['spending_behavior'], 
    meta=['_id', ['applicant_info', 'full_name']],
    errors='ignore'
)

print(f"--- Nested Feature Discovery: spending_behavior ---\n")

# Iterate through the flattened sub-schema to audit data integrity at the transaction level
for col in sorted(df_spending.columns):
    # Filter non-null entries to identify the active data types and population counts
    valid_data = df_spending[col].dropna()
    non_null_count = len(valid_data)
    total_entries = len(df_spending)
    
    # Identify unique native Python data types (e.g., str, int)
    if not valid_data.empty:
        unique_types = sorted(list(valid_data.map(lambda x: type(x).__name__).unique()))
        type_str = f"Types: {unique_types}"
    else:
        type_str = "Types: [None]"
    
    # Exclude metadata columns to focus exclusively on the spending behavior attributes
    if col not in ['_id', 'applicant_info.full_name']:
        # Updated formatting to include the 'Records:' prefix within the alignment
        print(f"{col:.<45} Records: {len(valid_data)}/{total_entries} | {type_str}")

--- Nested Feature Discovery: spending_behavior ---

amount....................................... Records: 809/809 | Types: ['int']
category..................................... Records: 809/809 | Types: ['str']


In [144]:
# Move data from 'annual_salary' to 'annual_income' for the 5 fragmented records
salary_fragment_update = collection.update_many(
    {"financials.annual_salary": {"$exists": True}},
    [
        {"$set": {"financials.annual_income": "$financials.annual_salary"}},
        {"$unset": "financials.annual_salary"}
    ]
)

# Identify and convert all 'annual_income' string values to numeric (double/float)
type_casting_update = collection.update_many(
    {"financials.annual_income": {"$type": "string"}},
    [
        {
            "$set": {
                "financials.annual_income": {
                    "$convert": {
                        "input": "$financials.annual_income",
                        "to": "double",
                        "onError": None # Sets to null if conversion fails for audit review
                    }
                }
            }
        }
    ]
)

print(f"--- Remediation Summary ---\n")
print(f"Records Merged (Salary -> Income): {salary_fragment_update.modified_count}")
print(f"Records Cast (String -> Numeric): {type_casting_update.modified_count}\n")

# Re-extract the baseline to verify final population and structural integrity
df_verify = pd.json_normalize(list(collection.find({})))

# Verify final population count for 'annual_income'
income_count = df_verify["financials.annual_income"].notna().sum()
print(f"financials.annual_income Population: {income_count}/491")

--- Remediation Summary ---

Records Merged (Salary -> Income): 0
Records Cast (String -> Numeric): 0

financials.annual_income Population: 491/491


In [146]:
from bson import ObjectId
import pandas as pd

cursor = list(collection.find({"processing_timestamp": {"$exists": False}}))
df_missing = pd.DataFrame(cursor)

print(f"--- Traceability Recovery Audit ---\n")

# Extract the 'hidden' creation time from the _id metadata
def recover_timestamp(doc_id):
    try:
        # Cast string to BSON ObjectId to access generation_time
        return ObjectId(doc_id).generation_time
    except:
        return None

# Apply the recovery to create a proxy audit trail
df_missing['proxy_timestamp'] = df_missing['_id'].apply(recover_timestamp)

print(f"Successfully recovered {df_missing['proxy_timestamp'].notna().sum()} proxy timestamps.\n")
print(f"Sample Recovery Trail (First 5 Records):")

# Standardized alignment for professional audit reporting
for _, row in df_missing.head(5).iterrows():
    print(f"- ID: {str(row['_id']):<25} | Proxy Time: {row['proxy_timestamp']}")


--- Traceability Recovery Audit ---

Successfully recovered 0 proxy timestamps.

Sample Recovery Trail (First 5 Records):
- ID: app_037                   | Proxy Time: None
- ID: app_215                   | Proxy Time: None
- ID: app_024                   | Proxy Time: None
- ID: app_275                   | Proxy Time: None
- ID: app_099                   | Proxy Time: None


A Metadata Recovery Audit was performed to mitigate the **87.6% missing rate** in **processing_timestamp**. Attempts to extract creation metadata from the **_id** field confirmed that the dataset utilizes **Custom Application Strings** (e.g., app_037) rather than native BSON ObjectIds. This architectural choice renders the operational audit trail **permanently unrecoverable**, constituting a definitive breach of **AI Act Art. 12** logging requirements for high-risk AI systems.

In [90]:
# Check other governance-critical fields
governance_fields = [
    ("processing_timestamp", "Operational traceability timestamp"),
    ("retention_until", "GDPR Art. 5 - Storage Limitation"),
    ("data_source", "GDPR Art. 14 - Transparency"),
    ("processing_purpose", "GDPR Art. 5 - Purpose Limitation")
]

print("\nGovernance Field Completeness Check:")
print("=" * 60)

for field, governance_note in governance_fields:
    pipeline = [
        {"$match": {field: {"$exists": False}}},
        {"$count": "missing"}
    ]
    result = list(collection.aggregate(pipeline))
    missing = result[0]['missing'] if result else 0
    
    status = "âœ…" if missing == 0 else "ðŸ”´"
    print(f"{status} {field}: {missing} missing ({governance_note})")


Governance Field Completeness Check:
ðŸ”´ processing_timestamp: 430 missing (Operational traceability timestamp)
ðŸ”´ retention_until: 491 missing (GDPR Art. 5 - Storage Limitation)
ðŸ”´ data_source: 491 missing (GDPR Art. 14 - Transparency)
ðŸ”´ processing_purpose: 491 missing (GDPR Art. 5 - Purpose Limitation)


---