%md
# Multi-Partner Eligibility Ingestion Pipeline

---

### Ingestion Workflow
1. **Discovery**: Scans landing zone for files matching `file_identifier` patterns.
2. **Schema Enforcement**: Renames and aligns columns to the `standard_schema`.
3. **Cleaning**: Applies business rules (Title Case, Phone Formatting, Date Normalization).
4. **Audit Stamping**: Appends `source_file` and `ingestion_timestamp` for lineage.
5. **Partitioning**: Splits data based on the presence of the `external_id` primary key.

In [0]:
#IMPORTS
import pandas as pd
import numpy as np
import json
import re
from datetime import datetime
import os
import glob

## STEP 1: Global Configuration & Metadata

In [0]:
# Mocking the config object that would usually live in a .json file or DB
config = {
    "acme": {
        "file_identifier": "acme",  # Added this for the search logic
        "delimiter": "|",
        "partner_code": "ACME_HEALTH",
        "column_mapping": {        
            "MBI": "external_id", "FNAME": "first_name", "LNAME": "last_name",
            "DOB": "dob", "EMAIL": "email", "PHONE": "phone"
        },
        "date_format": "%m/%d/%Y"
    },
    "bettercare": {
        "file_identifier": "bettercare", # Added this
        "delimiter": ",",
        "partner_code": "BETTER_CARE",
        "column_mapping": {        
            "subscriber_id": "external_id", "first_name": "first_name", "last_name": "last_name",
            "date_of_birth": "dob", "email": "email", "phone": "phone"
        },
        "date_format": "%Y-%m-%d"
    }
    # --- NEW VENDOR ADDED HERE ---
    # ,"healthfirst": {
    #     "file_identifier": "healthfirst",
    #     "delimiter": ";",
    #     "partner_code": "HF_03",
    #     "column_mapping": {
    #         "ID_Num": "external_id", 
    #         "Given_Name": "first_name", 
    #         "Surname": "last_name", 
    #         "Birth_Date": "dob", 
    #         "Contact_Email": "email", 
    #         "Cell": "phone"
    #     },
    #     "date_format": "%d-%m-%Y" # European/Military style date
    # }
}

print("Pipeline configuration loaded.")

Pipeline configuration loaded.


## STEP 2: Source System Simulation (Landing Zone)

In [0]:
# --- REAL WORLD CONFIGURATION ---
# In production, this path would be an environment variable or a mount point
# e.g., ENV_PATH = os.getenv("INPUT_PATH", "/mnt/data/raw/")
# For this demo, we use a local relative path.
LANDING_ZONE = "./landing_zone/" 

def setup_demo_environment():
    """Simulates a Cloud Landing Zone by creating local sample files."""
    os.makedirs(LANDING_ZONE, exist_ok=True)
    
    # ACME: Pipe Delimited + Edge Cases
    acme_content = (
        "MBI|FNAME|LNAME|DOB|EMAIL|PHONE\n"
        "1234567890A|John|Doe|03/15/1955|JOHN.DOE@EMAIL.COM|5551234567\n" # Normal
        "|Missing|ID|01/01/1960|test@test.com|1112223333\n"              # EDGE: Missing ID (Should be rejected)
        "9998887776B|  martha  |  SMITH  |05/20/1970|MARTHA@gmail.com|(555) 999-8888\n" # EDGE: Spaces/Casing/Phone
        "4445556667C|Bob|Jones|08/12/1945||555-000-1111"                 # EDGE: Missing Email
    )

    # BETTERCARE: Comma Delimited + Edge Cases
    bettercare_content = (
        "subscriber_id,first_name,last_name,date_of_birth,email,phone\n"
        "BC-001,Alice,Johnson,1965-08-10,alice.j@test.com,555-222-3333\n" # Normal
        "BC-002,Charlie,Brown,1972-99-99,charlie@peanuts.com,5554445555\n" # EDGE: Invalid Date (99-99)
        "BC-003,Bad,Phone,1980-01-01,bad@phone.com,12345"                 # EDGE: Invalid Phone (Too short)
    )

    # HEALTHFIRST: Semicolon Delimited + Edge Cases
    # hf_content = (
    #     "ID_Num;Given_Name;Surname;Birth_Date;Contact_Email;Cell\n"
    #     "HF-999;David;Miller;31-12-1985;david.m@hf.org;4445556666\n"
    #     ";Empty;ID;01-01-1990;none@test.com;0000000000" # One bad row to test rejection
    # )
    
    samples = {
        "acme_eligibility_20241025.txt": acme_content,
        "bettercare_member_extract.csv": bettercare_content   
        # ,"healthfirst_member_extract.csv": hf_content
    }
    
    for filename, content in samples.items():
        with open(os.path.join(LANDING_ZONE, filename), 'w') as f:
            f.write(content)
    print(f" Simulation Layer Active: Reading from {LANDING_ZONE}")

setup_demo_environment()

 Simulation Layer Active: Reading from ./landing_zone/


## STEP 3: Transformation & Standardization Engine

In [0]:
class EligibilityEngine:
    def __init__(self, config):
        self.config = config
        self.standard_schema = ['external_id', 'first_name', 'last_name', 'dob', 'email', 'phone', 'partner_code']

    def _clean_phone(self, phone):
        """Standardizes phone numbers to XXX-XXX-XXXX."""
        if pd.isna(phone): return None
        digits = re.sub(r'\D', '', str(phone))
        return f"{digits[:3]}-{digits[3:6]}-{digits[6:]}" if len(digits) == 10 else None

    def transform(self, df, partner_cfg, filename):
        """Applies business rules to a specific partner dataframe."""
        # 1. Map Columns
        df = df.rename(columns=partner_cfg['column_mapping'])
        
        # 2. Schema Enforcement (Fill missing columns with Nulls if they don't exist)
        for col in self.standard_schema:
            if col not in df.columns:
                df[col] = None

        # 3. Vectorized Transformations
        df['first_name'] = df['first_name'].str.strip().str.title()
        df['last_name'] = df['last_name'].str.strip().str.title()
        df['email'] = df['email'].str.strip().str.lower()
        df['phone'] = df['phone'].apply(self._clean_phone)
        
        # 4. Standardize Date to ISO-8601
        df['dob'] = pd.to_datetime(df['dob'], format=partner_cfg['date_format'], errors='coerce').dt.strftime('%Y-%m-%d')
        
        # 5. Add Audit Fields
        df['source_file'] = filename
        df['partner_code'] = partner_cfg['partner_code']
        df['ingestion_timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        #print(df)
        return df[self.standard_schema + ['ingestion_timestamp','source_file']]

    def run(self, input_path):
        """Automated Discovery with Global Error Handling."""
        all_files = glob.glob(os.path.join(input_path, "*"))
        final_dfs = []
        self.processing_summary = [] # The "Formal Log"

        for file in all_files:
            filename = os.path.basename(file).lower()
            
            try:
                partner_key = next((k for k in self.config if self.config[k]['file_identifier'] in filename), None)
                
                if not partner_key:
                    # 1. LOG & PRINT for unknown files
                    print(f" SKIPPED: {filename} (No matching partner config)")
                    self.processing_summary.append({"file": filename, "status": "Skipped", "details": "Unknown Partner"})
                    continue

                # 2. LOG & PRINT for processing start
                print(f" INGESTING: {filename}...")
                cfg = self.config[partner_key]
                raw_df = pd.read_csv(file, sep=cfg['delimiter'])
                std_df = self.transform(raw_df, cfg, filename)
                final_dfs.append(std_df)
                
                # Log success
                self.processing_summary.append({"file": filename, "status": "Success", "details": "Transformed"})

            except Exception as e:
                # 3. LOG & PRINT for failures
                error_msg = str(e)
                print(f" ERROR: {filename} failed. Reason: {error_msg}")
                self.processing_summary.append({
                    "file": filename, 
                    "status": "Failed", 
                    "details": error_msg,
                    "timestamp": datetime.now().strftime("%H:%M:%S")
                })
                # NOTE: In a production environment, this failure would trigger 
                # an automated alert (PagerDuty/Email/Slack) to the On-Call Engineer.
        return pd.concat(final_dfs, ignore_index=True) if final_dfs else pd.DataFrame()



In [0]:
# EXECUTION
engine = EligibilityEngine(config)
unified_eligibility_df = engine.run(LANDING_ZONE)

print("\n--- FINAL STANDARDIZED DATASET ---")
display(unified_eligibility_df)

 INGESTING: acme_eligibility_20241025.txt...
 INGESTING: bettercare_member_extract.csv...

--- FINAL STANDARDIZED DATASET ---


external_id,first_name,last_name,dob,email,phone,partner_code,ingestion_timestamp,source_file
1234567890A,John,Doe,1955-03-15,john.doe@email.com,555-123-4567,ACME_HEALTH,2026-01-15 23:47:42,acme_eligibility_20241025.txt
,Missing,Id,1960-01-01,test@test.com,111-222-3333,ACME_HEALTH,2026-01-15 23:47:42,acme_eligibility_20241025.txt
9998887776B,Martha,Smith,1970-05-20,martha@gmail.com,555-999-8888,ACME_HEALTH,2026-01-15 23:47:42,acme_eligibility_20241025.txt
4445556667C,Bob,Jones,1945-08-12,,555-000-1111,ACME_HEALTH,2026-01-15 23:47:42,acme_eligibility_20241025.txt
BC-001,Alice,Johnson,1965-08-10,alice.j@test.com,555-222-3333,BETTER_CARE,2026-01-15 23:47:42,bettercare_member_extract.csv
BC-002,Charlie,Brown,,charlie@peanuts.com,555-444-5555,BETTER_CARE,2026-01-15 23:47:42,bettercare_member_extract.csv
BC-003,Bad,Phone,1980-01-01,bad@phone.com,,BETTER_CARE,2026-01-15 23:47:42,bettercare_member_extract.csv


## STEP 4: Data Quality Gate & Partitioning

In [0]:
# Validation: Drop rows missing the unique identifier
is_missing = unified_eligibility_df['external_id'].isna() | (unified_eligibility_df['external_id'] == '')
missinexternal_id = unified_eligibility_df[is_missing].copy()
missinexternal_id['rejection_reason'] = "Missing External ID"
display(missinexternal_id)
# NOTE: In production, this 'Quarantine' dataset is exported as a "Rejection Report" 
# and automatically emailed back to the Partner (ACME/BetterCare/Healthfirst) for remediation. 
# This ensures data integrity without stopping the pipeline flow.

external_id,first_name,last_name,dob,email,phone,partner_code,ingestion_timestamp,source_file,rejection_reason
,Missing,Id,1960-01-01,test@test.com,111-222-3333,ACME_HEALTH,2026-01-15 23:47:42,acme_eligibility_20241025.txt,Missing External ID


## STEP 5: Data Persistence & Sink Layer

In [0]:
#Create the Clean Production Dataframe
clean_unified_eligibility_df = unified_eligibility_df[~is_missing].copy()
clean_unified_eligibility_df=clean_unified_eligibility_df.drop(columns=['source_file']) # To ensure the output adheres to the Standardized Output Schema
display(clean_unified_eligibility_df)

external_id,first_name,last_name,dob,email,phone,partner_code,ingestion_timestamp
1234567890A,John,Doe,1955-03-15,john.doe@email.com,555-123-4567,ACME_HEALTH,2026-01-15 23:47:42
9998887776B,Martha,Smith,1970-05-20,martha@gmail.com,555-999-8888,ACME_HEALTH,2026-01-15 23:47:42
4445556667C,Bob,Jones,1945-08-12,,555-000-1111,ACME_HEALTH,2026-01-15 23:47:42
BC-001,Alice,Johnson,1965-08-10,alice.j@test.com,555-222-3333,BETTER_CARE,2026-01-15 23:47:42
BC-002,Charlie,Brown,,charlie@peanuts.com,555-444-5555,BETTER_CARE,2026-01-15 23:47:42
BC-003,Bad,Phone,1980-01-01,bad@phone.com,,BETTER_CARE,2026-01-15 23:47:42


## STEP 6: Data Quality Observability & Reporting

### 6.A: File Ingestion Manifest

In [0]:
print("\n--- FILE PROCESSING LEDGER ---")
# This shows if any files crashed or were skipped
ledger_df = pd.DataFrame(engine.processing_summary)
display(ledger_df)


--- FILE PROCESSING LEDGER ---


file,status,details
acme_eligibility_20241025.txt,Success,Transformed
bettercare_member_extract.csv,Success,Transformed


### 6.B: Record-Level Quality Audit

In [0]:
# --- FINAL AUDIT & QUALITY GATE ---
print("--- Data Pipeline Audit Report ---")

total_raw_processed = len(unified_eligibility_df)
# We use the variable we created during the split
missing_ids_count = len(missinexternal_id) 
valid_ids_count = len(clean_unified_eligibility_df)

print(f"Total Records Processed: {total_raw_processed}")
print(f"Clean Records (Gold):    {valid_ids_count} ")
print(f"Rejected Records (DLQ): {missing_ids_count} ")

# Success Metric with logic
if valid_ids_count > 0:
    success_rate = (valid_ids_count / total_raw_processed) * 100
    print(f"\n Pipeline Success Rate: {success_rate:.2f}%")
else:
    print("\n Warning: No valid records found in this batch.")

--- Data Pipeline Audit Report ---
Total Records Processed: 7
Clean Records (Gold):    6 
Rejected Records (DLQ): 1 

 Pipeline Success Rate: 85.71%
