In [2]:
import pandas as pd
from datetime import datetime
from pathlib import Path
from typing import List, Dict
import time

REQUIRED_COLUMNS = [
    'MIPS Reporting Requirements', 
    'APM Participation', 
    '(Clinician Level) (HARDSHIP EXCEPTION)', 
    'MIPS Eligibility: Individual', 
    'MIPS Eligibility: Group',
    'Data Fetch Status'
]

def load_data(file_path: Path) -> pd.DataFrame:
    """Loads data from an Excel file."""
    try:
        print("Reading input file...")
        data = pd.read_excel(file_path)
        return data
    except Exception as e:
        print(f"Error reading {file_path}: {e}")
        raise

def check_columns(data: pd.DataFrame, required_columns: List[str]) -> List[str]:
    """Checks for missing required columns in the dataframe."""
    missing_columns = [col for col in required_columns if col not in data.columns]
    if missing_columns:
        print("Missing columns in the dataset:")
        print(missing_columns)
    return missing_columns

def add_columns(data: pd.DataFrame) -> pd.DataFrame:
    """Adds calculated columns to the dataframe."""
    data['Required to Report Traditional MIPS'] = data['MIPS Reporting Requirements'].apply(lambda x: 1 if isinstance(x, str) and 'required to report' in x.lower() else 0)
    data['APM Participation Y/N'] = data['APM Participation'].apply(lambda x: 1 if x > 0 else 0)
    data['EUC (Y/N)'] = data['(Clinician Level) (HARDSHIP EXCEPTION)'].apply(lambda x: 1 if x == 'Extreme and uncontrollable circumstances' else 0)
    return data

def aggregate_results(data: pd.DataFrame) -> Dict[str, pd.DataFrame]:
    """Aggregates results for various analyses."""
    results = {}
    data_success = data[data['Data Fetch Status'] == 'Success']

    def aggregate_column(column: str, data_subset: pd.DataFrame = None) -> pd.DataFrame:
        print(f"Analyzing data for {column}")
        if data_subset is None:
            data_subset = data
        agg = data_subset[column].value_counts().reset_index()
        agg.columns = [column, 'Count']
        total_count = agg['Count'].sum()
        agg['Percentage'] = (agg['Count'] / total_count * 100).round().astype(int)
        return pd.concat([agg, pd.DataFrame({column: ['Total'], 'Count': [total_count], 'Percentage': [100]})], ignore_index=True)

    print("Analyzing data for MIPS Reporting")
    results['MIPS_Reporting'] = aggregate_column('MIPS Reporting Requirements')

    print("Analyzing data for Aggregate MIPS Reporting")
    results['Aggregate_MIPS_Reporting'] = aggregate_column('Required to Report Traditional MIPS', data_success)
    
    print("Analyzing data for APM Participation")
    results['APM_Participation'] = aggregate_column('APM Participation')
    
    print("Analyzing data for Aggregate APM Participation")
    results['Aggregate_APM_Participation'] = aggregate_column('APM Participation Y/N', data_success)
    
    apm_participants = data[data['APM Participation Y/N'] == 1]
    non_apm_participants = data[data['APM Participation Y/N'] == 0]

    def compare_participants(participants: pd.DataFrame, label: str) -> pd.DataFrame:
        print(f"Analyzing data for {label}")
        agg = participants['Required to Report Traditional MIPS'].value_counts().reset_index()
        agg.columns = [label, 'Count']
        total_count = agg['Count'].sum()
        agg['Percentage'] = (agg['Count'] / total_count * 100).round().astype(int)
        return pd.concat([agg, pd.DataFrame({label: ['Total'], 'Count': [total_count], 'Percentage': [100]})], ignore_index=True)

    print("Analyzing data for APM vs MIPS")
    results['APM_vs_MIPS'] = compare_participants(apm_participants, 'Required to Report MIPS')
    
    print("Analyzing data for Non-APM vs MIPS")
    results['Non_APM_vs_MIPS'] = compare_participants(non_apm_participants, 'Required to Report MIPS')
    
    print("Analyzing data for Hardship Exception")
    results['Hardship_Exception'] = aggregate_column('(Clinician Level) (HARDSHIP EXCEPTION)')
    
    return results

def euc_analysis(data: pd.DataFrame) -> pd.DataFrame:
    """Performs analysis on EUC (Extreme and Uncontrollable Circumstances) data."""
    print("Analyzing data for Extreme and Uncontrollable Circumstances")
    euc_np_is = data[data['EUC (Y/N)'] == 1]
    required_to_report_count = euc_np_is['Required to Report Traditional MIPS'].sum()
    apm_participation_count = euc_np_is['APM Participation Y/N'].sum()
    mips_eligible_individual_count = euc_np_is['MIPS Eligibility: Individual'].value_counts().get('MIPS Eligible Individual', 0)
    mips_eligible_group_count = euc_np_is['MIPS Eligibility: Group'].value_counts().get('MIPS Eligible Group', 0)

    euc_summary = pd.DataFrame({
        'Category': [
            'Required to Report MIPS', 
            'Participated in an APM', 
            'MIPS Eligible Individual', 
            'MIPS Eligible Group'
        ],
        'Count': [
            required_to_report_count, 
            apm_participation_count, 
            mips_eligible_individual_count, 
            mips_eligible_group_count
        ]
    })

    total_euc_count = euc_np_is.shape[0]
    euc_summary['Percentage'] = (euc_summary['Count'] / total_euc_count * 100).round().astype(int)
    euc_summary = pd.concat([euc_summary, pd.DataFrame({'Category': ['Total'], 'Count': [total_euc_count], 'Percentage': [100]})], ignore_index=True)
    return euc_summary

def create_readme(input_file_path: Path, num_records: int, analysis_time: float) -> pd.DataFrame:
    """Creates a README dataframe with analysis details."""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    readme_content = {
        'Information': [
            'Date and time of analysis', 
            'Input file name', 
            'Number of records in input file',
            'Time to analyze (excluding time to write data to the output file)',
            '',
            'Analysis Tab Name',
            'MIPS_Reporting',
            'Aggregate_MIPS_Reporting',
            'APM_Participation',
            'Aggregate_APM_Participation',
            'APM_vs_MIPS',
            'Non_APM_vs_MIPS',
            'Hardship_Exception',
            'EUC_Analysis'
        ],
        'Details': [
            timestamp, 
            str(input_file_path), 
            num_records,
            f"{analysis_time:.2f} seconds",
            '',
            'Analysis Description',
            'Analysis of Required to Report Traditional MIPS',
            'Aggregated MIPS Reporting Requirements',
            'Analysis of NPI participation in APMs',
            'Analysis of APM Participation Y/N',
            'APM Participation vs. MIPS Reporting',
            'Non-APM Participation vs. MIPS Reporting',
            'Hardship Exception Analysis',
            'Extreme and Uncontrollable Circumstances Analysis'
        ]
    }
    return pd.DataFrame(readme_content)

def save_to_excel(data: pd.DataFrame, results: Dict[str, pd.DataFrame], euc_summary: pd.DataFrame, readme_df: pd.DataFrame, output_file_path: Path) -> None:
    """Saves the analysis results to an Excel file."""
    print("Writing results to output file...")
    with pd.ExcelWriter(output_file_path) as writer:
        readme_df.to_excel(writer, sheet_name='README', index=False)
        data.to_excel(writer, sheet_name='Processed Data', index=False)
        for key, result in results.items():
            result.to_excel(writer, sheet_name=key, index=False)
        euc_summary.to_excel(writer, sheet_name='EUC_Analysis', index=False)

def main(input_file_path: str) -> None:
    """Main function to run the analysis."""
    print("Starting analysis now...")
    input_path = Path(input_file_path)
    if not input_path.exists():
        print(f"Error: Couldn't find the data input file. Please make sure you modified line 144 to the name and location of your input file.")
        return

    start_time = time.time()
    
    data = load_data(input_path)
    missing_columns = check_columns(data, REQUIRED_COLUMNS)
    if missing_columns:
        print("Exiting due to missing columns.")
        return  # Exit if there are missing columns
    
    num_records = len(data)
    data = add_columns(data)
    results = aggregate_results(data)
    euc_summary = euc_analysis(data)
    
    analysis_time = time.time() - start_time
    
    readme_df = create_readme(input_path, num_records, analysis_time)
    timestamp = datetime.now().strftime("%Y%m%d%H%M")
    output_file_path = input_path.parent / f"{timestamp}_analysis-output.xlsx"
    save_to_excel(data, results, euc_summary, readme_df, output_file_path)
    print("Analysis complete. Results saved to:", output_file_path)

# Replace 'your_input_file_path.xlsx' with the actual file path
#input_file_path = 'your_input_file_path.xlsx'
input_file_path = '20240610134119 NPI-11 REPORT.xlsx'
main(input_file_path)


Starting analysis now...
Reading input file...
Analyzing data for MIPS Reporting
Analyzing data for MIPS Reporting Requirements
Analyzing data for Aggregate MIPS Reporting
Analyzing data for Required to Report Traditional MIPS
Analyzing data for APM Participation
Analyzing data for APM Participation
Analyzing data for Aggregate APM Participation
Analyzing data for APM Participation Y/N
Analyzing data for APM vs MIPS
Analyzing data for Required to Report MIPS
Analyzing data for Non-APM vs MIPS
Analyzing data for Required to Report MIPS
Analyzing data for Hardship Exception
Analyzing data for (Clinician Level) (HARDSHIP EXCEPTION)
Analyzing data for Extreme and Uncontrollable Circumstances
Writing results to output file...
Analysis complete. Results saved to: 202406101359_analysis-output.xlsx
