In [0]:
# MMR Flat File to CSV Parser - Converts the generated MMR flat file to CSV
# For use in Databricks environment

import pandas as pd
import numpy as np
from datetime import datetime
from typing import Dict, List, Optional, Any
import re

class MMRFlatFileParser:
    def __init__(self):
        """
        Initialize the parser with the exact field definitions matching the generator.
        Each field is defined as: (start_position, width, field_name, data_type, description)
        Positions are 0-based for Python string slicing.
        """
        self.field_definitions = [
            (0, 5, "contract_number", "str", "Plan Contract Number"),
            (5, 8, "run_date", "date", "Date the file was produced (YYYYMMDD)"),
            (13, 6, "payment_date", "date", "Payment month for the report (YYYYMM)"),
            (19, 12, "beneficiary_id", "str", "HICN/MBI Beneficiary ID"),
            (31, 7, "surname", "str", "Beneficiary last name"),
            (38, 1, "first_initial", "str", "First initial of beneficiary"),
            (39, 1, "sex_code", "str", "Beneficiary Sex Code: M/F"),
            (40, 8, "date_of_birth", "date", "Beneficiary date of birth (YYYYMMDD)"),
            (48, 4, "filler1", "filler", "Spaces"),
            (52, 5, "state_county_code", "str", "Beneficiary State and County Code"),
            (57, 1, "out_of_area_indicator", "flag", "Out of Area indicator"),
            (58, 1, "part_a_entitlement", "flag", "Part A entitlement indicator"),
            (59, 1, "part_b_entitlement", "flag", "Part B entitlement indicator"),
            (60, 1, "hospice", "flag", "Hospice status indicator"),
            (61, 1, "esrd", "flag", "ESRD indicator"),
            (62, 1, "aged_disabled_msp", "flag", "Aged/Disabled MSP indicator"),
            (63, 1, "filler2", "filler", "Spaces"),
            (64, 1, "filler3", "filler", "Spaces"),
            (65, 1, "new_medicare_medicaid_flag", "flag", "New Medicare Beneficiary Medicaid Status"),
            (66, 1, "lti_flag", "flag", "Long Term Institutional Status"),
            (67, 1, "medicaid_addon_indicator", "flag", "Medicaid Add-on Factor Indicator"),
            (68, 2, "filler4", "filler", "Spaces"),
            (70, 1, "default_risk_factor_code", "str", "Default Risk Adjustment Factor Code"),
            (71, 7, "risk_adjustment_factor_a", "decimal", "Part A Risk Adjustment Factor"),
            (78, 7, "risk_adjustment_factor_b", "decimal", "Part B Risk Adjustment Factor"),
            (85, 2, "payment_months_part_a", "int", "Payment/Adjustment Months Part A"),
            (87, 2, "payment_months_part_b", "int", "Payment/Adjustment Months Part B"),
            (89, 2, "adjustment_reason_code", "str", "Adjustment Reason Code (ARC)"),
            (91, 8, "payment_start_date", "date", "Payment/Adjustment Start Date"),
            (99, 8, "payment_end_date", "date", "Payment/Adjustment End Date"),
            (107, 9, "filler5", "filler", "Spaces"),
            (116, 9, "filler6", "filler", "Spaces"),
            (125, 9, "monthly_risk_adjusted_amount_a", "amount", "Monthly Risk Adjusted Amount Part A"),
            (134, 9, "monthly_risk_adjusted_amount_b", "amount", "Monthly Risk Adjusted Amount Part B"),
            (143, 8, "lis_premium_subsidy", "amount", "LIS Premium Subsidy"),
            (151, 1, "esrd_msp_flag", "str", "ESRD MSP Flag"),
            (152, 10, "mtm_addon", "amount", "Medication Therapy Management Add On"),
            (162, 8, "part_d_manufacturer_discount", "amount", "Part D Manufacturer Discount Amount"),
            (170, 1, "medicaid_dual_status", "str", "Medicaid Full/Partial/Non-dual"),
            (171, 4, "risk_adjustment_age_group", "str", "Risk Adjustment Age Group (RAAG)"),
            (175, 7, "filler7", "filler", "Spaces"),
            (182, 1, "filler8", "filler", "Spaces"),
            (183, 1, "filler9", "filler", "Spaces"),
            (184, 3, "plan_benefit_package_id", "str", "Plan Benefit Package ID"),
            (187, 1, "filler10", "filler", "Spaces"),
            (188, 2, "risk_adjustment_factor_type", "str", "Risk Adjustment Factor Type Code"),
            (190, 1, "frailty_indicator", "flag", "Frailty Indicator (PACE/FIDE SNP)"),
            (191, 1, "orec", "str", "Original Reason for Entitlement Code"),
            (192, 1, "filler11", "filler", "Spaces"),
            (193, 3, "segment_number", "str", "Segment Number"),
            (196, 1, "filler12", "filler", "Spaces"),
            (197, 1, "eghp_flag", "flag", "EGHP Flag"),
            (198, 8, "part_c_basic_premium_a", "amount", "Part C Basic Premium – Part A Amount"),
            (206, 8, "part_c_basic_premium_b", "amount", "Part C Basic Premium – Part B Amount"),
            (214, 8, "rebate_part_a_cost_sharing", "amount", "Rebate for Part A Cost Sharing Reduction"),
            (222, 8, "rebate_part_b_cost_sharing", "amount", "Rebate for Part B Cost Sharing Reduction"),
            (230, 8, "rebate_part_a_supplemental", "amount", "Rebate for Other Part A Mandatory Supplemental Benefits"),
            (238, 8, "rebate_part_b_supplemental", "amount", "Rebate for Other Part B Mandatory Supplemental Benefits"),
            (246, 8, "rebate_part_b_premium_reduction_a", "amount", "Rebate for Part B Premium Reduction – Part A Amount"),
            (254, 135, "filler13", "filler", "Large filler section (positions 255-389)"),
            (389, 2, "payment_months_part_d", "int", "Payment/Adjustment Months Part D"),
            (391, 10, "pace_premium_addon", "amount", "PACE Premium Add On"),
            (401, 10, "pace_cost_sharing_addon", "amount", "PACE Cost Sharing Add-on"),
            (411, 7, "part_c_frailty_factor", "decimal", "Part C Frailty Factor"),
            (418, 7, "msp_reduction_factor", "decimal", "MSP Reduction Factor"),
            (425, 10, "msp_reduction_amount_a", "amount", "MSP Reduction Amount Part A"),
            (435, 10, "msp_reduction_amount_b", "amount", "MSP Reduction Amount Part B"),
            (445, 2, "medicaid_dual_status_code", "str", "Medicaid Dual Status Code"),
            (447, 8, "part_d_coverage_gap_discount", "amount", "Part D Coverage Gap Discount Amount"),
            (455, 2, "part_d_risk_adjustment_type", "str", "Part D Risk Adjustment Factor Type"),
            (457, 1, "filler14", "filler", "Filler"),
            (458, 9, "part_a_monthly_rate", "amount", "Part A Monthly Rate for Payment or Adjustment"),
            (467, 9, "part_b_monthly_rate", "amount", "Part B Monthly Rate for Payment or Adjustment"),
            (476, 9, "part_d_monthly_rate", "amount", "Part D Monthly Rate for Payment or Adjustment"),
            (485, 10, "cleanup_id", "str", "Cleanup ID")
        ]
        
        # Create lookup dictionary for field names
        self.field_dict = {field[2]: field for field in self.field_definitions}
        
        # Verify total length
        total_length = max(pos + width for pos, width, _, _, _ in self.field_definitions)
        # print(f"Parser configured for records of {total_length} characters")
    
    def parse_field_value(self, raw_value: str, data_type: str, field_name: str = "") -> Any:
        """
        Parse and convert field values based on their data type.
        
        Args:
            raw_value (str): Raw field value from flat file
            data_type (str): Expected data type
            field_name (str): Field name for context
            
        Returns:
            Any: Converted value
        """
        # Clean the raw value
        cleaned_value = raw_value.strip()
        
        if data_type == "filler":
            return None  # Don't include filler fields in output
        
        elif data_type == "str":
            return cleaned_value if cleaned_value else None
        
        elif data_type == "int":
            if cleaned_value:
                try:
                    return int(cleaned_value)
                except ValueError:
                    return None
            return None
        
        elif data_type == "decimal":
            if cleaned_value and cleaned_value.strip():
                try:
                    return float(cleaned_value)
                except ValueError:
                    return None
            return None
        
        elif data_type == "amount":
            if cleaned_value and cleaned_value.strip():
                try:
                    # Handle monetary amounts with decimal points
                    return float(cleaned_value)
                except ValueError:
                    return None
            return None
        
        elif data_type == "date":
            if cleaned_value:
                try:
                    if len(cleaned_value) == 8:  # YYYYMMDD format
                        date_obj = datetime.strptime(cleaned_value, "%Y%m%d")
                        return date_obj.strftime("%Y-%m-%d")
                    elif len(cleaned_value) == 6:  # YYYYMM format
                        date_obj = datetime.strptime(cleaned_value, "%Y%m")
                        return date_obj.strftime("%Y-%m")
                    else:
                        return cleaned_value
                except ValueError:
                    return cleaned_value
            return None
        
        elif data_type == "flag":
            # Convert Y/N flags to integer 1/0, space to None
            if cleaned_value == "Y":
                return int(1)
            elif cleaned_value == "N":
                return int(0)
            else:
                return None
        
        else:
            return cleaned_value if cleaned_value else None
    
    def parse_record(self, record_line: str) -> Dict[str, Any]:
        """
        Parse a single record from the MMR flat file.
        
        Args:
            record_line (str): Single line from the flat file
            
        Returns:
            Dict[str, Any]: Parsed field values
        """
        # Ensure the line is long enough
        record_line = record_line.ljust(495)
        
        parsed_record = {}
        
        for start_pos, width, field_name, data_type, description in self.field_definitions:
            # Extract the raw field value
            end_pos = start_pos + width
            raw_value = record_line[start_pos:end_pos]
            
            # Parse the value based on its type
            parsed_value = self.parse_field_value(raw_value, data_type, field_name)
            
            # Only include non-filler fields
            if data_type != "filler":
                parsed_record[field_name] = parsed_value
        
        return parsed_record
    
    def parse_flat_file(self, file_path: str) -> pd.DataFrame:
        """
        Parse the entire MMR flat file and return as DataFrame.
        
        Args:
            file_path (str): Path to the MMR flat file
            
        Returns:
            pd.DataFrame: Parsed data
        """
        records = []
        
        try:
            with open(file_path, 'r', encoding='utf-8') as file:
                for line_num, line in enumerate(file, 1):
                    line = line.rstrip('\n\r')
                    
                    if line.strip():  # Skip empty lines
                        try:
                            parsed_record = self.parse_record(line)
                            parsed_record['record_number'] = line_num
                            records.append(parsed_record)
                        except Exception as e:
                            print(f"Error parsing line {line_num}: {e}")
                            continue
        
        except FileNotFoundError:
            raise FileNotFoundError(f"File not found: {file_path}")
        except Exception as e:
            raise Exception(f"Error reading file: {e}")
        
        if not records:
            raise ValueError("No valid records found in file")
        
        df = pd.DataFrame(records)
        return df
    
    def parse_flat_file_string(self, file_content: str) -> pd.DataFrame:
        """
        Parse MMR flat file content from a string.
        
        Args:
            file_content (str): Content of the MMR flat file
            
        Returns:
            pd.DataFrame: Parsed data
        """
        records = []
        lines = file_content.split('\n')
        
        for line_num, line in enumerate(lines, 1):
            line = line.rstrip('\r')
            
            if line.strip():  # Skip empty lines
                try:
                    parsed_record = self.parse_record(line)
                    parsed_record['record_number'] = line_num
                    records.append(parsed_record)
                except Exception as e:
                    print(f"Error parsing line {line_num}: {e}")
                    continue
        
        if not records:
            raise ValueError("No valid records found in content")
        
        df = pd.DataFrame(records)
        return df
    
    def export_to_csv(self, df: pd.DataFrame, output_path: str):
        """
        Export DataFrame to clean CSV format.
        
        Args:
            df (pd.DataFrame): Parsed MMR data
            output_path (str): Output CSV file path
        """
        df.to_csv(output_path, index=False, encoding='utf-8')
    
    def get_field_summary(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Generate a summary of parsed fields.
        
        Args:
            df (pd.DataFrame): Parsed MMR data
            
        Returns:
            pd.DataFrame: Field summary statistics
        """
        summary_data = []
        
        for _, _, field_name, data_type, description in self.field_definitions:
            if data_type != "filler" and field_name in df.columns:
                col = df[field_name]
                
                summary_info = {
                    'field_name': field_name,
                    'data_type': data_type,
                    'description': description,
                    'total_records': len(col),
                    'non_null_count': col.notna().sum(),
                    'null_count': col.isna().sum(),
                    'unique_values': col.nunique(),
                    'sample_values': str(col.dropna().head(3).tolist())[:100]
                }
                
                summary_data.append(summary_info)
        
        return pd.DataFrame(summary_data)

# Databricks-specific functions
def read_mmr_from_dbfs(dbfs_path: str) -> str:
    """Read MMR file from Databricks File System."""
    try:
        with open(f"/dbfs{dbfs_path}", 'r', encoding='utf-8') as file:
            return file.read()
    except Exception as e:
        raise Exception(f"Error reading from DBFS: {e}")

def mmr_to_spark_dataframe(file_content: str, spark_session):
    """Convert MMR content to Spark DataFrame for Databricks."""
    parser = MMRFlatFileParser()
    pandas_df = parser.parse_flat_file_string(file_content)
    return spark_session.createDataFrame(pandas_df)

# Example usage
if __name__ == "__main__":
    # Initialize parser
    parser = MMRFlatFileParser()
    
    # Parse the flat file and export to clean CSV
    try:
        df = parser.parse_flat_file("mmr_mock_flatfile.txt")
        parser.export_to_csv(df, "mmr_parsed_output.csv")
        
    except FileNotFoundError:
        print("MMR flat file not found. Run the generator first to create 'mmr_mock_flatfile.txt'")
    except Exception as e:
        print(f"Error: {e}")