# Automozed EPIC validation process against secuTrial data entries

created by: Yasaman Safarkhanlo on 2024.10.07

last modified: file name


In [1]:
import os
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime
import chardet
import logging
import re
import io
from typing import Dict, Any, Optional, Tuple, List, Union

In [2]:
def setup_logging():
    """Configure logging for the application, works both locally and in Docker"""
    # Detect environment: if running in Docker, use /app/data/logs; else, use ./logs
    base_dir = os.getenv('BASE_DIR', '.')  # Docker should set BASE_DIR=/app/data
    log_dir = Path(base_dir) / "logs"
    log_dir.mkdir(parents=True, exist_ok=True)

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    log_file = log_dir / f"validation_service_{timestamp}.log"

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file),
            logging.StreamHandler()
        ]
    )

    return logging.getLogger('epic-validation')

logger = setup_logging()

## Read files


In [3]:
def read_and_modify_secuTrial_export(df):
    """
    Process secuTrial export dataframe by removing metadata rows and setting proper headers.
    """
    try:
        return (df.iloc[6:]
                 .pipe(lambda x: x.set_axis(x.iloc[0], axis=1))
                 .iloc[1:]
                 .reset_index(drop=True)
                 .dropna(how='all'))
    except Exception as e:
        logger.error(f"Error processing secuTrial export: {e}")
        return df

def safe_read_file(file_path, custom_reader=None):
    """
    Safely reads a file (Excel or CSV), with an option for a custom reader function.
    """
    file_path = Path(file_path)
    file_extension = file_path.suffix.lower()

    try:
        if file_extension in [".xlsx", ".xls"]:
            if custom_reader:
                df = pd.read_excel(file_path, engine='openpyxl' if file_extension == ".xlsx" else 'xlrd', header=None)
            else:
                df = pd.read_excel(file_path, engine='openpyxl' if file_extension == ".xlsx" else 'xlrd')
        elif file_extension == ".csv":
            encodings = ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']
            df = None
            for encoding in encodings:
                try:
                    df = pd.read_csv(file_path, encoding=encoding)
                    break
                except UnicodeDecodeError:
                    continue
            if df is None:
                raise ValueError("Could not read CSV with any encoding")
        else:
            raise ValueError(f"Unsupported file type: {file_extension}")
        
        result = custom_reader(df) if custom_reader else df

        if result is None or result.empty:
            logger.warning(f"{file_path.name} is empty after processing.")
            return None

        return result

    except FileNotFoundError:
        logger.error(f"File not found at {file_path}")
    except Exception as e:
        logger.error(f"Error reading file at {file_path}: {e}")
    
    return None

In [4]:
base_dir = Path("/Users/yaskhanloo/Developer/bern-storke-center")

# Dynamically find the latest export folders
latest_sT_export = max((base_dir / "sT-files").glob("export-*"), key=lambda x: x.stat().st_mtime, default=None)
latest_EPIC_export = max((base_dir / "EPIC-files").glob("export-*"), key=lambda x: x.stat().st_mtime, default=None)

if latest_sT_export:
    secuTrial_base_dir = latest_sT_export
    REVASC_base_dir = secuTrial_base_dir / "REVASC"
    logger.info(f"Latest secuTrial export found: {secuTrial_base_dir}")
else:
    logger.error("No valid secuTrial export directory found.")
    raise FileNotFoundError("No valid secuTrial export directory found.")

if latest_EPIC_export:
    epic_base_dir = latest_EPIC_export
    logger.info(f"Latest EPIC export found: {epic_base_dir}")
else:
    logger.error("No valid EPIC export directory found.")
    raise FileNotFoundError("No valid EPIC export directory found.")

# Define file paths
file_path_secuTrial = secuTrial_base_dir / 'SSR_cases_of_2024.xlsx'
file_path_REVASC = REVASC_base_dir / 'report_SSR01_20250218-105747.xlsx'
file_path_EPIC = epic_base_dir / 'encounters.xlsx'

# Read files
df_secuTrial = safe_read_file(file_path_secuTrial, custom_reader=read_and_modify_secuTrial_export)
df_REVASC = safe_read_file(file_path_REVASC, custom_reader=read_and_modify_secuTrial_export)
df_EPIC = safe_read_file(file_path_EPIC)

# Log data frame sizes
if df_secuTrial is not None and df_EPIC is not None and df_REVASC is not None:
    logger.info(f"Data loaded successfully: secuTrial={df_secuTrial.shape}, REVASC={df_REVASC.shape}, EPIC={df_EPIC.shape}")
else:
    logger.warning("One or more dataframes failed to load.")

2025-05-23 14:25:43,438 - epic-validation - INFO - Latest secuTrial export found: /Users/yaskhanloo/Developer/bern-storke-center/sT-files/export-20250520
2025-05-23 14:25:43,439 - epic-validation - INFO - Latest EPIC export found: /Users/yaskhanloo/Developer/bern-storke-center/EPIC-files/export-20250516
  warn("Workbook contains no default style, apply openpyxl's default")
  warn("Workbook contains no default style, apply openpyxl's default")
2025-05-23 14:25:59,443 - epic-validation - INFO - Data loaded successfully: secuTrial=(1803, 174), REVASC=(4980, 256), EPIC=(2543, 18)


### Merge all EPIC files into one


In [5]:
def merge_single_epic_file(file_path, merge_column, merged_df, prefix=""):
    """
    Merge a single EPIC file into the main DataFrame with optional column prefixing.
    """
    df = safe_read_file(file_path)
    if df is None:
        logger.warning(f"Failed to read {file_path.name}")
        return merged_df

    if merge_column not in df.columns:
        logger.warning(f"Merge column '{merge_column}' not found in {file_path.name}")
        return merged_df

    # Add prefix to all columns except the merge column
    if prefix:
        df = df.rename(columns={col: f"{prefix}{col}" for col in df.columns if col != merge_column})

    # Merge logic
    if merged_df.empty:
        result_df = df.copy()
        logger.info(f"Using {file_path.name} as base: shape={result_df.shape}")
    else:
        result_df = merged_df.merge(df, on=merge_column, how="outer")
        logger.info(f"Merged {file_path.name}: shape={df.shape} → total={result_df.shape}")

    return result_df

def find_merge_column(directory):
    """Find the correct merge column by checking the first file"""
    directory = Path(directory)
    file_patterns = ["*.xlsx", "*.xls", "*.csv"]
    all_files = [f for pattern in file_patterns for f in directory.glob(pattern)]
    
    if not all_files:
        return None
        
    # Check first file for common merge columns
    first_file = all_files[0]
    df = safe_read_file(first_file)
    if df is not None and len(df.columns) > 1:
        possible_columns = ['PAT_ENC_CSN_ID', 'PatientID', 'ID', 'Patient_ID', 'CSN_ID']
        for col in possible_columns:
            if col in df.columns:
                logger.info(f"Found merge column: {col}")
                return col
        
        logger.info(f"Available columns in {first_file.name}: {list(df.columns)}")
        # Return the first column that looks like an ID
        for col in df.columns:
            if any(word in col.upper() for word in ['ID', 'CSN', 'PATIENT']):
                logger.info(f"Using merge column: {col}")
                return col
    
    return 'PAT_ENC_CSN_ID'  # Default fallback

def merge_all_epic_files(directory, merge_column=None):
    """
    Merges all EPIC files in a directory based on a specific column, in a defined order.
    """
    directory = Path(directory)
    if not directory.exists():
        logger.error(f"Directory not found: {directory}")
        raise FileNotFoundError(f"{directory} does not exist.")

    file_patterns = ["*.xlsx", "*.xls", "*.csv"]
    all_files = [f for pattern in file_patterns for f in directory.glob(pattern)]
    logger.info(f"Found {len(all_files)} data files in {directory.name}")

    # Auto-detect merge column if not provided
    if merge_column is None:
        merge_column = find_merge_column(directory)
        if merge_column is None:
            logger.error("Could not find a suitable merge column")
            return pd.DataFrame()

    file_order = ['enc', 'flow', 'imag', 'img', 'lab', 'med', 'mon']

    def file_priority(file_path):
        name = file_path.stem.lower()
        for i, keyword in enumerate(file_order):
            if keyword in name:
                return i
        return len(file_order)

    def get_prefix(filename):
        name = filename.lower()
        if 'enc' in name: return 'enct.'
        if 'flow' in name: return 'flow.'
        if 'imag' in name or 'img' in name: return 'img.'
        if 'lab' in name: return 'lab.'
        if 'med' in name: return 'med.'
        if 'mon' in name: return 'mon.'
        return ""

    sorted_files = sorted(all_files, key=file_priority)

    merged_df = pd.DataFrame()
    for file_path in sorted_files:
        prefix = get_prefix(file_path.stem)
        merged_df = merge_single_epic_file(file_path, merge_column, merged_df, prefix)
    return merged_df

In [6]:
try:
    # List all files in the EPIC export directory
    logger.info(f"Listing files in EPIC export directory: {epic_base_dir}")
    all_files = list(Path(epic_base_dir).glob("*"))
    for file in all_files:
        logger.debug(f"  - {file.name}")
    
    # Merge all EPIC files
    df_EPIC_all = merge_all_epic_files(epic_base_dir, merge_column="PAT_ENC_CSN_ID")
    
    if not df_EPIC_all.empty:
        logger.info(f"Final merged DataFrame shape: {df_EPIC_all.shape}")

        # Save the merged dataframe
        output_path = Path(base_dir) / "EPIC-files/merged_epic_files/merged_epic_data.csv"
        df_EPIC_all.to_csv(output_path, index=False)
        logger.info(f"Merged data saved to: {output_path}")
    else:
        logger.warning("Merged DataFrame is empty. Nothing saved.")

except FileNotFoundError as e:
    logger.error(f"Error: Directory not found - {e}")
except Exception as e:
    logger.exception(f"An unexpected error occurred during merging.")

2025-05-23 14:25:59,484 - epic-validation - INFO - Listing files in EPIC export directory: /Users/yaskhanloo/Developer/bern-storke-center/EPIC-files/export-20250516
2025-05-23 14:25:59,485 - epic-validation - INFO - Found 6 data files in export-20250516
2025-05-23 14:25:59,758 - epic-validation - INFO - Using encounters.xlsx as base: shape=(2543, 18)
2025-05-23 14:26:00,139 - epic-validation - INFO - Merged flowsheet.xlsx: shape=(2543, 31) → total=(2543, 48)
2025-05-23 14:26:00,337 - epic-validation - INFO - Merged imaging.xlsx: shape=(2543, 16) → total=(2543, 63)
2025-05-23 14:26:00,522 - epic-validation - INFO - Merged lab.xlsx: shape=(2543, 14) → total=(2543, 76)
2025-05-23 14:26:00,805 - epic-validation - INFO - Merged medication.xlsx: shape=(2543, 23) → total=(2543, 98)
2025-05-23 14:26:01,029 - epic-validation - INFO - Merged monitor.xlsx: shape=(2543, 18) → total=(2543, 115)
2025-05-23 14:26:01,030 - epic-validation - INFO - Final merged DataFrame shape: (2543, 115)
2025-05-23 1

## Data Cleaning and Merging


### REVASC merge with sT - single year


In [7]:
def merge_secuTrial_with_REVASC(df_secuTrial, df_REVASC, logger):
    """Merge REVASC data into secuTrial DataFrame."""
    try:
        merged_df = df_secuTrial.merge(
            df_REVASC,
            how='left',
            left_on='Case ID',
            right_on='CaseID',
            suffixes=('', '.revas')
        )
        merged_df.drop(columns=['CaseID'], inplace=True, errors='ignore')
        merged_df.reset_index(drop=True, inplace=True)
        
        logger.info(f"Successfully merged secuTrial + REVASC: {merged_df.shape}")
        return merged_df
        
    except Exception as e:
        logger.error(f"REVASC merge failed: {e}. Using secuTrial data only.")
        return df_secuTrial.copy()

In [8]:
df_secuTrial_w_REVAS = merge_secuTrial_with_REVASC(df_secuTrial, df_REVASC, logger)

2025-05-23 14:26:01,108 - epic-validation - INFO - Successfully merged secuTrial + REVASC: (1803, 429)


### Add FID and SSR


In [9]:
def load_and_process_id_log(id_log_path, logger):
    """Load and process the ID log file."""
    try:
        id_log = pd.read_excel(id_log_path)
        logger.info(f"ID log original columns: {list(id_log.columns)}")
        
        # Set first row as headers
        id_log.columns = id_log.iloc[0]
        id_log = id_log.iloc[1:].reset_index(drop=True)
        logger.info(f"ID log columns after header fix: {list(id_log.columns)}")
        
        # Map the actual column names we found
        column_mapping = {}
        for col in id_log.columns:
            if pd.isna(col):  # Skip NaN columns
                continue
            col_str = str(col).strip()
            if 'Fall-Nr.' in col_str:
                column_mapping[col] = 'FID'
            elif 'SSR Identification' in col_str:
                column_mapping[col] = 'SSR'
        
        logger.info(f"Column mapping: {column_mapping}")
        id_log.rename(columns=column_mapping, inplace=True)
        
        # Remove NaN columns
        id_log = id_log.loc[:, ~id_log.columns.isna()]
        
        # Check if we have the required columns
        if 'FID' not in id_log.columns or 'SSR' not in id_log.columns:
            logger.error(f"Required columns not found. Available: {list(id_log.columns)}")
            logger.error("Expected to find 'Fall-Nr.' and 'SSR Identification' columns")
            return None
            
        # Convert to appropriate data types
        id_log['FID'] = pd.to_numeric(id_log['FID'], errors='coerce')
        id_log['SSR'] = pd.to_numeric(id_log['SSR'], errors='coerce')
        
        # Remove rows with missing FID or SSR
        initial_count = len(id_log)
        id_log = id_log.dropna(subset=['FID', 'SSR'])
        final_count = len(id_log)
        
        if final_count < initial_count:
            logger.warning(f"Removed {initial_count - final_count} rows with missing FID/SSR")
            
        logger.info(f"Loaded ID log with {final_count} valid entries")
        return id_log
    except Exception as e:
        logger.error(f"Failed to load ID log: {e}")
        return None

In [10]:
def add_patient_ids(df_epic, df_secuTrial, id_log, logger):
    """Add FID and SSR columns to both dataframes."""
    
    # Add FID to EPIC data
    if 'img.FID' in df_epic.columns:
        df_epic['FID'] = df_epic['img.FID'].fillna(0).astype(int)
        df_epic.insert(0, 'FID', df_epic.pop('FID'))
    else:
        logger.warning("img.FID column not found in EPIC data")
    
    # Add SSR to secuTrial data
    if 'Case ID' in df_secuTrial.columns:
        df_secuTrial['SSR'] = df_secuTrial['Case ID'].str.extract(r'(\d+)$').astype(int)
        df_secuTrial.insert(1, 'SSR', df_secuTrial.pop('SSR'))
        # Clean up any 'nan' columns
        df_secuTrial = df_secuTrial.drop(columns=['nan'], errors='ignore')
    else:
        logger.warning("Case ID column not found in secuTrial data")
    
    # Merge with ID log
    if id_log is not None:
        df_epic = df_epic.merge(id_log[['FID', 'SSR']], on='FID', how='left')
        df_epic.insert(1, 'SSR', df_epic.pop('SSR'))
        
        df_secuTrial = df_secuTrial.merge(id_log[['SSR', 'FID']], on='SSR', how='left')
        df_secuTrial.insert(0, 'FID', df_secuTrial.pop('FID'))
        
        logger.info("Successfully added patient IDs to both dataframes")
    
    return df_epic, df_secuTrial

In [11]:
def find_matching_patients(df_epic, df_secuTrial, logger):
    """Find patients that exist in both datasets."""
    
    # Find common patients by FID and SSR
    common_keys = df_secuTrial[['FID', 'SSR']].merge(
        df_epic[['FID', 'SSR']], 
        on=['FID', 'SSR'], 
        how='inner'
    )
    
    # Filter to matching patients only
    df_epic_common = df_epic.merge(common_keys, on=['FID', 'SSR'], how='inner')
    df_secuTrial_common = df_secuTrial.merge(common_keys, on=['FID', 'SSR'], how='inner')
    
    logger.info(f"Found {len(common_keys)} matching patients")
    logger.info(f"EPIC common shape: {df_epic_common.shape}")
    logger.info(f"secuTrial common shape: {df_secuTrial_common.shape}")
    
    return df_epic_common, df_secuTrial_common

In [12]:
def find_missing_patients(df_epic, df_secuTrial, logger):
    """Find patients that exist in only one dataset."""
    
    # Patients only in secuTrial
    df_secuTrial_only = df_secuTrial.merge(
        df_epic[['FID', 'SSR']], 
        on=['FID', 'SSR'], 
        how='left', 
        indicator=True
    ).query('_merge == "left_only"').drop(columns=['_merge'])
    
    # Patients only in EPIC
    df_epic_only = df_epic.merge(
        df_secuTrial[['FID', 'SSR']], 
        on=['FID', 'SSR'], 
        how='left', 
        indicator=True
    ).query('_merge == "left_only"').drop(columns=['_merge'])
    
    logger.info(f"Patients only in secuTrial: {len(df_secuTrial_only)}")
    logger.info(f"Patients only in EPIC: {len(df_epic_only)}")
    
    return df_secuTrial_only, df_epic_only

In [13]:
def save_patient_analysis(df_epic_common, df_secuTrial_common, 
                         df_epic_only, df_secuTrial_only, output_dir, logger):
    """Save patient matching analysis to Excel files."""
    
    output_dir.mkdir(parents=True, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    # Save common patients
    common_file = output_dir / f"common_patients_{timestamp}.xlsx"
    with pd.ExcelWriter(common_file) as writer:
        df_secuTrial_common.to_excel(writer, sheet_name="secuTrial_common", index=False)
        df_epic_common.to_excel(writer, sheet_name="EPIC_common", index=False)
    
    # Save missing patients (with only relevant columns)
    missing_file = output_dir / f"missing_patients_{timestamp}.xlsx"
    
    # Select relevant columns for comparison
    secuTrial_cols = ['FID', 'SSR', 'Last name', 'First name', 'DOB', 'Arrival at hospital']
    epic_cols = ['FID', 'SSR', 'enct.name_last', 'enct.name_first', 'enct.birth_date', 'enct.arrival_date']
    
    # Only include columns that exist
    secuTrial_subset = df_secuTrial_only[[col for col in secuTrial_cols if col in df_secuTrial_only.columns]]
    epic_subset = df_epic_only[[col for col in epic_cols if col in df_epic_only.columns]]
    
    with pd.ExcelWriter(missing_file) as writer:
        secuTrial_subset.to_excel(writer, sheet_name="only_in_secuTrial", index=False)
        epic_subset.to_excel(writer, sheet_name="only_in_EPIC", index=False)
    
    logger.info(f"Patient analysis saved to {common_file} and {missing_file}")
    
    return df_epic_common, df_secuTrial_common

In [14]:
def process_patient_matching(df_epic, df_secuTrial, id_log_path, output_dir, logger):
    """
    Complete patient matching workflow.
    
    Returns:
        tuple: (df_epic_common, df_secuTrial_common) - datasets with only matching patients
    """
    
    # Load ID log
    id_log = load_and_process_id_log(id_log_path, logger)
    if id_log is None:
        return df_epic, df_secuTrial  # Return original data if ID log fails
    
    # Add patient IDs
    df_epic, df_secuTrial = add_patient_ids(df_epic, df_secuTrial, id_log, logger)
    
    # Find matching and missing patients
    df_epic_common, df_secuTrial_common = find_matching_patients(df_epic, df_secuTrial, logger)
    df_secuTrial_only, df_epic_only = find_missing_patients(df_epic, df_secuTrial, logger)
    
    # Save analysis
    df_epic_common, df_secuTrial_common = save_patient_analysis(
        df_epic_common, df_secuTrial_common, 
        df_epic_only, df_secuTrial_only, 
        output_dir, logger
    )
    
    return df_epic_common, df_secuTrial_common

In [15]:
output_dir = base_dir / 'EPIC-export-validation/validation-files'
id_log_path = base_dir / 'EPIC2sT-pipeline/Identification_log_SSR_2024_ohne PW_26.03.25.xlsx'

df_epic_common, df_secuTrial_common = process_patient_matching(
    df_EPIC_all, 
    df_secuTrial_w_REVAS, 
    id_log_path, 
    output_dir, 
    logger
)

logger.info("Patient matching completed successfully!")

2025-05-23 14:26:01,415 - epic-validation - INFO - ID log original columns: ['Anzahl', 'Unnamed: 1', 'Unnamed: 2', 'Unnamed: 3', 'Unnamed: 4', 'Unnamed: 5', 'Unnamed: 6', 'Unnamed: 7', 1777, 'Unnamed: 9', 'Unnamed: 10', 'Unnamed: 11', 'Unnamed: 12']
2025-05-23 14:26:01,415 - epic-validation - INFO - ID log columns after header fix: ['Kommentar / Studie', 'Pat.nr.', 'SSR Identification SSR-INS-000....', 'Lysenr', 'Fall-Nr.', 'Name', 'Vorname', 'Geburts- datum', 'Eintritt', 'Geschlecht', 'Follow up done', nan, nan]
2025-05-23 14:26:01,416 - epic-validation - INFO - Column mapping: {'SSR Identification SSR-INS-000....': 'SSR', 'Fall-Nr.': 'FID'}
2025-05-23 14:26:01,418 - epic-validation - INFO - Loaded ID log with 1775 valid entries
2025-05-23 14:26:01,446 - epic-validation - INFO - Successfully added patient IDs to both dataframes
2025-05-23 14:26:01,461 - epic-validation - INFO - Found 1435 matching patients
2025-05-23 14:26:01,461 - epic-validation - INFO - EPIC common shape: (1435, 11

## Comparison phase

#### Map Key!!


In [16]:
# Define reusable mappings with better variable names
BOOLEAN_TO_YES_NO = {0: 'no', 1: 'yes', False: 'no', True: 'yes'}
BILATERAL_LOCATION_MAPPING = {0: 'no', 1: '', 2: 'right', 3: 'left', 4: 'bilateral'}
PROSTHETIC_VALVE_TYPES = {0: 'None', 1: 'Biological', 2: 'Mechanical'}

# Image type mapping - all CT types = 1, all MRI types = 2
IMAGE_TYPE_TO_NUMERIC = {
    'CT': 1, 
    'MRI': 2, 
    'CT (external)': 1, 
    'MRI (external)': 2,
    'CT-angiography': 1, 
    'MR-angiography': 2
}

TRANSPORT_METHODS = {1: 'Ambulance', 2: 'Helicopter', 3: 'Other (taxi,self,relatives,friends...)'}

DISCHARGE_DESTINATIONS = {
    1: 'Home', 
    3: 'Rehabilitation Hospital', 
    2: 'Other acute care hospital', 
    4: 'Nursing home, palliative care center, or other medical facility'
}

BOOLEAN_COLUMNS = [
    'flow.iat_stentintracran', 
    'flow.iat_stentextracran', 
    'flow.stroke_pre', 
    'flow.tia_pre', 
    'flow.ich_pre',
    'flow.hypertension', 
    'flow.diabetes', 
    'flow.hyperlipidemia', 
    'flow.smoking', 
    'flow.atrialfib', 
    'flow.chd',
    'flow.lowoutput', 
    'flow.pad', 
    'flow.decompression', 
    'img.iat_mech', 
    'img.follow_mra', 
    'img.follow_cta',
    'img.follow_ultrasound', 
    'img.follow_dsa', 
    'img.follow_tte', 
    'img.follow_tee', 
    'img.follow_holter',
    'med.aspirin_pre', 
    'med.clopidogrel_pre', 
    'med.prasugrel_pre', 
    'med.ticagrelor_pre', 
    'med.dipyridamole_pre',
    'med.vka_pre', 
    'med.rivaroxaban_pre', 
    'med.dabigatran_pre', 
    'med.apixaban_pre', 
    'med.edoxaban_pre',
    'med.parenteralanticg_pre', 
    'med.antihypertensive_pre', 
    'med.antilipid_pre', 
    'med.hormone_pre',
    'med.treat_antiplatelet', 
    'med.treat_anticoagulant', 
    'med.treat_ivt'
]

BILATERAL_ANATOMY_COLUMNS = ['flow.mca', 'flow.aca', 'flow.pca', 'flow.vertebrobasilar']

In [17]:
def create_epic_value_mappings():
    """
    Creates and returns the value mappings dictionary for EPIC data standardization.
    This function is Docker-compatible and uses only standard Python libraries.
    
    Returns:
        dict: Dictionary mapping column names to their value transformation mappings
    """
    
    # Start with specific column mappings
    epic_value_mappings = {
        'enct.non_swiss': {True: 'yes'},
        'enct.sex': {1: 'Male', 2: 'Female'},
        'enct.transport': TRANSPORT_METHODS,
        'enct.discharge_destinat': DISCHARGE_DESTINATIONS,
        'flow.firstangio_result': {2: 'no', 3: 'yes'},
        'flow.prostheticvalves': PROSTHETIC_VALVE_TYPES,
        'img.firstimage_type': IMAGE_TYPE_TO_NUMERIC,
        'img.firstangio_type': IMAGE_TYPE_TO_NUMERIC
    }
    
    # Add boolean mappings for all boolean columns
    for column_name in BOOLEAN_COLUMNS:
        epic_value_mappings[column_name] = BOOLEAN_TO_YES_NO
    
    # Add bilateral mappings for anatomy columns
    for column_name in BILATERAL_ANATOMY_COLUMNS:
        epic_value_mappings[column_name] = BILATERAL_LOCATION_MAPPING
    
    return epic_value_mappings

# Create the mappings
EPIC_VALUE_MAPPINGS = create_epic_value_mappings()

In [19]:
def standardize_boolean_values(value: Any) -> Union[str, Any]:
    """
    Standardize boolean values to consistent 'yes'/'no' format.
    
    Args:
        value: Input value to standardize
        
    Returns:
        Standardized value ('yes', 'no', or original value if not boolean-like)
    """
    if pd.isna(value):
        return pd.NA
    
    if isinstance(value, bool):
        return "yes" if value else "no"
    elif isinstance(value, (int, float)):
        return "yes" if value else "no"
    elif isinstance(value, str):
        value_lower = value.lower().strip()
        if value_lower in ['true', 'yes', 'y', '1', 't']:
            return "yes"
        elif value_lower in ['false', 'no', 'n', '0', 'f']:
            return "no"
    
    return str(value)


def convert_value_to_target_type(value: Any, target_type: str) -> Any:
    """
    Convert value to specified target type with proper formatting.
    
    Args:
        value: Input value to convert
        target_type: Target data type (string specification)
        
    Returns:
        Converted value or pd.NA if conversion fails
    """
    if pd.isna(value):
        return pd.NA
        
    # Handle empty strings
    if value == '':
        return pd.NA
        
    # Handle various data types
    if not isinstance(target_type, str):
        return value  # If no type specified, return as is
        
    target_type_lower = target_type.lower()
        
    # Check for float with decimal specification (e.g., float-1, float-2)
    float_match = re.match(r'float-(\d+)', target_type_lower)
    if float_match:
        try:
            decimal_places = int(float_match.group(1))
            float_val = float(value)
            return round(float_val, decimal_places)
        except (ValueError, TypeError):
            return pd.NA
    
    # Integer types
    if target_type_lower in ['int', 'integer', 'int64', 'int32']:
        try:
            return int(float(value))
        except (ValueError, TypeError):
            return pd.NA
            
    # Float types
    elif target_type_lower in ['float', 'double', 'numeric', 'float64', 'float32']:
        try:
            return float(value)
        except (ValueError, TypeError):
            return pd.NA
            
    # Date/time types
    elif target_type_lower in ['date', 'datetime', 'timestamp']:
        try:
            # Convert to datetime and then to yyyymmdd hh:mm format
            dt = pd.to_datetime(value)
            return dt.strftime('%Y%m%d %H:%M')
        except (ValueError, TypeError, AttributeError):
            return pd.NA
            
    # Boolean types
    elif target_type_lower in ['bool', 'boolean']:
        return standardize_boolean_values(value)
        
    # Default to string for text, categorical, etc.
    else:
        return str(value) if value is not None else pd.NA


def values_are_equivalent(val1: Any, val2: Any, target_type: str) -> bool:
    """
    Compare two values with type-aware equivalence checking.
    
    Args:
        val1: First value to compare
        val2: Second value to compare
        target_type: Target data type for comparison context
        
    Returns:
        True if values are equivalent, False otherwise
    """
    # Handle NaN values consistently
    if pd.isna(val1) and pd.isna(val2):
        return True
    elif pd.isna(val1) or pd.isna(val2):
        return False
        
    target_type_lower = target_type.lower() if isinstance(target_type, str) else ''
        
    # Check for float with decimal specification (e.g., float-1, float-2)
    float_match = re.match(r'float-(\d+)', target_type_lower)
    if float_match:
        try:
            decimal_places = int(float_match.group(1))
            val1_rounded = round(float(val1), decimal_places)
            val2_rounded = round(float(val2), decimal_places)
            return val1_rounded == val2_rounded
        except (ValueError, TypeError):
            return False
    
    # Boolean comparison (standardized to yes/no)
    if target_type_lower in ['bool', 'boolean']:
        val1_std = standardize_boolean_values(val1)
        val2_std = standardize_boolean_values(val2)
        return val1_std == val2_std
    
    # Numeric types comparison
    if isinstance(val1, (int, float)) and isinstance(val2, (int, float)):
        try:
            return abs(float(val1) - float(val2)) < 1e-6
        except (ValueError, TypeError):
            return False
    
    # Date comparison (already in string format)
    if target_type_lower in ['date', 'datetime', 'timestamp']:
        return str(val1) == str(val2)
        
    # String comparison (case insensitive)
    elif isinstance(val1, str) and isinstance(val2, str):
        return val1.strip().lower() == val2.strip().lower()
        
    # Default comparison
    else:
        return str(val1) == str(val2)


def apply_value_mappings_to_dataframe(df: pd.DataFrame, value_mappings: dict) -> pd.DataFrame:
    """
    Apply value mappings to a DataFrame, transforming specified columns.
    
    Args:
        df: Input DataFrame
        value_mappings: Dictionary mapping column names to their value transformations
        
    Returns:
        DataFrame with transformed values
    """
    df_copy = df.copy()
    
    modified_columns = []
    for column_name, mapping_dict in value_mappings.items():
        if column_name in df_copy.columns:
            # Apply mapping, keeping original values for unmapped items
            df_copy[column_name] = df_copy[column_name].map(
                lambda x: mapping_dict.get(x, x) if not pd.isna(x) else x
            )
            modified_columns.append(column_name)
    
    print(f"Applied value mappings to {len(modified_columns)} columns:")
    for col in modified_columns[:10]:  # Show first 10 modified columns
        print(f"  - {col}")
    if len(modified_columns) > 10:
        print(f"  ... and {len(modified_columns) - 10} more columns")
    
    return df_copy

### compare ep and sT dataset


##### comparison application

### Mismatch report generation