# NovaCred Data Quality Pipeline

This notebook runs the full post-ingestion pipeline for NovaCred credit applications:
1. Structuring and flattening raw JSON
2. Duplicate `_id` analysis and deterministic canonical selection
3. Schema validation and data quality profiling (pre-clean)
4. Cleaning and standardisation with auditable raw vs clean fields
5. Privacy tagging, pseudonymisation, and PII-safe analysis outputs
6. CSV artifact export and acceptance checks

In [1]:
# Setup imports and notebook configuration

from pathlib import Path
import sys

import pandas as pd

REPO_ROOT = Path.cwd().resolve()
if not (REPO_ROOT / 'src').exists():
    REPO_ROOT = REPO_ROOT.parent
if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))

from src import clean, config, data_dictionary, flatten, io_utils, privacy, quality, schema

pd.set_option('display.max_columns', 120)
pd.set_option('display.width', 160)


## Stage 0 - PII-Safe Guardrails

- Raw records are loaded from `data/raw/raw_credit_applications.json`.
- `privacy.redact_record(...)` and `privacy.safe_preview_df(...)` are used for previews.
- This notebook never prints raw SSNs, raw emails, raw IP addresses, or full names.

In [2]:
# Load raw JSON input and show a redacted sample record

records = io_utils.load_raw_json(config.RAW_JSON_PATH)
assert isinstance(records, list), 'Top-level JSON type must be list.'

print(f"Top-level type: {type(records).__name__}")
print(f"Raw record count: {len(records)}")

redacted_first_record = privacy.redact_record(records[0])
redacted_first_record

Top-level type: list
Raw record count: 502


{'_id': 'app_200',
 'applicant_info': {'full_name': '[REDACTED_NAME]',
  'email': 'j***@hotmail.com',
  'ssn': '***-**-4340',
  'ip_address': '[REDACTED_IP]',
  'gender': 'Male',
  'date_of_birth': '2001-**-**',
  'zip_code': '[REDACTED_IP]'},
 'financials': {'annual_income': 73000,
  'credit_history_months': 23,
  'debt_to_income': 0.2,
  'savings_balance': 31212},
 'spending_behavior': [{'category': 'Shopping', 'amount': 480},
  {'category': 'Rent', 'amount': 790},
  {'category': 'Alcohol', 'amount': 247}],
 'decision': {'loan_approved': False,
  'rejection_reason': 'algorithm_risk_score'},
 'processing_timestamp': '2024-01-15T00:00:00Z'}

## Stage 1 - Record Identity and Flattening

- Adds stable `application_row_id` (0..n-1).
- Builds two analysis-friendly tables:
  - `applications_df`: one row per raw JSON record
  - `spending_df`: one row per spending item

In [3]:
# Flatten raw records into applications and spending tables

applications_df = flatten.flatten_applications(records)
spending_df = flatten.flatten_spending_items(records)

print('Applications table shape:', applications_df.shape)
print('Spending table shape:', spending_df.shape)

privacy.safe_preview_df(applications_df, pii_columns=config.DIRECT_PII_COLUMNS, n=5)

Applications table shape: (502, 21)
Spending table shape: (827, 5)


Unnamed: 0,application_row_id,application_id,raw_processing_timestamp,raw_applicant_full_name,raw_applicant_email,raw_applicant_ssn,raw_applicant_ip_address,raw_applicant_gender,raw_applicant_date_of_birth,raw_applicant_zip_code,raw_financial_annual_income,raw_financial_annual_salary,raw_financial_credit_history_months,raw_financial_debt_to_income,raw_financial_savings_balance,raw_decision_loan_approved,raw_decision_interest_rate,raw_decision_approved_amount,raw_decision_rejection_reason,raw_loan_purpose,raw_notes
0,0,app_200,2024-01-15T00:00:00Z,[REDACTED_NAME],j***@hotmail.com,***-**-4340,[REDACTED_IP],Male,2001-**-**,10036,73000,,23,0.2,31212,False,,,algorithm_risk_score,,
1,1,app_037,,[REDACTED_NAME],b***@yahoo.com,***-**-4784,[REDACTED_IP],M,1992-**-**,10032,78000,,51,0.18,17915,False,,,algorithm_risk_score,,
2,2,app_215,,[REDACTED_NAME],s***@mail.com,***-**-5178,[REDACTED_IP],Male,1989-**-**,10075,61000,,41,0.21,37909,True,3.7,59000.0,,vacation,
3,3,app_024,,[REDACTED_NAME],t***@protonmail.com,***-**-1833,[REDACTED_IP],Male,1983-**-**,10077,103000,,70,0.35,0,True,4.3,34000.0,,,
4,4,app_184,2024-01-15T00:00:00Z,[REDACTED_NAME],b***@aol.com,***-**-2475,[REDACTED_IP],M,1999-**-**,10080,57000,,14,0.23,31763,False,,,algorithm_risk_score,,


## Stage 2 - Duplicate `_id` Handling Policy

Duplicate application IDs are assessed **before** `_id` is treated as a key.

Canonical selection for analysis is deterministic:
1. Parse `processing_timestamp` and pick latest timestamp per `application_id`.
2. If timestamp is missing/unparseable or ties, pick max `application_row_id`.

Duplicate classes:
- `exact`: records are identical
- `versioned`: differences are non-material (or timestamp/version progression)
- `conflict`: material differences requiring governance attention

In [4]:
# Analyze duplicate application IDs and canonical selection metadata

duplicate_report_df, duplicate_meta_df = quality.analyze_duplicate_ids(applications_df)

total_records = len(applications_df)
unique_ids = applications_df['application_id'].nunique(dropna=True)
duplicate_record_count = int(duplicate_meta_df['is_duplicate_id'].sum())

print('Total records:', total_records)
print('Unique application_id values:', unique_ids)
print('Records with duplicate application_id:', duplicate_record_count)

duplicate_report_df

Total records: 502
Unique application_id values: 500
Records with duplicate application_id: 4


Unnamed: 0,application_id,dup_count,classification,canonical_row_id,canonical_reason,example_differences
0,app_001,2,conflict,455,missing_or_unparseable_timestamp_fallback_max_...,raw_applicant_ssn|raw_applicant_ip_address|raw...
1,app_042,2,versioned,354,missing_or_unparseable_timestamp_fallback_max_...,raw_notes


## Stage 4 - Expected Schema and Executable Validation

Schema definitions (field type, requiredness, allowed values/ranges, and PII tags) are declared in `src/schema.py`.
Validation flags are generated at row level and later aggregated into report tables.

In [5]:
# Build pre-clean schema dictionary, catalog, and validation outputs

schema_dictionary = schema.schema_dictionary_df()
rule_catalog_df = schema.build_rule_catalog()
application_flags_pre = schema.validate_applications_preclean(applications_df)
spending_flags_pre = schema.validate_spending_preclean(spending_df)

schema_validation_report = quality.build_schema_validation_report(
    applications_df=applications_df,
    application_flags=application_flags_pre,
    spending_df=spending_df,
    spending_flags=spending_flags_pre,
    stage='pre',
    rule_catalog=rule_catalog_df,
)

rule_catalog_df.head(12)


Unnamed: 0,stage,rule_id,rule_key,rule_family,issue_type,field_path,field_path_annotated,source_columns,value_source,dataset_scope,denominator,count_unit,severity,description
0,post,R_APP_001,flag_missing_processing_timestamp,APP,Completeness,processing_timestamp,processing_timestamp_clean,clean_processing_timestamp,clean,curated,application_rows,rows,high,Missing or blank processing timestamp.
1,post,R_APP_002,flag_missing_required_applicant_field,APP,Completeness,applicant_info.*,applicant_info.required_fields_mixed,raw_applicant_full_name|clean_email|raw_applic...,derived,curated,application_rows,rows,high,One or more required applicant fields missing ...
2,post,R_APP_003,flag_missing_ssn_and_ip,APP,Completeness,applicant_info.ssn|applicant_info.ip_address,applicant_info.ssn_raw|applicant_info.ip_addre...,raw_applicant_ssn|raw_applicant_ip_address,raw,curated,application_rows,rows,high,Both SSN and IP address missing/blank.
3,post,R_APP_004,flag_blank_email,APP,Completeness,applicant_info.email,applicant_info.email_clean,clean_email,clean,curated,application_rows,rows,medium,Email missing or blank.
4,post,R_APP_005,flag_invalid_email,APP,Validity,applicant_info.email,applicant_info.email_clean,clean_email,clean,curated,application_rows,rows,medium,Email does not match expected format.
5,post,R_APP_006,flag_gender_needs_normalisation,APP,Consistency,applicant_info.gender,applicant_info.gender_clean,clean_gender,clean,curated,application_rows,rows,low,Gender remains non-canonical after cleaning.
6,post,R_APP_007,flag_invalid_gender,APP,Validity,applicant_info.gender,applicant_info.gender_clean,clean_gender|gender_invalid_flag,derived,curated,application_rows,rows,medium,Gender outside allowed source set.
7,post,R_APP_008,flag_dob_non_iso_format,APP,Consistency,applicant_info.date_of_birth,applicant_info.date_of_birth_clean,clean_date_of_birth,clean,curated,application_rows,rows,low,Clean DOB is non-ISO after parsing.
8,post,R_APP_009,flag_dob_ambiguous_format,APP,Consistency,applicant_info.date_of_birth,applicant_info.date_of_birth_raw,dob_ambiguous_flag,derived,curated,application_rows,rows,medium,DOB ambiguity flag carried from cleaning stage...
9,post,R_APP_010,flag_annual_income_string_type,APP,Consistency,financials.annual_income,financials.annual_income_clean,annual_income_coerce_failed_flag,derived,curated,application_rows,rows,low,Annual income could not be coerced to numeric ...


## Stage 5 - Data Quality Profiling (Pre-Clean)

The issue registry quantifies completeness, uniqueness, validity, consistency, cross-field logic, plausibility, and synthetic indicators.
Counts and percentages are computed from data, with examples logged by `application_id` only.

In [6]:
# Build pre-clean data quality report and key findings summary

data_quality_report_df = quality.build_data_quality_report(
    applications_df=applications_df,
    application_flags=application_flags_pre,
    duplicate_report=duplicate_report_df,
    duplicate_metadata=duplicate_meta_df,
    spending_df=spending_df,
    spending_flags=spending_flags_pre,
    stage='pre',
    rule_catalog=rule_catalog_df,
)

key_findings = (
    data_quality_report_df.loc[data_quality_report_df['count'] > 0]
    .sort_values(['count', 'rule_id'], ascending=[False, True])
    [['issue_type', 'rule_id', 'description', 'count', 'percent', 'field_path_annotated', 'value_source']]
)

key_findings


Unnamed: 0,issue_type,rule_id,description,count,percent,field_path_annotated,value_source
18,Synthetic indicator,R_APP_019,IP address is private-range (likely masked/syn...,497,99.0,applicant_info.ip_address_raw,raw
0,Completeness,R_APP_001,Missing or blank processing timestamp.,440,87.65,processing_timestamp_raw,raw
7,Consistency,R_APP_008,DOB not in canonical YYYY-MM-DD format.,157,31.27,applicant_info.date_of_birth_raw,raw
5,Consistency,R_APP_006,Gender value requires canonical mapping (M/F t...,111,22.11,applicant_info.gender_raw,raw
8,Consistency,R_APP_009,DOB format is ambiguous NN/NN/YYYY.,39,7.77,applicant_info.date_of_birth_raw,raw
17,Plausibility,R_APP_018,Loan approved with less than 6 months of credi...,18,3.59,decision.loan_approved_raw|financials.credit_h...,raw
16,Plausibility,R_APP_017,Loan approved with zero months of credit history.,11,2.19,decision.loan_approved_raw|financials.credit_h...,raw
1,Completeness,R_APP_002,One or more required applicant fields missing ...,8,1.59,applicant_info.required_fields_raw,raw
9,Consistency,R_APP_010,Annual income stored as string instead of nume...,8,1.59,financials.annual_income_raw,raw
3,Completeness,R_APP_004,Email missing or blank.,7,1.39,applicant_info.email_raw,raw


## Stage 6 - Cleaning and Standardisation (Auditable)

Cleaning choices are deterministic and preserve auditability:
- Keep `raw_*` columns unchanged.
- Write standardised `clean_*` columns.
- Add remediation flags (for parsing, coercion, nullification, and logic checks).

Key choices:
- **DOB ambiguity** (`NN/NN/YYYY` with both parts <= 12): interpreted as **MM/DD/YYYY**, plus `dob_ambiguous_flag=True`.
- **Negative numeric fields** (`credit_history_months`, `savings_balance`) are flagged and nullified in clean columns.
- **DTI out-of-range** values are flagged and nullified in `clean_debt_to_income`.
- **Field drift**: `annual_salary` is mapped into `clean_annual_income` when `annual_income` is missing, with provenance flag.

In [7]:
# Apply cleaning rules and assemble curated full dataset preview

applications_clean_df = clean.clean_applications(applications_df)
spending_clean_df = clean.clean_spending_items(spending_df)

# Curated full dataset: raw + clean + validation flags + duplicate metadata.
applications_curated_full_df = applications_clean_df.join(application_flags_pre)
applications_curated_full_df = applications_curated_full_df.merge(
    duplicate_meta_df,
    on=['application_row_id', 'application_id'],
    how='left',
)

preview_cols = [
    'application_row_id',
    'application_id',
    'raw_applicant_full_name',
    'raw_applicant_email',
    'raw_applicant_ssn',
    'clean_gender',
    'clean_annual_income',
    'clean_debt_to_income',
    'is_duplicate_id',
    'is_canonical_for_analysis',
    'has_invalid_email',
    'dob_parse_failed_flag',
]
privacy.safe_preview_df(
    applications_curated_full_df[preview_cols],
    pii_columns=config.DIRECT_PII_COLUMNS,
    n=8,
)

Unnamed: 0,application_row_id,application_id,raw_applicant_full_name,raw_applicant_email,raw_applicant_ssn,clean_gender,clean_annual_income,clean_debt_to_income,is_duplicate_id,is_canonical_for_analysis,has_invalid_email,dob_parse_failed_flag
0,0,app_200,[REDACTED_NAME],j***@hotmail.com,***-**-4340,Male,73000.0,0.2,False,True,False,False
1,1,app_037,[REDACTED_NAME],b***@yahoo.com,***-**-4784,Male,78000.0,0.18,False,True,False,False
2,2,app_215,[REDACTED_NAME],s***@mail.com,***-**-5178,Male,61000.0,0.21,False,True,False,False
3,3,app_024,[REDACTED_NAME],t***@protonmail.com,***-**-1833,Male,103000.0,0.35,False,True,False,False
4,4,app_184,[REDACTED_NAME],b***@aol.com,***-**-2475,Male,57000.0,0.23,False,True,False,False
5,5,app_275,[REDACTED_NAME],m***@outlook.com,***-**-4912,Female,110000.0,0.05,False,True,False,False
6,6,app_099,[REDACTED_NAME],n***@outlook.com,***-**-2503,Male,55000.0,0.17,False,True,False,False
7,7,app_246,[REDACTED_NAME],s***@gmail.com,***-**-1864,Female,82000.0,0.29,False,True,False,False


## Post-clean validation and remediation evidence

- Runs executable post-clean schema validation on cleaned columns.
- Runs post-clean data quality profiling on cleaned outputs.
- Produces before/after evidence table for remediation impact.
- Displays only aggregated/non-PII summaries.


In [8]:
# Run post-clean validation and build remediation evidence tables

application_flags_post = schema.validate_applications_postclean(applications_clean_df)
spending_flags_post = schema.validate_spending_postclean(spending_clean_df)

schema_validation_report_postclean = quality.build_schema_validation_report(
    applications_df=applications_clean_df,
    application_flags=application_flags_post,
    spending_df=spending_clean_df,
    spending_flags=spending_flags_post,
    stage="post",
    application_rules=schema.APPLICATION_RULES_POST,
    spending_rules=schema.SPENDING_RULES_POST,
    rule_catalog=rule_catalog_df,
)

data_quality_report_postclean_df = quality.build_data_quality_report(
    applications_df=applications_clean_df,
    application_flags=application_flags_post,
    duplicate_report=duplicate_report_df,
    duplicate_metadata=duplicate_meta_df,
    spending_df=spending_clean_df,
    spending_flags=spending_flags_post,
    stage="post",
    application_rules=schema.APPLICATION_RULES_POST,
    spending_rules=schema.SPENDING_RULES_POST,
    ssn_column="raw_applicant_ssn",
    rule_catalog=rule_catalog_df,
)

before_after_comparison_df = quality.build_before_after_comparison(
    pre_report=data_quality_report_df,
    post_report=data_quality_report_postclean_df,
    duplicate_report=duplicate_report_df,
    duplicate_metadata=duplicate_meta_df,
    total_records=len(applications_df),
    canonical_count=int(duplicate_meta_df["is_canonical_for_analysis"].sum()),
)

cleaning_actions_df = quality.summarise_cleaning_changes(applications_clean_df)

print("Post-clean schema rules:", len(schema_validation_report_postclean))
print("Post-clean issue rows:", len(data_quality_report_postclean_df))
print("Post-clean non-zero issue rows:", int((data_quality_report_postclean_df["count"] > 0).sum()))

before_after_comparison_df


Post-clean schema rules: 22
Post-clean issue rows: 26
Post-clean non-zero issue rows: 14


Unnamed: 0,metric,rule_id,pre_count,pre_percent,post_count,post_percent,delta_count,delta_percent
0,Missing required applicant fields,R_APP_002,8,1.59,8,1.59,0,0.0
1,Missing processing timestamp,R_APP_001,440,87.65,440,87.65,0,0.0
2,Blank email,R_APP_004,7,1.39,7,1.39,0,0.0
3,Invalid email format,R_APP_005,4,0.8,4,0.8,0,0.0
4,Gender requires normalization,R_APP_006,111,22.11,0,0.0,-111,-22.11
5,DOB non-ISO format,R_APP_008,157,31.27,0,0.0,-157,-31.27
6,Annual income type/coercion issue,R_APP_010,8,1.59,0,0.0,-8,-1.59
7,Annual salary field drift,R_APP_011,5,1.0,5,1.0,0,0.0
8,Negative credit history months,R_APP_012,2,0.4,0,0.0,-2,-0.4
9,Negative savings balance,R_APP_013,1,0.2,0,0.0,-1,-0.2


## Stage 7 - Privacy Tagging and Pseudonymisation

Pseudonymisation strategy (`src/privacy.py`):
- `applicant_pseudo_id = sha256(salt + "|" + seed)`
- Seed precedence: `ssn` -> `email` fallback -> `(full_name + date_of_birth + zip_code)` fallback -> application fallback.
- `pseudo_id_source` and `pseudo_id_fallback_used_flag` are retained for transparency.

For `applications_analysis.csv`:
- Keep only canonical rows.
- Remove direct PII columns (`full_name`, `email`, `ssn`, `ip_address`, raw DOB string, clean DOB string).
- Keep privacy-preserving derived `age_band` instead of DOB.

In [9]:
# Build privacy-safe analysis dataset and PII inventory

applications_analysis_df = privacy.build_analysis_dataset(applications_curated_full_df)
pii_inventory_df = privacy.generate_pii_inventory(
    curated_full_df=applications_curated_full_df,
    analysis_df=applications_analysis_df,
    spending_df=spending_clean_df,
)

print('Curated full rows:', len(applications_curated_full_df))
print('Analysis rows (canonical applications):', len(applications_analysis_df))
print('Unique analysis application_id:', applications_analysis_df['application_id'].nunique())

applications_analysis_df.head(10)

Curated full rows: 502
Analysis rows (canonical applications): 500
Unique analysis application_id: 500


Unnamed: 0,application_id,applicant_pseudo_id,pseudo_id_source,pseudo_id_fallback_used_flag,age_band,age_band_missing_flag,clean_gender,clean_zip_code,clean_annual_income,clean_credit_history_months,clean_debt_to_income,clean_savings_balance,clean_loan_approved,clean_interest_rate,clean_approved_amount,clean_rejection_reason
0,app_001,fc4fb76803a008529455aa4130a4c9f4a5f72f06f7ad43...,email_fallback,True,,True,,,102000.0,37,0.42,0.0,False,,,high_dti_ratio
1,app_002,7fa4238022da5aed441f8c48a907a8f3cbe88186049c8e...,ssn,False,25-34,False,Male,10020.0,41000.0,5,0.36,18200.0,False,,,algorithm_risk_score
2,app_003,e626311f310f7fb80415229777be761b877b33d42ddcc1...,ssn,False,35-44,False,Female,90213.0,65000.0,74,0.43,7090.0,True,3.4,76000.0,
3,app_004,417094dc0567f442dbc01b8d7007ea237a13ba958b0d77...,ssn,False,25-34,False,Female,90217.0,69000.0,9,0.41,10327.0,False,,,high_dti_ratio
4,app_005,61c112b16ecefe0dab71d98f848daab97cb0019083597a...,ssn,False,65+,False,Female,90296.0,39000.0,76,0.06,15011.0,False,,,algorithm_risk_score
5,app_006,6c5853ffe9833775699631aaa163298d4b6e44a7c71d46...,ssn,False,35-44,False,Male,10048.0,82000.0,24,0.21,0.0,True,6.2,27000.0,
6,app_007,4710a710ad478e0137bc642d2a5e92dc84697624514c41...,ssn,False,35-44,False,Female,90284.0,92000.0,77,0.08,21353.0,True,5.2,27000.0,
7,app_008,b82f9f2c882994ac2bd2a6987b1bb019b254c595314fc0...,ssn,False,25-34,False,Female,90244.0,80000.0,24,0.43,22882.0,True,3.6,51000.0,
8,app_009,cce197ca698a62737834e6f500f429b7bb077dfce23bf2...,ssn,False,35-44,False,Female,90261.0,92000.0,15,0.08,39921.0,True,5.9,27000.0,
9,app_010,e591991bb78dfdb464ba8b84330cc953eb36772446a9f0...,ssn,False,25-34,False,Female,90250.0,44000.0,62,0.25,6824.0,False,,,algorithm_risk_score


## Data Dictionary

A canonical dictionary is generated across raw attributes and all output datasets, with business and lineage views.


In [10]:
# Build canonical, business, and lineage data dictionary outputs

output_datasets = {
    "applications_curated_full": applications_curated_full_df,
    "applications_analysis": applications_analysis_df,
    "spending_items_clean": spending_clean_df,
    "data_quality_report_pre": data_quality_report_df,
    "data_quality_report_post": data_quality_report_postclean_df,
    "schema_validation_report_pre": schema_validation_report,
    "schema_validation_report_post": schema_validation_report_postclean,
    "duplicate_id_report": duplicate_report_df,
    "pii_inventory": pii_inventory_df,
    "before_after_comparison": before_after_comparison_df,
    "rule_catalog": rule_catalog_df,
}

data_dictionary_df = data_dictionary.build_data_dictionary(
    records=records,
    output_datasets=output_datasets,
    rule_catalog_df=rule_catalog_df,
    schema_dictionary_df=schema_dictionary,
    pii_inventory_df=pii_inventory_df,
)
data_dictionary_business_df = data_dictionary.build_data_dictionary_business_view(data_dictionary_df)
data_dictionary_lineage_df = data_dictionary.build_data_dictionary_lineage_view(data_dictionary_df)

print("Data dictionary rows:", len(data_dictionary_df))
print("Business view rows:", len(data_dictionary_business_df))
print("Lineage view rows:", len(data_dictionary_lineage_df))

data_dictionary_df.head(12)


Data dictionary rows: 232
Business view rows: 232
Lineage view rows: 232


Unnamed: 0,dataset,field_name,field_path,description,data_type_observed,data_type_expected,nullable,allowed_values_or_range,example_redacted,pii_classification,transform_lineage,used_in_rules,present_in_outputs
0,applications_analysis,age_band,age_band,Derived privacy-preserving age grouping.,str,,True,25-34|35-44|45-54|55-64|65+|<25,,Non-PII,derived_for_analysis_privacy,,applications_analysis
1,applications_analysis,age_band_missing_flag,age_band_missing_flag,Derived validation/remediation indicator.,bool,,False,True|False,True,Non-PII,validation_or_cleaning_logic -> flag,,applications_analysis
2,applications_analysis,applicant_pseudo_id,applicant_pseudo_id,Privacy-preserving pseudonymization output.,str,,False,,fc4fb76803a008529455aa4130a4c9f4a5f72...,Quasi-PII,derived_for_analysis_privacy,,applications_analysis
3,applications_analysis,application_id,_id,External application identifier.,str,string,False,,app_001,Quasi-PII,pipeline_derived_or_passthrough,R_DUP_001|R_DUP_002|R_DUP_003|R_DUP_004|R_DUP_...,applications_analysis|applications_curated_ful...
4,applications_analysis,clean_annual_income,clean_annual_income,Standardized/cleaned field derived during reme...,float,,False,"[0.0, 171000.0]",102000.0,Non-PII,cleaning_logic -> clean_annual_income,,applications_analysis|applications_curated_full
5,applications_analysis,clean_approved_amount,clean_approved_amount,Standardized/cleaned field derived during reme...,float,,True,"[15000.0, 80000.0]",76000.0,Non-PII,cleaning_logic -> clean_approved_amount,R_APP_015,applications_analysis|applications_curated_full
6,applications_analysis,clean_credit_history_months,clean_credit_history_months,Standardized/cleaned field derived during reme...,int,,True,>=0,37,Non-PII,cleaning_logic -> clean_credit_history_months,R_APP_012|R_APP_017|R_APP_018,applications_analysis|applications_curated_full
7,applications_analysis,clean_debt_to_income,clean_debt_to_income,Standardized/cleaned field derived during reme...,float,,True,"[0,1]",0.42,Non-PII,cleaning_logic -> clean_debt_to_income,R_APP_014,applications_analysis|applications_curated_full
8,applications_analysis,clean_gender,clean_gender,Standardized/cleaned field derived during reme...,str,,True,Male|Female,Male,Non-PII,cleaning_logic -> clean_gender,R_APP_002|R_APP_006|R_APP_007,applications_analysis|applications_curated_full
9,applications_analysis,clean_interest_rate,clean_interest_rate,Standardized/cleaned field derived during reme...,float,,True,"[2.5, 6.5]",3.4,Non-PII,cleaning_logic -> clean_interest_rate,R_APP_015,applications_analysis|applications_curated_full


## Stage 8 - Persist CSV Artifacts

In [11]:
# Export curated and quality report CSV artifacts

io_utils.ensure_output_dirs()

io_utils.write_csv(applications_curated_full_df, config.APPLICATIONS_CURATED_FULL_PATH)
io_utils.write_csv(applications_analysis_df, config.APPLICATIONS_ANALYSIS_PATH)
io_utils.write_csv(spending_clean_df, config.SPENDING_ITEMS_CLEAN_PATH)
io_utils.write_csv(data_quality_report_df, config.DATA_QUALITY_REPORT_PATH)
io_utils.write_csv(data_quality_report_postclean_df, config.DATA_QUALITY_REPORT_POSTCLEAN_PATH)
io_utils.write_csv(duplicate_report_df, config.DUPLICATE_ID_REPORT_PATH)
io_utils.write_csv(pii_inventory_df, config.PII_INVENTORY_PATH)
io_utils.write_csv(schema_validation_report, config.SCHEMA_VALIDATION_REPORT_PATH)
io_utils.write_csv(schema_validation_report_postclean, config.SCHEMA_VALIDATION_REPORT_POSTCLEAN_PATH)
io_utils.write_csv(before_after_comparison_df, config.BEFORE_AFTER_COMPARISON_PATH)
io_utils.write_csv(rule_catalog_df, config.RULE_CATALOG_PATH)
io_utils.write_csv(data_dictionary_df, config.DATA_DICTIONARY_PATH)
io_utils.write_csv(data_dictionary_business_df, config.DATA_DICTIONARY_BUSINESS_PATH)
io_utils.write_csv(data_dictionary_lineage_df, config.DATA_DICTIONARY_LINEAGE_PATH)

outputs = pd.DataFrame(
    [
        {'output_file': str(config.APPLICATIONS_CURATED_FULL_PATH), 'rows': len(applications_curated_full_df), 'cols': applications_curated_full_df.shape[1]},
        {'output_file': str(config.APPLICATIONS_ANALYSIS_PATH), 'rows': len(applications_analysis_df), 'cols': applications_analysis_df.shape[1]},
        {'output_file': str(config.SPENDING_ITEMS_CLEAN_PATH), 'rows': len(spending_clean_df), 'cols': spending_clean_df.shape[1]},
        {'output_file': str(config.DATA_QUALITY_REPORT_PATH), 'rows': len(data_quality_report_df), 'cols': data_quality_report_df.shape[1]},
        {'output_file': str(config.DATA_QUALITY_REPORT_POSTCLEAN_PATH), 'rows': len(data_quality_report_postclean_df), 'cols': data_quality_report_postclean_df.shape[1]},
        {'output_file': str(config.DUPLICATE_ID_REPORT_PATH), 'rows': len(duplicate_report_df), 'cols': duplicate_report_df.shape[1]},
        {'output_file': str(config.PII_INVENTORY_PATH), 'rows': len(pii_inventory_df), 'cols': pii_inventory_df.shape[1]},
        {'output_file': str(config.SCHEMA_VALIDATION_REPORT_PATH), 'rows': len(schema_validation_report), 'cols': schema_validation_report.shape[1]},
        {'output_file': str(config.SCHEMA_VALIDATION_REPORT_POSTCLEAN_PATH), 'rows': len(schema_validation_report_postclean), 'cols': schema_validation_report_postclean.shape[1]},
        {'output_file': str(config.BEFORE_AFTER_COMPARISON_PATH), 'rows': len(before_after_comparison_df), 'cols': before_after_comparison_df.shape[1]},
        {'output_file': str(config.RULE_CATALOG_PATH), 'rows': len(rule_catalog_df), 'cols': rule_catalog_df.shape[1]},
        {'output_file': str(config.DATA_DICTIONARY_PATH), 'rows': len(data_dictionary_df), 'cols': data_dictionary_df.shape[1]},
        {'output_file': str(config.DATA_DICTIONARY_BUSINESS_PATH), 'rows': len(data_dictionary_business_df), 'cols': data_dictionary_business_df.shape[1]},
        {'output_file': str(config.DATA_DICTIONARY_LINEAGE_PATH), 'rows': len(data_dictionary_lineage_df), 'cols': data_dictionary_lineage_df.shape[1]},
    ]
)
outputs


Unnamed: 0,output_file,rows,cols
0,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,502,87
1,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,500,16
2,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,827,11
3,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,26,17
4,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,26,17
5,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,2,6
6,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,25,4
7,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,22,15
8,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,22,15
9,C:\Users\conno\dev\db\nova_dego\DEGO_Project_G...,18,8


## Acceptance Checks

This section enforces required deliverables and key privacy constraints.

In [12]:
# Run acceptance checks for outputs, privacy, and canonical logic

required_paths = [
    config.APPLICATIONS_CURATED_FULL_PATH,
    config.APPLICATIONS_ANALYSIS_PATH,
    config.SPENDING_ITEMS_CLEAN_PATH,
    config.DATA_QUALITY_REPORT_PATH,
    config.DATA_QUALITY_REPORT_POSTCLEAN_PATH,
    config.DUPLICATE_ID_REPORT_PATH,
    config.PII_INVENTORY_PATH,
    config.SCHEMA_VALIDATION_REPORT_PATH,
    config.SCHEMA_VALIDATION_REPORT_POSTCLEAN_PATH,
    config.BEFORE_AFTER_COMPARISON_PATH,
    config.RULE_CATALOG_PATH,
    config.DATA_DICTIONARY_PATH,
    config.DATA_DICTIONARY_BUSINESS_PATH,
    config.DATA_DICTIONARY_LINEAGE_PATH,
]

for path in required_paths:
    assert Path(path).exists(), f"Missing output file: {path}"

assert len(rule_catalog_df) > 0, "rule_catalog.csv must be non-empty."
assert len(data_dictionary_df) > 0, "data_dictionary.csv must be non-empty."
assert len(data_dictionary_business_df) > 0, "data_dictionary_business.csv must be non-empty."
assert len(data_dictionary_lineage_df) > 0, "data_dictionary_lineage.csv must be non-empty."
assert len(schema_validation_report_postclean) > 0, "Post-clean schema validation report must be non-empty."
assert len(data_quality_report_postclean_df) > 0, "Post-clean data quality report must be non-empty."
assert len(before_after_comparison_df) > 0, "Before/after comparison report must be non-empty."

quality.assert_rule_catalog_coverage(data_quality_report_df, rule_catalog_df, "data_quality_report_pre", stage_aware=True)
quality.assert_rule_catalog_coverage(data_quality_report_postclean_df, rule_catalog_df, "data_quality_report_post", stage_aware=True)
quality.assert_rule_catalog_coverage(schema_validation_report, rule_catalog_df, "schema_validation_report_pre", stage_aware=True)
quality.assert_rule_catalog_coverage(schema_validation_report_postclean, rule_catalog_df, "schema_validation_report_post", stage_aware=True)
quality.assert_rule_catalog_coverage(before_after_comparison_df, rule_catalog_df, "before_after_comparison", stage_aware=False)

annotation_columns = ["field_path_annotated", "source_columns", "value_source", "dataset_scope", "denominator", "count_unit"]
for col in annotation_columns:
    assert col in data_quality_report_df.columns, f"Missing annotation column in pre data quality report: {col}"
    assert col in data_quality_report_postclean_df.columns, f"Missing annotation column in post data quality report: {col}"
    assert col in schema_validation_report.columns, f"Missing annotation column in pre schema validation report: {col}"
    assert col in schema_validation_report_postclean.columns, f"Missing annotation column in post schema validation report: {col}"

required_duplicate_cols = [
    "is_duplicate_id",
    "dup_count",
    "rank_within_id",
    "is_canonical_for_analysis",
    "has_conflict",
]
for col in required_duplicate_cols:
    assert col in applications_curated_full_df.columns, f"Missing duplicate metadata column: {col}"

assert applications_analysis_df["application_id"].is_unique, "Analysis dataset must have unique application_id."
assert "applicant_pseudo_id" in applications_analysis_df.columns, "Missing applicant_pseudo_id in analysis dataset."
assert not any(col.startswith("raw_") for col in applications_analysis_df.columns), "Analysis dataset must not contain raw_* columns."
allowed_flag_cols = {"age_band_missing_flag", "pseudo_id_fallback_used_flag"}
flag_cols = [col for col in applications_analysis_df.columns if col.endswith("_flag")]
assert set(flag_cols).issubset(allowed_flag_cols), f"Unexpected flag columns in analysis dataset: {sorted(set(flag_cols) - allowed_flag_cols)}"

for forbidden_col in [
    "raw_applicant_full_name",
    "raw_applicant_email",
    "raw_applicant_ssn",
    "raw_applicant_ip_address",
    "raw_applicant_date_of_birth",
    "clean_email",
    "clean_date_of_birth",
]:
    assert forbidden_col not in applications_analysis_df.columns, f"Forbidden PII column present in analysis dataset: {forbidden_col}"

assert data_quality_report_df["count"].notna().all(), "data_quality_report.csv must contain counts."
assert data_quality_report_df["percent"].notna().all(), "data_quality_report.csv must contain percentages."
assert data_quality_report_postclean_df["count"].notna().all(), "data_quality_report_postclean.csv must contain counts."
assert data_quality_report_postclean_df["percent"].notna().all(), "data_quality_report_postclean.csv must contain percentages."

# Canonical selection policy check: latest processing_timestamp, fallback to max application_row_id.
policy_df = applications_df[["application_id", "application_row_id", "raw_processing_timestamp"]].copy()
policy_df["parsed_ts"] = pd.to_datetime(policy_df["raw_processing_timestamp"], errors="coerce", utc=True)
canonical_lookup = (
    applications_curated_full_df.loc[applications_curated_full_df["is_canonical_for_analysis"], ["application_id", "application_row_id"]]
    .set_index("application_id")["application_row_id"]
)
for app_id, group in policy_df.groupby("application_id", dropna=False):
    max_ts = group["parsed_ts"].max()
    if pd.notna(max_ts):
        expected_row_id = int(group.loc[group["parsed_ts"] == max_ts, "application_row_id"].max())
    else:
        expected_row_id = int(group["application_row_id"].max())
    actual_row_id = int(canonical_lookup.loc[app_id])
    assert actual_row_id == expected_row_id, f"Canonical selection mismatch for {app_id}: expected {expected_row_id}, got {actual_row_id}"

print("All acceptance checks passed.")


All acceptance checks passed.
