# PII Detection & Data Quality Validation Pipeline

This notebook demonstrates an end-to-end **fintech data quality pipeline** that ingests raw customer data, audits it for quality issues, detects and masks Personally Identifiable Information (PII), and produces a clean, GDPR-compliant dataset.

### Pipeline Stages
| Stage | Module | Purpose |
|-------|--------|---------|
| **Part 1** | `DataProfiler` | Profile data quality — completeness, types, formatting issues |
| **Part 2** | `PIIDetector` | Scan for PII using regex and heuristic matching |
| **Part 3** | `FintechGXValidator` | Validate schema rules with Great Expectations |
| **Part 4** | `DataRemediator` | Clean and normalize data to pass validation |
| **Part 5** | `DataMasker` | Mask all PII fields for safe downstream use |

### Input / Output
- **Input:** `data/raw/customers_raw.csv` — 10 customer records with deliberate quality issues
- **Output:** `data/processed/customers_masked.csv` — cleaned and masked dataset
- **Reports:** Quality profile, PII detection, validation results, cleaning log, masked sample

In [None]:
import os
import sys
PROJECT_ROOT = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(PROJECT_ROOT)
input_path = PROJECT_ROOT + '/data/raw/customers_raw.csv'

#part 1: Data Profiler
from src.part1.data_profiler import DataProfiler
from src.utils.logger_config import setup_pipeline_logger

#part 2: PII Detector
from src.part2.pii_detector import PIIDetector

#part 3: Data Validator
from src.part3.data_validator import FintechGXValidator

#part 4: Data Remediator
from src.part4.cleaning import DataRemediator

#part 5: Data Masker
from src.part5.data_masker import DataMasker


import pandas as pd

# Initialize our specialized logger
logger = setup_pipeline_logger()

logger.info("Pipeline Logger initialized. Ready for Data Quality Analysis.")

# Verify the data directory exists
if not os.path.exists('../data'):
    logger.error("Data directory not found!")
else:
    logger.info("Data directory verified.")

INFO: Pipeline Logger initialized. Ready for Data Quality Analysis.
INFO: Data directory verified.


## Part 1: Data Profiling & Quality Assessment
This stage loads the raw customer dataset and performs a comprehensive data quality audit. The `DataProfiler` checks for:
- **Completeness** — missing values and sentinel placeholders (e.g., `invalid_date`)
- **Data Types** — verifying each column matches expected types (INT, STRING, DATE, NUMERIC)
- **Quality Issues** — inconsistent phone formats, invalid category values, duplicate IDs

The output is a structured text report saved to `data/reports/data_quality_report.txt`.

In [11]:
# Instantiate and run
profiler = DataProfiler(input_path=input_path)
profiler.run_full_analysis(output_report_path='../data/reports/data_quality_report.txt')

INFO: Loaded 10 rows from customers_raw.csv
INFO: Analysis complete. Report saved to: ../data/reports/data_quality_report.txt


## Part 2: PII Detection & Risk Assessment
This stage scans the dataset for **Personally Identifiable Information (PII)** using regex pattern matching and heuristic checks. The `PIIDetector` identifies:
- **Emails** — matched via RFC-style regex patterns
- **Phone Numbers** — matched via flexible digit/separator patterns
- **Addresses** — detected by non-null entries with sufficient length
- **Dates of Birth** — validated as present and non-sentinel values

The scan produces a risk assessment report with exposure analysis and mitigation recommendations.

In [12]:
detector = PIIDetector(profiler.df)

# Run the scan and generate report
detector.scan_pii().generate_report('../data/reports/pii_detection_report.txt')

# Quick visual check in the notebook
print("\nPII Scan Summary:")
print(f"Emails found: {detector.risk_results['emails']}")
print(f"Phones found: {detector.risk_results['phones']}")

INFO: Starting PII scanning process...
INFO: Scan complete. Found 10 emails and 10 phone numbers.
INFO: PII Detection Report saved to ../data/reports/pii_detection_report.txt

PII Scan Summary:
Emails found: 10
Phones found: 10


## Part 3: Schema Validation with Great Expectations
This stage applies formal schema validation using **Great Expectations (GX 1.x)**. The `FintechGXValidator` enforces strict rules on the raw data:
- `customer_id` must be unique, positive, and non-null
- Names must be non-null, 2–50 characters, alphabetic only
- `income` must be numeric and within a realistic range (0–10M)
- `account_status` must be one of: `active`, `inactive`, `suspended`
- Dates must conform to `YYYY-MM-DD` format

The validation result (True/False) shows whether the raw data passes all business rules. Failures are expected at this stage — they will be remediated in Part 4.

In [13]:
# Initialize with the modern API
gx_engine = FintechGXValidator(profiler.df)

# Build, Validate, and Generate the deliverable
results = gx_engine.build_expectations().validate(
    report_path='../data/reports/validation_results.txt'
)

print(f"Validation Success: {results.success}")

INFO: Building Strict Expectations for suite: fintech_suite
INFO: Starting GX 1.x Validation execution...


Calculating Metrics:   0%|          | 0/84 [00:00<?, ?it/s]

INFO: Forensic GX Report saved to ../data/reports/validation_results.txt
Validation Success: False


## Part 4: Data Cleaning & Remediation
This stage applies automated fixes to resolve the quality issues identified in Parts 1 and 3. The `DataRemediator` performs:
- **Name Normalization** — applies title case to `first_name` and `last_name`
- **Phone Standardization** — converts all formats (dotted, parenthesized, continuous digits) to `XXX-XXX-XXXX`
- **Date Normalization** — converts all date formats to `YYYY-MM-DD` and replaces `invalid_date` sentinels with NaT
- **Missing Value Imputation** — fills nulls with safe defaults (`[UNKNOWN]`, `0`, `unknown`)

After cleaning, a **re-validation** is run with Great Expectations to confirm the fixes. The cleaning log with before/after statistics is saved to `data/reports/cleaning_log.txt`.

In [14]:
# 1. Initialize Remediator
remediator = DataRemediator(profiler.df)

# 2. Execute Cleaning Pipeline
remediator.normalize_names().normalize_phones().normalize_dates().handle_missing()

# 3. Re-Validate to confirm 0 failures
gx_engine_v2 = FintechGXValidator(remediator.df, suite_name="cleaned_suite")
results_after = gx_engine_v2.build_expectations().validate(report_path='../data/reports/validation_final.txt')

# 4. Generate the Log
remediator.generate_log(
    output_path='../data/reports/cleaning_log.txt',
    validation_before=7, # Based on our previous Part 3 findings
    validation_after=results_after.statistics['unsuccessful_expectations']
)

# 5. Save final CSV
remediator.df.to_csv('../data/processed/customers_cleaned.csv', index=False)
logger.info("Pipeline Execution Complete. Golden Dataset generated.")

INFO: Building Strict Expectations for suite: cleaned_suite
INFO: Starting GX 1.x Validation execution...


Calculating Metrics:   0%|          | 0/84 [00:00<?, ?it/s]

INFO: Forensic GX Report saved to ../data/reports/validation_final.txt
INFO: Cleaning log saved to ../data/reports/cleaning_log.txt
INFO: Pipeline Execution Complete. Golden Dataset generated.


## Part 5: PII Masking & GDPR Compliance
This final stage masks all PII fields to produce a **GDPR-compliant** dataset safe for analytics teams. The `DataMasker` applies:
- **Names** — `John Doe` → `J*** D***`
- **Emails** — `john.doe@gmail.com` → `j***@gmail.com`
- **Phones** — `555-123-4567` → `***-***-4567`
- **Addresses** — replaced with `[MASKED ADDRESS]`
- **Dates of Birth** — `1985-03-15` → `1985-**-**`

Business-critical fields (income, account status, created date) remain intact. The masked dataset is saved to `data/processed/customers_masked.csv`, along with a before/after comparison report.

In [15]:
# 1. Initialize Masker with the cleaned data from Part 4
masker = DataMasker(remediator.df)

# 2. Execute Masking Chain
masker.mask_names().mask_emails().mask_phones().mask_addresses().mask_dob()

# 3. Save the final GDPR-compliant file
masked_df = masker.save_masked_data('../data/processed/customers_masked.csv')

# 4. Generate the deliverable comparison
masker.generate_masked_sample(profiler.df, '../data/reports/masked_sample.txt')

display(masked_df.head(5))

INFO: Masked dataset saved to ../data/processed/customers_masked.csv
INFO: Masked sample report generated at ../data/reports/masked_sample.txt


Unnamed: 0,customer_id,first_name,last_name,email,phone,date_of_birth,address,income,account_status,created_date
0,1,J***,D***,j***@gmail.com,***-***-4567,1985-**-**,[MASKED ADDRESS],75000.0,active,2024-01-10
1,2,J***,S***,j***@company.com,***-***-6543,1990-**-**,[MASKED ADDRESS],95000.0,active,2024-01-11
2,3,N***,J***,b***@email.com,***-***-5678,1988-**-**,[MASKED ADDRESS],0.0,suspended,2024-01-12
3,4,M***,B***,m***@gmail.com,***-***-6789,UNKN-**-**,[MASKED ADDRESS],120000.0,unknown,2024-01-13
4,5,R***,N***,r***@yahoo.com,***-***-7890,2005-**-**,[MASKED ADDRESS],55000.0,active,UNKNOWN


---
##  Pipeline Complete

All five stages have executed successfully. The following deliverables have been generated:

| Deliverable | Location |
|-------------|----------|
| Data Quality Profile | `data/reports/data_quality_report.txt` |
| PII Detection Report | `data/reports/pii_detection_report.txt` |
| Validation Results (Raw) | `data/reports/validation_results.txt` |
| Validation Results (Cleaned) | `data/reports/validation_final.txt` |
| Cleaning Log | `data/reports/cleaning_log.txt` |
| Masked Sample Comparison | `data/reports/masked_sample.txt` |
| Cleaned Dataset | `data/processed/customers_cleaned.csv` |
| Masked Dataset (Final) | `data/processed/customers_masked.csv` |

The masked dataset is safe for sharing with analytics teams — all PII has been obfuscated while preserving business-critical fields like income, account status, and dates.