In [None]:
import pandas as pd  # For handling data in DataFrames
import os  # For working with file paths
from difflib import get_close_matches  # For approximate string matching
from fuzzywuzzy import fuzz  # For fuzzy string matching
import Levenshtein  # Optimized Levenshtein distance calculations (improves fuzzywuzzy performance)
import configparser  # For reading configurations from a .config file (future-proofing!)

In [110]:
# Load the data dictionaries
def load_excel(file_path, sheet_name=0):
    """Load an Excel file and return a DataFrame from the specified sheet."""
    return pd.read_excel(file_path, sheet_name=sheet_name)

# Load and standardize columns
def load_and_prepare_data(cde_file, study_file, cde_sheet=0, study_sheet=0):
    """Load and standardize columns from the CDE and study files."""
    # Load files with specified sheet names
    cde_data = load_excel(cde_file, sheet_name=cde_sheet)
    study_data = load_excel(study_file, sheet_name=study_sheet)

    # Standardize column names
    cde_cols = {
        "CRF Name": "crf_name",
        "CDE Name": "cde_name",
        "Variable Name": "variable_name",
        "Definition": "definition",
        "Permissible Values": "permissible_values",
        "PV Description": "pv_description",
        "Data Type": "data_type"
    }

    study_cols = {
        "Matched HEAL Core CRF": "crf_name",
        "Variable / Field Name": "variable_name",
        "Field Label": "definition",
        "Choices, Calculations, OR Slider Labels": "permissible_values",
        "Field Note": "pv_description",
        "Field Type": "data_type"
    }

    # Rename columns to standardized names
    cde_data.rename(columns=cde_cols, inplace=True)
    study_data.rename(columns=study_cols, inplace=True)

    # Convert relevant columns to lowercase for consistency
    for col in ["variable_name", "crf_name"]:
        cde_data[col] = cde_data[col].str.lower()
        study_data[col] = study_data[col].str.lower()

    print("Renamed CDE Data Columns:", cde_data.columns)
    print("Renamed Study Data Columns:", study_data.columns)

    return cde_data, study_data

In [111]:
def find_exact_and_crf_matches(cde_data, study_data):
    """Find exact matches and CRF mismatches, considering fuzzy matches for CRF names."""
    results = []

    for _, study_row in study_data.iterrows():
        study_var = study_row["variable_name"]
        study_crf = study_row["crf_name"]

        # Step 1: Find exact matches for variable and CRF name
        exact_matches = cde_data[
            (cde_data["variable_name"] == study_var) &
            (cde_data["crf_name"] == study_crf)
        ]
        if not exact_matches.empty:
            results.append("Exact Match")
            continue

        # Step 2: Check for exact variable name match but fuzzy CRF match
        var_matches = cde_data[cde_data["variable_name"] == study_var]
        if not var_matches.empty:
            # Fuzzy match CRF names
            cde_crf = var_matches.iloc[0]["crf_name"]  # Take the first matching CRF
            crf_score = fuzz.ratio(study_crf.lower(), cde_crf.lower())
            if crf_score >= 70:  # Threshold for a "close" CRF match
                results.append("Wrong CRF (via Fuzzy Match)")
            else:
                results.append("Wrong CRF")
            continue

        # Step 3: Look for fuzzy matches for variable name
        close_matches = get_close_matches(study_var, cde_data["variable_name"].tolist(), n=1, cutoff=0.8)
        if close_matches:
            close_match_var = close_matches[0]

            # Check if the closest match has a fuzzy CRF match
            close_match_crf = cde_data[cde_data["variable_name"] == close_match_var]["crf_name"].iloc[0]
            crf_score = fuzz.ratio(study_crf.lower(), close_match_crf.lower())
            if crf_score >= 70:  # Adjust threshold as needed
                results.append("Wrong CRF (via Close Match and Fuzzy CRF)")
            else:
                results.append("Not in CDE")
        else:
            # Step 4: No match found at all
            results.append("Not in CDE")

    return results

# Notes about find_exact_and_crf_matches
Compares study variables to CDE variables to determine:
- **Exact matches** (when both variable name and CRF name match).
- - Filters cde_data for rows where both variable_name and crf_name match exactly.
- - If at least one match is found (not exact_matches.empty), it is classified as an "Exact Match", and the function skips to the next study variable.
- **Partial matches** (when the variable name matches but the CRF name does not).
- - The script compares the CRF names using fuzzy matching (fuzz.ratio()).
- - If the similarity score (crf_score) is ≥ 70, it is labeled "Wrong CRF (via Fuzzy Match)" (meaning the CRF is close but not exact).
- - Otherwise, it is labeled "Wrong CRF".
- **Fuzzy matches** (when the variable name does not match exactly but is close).
- - If the exact variable name is not found, we look for "close" matches using get_close_matches(), which finds the closest string based on character similarity.
- - If a close variable name exists, the function checks if the CRF name is also a fuzzy match (crf_score >=70), if so, it's classified as "Wrong CRF (via Close Match and Fuzzy CRF). If not, it's labeled "Not in CDE" (meaning the variable is similar but doesn't belong to the expected CRF)
- No matches (when neither the variable name nor CRF name is found).

# Function Encodings 
- "Exact Match": Both variable name and CRF name match exactly. ✅
- "Wrong CRF": Variable name matches exactly, but CRF name does not. ❌
- "Wrong CRF (via Fuzzy Match)": Variable name matches, and CRF is similar but not identical (≥70% similarity). 🔄
- "Wrong CRF (via Close Match and Fuzzy CRF)": Closest matching variable name found, but CRF is still mismatched (via fuzzy logic). 🔄
- "Not in CDE": No match found at all in the CDE knowledge base. ❌

In [112]:
def find_close_matches(cde_data, study_data, exact_results):
    """Find close matches for variable names and fuzzy matches for CRF names."""
    close_match_results = []

    for i, study_row in study_data.iterrows():
        study_var = study_row["variable_name"]
        study_crf = study_row["crf_name"]

        # Skip if already an exact match
        if exact_results[i] == "Exact Match":
            close_match_results.append(None)
            continue

        # Find close matches for variable name
        close_matches = get_close_matches(study_var, cde_data["variable_name"].tolist(), n=1, cutoff=0.8)
        if close_matches:
            close_match_var = close_matches[0]

            # Fuzzy match CRF names
            close_match_crf = cde_data[cde_data["variable_name"] == close_match_var]["crf_name"].iloc[0]
            crf_score = fuzz.ratio(study_crf.lower(), close_match_crf.lower())
            if crf_score >= 70:
                close_match_results.append(f"Close Match: {close_match_var} (Fuzzy CRF Match)")
            else:
                close_match_results.append(f"Close Match: {close_match_var} (CRF Mismatch)")
        else:
            # Add fallback message when no close matches are found
            close_match_results.append("No Close Match Found")

    return close_match_results


# Notes on find_close_matches
This function builds on the previous function by focusing on approximate matches (i.e. close, but not exact) for variable names and CRF names. It refines the matching process by skipping already confirmed exact matches and adding more nuance categories for near matches.
Returns the list of close matches, with each entry corresponding to a variable from the study dataset.

Creates a ranking system for matches:
- Exact Match (previous function).
- Close Match with Fuzzy CRF Match (if CRF similarity ≥ 70).
- Close Match with CRF Mismatch (if CRF similarity < 70).
- No Close Match Found (if no variable match is found).

# Function Encodings
- None: This study variable was already an exact match, so we skipped it. ✅
- "Close Match: {close_match_var} (Fuzzy CRF Match)"	Close variable name found, and CRF name is a fuzzy match. 🔄
- "Close Match: {close_match_var} (CRF Mismatch)"	Close variable name found, but CRF does not match. ❌
- "No Close Match Found"	No sufficiently close variable names were found. ❌

In [113]:
def normalize_text(text):
    """Normalize text by making it lowercase, stripping spaces, and removing extra whitespace."""
    if not isinstance(text, str):  # Handle NaN or None
        return ""
    return " ".join(text.lower().strip().split())

def find_pv_and_encoding_mismatches(cde_data, study_data):
    """Find mismatches in permissible values and encodings, even for unmatched variables."""
    mismatch_results = []

    for _, study_row in study_data.iterrows():
        study_var = study_row["variable_name"]
        study_pv = normalize_text(study_row.get("permissible_values", ""))
        study_desc = normalize_text(study_row.get("pv_description", ""))

        # Step 1: Check for exact variable name matches
        matches = cde_data[cde_data["variable_name"] == study_var]
        if matches.empty:
            # No match found for variable_name, so no PV/Encoding comparison is possible
            mismatch_results.append("No Comparison Possible")
            continue

        # Step 2: Handle missing PV/Encoding values in the study data
        if not study_pv and not study_desc:
            mismatch_results.append("Both PV/Encoding missing")
            continue

        # Step 3: Compare PV and encoding for exact matches
        pv_mismatch = False
        encoding_mismatch = False

        for _, cde_row in matches.iterrows():
            cde_pv = normalize_text(cde_row.get("permissible_values", ""))
            cde_desc = normalize_text(cde_row.get("pv_description", ""))
            pv_mismatch |= study_pv != cde_pv
            encoding_mismatch |= study_desc != cde_desc

        if pv_mismatch or encoding_mismatch:
            mismatch_results.append(
                f"PV Mismatch: {pv_mismatch}, Encoding Mismatch: {encoding_mismatch}"
            )
            continue

        # Step 4: Fuzzy match based on PV/Encoding if no exact match was conclusive
        best_match = None
        best_score = 0

        for _, cde_row in cde_data.iterrows():
            cde_pv = normalize_text(cde_row.get("permissible_values", ""))
            cde_desc = normalize_text(cde_row.get("pv_description", ""))

            # Skip NaN/empty values for matching
            if not study_pv and not cde_pv and not study_desc and not cde_desc:
                continue

            pv_score = fuzz.ratio(study_pv, cde_pv)
            desc_score = fuzz.ratio(study_desc, cde_desc)
            avg_score = (pv_score + desc_score) / 2

            if avg_score > best_score and avg_score >= 70:  # Adjust threshold as needed
                best_match = cde_row["variable_name"]
                best_score = avg_score

        if best_match:
            mismatch_results.append(
                f"Close PV/Encoding Match: {best_match} (Score: {best_score:.1f})"
            )
        else:
            mismatch_results.append(None)

    return mismatch_results

# Notes on find_pv_and_encoding_mismatches
PV Mismatch: Happens when the permissible values don’t match exactly.
Encoding Mismatch: Happens when the descriptions are meaningfully different, not just worded slightly differently.

# Function Encodings
- "No Comparison Possible": No variable name match, so PV/Encoding cannot be checked. ❌
- "Both PV/Encoding missing": Study did not provide PVs or encoding descriptions, so no comparison is possible. ⚠️
- "PV Mismatch: True, Encoding Mismatch: False": Permissible values (PVs) do not match, but encoding descriptions are the same. ❌
- "PV Mismatch: False, Encoding Mismatch: True": PVs match, but encoding descriptions are different. ❌
- "PV Mismatch: True, Encoding Mismatch: True": Both PVs and encoding descriptions do not match. 🚨 ❌
- "Close PV/Encoding Match: {best_match} (Score: {score})": A close match was found (based on fuzzy similarity). 🔄
- None:	No meaningful PV/Encoding match was found. ❌

In [114]:
# Save report function
def save_report(report, output_file):
    """Save the report to a CSV file."""
    report.to_csv(output_file, index=False)
    file_path = os.path.abspath(output_file)
    print(f"Comparison report saved to: {file_path}")

In [115]:
# File paths and sheet names
cde_file_path = r"C:\Users\lmaefos\Code Stuffs\CDE_detective\CDE_ID_detective_revamp\KnowledgeBase\Compiled_CORE_CDEs list_English_one sheet_as of 2024-11-11.xlsx"
study_file_path = r"C:\Users\lmaefos\Code Stuffs\CDE_detective\CDE_ID_detective_revamp\out\HDP00337_DataDictionary_BSCIP1_2023-08-07_2024-12-13_enhanced.xlsx"
cde_sheet_name = "ALL"
study_sheet_name = "Sheet1"

# Output file path
output_file_path = r"C:\Users\lmaefos\Code Stuffs\CDE_detective\CDE_ID_detective_revamp\out\HDP00337_Comparison_Report_2024-12-13.csv"

# Load data
cde_data, study_data = load_and_prepare_data(
    cde_file_path,
    study_file_path,
    cde_sheet_name,
    study_sheet_name
)

# Ensure all `variable_name` values are strings
cde_data["variable_name"] = cde_data["variable_name"].astype(str)
study_data["variable_name"] = study_data["variable_name"].astype(str)

# Generate report columns
exact_results = find_exact_and_crf_matches(cde_data, study_data)
close_match_results = find_close_matches(cde_data, study_data, exact_results)
mismatch_results = find_pv_and_encoding_mismatches(cde_data, study_data)

# Create the report DataFrame
report = pd.DataFrame({
    "Study Variable": study_data["variable_name"],
    "HEAL Core CRF Name Match": study_data["crf_name"],
    "Exact/CRF Match Result": exact_results,
    "Exact Match CDE Name": [cde_data.loc[cde_data["variable_name"] == study_var, "cde_name"].values[0]
                              if result == "Exact Match" else None
                              for study_var, result in zip(study_data["variable_name"], exact_results)],
    "Close Match Result": close_match_results,
    "Best Close Match CDE Name": [res.split(": ")[1] if res and "Close Match" in res else None for res in close_match_results],
    "PV/Encoding Result": mismatch_results,
    "PV Similarity Score": [float(res.split("Score: ")[1][:-1]) if res and "Score:" in res else None for res in mismatch_results],
})

# INSERT THE FUNCTION HERE ⬇️
def determine_final_match(row):
    if row["Exact/CRF Match Result"] == "Exact Match":
        return "Exact"
    elif "Close Match" in str(row["Close Match Result"]):
        return "Close"
    elif "Mismatch" in str(row["PV/Encoding Result"]):
        return "Mismatch"
    else:
        return "No Match"

report["Final Match Type"] = report.apply(determine_final_match, axis=1)

# Save the report
save_report(report, output_file_path)

Renamed CDE Data Columns: Index(['Study Population Focus', 'Domain', 'crf_name', 'CRF Question #',
       'cde_name', 'variable_name', 'definition', 'Short Description',
       'Additional Notes (Question Text)', 'permissible_values',
       'pv_description', 'data_type', 'Disease Specific Instructions',
       'Disease Specific References', 'Population', 'Classification',
       'External Id CDISC', 'CDISC Permissible Values', 'CDISC Data Type',
       'CDISC Notes', 'Additional Information',
       'Map to CDISC variable name if different', 'Map to CDISC format',
       'Notes', 'Unnamed: 24'],
      dtype='object')
Renamed Study Data Columns: Index(['variable_name', 'Form Name', 'Extracted CRF Name', 'crf_name',
       'Match Confidence', 'Section Header', 'data_type', 'definition',
       'permissible_values', 'pv_description',
       'Text Validation Type OR Show Slider Number', 'Text Validation Min',
       'Text Validation Max', 'Identifier?',
       'Branching Logic (Show field