# DATA CLEANING SCRIPT - (For Metabase CSVs)

## 📘 INSTRUCTIONS


### 🔧 SETUP YOUR FILE PATHS

Before running the notebook, please provide the following input and output file paths in the **"FILE PATHS"** section:

### 📝 INPUT INFORMATION (Required)

- **Country**: Type the source country name (`"Zimbabwe"` or `"ZIM"` or `"Malawi"` or `"MWI"`) — case insensitive.
- **CSV File to Clean**: Full file path to the raw `.csv` file that needs cleaning.
- **Country-Specific Data Dictionary**: Excel file containing the variable mappings for your chosen country.
  - 🔗 [MWI Dictionary (Google Sheet)](https://docs.google.com/spreadsheets/d/132qyY3YDJPto6NISo6Bg6mcfhTeya5o1njVGZPWbAaM/edit?gid=0#gid=0)
  - 🔗 [ZIM Dictionary (Google Sheet)](https://docs.google.com/spreadsheets/d/1IDfRghpXX971MYcd7mLkYX4TiN2ydUV2tOGsdTcHosI/edit?gid=913417091#gid=913417091)
- **Validation Dictionary**: Excel file that holds cross-country validation rules.
  - 🔗 [Validation Dictionary (Google Sheet)](https://docs.google.com/spreadsheets/d/1_OLakvc_kNANlZ3MDi5Me6mBDaRDmo6Ma-gu01Z-efE/edit?gid=0#gid=0)

> 📥 **Please download all the dictionaries above as Excel (.xlsx) files before running the notebook.**

### 💾 OUTPUT INFORMATION (Required)
- **Pickle Output Path**: Where to save the cleaned data as a `.pkl` file.
- **CSV Output Path**: Where to save the cleaned data as a `.csv` file.


## ▶️ HOW TO RUN

Once all file paths have been filled in:
1. Click **`Run All`** in the top menu.
2. Wait for the notebook to finish execution — the output files will be saved to the specified locations.

> 🛠️ Make sure all paths are correct before running to avoid errors.
---


## CONFIGURATION

### FILE PATHS

#### Mandatory File Paths

In [1]:
# INPUT DATA
#-----------------
# Specify Source Country - "zim"  for "zimbabwe"/   "mwi"  for "malawi"
country = "zim"

# File To Clean
csv_filepath = r" "# <--------------- Put csv path here

In [2]:
# DICTIONARIES
#------------------
# Data Dictionary ZIM
dict_filepath_zim = r" "  # <--------------- Put Data dictionary path here
# Data Dictionary MWI
dict_filepath_mwi = r" " # <--------------- Put Data dictionary path here
# Validation Dictionary
feature_dict_filepath  = r" "  # <--------------- Put Validation dictionary path here

In [3]:
# OUTPUT DATA
#------------------
# CLEAN File
merged_data_csv = r" " # <--------------- Put output csv path here
merged_data_pkl = r" " # <--------------- Put output pkl path here

#### Optional File Paths

##### 1st Stage Cleaning

In [4]:
# #-------REPORTS----------
# pre_process_report_filepath_zim = " "      # <--------------- Put preprocess report path column here
# column_cleaning_report_filepath_zim = "  " # <--------------- Put column cleaning report path column here
# forward_fill_report_path_zim = " "         # <--------------- Put forward filling report path here
# dtype_conversion_report_filepath_zim = " " # <--------------- Put dtype convertion report path here
# post_process_report_filepath_zim =     " " # <--------------- Put post process report path here
# report_frame_shift_zim = " "
# dropped_columns_zim = " "
# forward_fill_nones_report_zim = " "

# #--------OUTPUTS-------------
# # frame Shift data
# frame_shift_csv_zim= " "  # <--------------- frame shift csv path here
# frame_shift_pkl_zim= " "  # <--------------- frame shift pkl path here
# # summary dictionary in excel
# output_csv_filepath_zim= " "   # <--------------- output summary csv
# output_excel_filepath_zim= " " # <--------------- output summary excel

##### Numeric Validation

In [5]:
# # ------REPORTS--------
# # Flags - Features Flagged With Out of Range Values
# report_numeric_flags_zim = " "           # <--------------- File Path
# # Flags - Features Flagged With Out of Range Values After Fixes
# report_numeric_disallowed_ZIM = " "     # <--------------- File Path
# # Deleted Values - All deleted Non-Numeric Values
# report_dtypes_disallowed_ZIM = " "      # <--------------- File Path

##### Boolean Validation

In [6]:
# # ------REPORTS--------
# # Deleted & Fixed Values - All Non-Boolean Values & ALL Standardized Boolean Values
# report_dtypes_disallowed_ZIM = r" " # <--------------- File Path

##### Categorical Validation

In [7]:
# # ------REPORTS--------
# # flags
# report_non_numeric_flags_zim = r" "# <--------------- File Path
# # Value Replacements

# # Disallowed Deletes

# # Value Mapping

##### DateTime Validation

In [8]:
# # ------REPORTS--------
# report_dtypes_disallowed_ZIM = " "

###
---

## CLEANING & VALIDATION PIPELINE

### SETUP

#### Envrionment Setup

In [9]:
# Import Neccessary Libraries & Dependencies
#--------------------------------------------
import pandas as pd
import numpy as np
import logging
import random
import re
import os
from datetime import datetime
from collections import defaultdict
from decimal import Decimal, InvalidOperation
from typing import Dict, Any, Optional, Collection
import time


# Warnings
import warnings
warnings.filterwarnings('ignore')


# Logging Configuration
# -------------------------
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(levelname)s: %(message)s',
    handlers=[
        logging.FileHandler("data_cleaning.log"),
        logging.StreamHandler()
    ]
)

# Runtime
#--------------
notebook_start_time = time.time()


#### Country-Switch Configuration

In [10]:
# ─── LISTS & OBJECTS TO SWITCH ───

# NUMERIC FEATURES
#---------------------------
numeric_features_zim = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge",
# common features
"admissionweight.value", "age.value", "agec.value", "aw.value", "birthweight.value", "bloodsugarmg.value", "bloodsugarmmol.value", "bsmg.value", "bsmmol.value", "bwtdis.value", "dischhr.value", "dischrr.value",
"dischsats.value", "dischtemp.value", "dischweight.value", "durationlab.value", "durcpr.value", "gestation.value", "gestbirth.value", "hr.value", "length.value", "lengthdis.value", "lengthofstay.value", "lpgluc.value",
"lpprot.value", "manualhr.value", "nnuadmtemp.value", "ofc.value", "ofcdis.value", "par.value", "resprate.value", "rr.value", "rrmother.value", "satsair.value", "satso2.value", "temperature.value", "thompscore.value",
"timespent", "timespentdischarge","alb.value", "alb2.value", "alp1r.value", "alp2r.value", "antenatalcare.value", "apgar1.value", "apgar10.value", "apgar5.value", "balscore.value", "balscorewks.value",
"bbadur.value", "bcisol.value", "bili1r.value", "bilirubin2r.value", "bpdur.value", "ca1r.value", "calcium2r.value", "corrected.value","creat1r.value", "creatinine2r.value", "crp1r.value", "crp2r.value", "cyanosis.value",
"downescr.value", "duramox.value", "duramp.value", "durcef.value", "durclox.value", "durgent.value", "durimi.value", "durmet.value", "durproc.value", "durvanc.value", "foedisttime.value", "gluc1r.value", "glucose2r.value",
"grav.value", "hb1r.value", "hb2r.value", "lchild.value", "mag1r.value", "magnesium2r.value", "matage.value", "matageyrs.value", "phos1r.value", "phosphate2r.value", "plt1r.value", "plt2r.value", "pot1r.value",
"potassium2r.value", "sod1r.value", "sodium2r.value", "transnumber.value", "txhb.value", "ur1r.value", "urea2r.value", "vlnumber.value", "wcc1r.value", "wcc2r.value",
"airentry.value",
"nmarmr.value", "nmbreast.value", "nmeye.value", "nmgen.value", "nmheelear.value", "nmlan.value", "nmplant.value", "nmpop.value", "nmposture.value", "nmscarf.value", "nmskin.value", "nmsquare.value",
"thompalert.value", "thompfeed.value", "thompfont.value", "thompgrasp.value", "thompmoro.value", "thomprefl.value", "thompresp.value", "thompseiz.value", "thomptone.value"
# unique features zim
# problematic features zim
]

numeric_features_mwi = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge",
# common features
"admissionweight.value", "age.value", "agec.value", "apgar1.value", "apgar10.value", "apgar10dc.value", "apgar1dc.value", "apgar5.value", "apgar5dc.value", "balscore.value", "balscorewks.value", "bidageb.value",
"birthweight.value", "bloodsugarmg.value", "bloodsugarmmol.value", "bsmg.value", "bsmmol.value", "bwbid.value", "dischhr.value", "dischrr.value", "dischsats.value", "dischtemp.value", "dischweight.value",
"durationlab.value", "gestation.value", "hr.value", "lengthoflife.value", "lengthresus.value", "manualhr.value", "matageyrs.value", "modedeliverydc.value", "mothcell.value", "mothersatso2.value", "ofc.value", "par.value",
"paritydeadchildren.value", "paritylivingchildren.value", "rr.value", "satsair.value", "satso2.value", "temperature.value", "temperatureonarrival.value", "tempthermia.value", "thompscore.value", "lengthofstay.value",
"timespent", "timespentdischarge", "aw.value", "bw.value",
"fitsth.value",
"modedelivery.value",
"fontth.value", "graspth.value", "locth.value", "moroth.value", "postth.value", "respth.value", "suckth.value", "toneth.value",
"nmarmr.value", "nmbreast.value", "nmeye.value", "nmgen.value", "nmheelear.value", "nmlan.value", "nmplant.value", "nmpop.value", "nmposture.value", "nmscarf.value", "nmskin.value", "nmsquare.value"
# unique features mwi
]



# BOOLEAN FEATURES
#---------------------------

bool_features_zim      = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge",
# commnon features
"agea.value", "anmatsyphtreat.value", "babycrytriage.value", "bc.value", "bc2.value", "bcsens.value", "bf.value", "bili.value", "bili2.value", "birthplacesame.value", "bldgrpyn.value", "bone.value", "bone2.value",
"bsmonyn.value", "corddelay.value", "cpap.value", "crp.value", "crp2.value", "datevdrlsamehiv.value", "disccovidrisk.value", "dobyn.value", "dysmorphic.value", "envtemp.value", "fbc.value", "fbc2.value", "feversr.value",
"foedist.value", "foehrtdoc.value", "foehrtrec.value", "gluc.value", "gluc2.value", "haart.value", "hivpcr.value", "hivpcrinf.value", "hyposxyn.value", "hyposymptoms.value", "inorout.value", "kmc.value",
"matadm.value", "matadmit.value", "matbldgrpyn.value", "matchorio.value", "matdischarge.value", "mathivp.value", "mathivtest.value", "matsub.value", "mec.value", "mf.value", "mgso4.value", "murmur.value", "none.value",
"none2.value", "nvpazt.value", "nvpgiven.value", "nvplr.value", "phototherapy.value", "placebirthtown.value", "prom.value", "readmission.value", "revclin.value", "review.value", "riskcovid.value", "rom.value",
"rpr.value", "sent.value", "stethoscope.value", "surf.value", "talipessev.value", "torch.value", "transfusion.value", "travelhistory.value", "ue.value", "ue2.value", "ycolour.value", "brprobs.value", "externalsource.value",
"glustx.value", "typebirthot.value"
# unique features zim
# problematic features
]
bool_features_mwi      = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge",
# commnon features
"agea.value", "anmatsyphtreat.value", "babycrytriage.value", "bidagea.value", "biddobyn.value", "birthplacesame.value", "bsmonyn.value", "datevdrlsamehiv.value", "disccovidrisk.value", "dobyn.value", "dysmorphic.value",
"feversr.value", "haart.value", "healthed.value", "hyposymptoms.value", "inorout.value", "itn.value", "mathivtest.value", "motherpresent.value", "nvpgiven.value", "ortolani.value", "phototherapy.value","readmission.value",
"revclin.value", "riskcovid.value", "stethoscope.value", "talipessev.value", "ycolour.value", "brprobs.value", "hyposxyn.value", "lbwbinarydischarge", "prematuritywithrespiratorydistresssyndrome.value", "symptomsyn.value"
# unique features mwi
# problematic features mwi
]



# CATEGORICAL FEATURES
#---------------------------
cat_features_zim       = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge",
# common features
"abdomen.value", "activity.value", "admittedfrom.value", "admreason.value", "admreasonadd.value", "admward.value", "ageb.value", "agecat.value", "ageestimate.value", "anstercrse.value", "ansteroids.value",
"anus2.value", "anvdrl.value", "anvdrlreport.value", "anvdrlresult.value", "bbadel.value", "bbaloc.value", "bc1othr.value", "bc1r.value", "bc2r.value", "bcresist.value",
"birthfacility.value", "birthplace.value", "bsunit.value", "cadre.value", "cadredis.value", "causedeath.value", "chestausc.value", "colour.value", "contcausedeath.value", "corrected.value", "covidconfirmation.value",
"covidrepresults.value", "cprout.value", "crt.value", "crybirth.value", "csforg.value", "csreason.value", "dangersigns.value", "dangersigns2.value", "delivinter.value", "diagdis1.value", "diagnoses.value",
"downesinterp.value", "ethnicity.value", "feeddsch.value", "feedsadm.value", "feedsdsc.value", "femorals.value", "feverons.value", "firm.value", "foebrad.value", "foedec.value", "foetach.value", "folate.value",
"fontanelle.value", "furthertriage.value", "gastons.value", "gender.value", "genitalia.value", "goodprog.value", "gscvsom.value", "headshape.value", "highrisk.value", "hivpcrinfr.value", "hivtestreport.value",
"hivtestresult.value", "imaging.value", "iron.value", "jaundice.value", "lengthhaart.value", "lp.value", "lpglucavail.value", "lpprotavail.value", "maritalstat.value", "mataddrhadistrict.value", "mataddrmwdistrict.value",
"mataddrprovince.value", "matadmplace.value", "matbldgrp.value", "matcomorbidities.value", "mathivpr.value", "matoutcome.value", "matpart.value", "matrpr.value", "matrprr.value", "matrprt.value", "matsymptoms.value",
"medsdis.value", "medsgiven.value", "methodestgest.value", "moretrans.value", "motherdobyn.value", "mothersdiagnosis.value", "mskproblems.value", "neorons.value", "neotreeoutcome.value", "ortolani.value", "othprobs.value",
"palate.value", "partnertrsyph.value", "passedmec.value", "placebirth.value","pregconditions.value", "premdia.value", "presentation.value", "probslab.value", "puinfant.value", "punewborn.value",
"reason.value", "refdia.value", "referredfrom.value", "referredfrom2.value", "religion.value", "respons.value", "respsr.value", "respsup.value", "resus.value", "retractions.value", "revclintyp.value", "reviewcadre.value",
"rfsepsis.value", "romlength.value", "rvi.value", "sexdis.value", "signsdehydrations.value", "signsrd.value", "skin.value", "specrev.value", "specrevtyp.value", "spine.value", "srneuroother.value", "stoolsinfant.value",
"suckreflex.value", "tempadm.value", "tempbirth.value", "teo.value", "testthispreg.value", "tone.value", "torchre.value", "transtype.value", "ttv.value", "typebirth.value", "umbilicus.value", "vitk.value", "vlknown.value",
"vomiting.value", "wob.value", "$symptomreviewneurology.value",
"agecategory", "awgroup.value", "birthtrauma.value", "birthweightcategory", "birthweightcategorydischarge", "bloodsfinal.value", "bloodsinitial.value", "bornbeforearrival.value", "bwgroup.value", "cleftlipand/orpalatewithrd.value",
"cleftpalate.value", "congenitalabnormality.value", "congenitaldislocationofthehipcdh.value", "considercongenitalheartdisease.value", "convulsions.value", "dehydration.value", "delivery.value", "difficultyfeeding.value",
"edlizsummarytablescore.value", "extremelylowbirthweight<1000g.value", "extremelypremature<28weeks.value", "feedingreview.value", "gastroschisis.value","gestgroup.value", "hivhighrisk.value", "hivlowrisk.value",
"hivunknown.value", "hyperthermia.value", "hypoglycaemianotsymptomatic.value", "hypoglycaemiasymptomatic.value", "hypoxicischaemicencephalopathy.value", "lengthoflife.value", "lowbirthweight15002499g.value",
"macrosomia>4000g.value", "macrosomiabigbaby.value", "meconiumexposureasymptomaticbaby.value", "mildhypothermia.value", "mildtalipesclubfoot.value", "moderatehypothermia.value", "moderatetalipesclubfoot.value", "myelomeningocele.value",
"neonatalsepsis.value", "normalbaby.value", "omphalocele.value", "other.value", "pathologicaljaundice.value", "physiologicaljaundice.value", "plandrugamin.value", "pneumonia/bronchiolitis.value", "possiblemeconiumaspiration.value",
"premature3236weeks.value", "premature32to37weeks.value", "prematuritywithrd.value", "prematuritywithrds.value", "prescom.value", "riskfactorsforsepsisasymptomaticbaby.value", "riskofhypoglycaemia.value", "safekeeping.value",
"severehypothermia.value", "suspectedhypoxicischaemicencephalopathy.value", "suspectedhypoxicischaemicencephalopathyhie.value", "suspectedneonatalsepsis.value", "symptomreviewneurology.value", "tempgroup.value",
"tempthermia.value", "termwithrd.value", "transienttachypnoeaofnewbornttn.value", "verylowbirthweight10001499g.value", "verypremature2831+6weeks.value", "verypremature2831weeks.value",
"modedelivery.value", "abandonedbaby.value"
# unique features zim
# problematic features zim
]


cat_features_mwi = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge",
# common features
"abdomen.value", "activity.value", "admittedfrom.value", "admreaconganom.value", "admreason.value", "admreasurgcond.value", "agecat.value", "ageestimate.value", "ansteroids.value", "antenatalcare.value", "anus2.value",
"anvdrl.value", "anvdrlresult.value", "bidagecat.value", "birthfacility.value", "bsunit.value", "cadre.value", "cadredis.value", "causebid.value", "causedeath.value", "chestausc.value", "chlor.value", "colour.value",
"contcausedeath.value", "covidconfirmation.value", "covidrepresults.value", "crt.value", "crybirth.value", "csreason.value", "dangersigns.value", "dangersigns2.value", "diagdis1.value", "discdiagsurgicalcond.value",
"ethnicity.value", "feedsadm.value", "fefo.value", "femorals.value", "feverons.value", "fontanelle.value", "furthertriage.value", "gastons.value", "gender.value", "genitalia.value", "gscvsom.value", "headshape.value",
"hivtestresult.value", "hivtestresultdc.value", "ipt.value", "jaundice.value", "lengthhaart.value", "lengthresusknown.value", "maritalstat.value", "matcomorbidities.value", "matsymptoms.value", "mecpresent.value",
"mecthickthin.value", "medsgiven.value", "methodestgest.value", "mothersdiagnosis.value", "mskproblems.value", "murmur.value", "neorons.value", "neotreeoutcome.value", "otherprobs.value", "palate.value", "paritycod.value",
"passedmec.value", "placebirth.value", "pregconditions.value", "presentation.value", "probslab.value", "puinfant.value", "punewborn.value", "reason.value", "referredfrom.value", "referredfrom2.value", "religion.value",
"respons.value", "respsr.value", "respsup.value", "resus.value", "revclintyp.value", "rfsepsis.value", "rom.value", "romlength.value", "signsdehydrations.value", "signsrd.value", "skin.value", "spine.value", "srneuroother.value",
"stdsother.value", "stoolsinfant.value", "suckreflex.value", "testthispreg.value", "tetraeye.value", "thermcare.value", "tone.value", "tribe.value", "ttv.value", "typebid.value", "typebirth.value", "umbilicus.value",
"vitk.value", "vomiting.value", "wob.value",
"<28wks/1kg.value", "abdominalobstruction.value", "abscess.value", "agecategory", "ambiguousgenetalia.value", "anaemia.value", "apnoeaofprematurity.value", "atriskofhypoglycaemia.value", "awgroup.value", "birthasphyxia.value",
"birthtrauma.value", "birthweightcategory", "birthweightcategorydischarge", "bowelobstruction.value","bwgroup.value", "cleftlipand/orpalate.value", "cleftlipand/orpalatewithrd.value", "congenitalabnormality.value",
"congenitaldislocationofthehipcdh.value", "congenitalheartdisease.value", "convulsions.value", "dehydration.value", "difficultyfeeding.value", "extremelylowbirthweight<1000g.value", "extremelypremature<28weeks.value",
"feedingreview.value", "gastroschisis.value", "gestationdc.value", "gestgroup.value", "highbirthweight>4000gatbirth.value", "hivhighrisk.value", "hivlowrisk.value", "hivunknown.value", "hyperthermia.value", "hypoglycaemianotsymptomatic.value",
"hypoglycaemiasymptomatic.value", "hypoxicischaemicencephalopathy.value", "lbwbinary", "lowbirthweight15002499g.value", "meningitis.value", "mildhypothermia.value", "mildtalipesclubfoot.value", "moderatehypothermia.value",
"moderatetalipesclubfoot.value", "myelomeningocele.value", "neonatalsepsis.value", "neonatalsepsisearlyonsetasymptomatic.value", "neonatalsepsisearlyonsetsymptomatic.value", "neonatalsepsislateonsetasymptomatic.value",
"omphalocele.value", "pathologicaljaundice.value", "physiologicaljaundice.value", "pneumonia/bronchiolitis.value", "possiblemeconiumaspiration.value", "premature3236weeks.value", "prematuritywithrd.value", "prolongedjaundice.value",
"severehypothermia.value", "suspectedhypoxicischaemicencephalopathy.value", "suspectedneonatalsepsis.value", "symptomreviewneurology.value", "tempgroup.value", "termwithrd.value", "transienttachypnoeaofnewbornttn.value",
"umbilicalhernia.value", "untreatedmaternalsyphilis.value", "verylowbirthweight10001499g.value", "verypremature2831weeks.value", "dumpedbaby.value"
# unique features mwi
]



# OBJECT FEATURES
#---------------------------
obj_features_zim       = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge",
"admreasonoth.value", "axrr.value", "babyhospnum.value","bbacord.value","bbadelo.value", "bbaot.value", "bc2oth.value", "bc2rcont.value", "bcsecondisol.value", "causedeathother.value", "cleftlip.value",
"contriboth.value", "csreasonoth.value", "csreasonother.value", "cussr.value", "cxrr.value", "diagdis1oth.value", "diagnosesoth.value", "drid.value", "echor.value", "grunting.value", "hcwid.value", "hcwiddis.value",
"kinaddress.value", "kincell.value", "kinname.value", "lxrr.value", "matadmplaceoth.value", "mathospnum.value", "med1.value", "med2.value", "med3.value", "med4.value", "medoth.value", "medsdisoth.value",
"motherscell.value", "otherbirthfacility.value", "otherreferralfacility.value", "othhardistrict.value", "othmwdistrict.value", "othpregcond.value", "othprobsoth.value", "othxrr.value", "planoth.value", "prescomoth.value",
"prog.value", "reasonother.value", "religionother.value", "revclinoth.value", "reviewid.value", "revinfo.value", "specrevd.value", "specrevtypoth.value", "syphact.value", "travelhint.value", "travelhreg.value",
"trhospital.value", "troward.value", "trowardother.value",
"anaemia.value", "cleftlipand/orpalate.value", "neotreeid.value", "nuids.value", "othsym.value", "plandrugabx.value", "plandrugamik.value", "plandrugamp.value", "plandrugazt.value",
"plandrugcaff.value", "plandrugcef.value", "plandrugcefm.value", "plandruggent.value", "plandrugmet.value", "plandrugnvp.value", "prolongedjaundice.value", "uniquekeydischarge", "plan.value"
]


obj_features_mwi       = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge",
"admreasonoth.value", "admreasonothconganom.value", "babyfirstname.value", "babysurname.value", "causebidoth.value", "causedeathoth.value", "causedeathother.value", "cleftlip.value", "contriboth.value",
"csreasonoth.value", "diagdis1oth.value", "ethnicityother.value", "hcwid.value", "hcwiddis.value", "hcwsig.value", "hcwsigdis.value", "matphysaddressdistrict.value", "med1.value", "med2.value", "med3.value", "med4.value",
"medoth.value", "modfactor1.value", "modfactor2.value", "modfactor3.value", "motheraddressvillage.value", "motherfirstname.value", "mothersurname.value", "mothlm.value", "otherbirthfacility.value", "otherprobsoth.value",
"otherreferralfacility.value", "paritycodother.value", "planoth.value", "reasonother.value", "religionother.value", "revclinoth.value", "stuid.value", "tribeother.value", "uidbid.value",
"ageest.value",
"babyfirstnamedc.value", "babysurnamedc.value", "cleftlip/cleftpalate.value","diagnoses.value", "diagnosesoth.value", "diagnosissurgicalcond.value", "edlizsummarytablescore.value", "externalsource.value",
"motherfirstnamedc.value", "mothersurnamedc.value", "neotreeid.value", "nuids.value", "other.value", "uiddc.value", "uniquekeydischarge", "plan.value"
]



# DATETIME FEATURES
#---------------------------
dt_features_zim        = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge", "dobtob.value", "datetimeadmission.value","datetimedischarge.value",
# commnon features
"anvdrldate.value", "axrdate.value", "bcdate1.value", "bcdate2.value", "bilidate1.value", "bilidate2.value", "bonedate1.value", "bonedate2.value", "clinrevdat.value", "crpdate1.value", "crpdate2.value", "cussdate.value",
"cxrdate.value", "datedischvitals.value", "datedischweight.value", "datefbc2.value", "datehivtest.value", "datelastattendedbaby.value", "datelastattendedmother.value", "datelp.value",
"datetimedeath.value", "datetimedeathmother.value", "dateweaned.value", "echodate.value", "fbcdate1.value", "glucdate1.value", "glucdate2.value", "hivpcrinfd.value",
"lxrdate.value", "mathivpd.value", "othxrdate.value", "torchdate.value", "uedate1.value", "uedate2.value", "completedat", "endscriptdatetime.value", "ingestedat", "startedat",
]

dt_features_mwi        = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge", "dobtob.value", "datetimeadmission.value","datetimedischarge.value",
# commnon features
"anvdrldate.value", "clinrevdat.value", "dateadmission.value", "datedischvitals.value", "datedischweight.value", "datehivtest.value", "datetimedeath.value",
"datetimetemponarrival.value", "dateweaned.value", "endscriptdatetime.value", "completedat", "dateadmissiondc.value", "ingestedat", "startedat", "symptomsdt.value",
]



# WEIGHT FEATURES - Numeric Validation
#--------------------------------------
weight_cols_zim        = ['admissionweight', 'aw', 'birthweight','bwtdis'] #
weight_cols_mwi        = ['admissionweight', 'birthweight'] #


# Skip Validation - Numeric Validation
#--------------------------------------
skip_columns_zim       = ['age','bloodsugarmmol', 'dischrr', 'hr', 'rr', 'length', 'nnuadmtemp', 'wcc1r']
skip_columns_mwi       = ['age', 'bloodsugarmg', 'bloodsugarmmol', 'dischhr', 'dischrr', 'bsmg', 'hr', 'rr', 'dischtemp']



# VALUES TO DELETE - Categorical Validation
# ---------------------------------------------

values_to_delete_zim = {

("birthfacility"): ["Name of Birth Facility"],

("causedeath"): ["2023-10-19 22:00:00", "AS", "2024-07-16 16:56:00", "2024-10-24 11:06:00","2023-10-19 22:00:00", "2024-07-16 16:56:00", "2024-10-24 11:06:00"],

("csreason"): ["Reason for C-Section"],

("diagdis1"): ["J", "CA", "2024-07-14 08:05:00", "2024-07-22 08:03:00", "2024-10-25 11:00:00", "2024-07-14 08:05:00", "2024-07-22 08:03:00", "2024-10-25 11:00:00"],


("jaundice"): ["MJ", "DJ", "3.0", "5.0", "5.0", "4.0", "2.0", "3.0", "1.0", "2.0", "Jaundice Kramer staging"],


("medsgiven"): ["{Medications given during admission,Medications given during admission}"],

("referredfrom"): ["Name of Referring Facility"],

("referredfrom2"): ["Referred from?"],

("respsup"): ["Respiratory Support", "{Respiratory Support,Respiratory Support}"],

("signsdehydrations"): ["32.0", "26.0", "29.0", "35.0", "34.0", "30.0", "37.0", "33.0", "38.0", "40.0"],

("admward"): ["Was the baby admitted through Embassy or Admissions?"],

("anvdrlreport"): ["How did you receive the information about syphilis testing?"],

("bbadel"): ["Who delivered the baby?"],

("bbaloc"): ["Baby born where?"],

("bc1othr"): ["If other what?"],

("bc1r"): ["Initial blood culture result"],

("bc2r"): ["Result of final blood culture"],

("birthplace"): ["Place of birth"],

("testthispreg"): ["A"],

("delivinter"): ["0.0"],

("goodprog"): ["What progress did the baby make? "],

("highrisk"): ["Does this baby need a high risk sticker?"],

("hivpcrinfr"): ["HIV PCR Result for Infant"],

("hivtestreport"): ["Where did you get the HIV testing information from?"],

("imaging"): ["Imaging during admission"],

("lp"): ["Lumbar puncture performed?"],

("mataddrhadistrict"): ["If Harare, which District?"],

("matdischarge"): ["If the mother was admitted, is she now discharged?"],

("mathivpr"): ["Maternal HIV test result (postnatal)"],

("matrprr"): ["Maternal RPR Result"],

("medsdis"): ["Medications prescribed on discharge"],

("moretrans"): ["Platelets and packed cells", "If more than one kind then what kinds?", "Packed cells and platelets", "If more than one type then what type?"],

("motherdobyn"): ["Do you know the mother's date of birth?"],

("othprobs"): ["Other Problems"],

("partnertrsyph"): ["Was the partner treated?"],

("reviewcadre"): ["Cadre of Reviewer"],

("specrev"): ["Was the baby reviewed by a specialty?"],

("specrevtyp"): ["Which Specialities? (add all)", "Which Speciality?"],

("transtype"): ["What kind of transfusion?", "What type of transfusion?"]

}


# MWI
# Delete Not Allowed Values
values_to_delete_mwi = {
("contcausedeath"): ["March 18, 2022, 7:45 AM", "March 21, 2022, 6:55 AM", "March 21, 2022, 11:30 PM", "March 24, 2022, 8:20 AM", "March 25, 2022, 2:25 PM", "March 29, 2022, 9:35 AM", "March 26, 2022, 12:30 PM",
                     "March 26, 2022, 2:30 PM", "April 29, 2022, 9:25 AM", "March 28, 2022, 12:30 PM", "April 7, 2022, 10:00 AM", "March 31, 2022, 11:40 AM", "April 19, 2022, 12:40 AM", "April 23, 2022, 11:38 PM",
                     "April 12, 2022, 3:35 PM", "April 20, 2022, 10:46 PM", "April 26, 2022, 7:52 AM", "April 30, 2022, 7:45 AM", "April 28, 2022, 9:00 AM", "April 28, 2022, 11:50 AM", "April 28, 2022, 4:30 PM",
                     "April 29, 2022, 9:00 PM", "May 2, 2022, 12:50 PM", "April 30, 2022, 6:00 PM", "May 4, 2022, 9:55 AM", "May 12, 2022, 3:55 AM", "May 7, 2022, 12:15 PM", "May 14, 2022, 6:20 AM",
                     "May 15, 2022, 8:08 AM", "May 21, 2022, 5:00 PM", "May 25, 2022, 10:20 AM", "May 26, 2022, 3:45 AM", "May 28, 2022, 6:40 AM", "May 24, 2022, 1:02 PM", "March 12, 2022, 8:30 PM",
                     "March 13, 2022, 12:10 AM", "March 17, 2022, 11:45 PM"],


("hivtestresultdc"): ["March 22, 2022, 10:43 AM", "March 19, 2022, 11:19 AM", "March 23, 2022, 9:37 AM", "March 11, 2022, 4:23 AM", "April 4, 2022, 9:15 AM", "March 12, 2022, 6:06 AM", "April 6, 2022, 9:43 AM",
                      "March 19, 2022, 11:52 AM", "March 31, 2022, 9:18 AM", "March 19, 2022, 9:38 AM", "March 16, 2022, 9:18 AM", "March 18, 2022, 8:08 PM", "March 17, 2022, 12:58 PM", "March 23, 2022, 9:52 AM",
                      "March 29, 2022, 11:13 AM", "March 21, 2022, 6:47 AM", "March 21, 2022, 6:06 AM", "April 23, 2022, 7:46 AM", "March 24, 2022, 7:35 PM", "April 27, 2022, 9:54 AM", "April 19, 2022, 9:01 AM",
                      "March 25, 2022, 4:46 AM", "March 29, 2022, 10:02 AM", "March 25, 2022, 1:58 PM", "March 26, 2022, 3:53 PM", "March 28, 2022, 11:39 AM", "March 28, 2022, 6:29 PM", "March 28, 2022, 2:24 PM",
                      "April 7, 2022, 2:09 PM", "April 12, 2022, 6:31 AM", "March 31, 2022, 12:28 PM", "April 19, 2022, 1:20 AM", "May 17, 2022, 9:25 AM", "April 28, 2022, 9:59 AM", "May 15, 2022, 12:47 PM",
                      "April 12, 2022, 9:36 AM", "May 18, 2022, 11:55 AM", "April 18, 2022, 10:10 AM", "April 25, 2022, 10:00 AM", "April 23, 2022, 11:13 AM", "April 9, 2022, 8:11 AM", "April 24, 2022, 12:40 PM",
                      "April 23, 2022, 10:17 AM", "April 12, 2022, 6:53 AM", "May 5, 2022, 10:20 AM", "April 20, 2022, 4:25 AM", "April 25, 2022, 7:55 AM", "April 28, 2022, 10:44 AM", "April 28, 2022, 1:39 PM",
                      "May 8, 2022, 9:57 AM", "April 28, 2022, 12:31 PM", "May 4, 2022, 10:56 AM", "April 28, 2022, 9:09 PM", "April 29, 2022, 3:34 AM", "April 29, 2022, 1:05 PM", "April 29, 2022, 6:33 PM",
                      "May 3, 2022, 1:38 PM", "May 10, 2022, 4:36 AM", "May 22, 2022, 8:17 AM", "May 20, 2022, 8:45 AM", "May 7, 2022, 2:33 PM", "May 13, 2022, 10:38 AM", "May 14, 2022, 8:57 AM",
                      "May 15, 2022, 10:27 AM", "May 15, 2022, 10:34 AM", "June 16, 2022, 8:48 AM", "May 14, 2022, 7:16 AM", "May 22, 2022, 9:21 AM", "May 15, 2022, 7:40 AM", "May 16, 2022, 8:46 AM",
                      "June 7, 2022, 10:07 AM", "May 27, 2022, 12:05 PM", "May 21, 2022, 1:43 PM", "June 20, 2022, 12:23 PM", "May 24, 2022, 8:26 AM", "May 24, 2022, 12:36 PM", "May 26, 2022, 4:33 AM",
                      "May 26, 2022, 9:05 AM", "June 7, 2022, 10:15 AM", "May 24, 2022, 2:41 PM"],


("hivtestresultdc"): ["March 22, 2022, 10:43 AM", "March 19, 2022, 11:19 AM", "March 23, 2022, 9:37 AM", "March 11, 2022, 4:23 AM", "April 4, 2022, 9:15 AM", "March 12, 2022, 6:06 AM",
                      "April 6, 2022, 9:43 AM", "March 19, 2022, 11:52 AM", "March 31, 2022, 9:18 AM", "March 19, 2022, 9:38 AM", "March 16, 2022, 9:18 AM", "March 18, 2022, 8:08 PM",
                      "March 17, 2022, 12:58 PM", "March 23, 2022, 9:52 AM", "March 29, 2022, 11:13 AM", "March 21, 2022, 6:47 AM", "March 21, 2022, 6:06 AM", "April 23, 2022, 7:46 AM",
                      "March 24, 2022, 7:35 PM", "April 27, 2022, 9:54 AM", "April 19, 2022, 9:01 AM", "March 25, 2022, 4:46 AM", "March 29, 2022, 10:02 AM", "March 25, 2022, 1:58 PM", "March 26, 2022, 3:53 PM",
                      "March 28, 2022, 11:39 AM", "March 28, 2022, 6:29 PM", "March 28, 2022, 2:24 PM", "April 7, 2022, 2:09 PM", "April 12, 2022, 6:31 AM", "March 31, 2022, 12:28 PM", "April 19, 2022, 1:20 AM",
                      "May 17, 2022, 9:25 AM", "April 28, 2022, 9:59 AM", "May 15, 2022, 12:47 PM", "April 12, 2022, 9:36 AM", "May 18, 2022, 11:55 AM", "April 18, 2022, 10:10 AM", "April 25, 2022, 10:00 AM",
                      "April 23, 2022, 11:13 AM", "April 9, 2022, 8:11 AM", "April 24, 2022, 12:40 PM", "April 23, 2022, 10:17 AM", "April 12, 2022, 6:53 AM", "May 5, 2022, 10:20 AM", "April 20, 2022, 4:25 AM",
                      "April 25, 2022, 7:55 AM", "April 28, 2022, 10:44 AM", "April 28, 2022, 1:39 PM", "May 8, 2022, 9:57 AM", "April 28, 2022, 12:31 PM", "May 4, 2022, 10:56 AM", "April 28, 2022, 9:09 PM",
                      "April 29, 2022, 3:34 AM", "April 29, 2022, 1:05 PM", "April 29, 2022, 6:33 PM", "May 3, 2022, 1:38 PM", "May 10, 2022, 4:36 AM", "May 22, 2022, 8:17 AM", "May 20, 2022, 8:45 AM",
                      "May 7, 2022, 2:33 PM", "May 13, 2022, 10:38 AM", "May 14, 2022, 8:57 AM", "May 15, 2022, 10:27 AM", "May 15, 2022, 10:34 AM", "June 16, 2022, 8:48 AM", "May 14, 2022, 7:16 AM",
                      "May 22, 2022, 9:21 AM", "May 15, 2022, 7:40 AM", "May 16, 2022, 8:46 AM", "June 7, 2022, 10:07 AM", "May 27, 2022, 12:05 PM", "May 21, 2022, 1:43 PM", "June 20, 2022, 12:23 PM",
                      "May 24, 2022, 8:26 AM", "May 24, 2022, 12:36 PM", "May 26, 2022, 4:33 AM", "May 26, 2022, 9:05 AM", "June 7, 2022, 10:15 AM", "May 24, 2022, 2:41 PM"],

("respsup"): ["40.0", "48.0", "46.0", "51.0", "53.0", "50.0", "52.0", "65.0", "38.0", "42.0", "20.0", "24.0", "98.0", "44.0", "30.0", "28.0", "57.0", "49.0", "45.0", "32.0", "56.0", "47.0", "60.0", "36.0", "43.0",
              "58.0", "54.0", "4.0", "64.0", "12.0", "69.0", "39.0", "41.0", "21.0"],


("otherprobs"): ["2022-03-29T10:45:16.139Z", "2022-03-24T11:21:05.456Z", "2022-03-31T09:39:49.521Z", "2022-04-13T09:45:49.513Z", "2022-03-24T11:54:47.029Z", "2022-04-01T09:26:04.134Z", "2022-03-25T09:45:19.971Z",
                 "2022-03-31T09:57:34.948Z", "2022-04-07T11:16:31.754Z", "2022-04-25T07:49:23.173Z", "2022-05-05T09:57:16.553Z", "2022-04-28T09:03:35.718Z", "2022-04-07T11:42:10.626Z", "2022-05-26T09:31:00.919Z",
                 "2022-05-05T10:03:36.992Z", "2022-05-16T12:51:33.519Z", "2022-04-14T09:42:07.117Z", "2022-05-26T12:04:15.038Z", "2022-04-25T10:15:04.993Z", "2022-05-05T10:04:39.048Z", "2022-04-28T11:17:18.006Z",
                 "2022-04-25T10:19:58.107Z", "2022-04-23T11:17:53.618Z", "2022-05-12T10:23:23.650Z", "2022-05-12T10:00:09.455Z", "2022-05-05T11:02:14.071Z", "2022-05-26T08:21:08.877Z", "2022-05-21T08:47:33.462Z",
                 "2022-05-16T10:41:40.396Z", "2022-05-20T09:01:01.379Z", "2022-05-19T10:30:50.709Z", "2022-05-19T10:36:23.850Z", "2022-06-23T08:55:49.181Z", "2022-06-15T09:22:55.734Z", "2022-05-17T08:48:06.510Z",
                 "2022-06-16T10:10:51.821Z", "2022-06-02T12:07:27.955Z", "2022-06-23T12:27:52.105Z", "2022-05-26T10:10:49.493Z", "2022-06-08T10:18:20.742Z"],


("hcwsigdis"): ["29.0", "33.0", "32.0", "34.0", "35.0", "28.0", "27.0", "36.0", "25.0", "30.0", "37.0", "31.0"],

("thermcare"): ["AM", "{AM,VitK,TEO}", "{BP,GENT,AM}", "{CEF,AM,OTH}", "{BP,GENT,AM,VitK}", "{BP,GENT}", "{GENT,BP,AM}", "CEF", "NONE", "{BP,GENT,AM,OTH}", "{BP,GENT,VitK,TEO}", "{NVP,AM}", "{BP,GENT,CEF,AM}",
                "{CHLX,VitK,TEO}", "{BP,GENT,CEF}", "{AM,NVP,CEF}", "{BP,GENT,AM,NVP}", "{CEF,AM,BP,GENT}", "{BP,GENT,AM,CEF,OTH}", "{BP,GENT,MET}", "{CEF,AM,MET}", "{BP,GENT,AM,VitK,TEO}", "{CEF,BP,GENT,AM}",
                "{CEF,AM,FLU,Mero}", "{BP,AM,GENT,VitK,TEO}", "{BP,GENT,AM,CEF}", "{BP,GENT,PCM}", "{VitK,TEO}", "{GENT,BP,VitK,TEO}", "{BP,GENT,CEF,VitK,TEO}", "{BP,GENT,NVP}", "{GENT,AM,BP,VitK,TEO}",
                "{BP,GENT,CHLX,AM,VitK,TEO}"],

("neotreeoutcome"): ["FALSE"],

("medsgiven"): ["EMMP", "ELTE", "MPGR", "PRDI", "PECH", "DAMU", "PRMA", "JEJE", "BRNA", "VEKU", "NAKH", "NAME", "WAKU", "BLPH", "FEZA"]

}



# VALUE ALIGNMENT - Categorical Validation
# ---------------------------------------------

# ZIM
value_mappings_zim = {
    "admreason": {
        "GSch": ["GSchis"],
        "Conv": ["CONV"],
        "Fev": ["FEV"]
    },
    "ageestimate": {
        "INF": ["inf"]
    },
    "feedsadm": {
        "NFY": ["Has not had a breast feed yet"]
    },
    "fontanelle": {
        "Flat": ["Full"]
    },
    "furthertriage": {
        "Stable": ["Stable "]
    },
    "puinfant": {
        "Unk": ["Not sure"]
    },
    "stoolsinfant": {
        "Unk": ["Not sure"]
    },
    "birthplace": {
        "SMCH": ["HCH"]
    },
    "matadmplace": {
        "ICU": [" ICU"]
    },
    "delivinter": {
        "NONE": ["None"]
    },
    "none": {
        "True": ["TRUE"]
    },
     "none2": {
        "True": ["TRUE"]
    },
    "passedmec": {
        "Not sure": ["Unk"]
    }
}

# Malawi Alignment:
# Malawi Alignment:

value_mappings_mwi = {
    "activity": {
        "Convulsions": ["Conv"]
    },
    "admreason": {
        "FEV": ["Fev"]
    },
    "ageestimate": {
        "INF": ["inf"]
    },
    "dangersigns": {
        "Convulsions": ["Conv"]
    },
    "fontanelle": {
        "Full": ["Flat"]
    },
    "furthertriage": {
        "Stable": ["Stable "]
    },
    "passedmec": {
        "Unk": ["Not sure"]
    },
    "srneuroother": {
        "Fl": ["FL"],
        "Convulsions": ["Conv"]
    },
    "ttv": {
        "TTV3": ["TTV3 "],
        "TTV4+": ["TTV4", "TTV5", "TTV5orM"]
    },
    "fefo": {
        "FeFo4+": ["FeFo4orM"]
    },
     "colour": {
        "Yellow": ["Yell"]
    },
     "tribe": {
        "N": [" N"]
    },
    "ipt": {
        "IPT5+": ["IPT5orM"]
    }
}


In [11]:
# ─── Build cfg via globals() ───

# 1. Normalize to a suffix
suffix = "mwi" if country.lower() in ("malawi", "mwi") else "zim"

# 2. Define how your short keys map to the base variable names
keys_map = {
    "num":             "numeric_features",
    "bool":            "bool_features",
    "cat":             "cat_features",
    "obj":             "obj_features",
    "dt":              "dt_features",
    "weight_cols":     "weight_cols",
    "skip_columns":    "skip_columns",
    "values_to_delete":"values_to_delete",
    "value_mappings":  "value_mappings",
    "dict_filepath":   "dict_filepath",
}

# 3. Grab each country‐specific global into cfg
cfg = {
    key: globals()[f"{base}_{suffix}"]
    for key, base in keys_map.items()
}

# 4. Sheet name - Numeric Validation
sheet_name = f"numeric_{suffix}"


# 6. Sanity check (optional)
print(f"→ Running for {suffix.upper()}")
for k, v in cfg.items():
    print(f"   • {k:<14}: {type(v).__name__}, len={len(v)}")
print("   • sheet_name:", sheet_name)


→ Running for ZIM
   • num           : list, len=126
   • bool          : list, len=84
   • cat           : list, len=227
   • obj           : list, len=83
   • dt            : list, len=47
   • weight_cols   : list, len=4
   • skip_columns  : list, len=8
   • values_to_delete: dict, len=39
   • value_mappings: dict, len=13
   • dict_filepath : str, len=151
   • sheet_name: numeric_zim


### Functions

#### Universal Functions

In [12]:
# FUNCTION - Get DataFrame Status Report
# Summarizes column dtypes, duplicate rows/columns, and fully-missing columns. Optional report path.
# ---------------------------------------------------------------------------------------------------
def get_report(
    df: pd.DataFrame,
    txt_file_path: Optional[str] = None
) -> None:
    """
    Steps:
    1. Compute summary: dtypes, duplicate rows, duplicate columns, fully-missing columns.
    2. If `txt_file_path` is provided, attempt to write the report; skip on error.
    3. Otherwise, do nothing (report is optional).
    """
    # Step 1: Build report lines
    lines: list[str] = []
    lines.append('Summary of Column Data Types')
    lines.append('=============================')
    dtype_counts = df.dtypes.apply(lambda x: str(x)).value_counts()  # Count columns by dtype
    for dtype, count in dtype_counts.items():
        lines.append(f'{dtype}: {count} columns')  # Add each dtype count

    # Duplicate rows
    duplicate_count = df.duplicated().sum()
    lines.append(f"\nDuplicate Rows: {duplicate_count}")
    lines.append('===================')

    # Duplicate columns
    duplicate_cols = df.columns[df.columns.duplicated()]
    num_duplicate_cols = len(duplicate_cols)
    lines.append(f"\nDuplicate Columns: {num_duplicate_cols}")
    lines.append('===================\n')
    if num_duplicate_cols > 0:
        lines.append("Columns with duplicates:")
        for col in duplicate_cols:
            lines.append(f" - {col}")  # List each duplicated column

    # Columns with 100% missing values
    missing_100 = df.isna().all()
    missing_100_count = missing_100.sum()
    lines.append(f"\nColumns with 100% Missing Values: {missing_100_count}")
    if missing_100_count > 0:
        lines.append("Columns with no values:")
        for col in df.columns[missing_100]:
            lines.append(f" - {col}")  # List fully-missing columns

    lines.append('===================')

    # Step 2: Optional writing of the report
    if txt_file_path:
        try:
            with open(txt_file_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(lines))  # Write report content
        except Exception:
            pass  # Skip report if path invalid or write fails


In [13]:
# FUNCTION - Inspect Random Column
# Samples and prints key metrics for a randomly selected DataFrame column.
# ------------------------------------------------------------------------------
def inspect_random_column(df: pd.DataFrame) -> None:
    """
    Randomly selects one column from the DataFrame and prints:
      1. Column name and data type.
      2. Total number of rows and percentage missing.
      3. Number and list of unique values.
      4. A sample of up to 10 non-missing values.
    """
    # 1. Select a random column
    selected_column_name = random.choice(df.columns)  # Choose one column name at random
    col_series = df[selected_column_name]             # Extract the column as a Series

    # 2. Compute basic metrics
    total_entries = len(col_series)                    # Total entries in the column
    total_missing = col_series.isna().sum()            # Count of missing (NaN) entries

    # 3. Print summary information
    print(f'Column: {selected_column_name}')          # Display the chosen column name
    print(f'Column dtype: {col_series.dtype}')        # Display the data type of the column
    print(f'\nTotal rows: {total_entries}')          # Show total number of rows
    print(f'% Missing: {total_missing} ({(total_missing / total_entries) * 100:.3f}% missing)')  # Show missing percentage
    print('Number of Unique Values:', col_series.nunique())  # Display count of unique values
    print('\nAll Unique Values in Column:\n', col_series.unique())  # List all unique values

    # 4. Display a sample of non-missing values
    non_missing_series = col_series.dropna()           # Filter out missing values
    if not non_missing_series.empty:
        sample_size = min(10, len(non_missing_series)) # Limit sample size to up to 10
        print('\nSample Values:')                     # Header for sample values
        print(
            non_missing_series
            .sample(n=sample_size, random_state=42)
            .to_string(index=False)
        )  # Print sampled values without indices
    else:
        print('\n(No non-missing values to sample)')     # Message if no values to sample


In [14]:
# FUNCTION - Remove Suffixes
# Simplifies column names by stripping any suffix after the first period ('.').
# ------------------------------------------------------------------------------
def remove_suffixes(
    df: pd.DataFrame
) -> pd.DataFrame:
    """
    1. Rename each column by keeping only the text before the first period.
    2. Return the DataFrame with simplified column names.

    Parameters:
        df (pd.DataFrame): Input DataFrame with potentially suffixed column names.

    Returns:
        pd.DataFrame: DataFrame with column names simplified.
    """
    # Step 1: Strip suffix from columns
    df = df.rename(columns=lambda x: x.split('.')[0])  # Split on first period and use the prefix as new column name

    # Step 2: Return updated DataFrame
    return df  # DataFrame now has no suffixes on column names

In [15]:
# FUNCTION - Log in Notebook Runtime
start_time = time.time()
def format_time(seconds):
    minutes = int(seconds // 60)
    remaining_seconds = seconds % 60
    return f"{minutes} minutes and {remaining_seconds:.2f} seconds"

#### 1st Stage Cleaning Functions

In [16]:
# FUNCTION - Clean Column Headers
# Remove all whitespace and normalize dot placement, then lowercase every column name.
# ------------------------------------------------------------------------------
def clean_columns(
    df: pd.DataFrame
) -> pd.DataFrame:
    """
    Steps:
    1. Define a helper `clean_name` to process individual column names.
    2. Apply `clean_name` to each column in the DataFrame.
    3. Return the DataFrame with cleaned headers.
    """
    # Step 1: Helper to clean a single column name
    def clean_name(name):
        name = name.strip()                      # Remove leading/trailing whitespace
        name = re.sub(r"\s*\.\s*", ".", name)  # Normalize spaces around dots
        name = re.sub(r"\s+", " ", name)        # Collapse multiple spaces to single
        name = name.replace(' ', '')               # Remove all remaining spaces
        name = name.lower()                        # Convert to lowercase
        return name

    # Step 2: Apply cleaning function to each column name
    df.columns = [clean_name(col) for col in df.columns]  # Update DataFrame headers

    # Step 3: Return the modified DataFrame
    return df  # DataFrame now has standardized, lowercase column names


In [17]:
# FUNCTION - Remove Frame Shift (with optional reporting)
# Cleans rows where 'UID' lacks a dash, saves dirty data, and optionally writes a report.
# ---------------------------------------------------------------------------------------
def remove_frame_shift(
    df: pd.DataFrame,
    report_filepath: str = None,
    pkl_filepath: str = None,
    csv_filepath: str = None
) -> pd.DataFrame:
    """
    Steps:
    1. Identify rows where 'UID' does not contain '-'.
    2. Split into dirty_df (no dash) and clean_df (with dash).
    3. Optionally save dirty_df to pickle and CSV if file paths are valid.
    4. Optionally write a report of affected rows to report_filepath.
    5. Return clean_df for further use.
    """
    # Step 1: Detect frame shift issues (UID missing dash)
    has_dash = df['UID'].astype(str).str.contains('-')  # Boolean mask

    # Step 2: Create dirty and clean subsets
    dirty_df = df[~has_dash].copy()  # Rows missing dash
    clean_df = df[has_dash].copy()   # Rows with dash

    # Step 3: Optionally save dirty data
    if pkl_filepath:
        try:
            dirty_df.to_pickle(pkl_filepath)
        except Exception:
            pass  # Skip if path invalid or write fails
    if csv_filepath:
        try:
            dirty_df.to_csv(csv_filepath, index=False)
        except Exception:
            pass  # Skip if path invalid or write fails

    # Step 4: Optionally generate report
    if report_filepath:
        try:
            total_affected = len(dirty_df)
            affected_uids = sorted(dirty_df['UID'].astype(str).unique())
            report_lines = [
                f"Total number of affected rows: {total_affected}",
                "Affected UIDs:"
            ]
            report_lines.extend(affected_uids)
            with open(report_filepath, 'w') as report_file:
                report_file.write("\n".join(report_lines))  # Overwrite existing report
        except Exception:
            pass  # Skip reporting on error

    # Step 5: Return the cleaned DataFrame
    return clean_df  # DataFrame without frame-shifted rows


In [18]:
# FUNCTION - Merge Duplicate Columns
# Consolidates columns with identical names by keeping the one with the most non-null values
# and filling its missing entries from the less complete duplicates.
# ------------------------------------------------------------------------------
def merge_duplicate_columns(df: pd.DataFrame) -> pd.DataFrame:
    """
    Steps:
    1. Iterate through each unique column name in the DataFrame.
    2. For names appearing once, keep the series unchanged.
    3. For duplicates:
       a. Determine which series has the most non-null values.
       b. Use that as the primary, filling its NaNs with values from the others.
       c. Log the merge action (optional reporting not applicable here).
    4. Reconstruct and return a new DataFrame with one column per unique name.
    """
    merged = {}  # Container for the merged Series objects

    # Step 1: Loop over each unique column name
    for col in df.columns.unique():
        # Gather all Series objects with this column name
        dup_series = [df.iloc[:, i] for i, c in enumerate(df.columns) if c == col]

        # Step 2: If only one series exists, keep it as-is
        if len(dup_series) == 1:
            merged[col] = dup_series[0]

        # Step 3: Merge multiple series
        else:
            # 3a. Count non-null entries for each duplicate
            counts = [s.notna().sum() for s in dup_series]
            idx_fuller = counts.index(max(counts))         # Index of the most complete series
            fuller = dup_series[idx_fuller].copy()         # Primary series to keep

            # 3b. Fill NaNs in primary with values from other duplicates
            for i, s in enumerate(dup_series):
                if i != idx_fuller:
                    fuller = fuller.combine_first(s)         # Fill missing spots

            merged[col] = fuller                           # Store merged result
            # Log action (no external report file generated)
            logging.info(
                f"Merged duplicates for column '{col}'; kept column with {max(counts)} non-null values."
            )  # This log is optional and won’t raise errors if logging isn’t configured

    # Step 4: Reconstruct DataFrame from merged series and preserve original index
    new_df = pd.DataFrame(merged)
    new_df.index = df.index
    return new_df  # Return the cleaned, de-duplicated DataFrame


In [19]:
# FUNCTION - Load and Aggregate the Data Dictionary
# Reads an Excel file (multiple sheets) and builds a mapping:
# key (lowercase) -> {values, labels, value_labels, datatype}
# ------------------------------------------------------------------------------
def load_and_aggregate_dictionary(
    dict_filepath: str
) -> Dict[str, Any]:
    """
    Steps:
    1. Load the Excel workbook from path.
    2. For each sheet:
       a. Parse into DataFrame if possible.
       b. Verify required columns exist; otherwise skip.
       c. Standardize column names and normalize text fields.
       d. Iterate rows to populate `aggregated_dict`:
          - Initialize new key entries or verify datatype consistency.
          - Collect unique values and labels.
          - Map each value_label to its raw value.
    3. Return the aggregated dictionary mapping.
    """
    # Step 1: Load workbook
    try:
        xlsx = pd.ExcelFile(dict_filepath)  # Open the Excel file
    except Exception as e:
        logging.error(f"Failed to load dictionary file: {e}")  # Log on error
        raise  # Propagate exception

    aggregated_dict: Dict[str, Any] = {}  # Container for final mappings

    # Step 2: Process each sheet in the workbook
    for sheet in xlsx.sheet_names:
        try:
            df_sheet = xlsx.parse(sheet)  # Parse sheet into DataFrame
        except Exception as e:
            logging.error(f"Failed to parse sheet '{sheet}': {e}")  # Log parse failure
            continue  # Skip to next sheet

        # 2a. Verify required columns are present
        expected_cols = ['Key', 'Label', 'Data Type', 'Value', 'Value Label']
        df_sheet.columns = [col.strip() for col in df_sheet.columns]  # Trim headers
        if not all(col in df_sheet.columns for col in expected_cols):
            logging.warning(f"Sheet '{sheet}' missing expected columns. Skipping.")  # Warn and skip
            continue

        # 2b. Rename and normalize column names
        df_sheet = df_sheet.rename(columns={
            'Key': 'key',
            'Label': 'label',
            'Data Type': 'datatype',
            'Value': 'value',
            'Value Label': 'value_label'
        })  # Standardize names
        df_sheet['key'] = df_sheet['key'].astype(str).str.strip().str.lower()       # Clean key
        df_sheet['value'] = df_sheet['value'].astype(str).str.strip()                # Clean raw value
        df_sheet['label'] = df_sheet['label'].astype(str).str.strip()                # Clean label
        df_sheet['value_label'] = df_sheet['value_label'].astype(str).str.strip()    # Clean value_label
        df_sheet['datatype'] = df_sheet['datatype'].astype(str).str.strip().str.lower()  # Clean datatype

        # 2c. Populate aggregated_dict row by row
        for _, row in df_sheet.iterrows():
            key = row['key']  # Normalized key
            if pd.isna(key) or key == '':
                continue  # Skip empty keys

            # Initialize or verify datatype consistency
            if key not in aggregated_dict:
                aggregated_dict[key] = {
                    'values': set(),
                    'labels': set(),
                    'value_labels': {},
                    'datatype': row['datatype']
                }
            else:
                if aggregated_dict[key]['datatype'] != row['datatype']:
                    logging.warning(
                        f"Datatype inconsistency for key '{key}' in sheet '{sheet}'. Using first encountered datatype."
                    )  # Warn but keep original datatype

            # Add raw value if valid
            if row['value'] not in [None, '', 'nan']:
                aggregated_dict[key]['values'].add(row['value'])
            # Add label if valid
            if row['label'] not in [None, '', 'nan']:
                aggregated_dict[key]['labels'].add(row['label'])
            # Map value_label to raw value if valid
            if row['value_label'] not in [None, '', 'nan']:
                aggregated_dict[key]['value_labels'][row['value_label']] = row['value']

    logging.info("Completed aggregating dictionary from Excel.")  # Log completion
    return aggregated_dict  # Return the final mapping


In [20]:
# FUNCTION - Value Replacements
# Cleans columns using a base-name dictionary mapping and optionally writes a detailed report.
# ------------------------------------------------------------------------------
def clean_columns_using_base_name(
    df: pd.DataFrame,
    dict_mapping: Dict[str, Any],
    column_cleaning_report_filepath: Optional[str] = None,
    max_warnings: int = 10
) -> pd.DataFrame:
    """
    For each column, remove known suffixes and:
      - Replace values via `value_labels` mapping.
      - Delete values in `labels` set (set to NaN).
    Optionally writes a report if `column_cleaning_report_filepath` is provided.
    """
    # 1. Helper: derive base name by stripping suffixes
    def get_base_name(col: str) -> str:
        if col.endswith('.value'):
            return col[:-6]            # Remove '.value' suffix
        if col.endswith('.valuedischarge'):
            return col[:-14]           # Remove '.valuedischarge' suffix
        return col                     # No suffix to remove

    # 2. Initialize reports & counters
    columns_with_no_dict: list[str] = []  # Columns missing mapping
    replaced_values_info: DefaultDict[str, list[tuple[Any, Any]]] = defaultdict(list)
    not_found_values_info: DefaultDict[str, list[Any]] = defaultdict(list)
    deleted_labels_count: DefaultDict[str, int] = defaultdict(int)
    columns_with_extra_missing: list[str] = []

    # 3. Identify columns whose base name exists in the mapping
    columns_to_clean = [col for col in df.columns if get_base_name(col) in dict_mapping]

    # 4. Process each column for replacements and deletions
    for col in columns_to_clean:
        base = get_base_name(col)                         # Get lookup key
        mapping = dict_mapping.get(base)                  # Fetch mapping
        if not mapping:
            columns_with_no_dict.append(col)              # Record missing mapping
            continue                                      # Skip cleaning

        allowed = mapping.get('values', set())            # Allowed raw values
        val_labels = mapping.get('value_labels', {})      # Description->raw mapping
        labels_set = mapping.get('labels', set())         # Labels to delete

        unique_vals = df[col].dropna().unique()           # Unique non-null entries
        warning_count = 0

        for val in unique_vals:
            s = str(val).strip()                          # Normalize value string

            # 4a. Skip if already allowed or valid combination
            if s in allowed:
                continue
            if s.startswith('{') and s.endswith('}'):
                parts = [x.strip() for x in s[1:-1].split(',')]
                if all(p in allowed for p in parts):
                    continue                              # Valid multi-value combo

            # 4b. Replacement via label->value mapping
            if s in val_labels:
                new = val_labels[s]
                df.loc[df[col] == val, col] = new         # Replace with raw value
                replaced_values_info[col].append((val, new))

            # 4c. Deletion for values in labels set
            elif s in labels_set:
                df.loc[df[col] == val, col] = np.nan       # Delete (set to NaN)
                deleted_labels_count[col] += 1

            # 4d. Unknown values
            else:
                if warning_count < max_warnings:
                    warning_count += 1                     # Count warnings up to max
                not_found_values_info[col].append(val)

        # 4e. Track columns with extra unknowns beyond max_warnings
        if warning_count > max_warnings:
            columns_with_extra_missing.append(col)

    total_deleted = sum(deleted_labels_count.values())   # Total deletions across all columns

    # 5. Optional: write cleaning report if filepath is provided
    if column_cleaning_report_filepath:
        try:
            with open(column_cleaning_report_filepath, 'w', encoding='utf-8') as f:
                # Section 1: Missing dictionary mappings
                f.write("Section 1 - Columns with No Dictionary Mappings\n")
                if columns_with_no_dict:
                    for c in columns_with_no_dict:
                        f.write(f"  - {c}\n")
                else:
                    f.write("  (All columns had a mapping.)\n")

                # Section 2: Replacements made
                f.write("\nSection 2 - Replaced Values\n")
                any_rep = False
                for c, reps in replaced_values_info.items():
                    if reps:
                        any_rep = True
                        f.write(f"\n  Column '{c}':\n")
                        for old, new in reps:
                            f.write(f"    - {old} -> {new}\n")
                if not any_rep:
                    f.write("  (No replacements made.)\n")

                # Section 3: Deletions performed
                f.write("\nSection 3 - Deleted Labels\n")
                if deleted_labels_count:
                    for c, cnt in deleted_labels_count.items():
                        f.write(f"  - {c}: {cnt} deletions\n")
                    f.write(f"\nTotal deletions: {total_deleted}\n")
                else:
                    f.write("  (No deletions made.)\n")

                # Section 4: Unknown values encountered
                f.write("\nSection 4 - Unknown Values\n")
                if not_found_values_info:
                    for c, vals in not_found_values_info.items():
                        if vals:
                            f.write(f"  - {c}: {len(vals)} unknowns\n")
                else:
                    f.write("  (No unknown values encountered.)\n")

                f.write("\nEnd of Report\n")
        except Exception:
            pass   # Skip report on any write error

    return df  # Return cleaned DataFrame with replacements and deletions applied


In [21]:
# FUNCTION - Forward Fill Paired Columns & Drop Label Columns
# Forward-fills missing .value/.valuedischarge entries from paired label columns when numeric or datetime,
# then drops the label columns. Report is optional.
# ------------------------------------------------------------------------------
def forward_fill_numeric_datetime(
    df: pd.DataFrame,
    forward_fill_report_path: Optional[str] = None
) -> pd.DataFrame:
    """
    Steps:
    1. Identify pairs of value and label columns based on suffix patterns.
    2. For each pair:
       a. Forward-fill missing numeric values from label column.
       b. Forward-fill remaining missing datetime values from label column.
       c. Record the number of filled entries and drop the label column.
    3. Optionally write a report if `forward_fill_report_path` is provided.
    4. Return the DataFrame with filled values and labels removed.
    """
    # Step 1: Build mapping of value->label columns
    pairs: dict[str, str] = {}
    for col in df.columns:
        if col.endswith('.value') and col[:-6] + '.label' in df.columns:
            pairs[col] = col[:-6] + '.label'            # Pair .value with .label
        elif col.endswith('.valuedischarge') and col[:-14] + '.labeldischarge' in df.columns:
            pairs[col] = col[:-14] + '.labeldischarge'  # Pair .valuedischarge with .labeldischarge

    # Prepare report container
    fill_report: dict[str, int] = {}

    # Step 2: Process each value-label pair
    for value_col, label_col in pairs.items():
        # 2a. Mask rows where value is missing and label is present
        mask = df[value_col].isna() & df[label_col].notna()

        # 2b. Numeric forward-fill attempt
        numeric_converted = pd.to_numeric(df.loc[mask, label_col], errors='coerce')  # Convert labels to numeric
        numeric_mask = numeric_converted.notna()  # Identify successful numeric conversions
        df.loc[mask, value_col] = df.loc[mask, value_col].fillna(numeric_converted)  # Fill numeric

        # 2c. Datetime forward-fill for remaining
        mask_remaining = df[value_col].isna() & df[label_col].notna()  # Updated mask
        datetime_converted = pd.to_datetime(df.loc[mask_remaining, label_col], errors='coerce')  # Convert labels to datetime
        datetime_mask = datetime_converted.notna()  # Successful datetime conversions
        df.loc[mask_remaining, value_col] = df.loc[mask_remaining, value_col].fillna(datetime_converted)  # Fill datetime

        # 2d. Record total fills and drop the label column
        fill_count = int(numeric_mask.sum() + datetime_mask.sum())  # Sum numeric + datetime fills
        fill_report[value_col] = fill_count
        df.drop(columns=[label_col], inplace=True)  # Remove label column after filling

    # Step 3: Optional report writing
    if forward_fill_report_path:
        try:
            with open(forward_fill_report_path, 'w', encoding='utf-8') as f:
                total_cols = sum(1 for cnt in fill_report.values() if cnt > 0)  # Columns with any fills
                f.write("Forward Fill Report\n")
                f.write("===================\n\n")
                f.write(f"Total columns forward filled: {total_cols}\n\n")
                f.write("Details for columns with fills:\n")
                for col, count in fill_report.items():
                    if count > 0:
                        f.write(f"  - {col}: {count} values filled\n")
        except Exception:
            pass  # Skip reporting if path is invalid or write fails

    # Step 4: Return the updated DataFrame
    return df  # Values filled and label columns removed


In [22]:
# # FUNCTION - Drop Unwanted Columns
# # Drops label columns and selects the best candidate per prefix group; optional report generation.
# # -------------------------------------------------------------------------------------------------
def drop_unwanted_columns(
    df: pd.DataFrame,
    drop_report_path: Optional[str] = None
) -> pd.DataFrame:
    """
    Drop unwanted columns from the DataFrame as follows:

    1. Drop all columns with "label" in their name.
    2. For the remaining columns, group them by their prefix (the part before the first dot,
       or the whole name if no dot exists).
    3. For each group:
       - If the group has columns with different suffixes (e.g. "uid.value" and "uid.valuedischarge"),
         then keep the column ending with ".value" and drop the others.
       - If the group also includes a column with the bare prefix (e.g. "age" alongside "age.value"),
         compare the bare column and the ".value" column by counting non-null values. Keep the one with
         more non-null entries; if they are equal, keep the bare column.
       - If neither a bare column nor a ".value" column exists, keep the first column in the group.
    4. If `drop_report_path` is provided, write a report showing, for each prefix group,
       the kept column and the dropped column(s).
    5. Return a DataFrame that contains only the kept columns.
    """
    # Step 1: Drop all columns with "label" in their name.
    filtered_cols = [col for col in df.columns if "label" not in col]
    df = df[filtered_cols]

    # Step 2: Group columns by prefix
    groups: dict[str, list[str]] = {}
    for col in df.columns:
        prefix = col.split('.')[0] if '.' in col else col
        groups.setdefault(prefix, []).append(col)

    decisions: dict[str, tuple[str, list[str]]] = {}

    # Step 3: Decide which column to keep per group
    for prefix, cols in groups.items():
        candidate_bare  = prefix if prefix in cols else None
        candidate_value = f"{prefix}.value" if f"{prefix}.value" in cols else None

        if candidate_bare and candidate_value:
            # compare non-null counts
            count_bare  = df[candidate_bare].count()
            count_value = df[candidate_value].count()
            chosen = candidate_bare if count_bare >= count_value else candidate_value
        elif candidate_value:
            chosen = candidate_value
        elif candidate_bare:
            chosen = candidate_bare
        else:
            chosen = cols[0]

        dropped = [c for c in cols if c != chosen]
        decisions[prefix] = (chosen, dropped)

    # Step 4: Optional report writing
    if drop_report_path:
        report_lines = ["Drop Columns Report", "==================="]
        for prefix, (kept, dropped) in decisions.items():
            if dropped:
                report_lines.append(f"Prefix '{prefix}': Kept column: {kept}")
                report_lines.append(f"   Dropped columns: {', '.join(dropped)}")
            else:
                report_lines.append(f"Prefix '{prefix}': Only column kept: {kept}")
        report_text = "\n".join(report_lines)
        try:
            with open(drop_report_path, 'w', encoding='utf-8') as f:
                f.write(report_text)
        except Exception:
            pass  # silently skip any write errors

    # Step 5: Build and return final DataFrame
    kept_columns = [decisions[p][0] for p in decisions]
    final_cols = [col for col in df.columns if col in kept_columns]
    return df[final_cols]


In [23]:
# # FUNCTION - Convert Columns to Correct Data Types (with optional report)
# # Attempts dictionary-based conversion, falls back to inference, and optionally generates a report.
# # ------------------------------------------------------------------------------
def convert_columns_and_create_report(
    df: pd.DataFrame,
    dict_mapping: dict[str, dict[str, any]],
    sample_size: int = 10000,
    random_state: int = 24,
    report_path: Optional[str] = 'datatype_conversion_report.txt'
) -> pd.DataFrame:
    # Helper: get base name - remove suffixes from df to lookup in dictionary
    def get_base_name(col: str) -> str:
        if col.endswith('.value'):
            return col[:-6]
        elif col.endswith('.valuedischarge'):
            return col[:-14]
        return col

    # Helper: get a sample of non-missing, standardized values.
    def get_sample(series: pd.Series) -> pd.Series:
        non_missing = series[series.notnull()]
        if non_missing.empty:
            return pd.Series([], dtype=str)
        sample = non_missing.sample(n=min(sample_size, len(non_missing)), random_state=random_state)
        return sample.astype(str).str.strip()

    # Valid boolean tokens.
    bool_valids = {"true", "false", "yes", "no", "y", "n"}
    bool_true = {"true", "yes", "y"}

    # Prepare report dictionary.
    conv_report = {
        'converted_boolean': [],
        'should_be_boolean': {},
        'converted_numeric_int': [],
        'converted_numeric_float': [],
        'should_be_numeric': {},
        'converted_datetime': [],
        'should_be_datetime': {},
        'converted_categorical': [],
        'should_be_categorical': {},
        'converted_object': [],
        'columns_not_in_dict': [],
        'should_be_inferred': {}
    }

    # Process only (.value/.valuedischarge) columns.
    process_columns = [col for col in df.columns if not (col.endswith('.label') or col.endswith('.labeldischarge'))]

    # Helper: infer conversion (using a sample of non-missing values).
    def infer_conversion(series: pd.Series) -> (str, pd.Series):
        s = series.astype(str).str.strip()

        # Try Boolean Conversion
        if s.str.lower().isin(bool_valids).all():
            conv = s.apply(lambda x: True if x.lower() in bool_true else False)
            return 'boolean', conv

        # Try Numeric Conversion
        numeric_sample = pd.to_numeric(s, errors='coerce')
        if numeric_sample.notnull().all():
            new_series = pd.to_numeric(s, errors='coerce')
            non_null = new_series.dropna()
            if (not non_null.empty and non_null.apply(lambda x: np.isfinite(x) and np.isclose(x, round(x))).all()):
                return 'numeric_int', new_series.astype('Int64')
            else:
                return 'numeric_float', new_series

        # Try Datetime Conversion
        dt_sample = pd.to_datetime(s, errors='coerce', infer_datetime_format=True)
        if dt_sample.notnull().all():
            return 'datetime', pd.to_datetime(s, errors='coerce', infer_datetime_format=True)

        # Try Categorical if few unique values.
        if s.nunique() / len(s) <= 0.10:
            return 'categorical', s.astype('category')

        # Otherwise consider as Object.
        return 'object', s

    # Loop through columns.
    for col in process_columns:
        base = get_base_name(col)
        lower_key = base.lower()
        mapping = dict_mapping.get(lower_key)
        converted = False

        # If the column is found in the dictionary:
        if mapping is not None:
            # Get datatype from the mapping
            dtype = mapping.get('datatype', '').strip().lower()
            # If no datatype is provided, infer the type using a sample.
            if dtype in ['', 'nan', 'none']:
                inf_type, _ = infer_conversion(get_sample(df[col]))
                dtype = inf_type
            allowed_values = mapping.get('values', set())
            logging.info(f"Column '{col}' (base '{base}') using dictionary expected type '{dtype}'")

            # TRY DICTIONARY BASED - CONVERSION
            try:
                # ----BOOLEAN CONVERSION-------
                if dtype in ['boolean', 'bool']:
                    def to_bool(x):
                        if pd.isnull(x):
                            return x
                        s = str(x).strip().lower()
                        if s in bool_true:
                            return True
                        elif s in bool_valids:
                            return False
                        return np.nan
                    new_series = df[col].apply(to_bool).astype('boolean')
                    df[col] = new_series
                    conv_report['converted_boolean'].append(col)

                # For single_select_option/dropdown with 2 allowed values,
                # check if allowed values are yes/no variants (including Y/N).
                elif (dtype in ['single_select_option', 'dropdown']) and (len(allowed_values) == 2):
                    allowed_values_lower = {str(v).strip().lower() for v in allowed_values if v is not None and str(v).strip() != ""}
                    yes_no_set = {"yes", "no", "y", "n", "true", "false", "1"}
                    if allowed_values_lower.issubset(yes_no_set):
                        def to_bool(x):
                            if pd.isnull(x):
                                return x
                            s = str(x).strip().lower()
                            if s in {"yes", "y", "true", "1"}:
                                return True
                            elif s in {"no", "n", "false"}:
                                return False
                            return np.nan
                        new_series = df[col].apply(to_bool).astype('boolean')
                        df[col] = new_series
                        conv_report['converted_boolean'].append(col)
                    else:
                        new_series = df[col].astype('category')
                        df[col] = new_series
                        conv_report['converted_categorical'].append(col)

                # -------NUMERIC CONVERSION------
                elif any(sub in dtype for sub in ['number', 'numeric', 'period']):
                    new_series = df[col].apply(lambda x: str(x).replace(",", ".") if pd.notnull(x) else x)
                    new_series = pd.to_numeric(new_series, errors='coerce')
                    non_null = new_series.dropna()
                    if not non_null.empty and non_null.apply(lambda x: np.isfinite(x) and np.isclose(x, round(x))).all():
                        new_series = new_series.astype('Int64')
                        num_type = 'numeric_int'
                    else:
                        num_type = 'numeric_float'
                    df[col] = new_series
                    if num_type == 'numeric_int':
                        conv_report['converted_numeric_int'].append(col)
                    else:
                        conv_report['converted_numeric_float'].append(col)

                # ------------DATETIME CONVERSION----------
                elif any(sub in dtype for sub in ['datetime', 'date']):
                    new_series = df[col].apply(lambda x: pd.to_datetime(x, errors='coerce') if pd.notnull(x) else x)
                    df[col] = new_series
                    conv_report['converted_datetime'].append(col)

                # ------------CATEGORICAL CONVERSION----------
                elif ('multi_select_option' in dtype or ((dtype in ['single_select_option', 'dropdown']) and (len(allowed_values) > 2))):
                    new_series = df[col].astype('category')
                    df[col] = new_series
                    conv_report['converted_categorical'].append(col)

                # ------------String Conversion----------
                elif any(sub in dtype for sub in ['string', 'uid']):
                    new_series = df[col].apply(lambda x: str(x) if pd.notnull(x) else x)
                    df[col] = new_series
                    conv_report['converted_object'].append(col)

                # ------FALLBACK TO OBJECT-------
                else:
                    new_series = df[col].apply(lambda x: str(x) if pd.notnull(x) else x)
                    df[col] = new_series
                    conv_report['converted_object'].append(col)
                converted = True

            except Exception as e:
                logging.error(f"Error converting '{col}' using type '{dtype}': {e}")
                conv_report.setdefault('should_be_' + dtype, {})[col] = f"Conversion error: {e}"

                # COLUMNS THAT FAILED DICTIONARY CONVERSION - USE SAMPLE INFERENCE CONVERSION
                try:
                    inf_type, _ = infer_conversion(get_sample(df[col]))
                    # ----NUMERIC CONVERSION----
                    if inf_type.startswith('numeric'):
                        new_series = pd.to_numeric(df[col].astype(str).str.strip(), errors='coerce')
                        non_null = new_series.dropna()
                        if not non_null.empty and non_null.apply(lambda x: np.isfinite(x) and np.isclose(x, round(x))).all():
                            new_series = new_series.astype('Int64')
                        df[col] = new_series
                        if inf_type.startswith('numeric_int'):
                            conv_report['converted_numeric_int'].append(col)
                        else:
                            conv_report['converted_numeric_float'].append(col)

                    # ----DATETIME CONVERSION----
                    elif inf_type == 'datetime':
                        new_series = pd.to_datetime(df[col], errors='coerce', infer_datetime_format=True)
                        df[col] = new_series
                        conv_report['converted_datetime'].append(col)

                    # ----BOOLEAN CONVERSION----
                    elif inf_type == 'boolean':
                        new_series = df[col].apply(lambda x: True if str(x).strip().lower() in bool_true else False if pd.notnull(x) else pd.NA).astype('boolean')
                        df[col] = new_series
                        conv_report['converted_boolean'].append(col)

                    # ----CATEGORICAL CONVERSION----
                    elif inf_type == 'categorical':
                        new_series = df[col].astype('category')
                        df[col] = new_series
                        conv_report['converted_categorical'].append(col)

                    # -----FALL BACK TO OBJECT--------
                    else:
                        new_series = df[col].apply(lambda x: str(x) if pd.notnull(x) else x)
                        df[col] = new_series
                        conv_report['converted_object'].append(col)
                except Exception as ex:
                    logging.error(f"Fallback inference error for column '{col}': {ex}")
                    conv_report.setdefault('should_be_inferred', {})[col] = f"Fallback error: {ex}"
        else:
            # COLUMNS NOT FOUND IN DICTIONARY - USE SAMPLE INFERENCE CONVERSION
            conv_report['columns_not_in_dict'].append(col)
            try:
                inf_type, _ = infer_conversion(get_sample(df[col]))
                if inf_type.startswith('numeric'):
                    new_series = pd.to_numeric(df[col].astype(str).str.strip(), errors='coerce')
                    non_null = new_series.dropna()
                    if not non_null.empty and non_null.apply(lambda x: np.isfinite(x) and np.isclose(x, round(x))).all():
                        new_series = new_series.astype('Int64')
                    df[col] = new_series
                    if inf_type.startswith('numeric_int'):
                        conv_report['converted_numeric_int'].append(col)
                    else:
                        conv_report['converted_numeric_float'].append(col)
                elif inf_type == 'datetime':
                    new_series = pd.to_datetime(df[col], errors='coerce', infer_datetime_format=True)
                    df[col] = new_series
                    conv_report['converted_datetime'].append(col)
                elif inf_type == 'boolean':
                    new_series = df[col].apply(lambda x: True if str(x).strip().lower() in bool_true else False if pd.notnull(x) else pd.NA).astype('boolean')
                    df[col] = new_series
                    conv_report['converted_boolean'].append(col)
                elif inf_type == 'categorical':
                    new_series = df[col].astype('category')
                    df[col] = new_series
                    conv_report['converted_categorical'].append(col)
                else:
                    new_series = df[col].apply(lambda x: str(x) if pd.notnull(x) else x)
                    df[col] = new_series
                    conv_report['converted_object'].append(col)
            except Exception as e:
                logging.error(f"Inference conversion error for column '{col}': {e}")
                conv_report.setdefault('should_be_inferred', {})[col] = f"Inference conversion error: {e}"

    # Optional: write the conversion report if report_path is provided
    if report_path:
        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("===== Combined Data Type Conversion Report =====\n\n")

                # 1. Boolean Section
                f.write("Section 1: Boolean Conversion\n")
                f.write("  Successfully Converted:\n")
                for c in conv_report['converted_boolean']:
                    f.write(f"    - {c}\n")
                f.write("  Flagged as Should be Boolean:\n")
                for c in conv_report.get('should_be_boolean', {}):
                    f.write(f"    - {c}\n")
                f.write("\n")

                # 2. Numeric Section
                f.write("Section 2: Numeric Conversion\n")
                f.write("  Converted to Integer:\n")
                for c in conv_report['converted_numeric_int']:
                    f.write(f"    - {c}\n")
                f.write("  Converted to Float:\n")
                for c in conv_report['converted_numeric_float']:
                    f.write(f"    - {c}\n")
                f.write("  Flagged as Should be Numeric:\n")
                for c in conv_report.get('should_be_numeric', {}):
                    f.write(f"    - {c}\n")
                f.write("\n")

                # 3. Datetime Section
                f.write("Section 3: Datetime Conversion\n")
                f.write("  Successfully Converted:\n")
                for c in conv_report['converted_datetime']:
                    f.write(f"    - {c}\n")
                f.write("  Flagged as Should be Datetime:\n")
                for c in conv_report.get('should_be_datetime', {}):
                    f.write(f"    - {c}\n")
                f.write("\n")

                # 4. Categorical Section\n                f.write("Section 4: Categorical Conversion\n")
                f.write("  Successfully Converted:\n")
                for c in conv_report['converted_categorical']:
                    f.write(f"    - {c}\n")
                f.write("  Flagged as Should be Categorical:\n")
                for c in conv_report.get('should_be_categorical', {}):
                    f.write(f"    - {c}\n")
                f.write("\n")

                # 5. String/Object Section
                f.write("Section 5: String/Object Conversion\n")
                for c in conv_report['converted_object']:
                    f.write(f"    - {c}\n")
                f.write("\n")

                # 6. Columns Not Found in Dictionary\n                f.write("Section 6: Columns Not Found in Dictionary (Inference used):\n")
                for c in conv_report['columns_not_in_dict']:
                    f.write(f"    - {c}\n")

                # 7. Should Be Inferred
                if conv_report.get('should_be_inferred'):
                    f.write("  Flagged by Inference as Should be Converted:\n")
                    for c in conv_report['should_be_inferred']:
                        f.write(f"    - {c}\n")

                f.write("\n===== End of Combined Data Type Conversion Report =====\n")
        except Exception:
            pass

    return df


In [24]:
# FUNCTION - Generate a Summary of the Dataframe in EXCEL
# For each column, extract its description and datatype, then optionally save the summary files.
# ------------------------------------------------------------------------------
def map_df_columns_to_descriptions_and_datatypes(
    df: pd.DataFrame,
    excel_filepath: str,
    output_csv_filepath: str,
    output_excel_filepath: str
) -> pd.DataFrame:
    """
    Steps:
    1. Load the Excel dictionary workbook from `excel_filepath`.
    2. For each sheet:
       a. Parse into a DataFrame, skip on parse errors.
       b. Ensure 'Key' and 'Label' columns exist; skip sheet otherwise.
       c. Standardize 'Key' to lowercase and strip whitespace.
       d. Build a mapping of base column name -> description (first occurrence wins).
    3. For each column in `df`:
       a. Strip any suffix after the first dot and lowercase to get base name.
       b. Look up description in the mapping, defaulting to 'Not found in dictionary'.
       c. Record the column's pandas dtype as a string.
    4. Assemble a summary DataFrame with columns ['column','description','datatype'], sort by 'column'.
    5. Attempt to save the summary to CSV and Excel; if writing fails, skip without raising.
    6. Return the summary DataFrame.
    """
    # Step 1: Load dictionary workbook
    try:
        xlsx = pd.ExcelFile(excel_filepath)
    except Exception as e:
        raise ValueError(f"Failed to load Excel file: {e}")

    col_mapping: dict[str, str] = {}  # base_name -> description

    # Step 2: Process each sheet to build the mapping
    for sheet in xlsx.sheet_names:
        try:
            df_sheet = xlsx.parse(sheet)  # Parse the sheet
        except Exception:
            continue  # Skip on parse failure

        # 2b: Check for required columns
        if not {'Key', 'Label'}.issubset(df_sheet.columns):
            continue  # Skip sheets missing necessary headers

        # 2c: Rename and clean 'Key' and 'Label'
        rename_dict = {}
        for col in df_sheet.columns:
            lower = str(col).strip().lower()
            if lower == 'key':
                rename_dict[col] = 'key'
            elif lower == 'label':
                rename_dict[col] = 'label'
        df_sheet = df_sheet.rename(columns=rename_dict)
        df_sheet['key'] = df_sheet['key'].astype(str).str.strip().str.lower()
        df_sheet['label'] = df_sheet['label'].astype(str).str.strip()

        # 2d: Populate mapping (first occurrence retained)
        for _, row in df_sheet.iterrows():
            base = row['key']
            if base and base not in col_mapping:
                col_mapping[base] = row['label']

    # Step 3: Build list of summary entries
    output_rows: list[dict[str, str]] = []
    for col in df.columns:
        base = col.split('.')[0].strip().lower()  # Remove suffix and lowercase
        description = col_mapping.get(base, 'Not found in dictionary')
        datatype = str(df[col].dtype)
        output_rows.append({'column': col, 'description': description, 'datatype': datatype})

    # Step 4: Assemble summary DataFrame
    result_df = pd.DataFrame(output_rows).sort_values(by='column').reset_index(drop=True)

    # Step 5: Optionally save the files, skipping on any write error
    try:
        result_df.to_csv(output_csv_filepath, index=False)
    except Exception:
        pass  # Skip CSV save on failure
    try:
        result_df.to_excel(output_excel_filepath, index=False)
    except Exception:
        pass  # Skip Excel save on failure

    # Step 6: Return the generated summary DataFrame
    return result_df


In [25]:
# FUNCTION - Forward Fill ‘None’ Values from Label Columns
# For each paired .value/.label or .valuedischarge/.labeldischarge column:
# if the value is missing but the label is one of {"none","normal","norm"}, fill the value.
# Optionally writes a report of fills if a valid path is provided.
# ------------------------------------------------------------------------------
def fill_missing_with_label_value(
    df: pd.DataFrame,
    forward_fill_report_path: Optional[str] = None
) -> pd.DataFrame:
    """
    Steps:
    1. Identify pairs of value and label columns based on naming conventions.
    2. For each pair, create a mask where the value is NaN and label is one of the target strings.
    3. Fill missing values in the value column with the corresponding label entry.
    4. Record the count of fills per value column.
    5. Optionally write a report if `forward_fill_report_path` is provided and writable.
    6. Return the DataFrame with filled values.
    """
    # Step 1: Build mapping of value->label column pairs
    pairs: dict[str, str] = {}
    for col in df.columns:
        if col.endswith('.value') and col[:-6] + '.label' in df.columns:
            pairs[col] = col[:-6] + '.label'  # Pair .value with .label
        elif col.endswith('.valuedischarge') and col[:-14] + '.labeldischarge' in df.columns:
            pairs[col] = col[:-14] + '.labeldischarge'  # Pair .valuedischarge with .labeldischarge

    # Prepare fill count report
    fill_report: dict[str, int] = {}

    # Step 2 and 3: Process each pair to forward-fill 'none' labels
    for value_col, label_col in pairs.items():
        # Mask: value is NaN, label is one of 'none', 'normal', 'norm'
        mask = (
            df[value_col].isna() &
            df[label_col].notna() &
            df[label_col].astype(str).str.lower().isin(["none","normal","norm"])
        )
        # Fill NaNs in the value column with the label's value
        df.loc[mask, value_col] = df.loc[mask, label_col]
        # Record how many were filled
        fill_report[value_col] = int(mask.sum())
        # Note: label columns are retained per original logic

    # Step 5: Optionally write the fill report
    if forward_fill_report_path:
        try:
            with open(forward_fill_report_path, 'w', encoding='utf-8') as f:
                total_cols = sum(1 for cnt in fill_report.values() if cnt > 0)
                f.write("Forward Fill Report\n")
                f.write("===================\n\n")
                f.write(f"Total columns forward filled: {total_cols}\n\n")
                f.write("Details for columns with fills:\n")
                for col, count in fill_report.items():
                    if count > 0:
                        f.write(f"  - {col}: {count} values filled\n")
        except Exception:
            pass  # Skip report generation if path invalid or write error

    # Step 6: Return the updated DataFrame
    return df  # Values filled; label columns remain in DataFrame

#### Numeric Validation Functions

In [26]:
# FUNCTION - Load & Aggregate Validation Dictionary
# Reads a specified sheet from an Excel file to extract numeric validation ranges.
# ------------------------------------------------------------------------------
def load_and_aggregate_validation_dictionary(
    dict_filepath: str,
    sheet_name: str
) -> dict[str, tuple[float, float]]:
    """
    Steps:
    1. Load the specified sheet from the Excel file into a DataFrame.
    2. Iterate each row:
       a. Extract column name from first cell (column A), normalize to lowercase.
       b. Extract range string from fifth cell (column E).
       c. Skip empty or NaN ranges.
       d. Use regex to parse "min - max" (allowing '-' or '–').
          - On match, convert to floats and store in validation_ranges.
          - On parse error, log and skip.
          - On format mismatch, log a warning.
    3. Return the mapping of column names to (min_value, max_value) tuples.
    """
    # Step 1: Attempt to read the specified sheet
    try:
        df = pd.read_excel(dict_filepath, sheet_name=sheet_name)
    except Exception as e:
        logging.error(f"Failed to load sheet '{sheet_name}' from '{dict_filepath}': {e}")
        raise  # Propagate error if file or sheet is unreadable

    validation_ranges: dict[str, tuple[float, float]] = {}

    # Step 2: Process each row for column name and range
    for idx, row in df.iterrows():
        # 2a. Extract and normalize column name
        col_name = str(row.iloc[0]).strip().lower()
        # 2b. Extract raw range string (column E)
        range_str = row.iloc[4]
        if pd.isna(range_str) or str(range_str).strip() == '':
            continue  # Skip rows without a valid range

        # 2c. Parse range using regex, allow hyphen or en-dash
        pattern = r'([\d\.\-]+)\s*[-–]\s*([\d\.\-]+)'
        match = re.match(pattern, str(range_str).strip())
        if match:
            try:
                min_val = float(match.group(1))  # Convert lower bound
                max_val = float(match.group(2))  # Convert upper bound
                validation_ranges[col_name] = (min_val, max_val)
            except Exception as e:
                logging.error(
                    f"Error parsing numeric range for '{col_name}': '{range_str}' -> {e}"
                )  # Log conversion failure
        else:
            logging.warning(
                f"Unrecognized range format for '{col_name}': '{range_str}'"
            )  # Warn on format mismatch

    logging.info(
        f"Completed loading validation ranges from sheet '{sheet_name}'."
    )  # Log final status
    # Step 3: Return the assembled dictionary
    return validation_ranges  # Mapping: column_name -> (min_value, max_value)


In [27]:
# FUNCTION - Raise Flags for Disallowed Numeric Values
# Checks numeric columns against allowed ranges and optionally writes a report.
# ------------------------------------------------------------------------------
def raise_flags_numeric(
    df: pd.DataFrame,
    range_mapping: dict[str, tuple[float, float]],
    report_file: Optional[str] = None,
    skip_columns: list[str] = ['uid', 'facility', 'uniquekey', "startedatdischarge", "completedatdischarge", "ingestedatdischarge"]
) -> pd.DataFrame:
    """
    Steps:
    1. Prepare skip_columns list and make a copy of the DataFrame.
    2. Normalize mapping keys to match DataFrame columns.
    3. Iterate over each column:
       a. Skip UID/facility/uniquekey or user-specified columns.
       b. If column is numeric and in mapping, flag disallowed values and replace on the copy.
    4. Build report_lines summarizing flagged values and skipped columns.
    5. Optionally write the report if `report_file` is provided and writable.
    6. Return the modified DataFrame copy.
    """
    # 1. Handle default skip_columns and copy DataFrame
    if skip_columns is None:
        skip_columns = []  # No extra skips by default
    df_copy = df.copy()    # Work on a copy to preserve original

    # 2. Normalize DataFrame column set and mapping keys
    df_cols = set(df_copy.columns)  # Existing columns
    normalized_mapping: dict[str, tuple[float, float]] = {}
    for key, value in range_mapping.items():
        norm_key = key.lower().strip()  # Lowercase mapping key
        candidate = norm_key
        if norm_key not in df_cols:
            parts = norm_key.split('.')
            while len(parts) > 1:
                parts.pop()              # Remove suffix segment
                candidate = '.'.join(parts)
                if candidate in df_cols:
                    break
        # Record first mapping for each candidate
        if candidate not in normalized_mapping:
            normalized_mapping[candidate] = value
        else:
            logging.warning(f"Duplicate mapping for '{candidate}', using first occurrence.")

    report_info: dict[str, dict[str, Any]] = {}  # Hold flagged details
    skipped_columns_not_processed: list[str] = []

    # 3. Process each column in the copy
    for col in df_copy.columns:
        col_lower = col.lower()
        # 3a. Skip identifiers and user-specified
        if (
            'uid' in col_lower
            or col_lower in ['facility','uniquekey']
            or col in skip_columns
        ):
            skipped_columns_not_processed.append(col)
            continue

        # 3b. Flag and replace disallowed numeric values
        if pd.api.types.is_numeric_dtype(df_copy[col]) and col in normalized_mapping:
            min_val, max_val = normalized_mapping[col]
            # Helper to test validity
            def is_allowed(val):
                if pd.isnull(val):
                    return True  # Preserve NaNs
                try:
                    return min_val <= val <= max_val
                except Exception:
                    return False

            mask = df_copy[col].apply(is_allowed)  # True if allowed
            num_replacements = (~mask).sum()       # Count disallowed
            bad_vals = df_copy[col][~mask].dropna().unique().tolist()

            if num_replacements > 0:
                report_info[col] = {
                    'allowed_range': (min_val, max_val),
                    'disallowed_values': bad_vals,
                    'rows_affected': int(num_replacements)
                }
                # Replace disallowed with NaN
                df_copy[col] = df_copy[col].where(mask, other=np.nan)

    # 4. Build report lines
    report_lines: list[str] = []
    report_lines.append("Numeric Range Disallowed Values Report")
    report_lines.append("========================================")
    report_lines.append("")
    if report_info:
        for col, info in report_info.items():
            report_lines.append(f"Column: {col}")
            report_lines.append(f"  Allowed Range: {info['allowed_range']}")
            report_lines.append(f"  Disallowed Values: {info['disallowed_values']}")
            report_lines.append(f"  Rows Affected: {info['rows_affected']}")
            report_lines.append("")
    else:
        report_lines.append("No disallowed numeric values found in any column.")
        report_lines.append("")

    if skipped_columns_not_processed:
        report_lines.append("Columns Skipped:")
        report_lines.append("----------------")
        for sc in skipped_columns_not_processed:
            report_lines.append(f"  - {sc}")
        report_lines.append("")

    # 5. Optional report writing
    if report_file:
        try:
            with open(report_file, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_lines))  # Overwrite report
        except Exception:
            pass  # Skip report on any write error

    # 6. Return the modified copy
    return df_copy  # DataFrame with disallowed numeric values set to NaN


In [28]:
# FUNCTION - Validate Numeric Ranges
# Cleans a DataFrame by replacing out-of-range numeric values with NaN and optionally writes a report.
# ------------------------------------------------------------------------------
def validate_numeric_ranges(
    df: pd.DataFrame,
    range_mapping: dict[str, tuple[float, float]],
    report_file: Optional[str] = None,
    skip_columns: list[str] = ['uid', 'facility', 'uniquekey', "startedatdischarge", "completedatdischarge", "ingestedatdischarge"]
) -> pd.DataFrame:
    """
    Steps:
    1. Initialize skip_columns list and logging imports assumed.
    2. Normalize DataFrame columns and mapping keys to match.
    3. Iterate each column:
       a. Skip UID/facility/uniquekey or user-specified columns.
       b. If numeric and in mapping, flag and replace out-of-range values.
    4. Build report_lines summarizing disallowed values and skipped columns.
    5. Optionally write report if `report_file` is provided and writable.
    6. Return modified DataFrame.
    """
    # Step 1: Prepare skip list and imports
    if skip_columns is None:
        skip_columns = []  # Default no extra skips

    # Step 2: Normalize mapping keys to DataFrame columns
    df_cols = set(df.columns)
    normalized_mapping: dict[str, tuple[float, float]] = {}
    for key, (min_val, max_val) in range_mapping.items():
        norm_key = key.lower().strip()  # Lowercase and trim
        candidate = norm_key
        if norm_key not in df_cols:
            parts = norm_key.split('.')
            while len(parts) > 1:
                parts.pop()  # Remove last segment
                candidate = '.'.join(parts)
                if candidate in df_cols:
                    break
        if candidate not in normalized_mapping:
            normalized_mapping[candidate] = (min_val, max_val)
        else:
            logging.warning(f"Duplicate mapping for '{candidate}', keeping first mapping.")

    report_info: dict[str, dict[str, Any]] = {}  # Store flagged details per column
    skipped_columns_not_processed: list[str] = []  # Columns skipped

    # Step 3: Process each DataFrame column
    for col in df.columns:
        col_lower = col.lower()
        # 3a. Skip identifiers and user-specified
        if (
            'uid' in col_lower
            or col_lower in ['facility', 'uniquekey']
            or col in skip_columns
        ):
            skipped_columns_not_processed.append(col)
            continue

        # 3b. Flag and replace out-of-range numeric values
        if pd.api.types.is_numeric_dtype(df[col]) and col in normalized_mapping:
            min_val, max_val = normalized_mapping[col]
            # Helper to test validity
            def is_allowed(val):
                if pd.isnull(val):
                    return True  # Preserve NaNs
                try:
                    return min_val <= val <= max_val
                except Exception:
                    return False

            mask_allowed = df[col].apply(is_allowed)  # True where allowed
            num_flags = (~mask_allowed).sum()         # Count of disallowed values
            unique_bad = df[col][~mask_allowed].dropna().unique().tolist()

            if num_flags > 0:
                report_info[col] = {
                    'allowed_range': (min_val, max_val),
                    'disallowed_values': unique_bad,
                    'rows_affected': int(num_flags)
                }
                # Replace disallowed with NaN
                df[col] = df[col].where(mask_allowed, other=np.nan)

    # Step 4: Build report lines
    report_lines: list[str] = []
    report_lines.append("Numeric Range Disallowed Values Report")
    report_lines.append("========================================")
    report_lines.append("")
    if report_info:
        for col, info in report_info.items():
            report_lines.append(f"Column: {col}")
            report_lines.append(f"  Allowed Range: {info['allowed_range']}")
            report_lines.append(f"  Disallowed Values: {info['disallowed_values']}")
            report_lines.append(f"  Rows Affected: {info['rows_affected']}")
            report_lines.append("")
    else:
        report_lines.append("No disallowed numeric values found in any column.")
        report_lines.append("")

    if skipped_columns_not_processed:
        report_lines.append("Columns Skipped:")
        report_lines.append("----------------")
        for sc in skipped_columns_not_processed:
            report_lines.append(f"  - {sc}")
        report_lines.append("")

    # Step 5: Optional report writing
    if report_file:
        try:
            with open(report_file, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_lines))  # Write out the report
        except Exception:
            pass  # Silently skip report on error

    # Step 6: Return DataFrame with disallowed numeric values removed
    return df  # DataFrame modified in place

In [29]:
# # FUNCTION - Convert weight to grams
# # Normalizes mixed integer/float weights by assuming decimals indicate kilograms.
# # ------------------------------------------------------------------------------
# def convert_weight_to_grams(
#     df: pd.DataFrame,
#     cols: str | list[str],
#     kg_threshold: int = 10
# ) -> pd.DataFrame:
#     """
#     Steps:
#     1. Ensure `cols` is a list of column names.
#     2. Define `convert_value` to:
#        a. Return NaN for missing values.
#        b. Detect if string has no decimal; parse as int, convert to grams if < kg_threshold.
#        c. If decimal part is all zeros, treat like integer above.
#        d. Otherwise assume value in kilograms, multiply by 1000 precisely.
#     3. Apply `convert_value` to each specified column, logging errors for missing columns.
#     4. Return the DataFrame with converted weight columns.
#     """
#     # 1. Normalize `cols` to list
#     if isinstance(cols, str):
#         cols = [cols]

#     # 2. Helper to convert a single cell's value
#     def convert_value(x):
#         if pd.isna(x):
#             return np.nan  # Preserve missing values
#         s = str(x).strip()  # Normalize to trimmed string

#         # 2b. No decimal point => integer-like
#         if '.' not in s:
#             try:
#                 val = int(s)
#                 return val * 1000 if val < kg_threshold else val
#             except ValueError:
#                 logging.warning(f"Value '{s}' cannot be converted to int.")
#                 return np.nan

#         # 2c. Split integer and fractional parts
#         before, _, after = s.partition('.')
#         # If fractional part empty or all zeros, treat as integer
#         if after == '' or after.strip('0') == '':
#             try:
#                 val = int(before)
#                 return val * 1000 if val < kg_threshold else val
#             except ValueError:
#                 logging.warning(f"Value '{s}' cannot be converted to int.")
#                 return np.nan
#         # 2d. Nonzero fractional => treat as kilograms precisely
#         try:
#             converted = Decimal(s) * Decimal('1000')  # Precise arithmetic
#             return int(converted)
#         except Exception as e:
#             logging.warning(f"Value '{s}' could not be converted: {e}")
#             return np.nan

#     # 3. Apply conversion to each column
#     for col in cols:
#         if col not in df.columns:
#             logging.error(f"Column '{col}' not found in DataFrame.")
#             continue  # Skip missing columns without stopping
#         df[col] = df[col].apply(convert_value)  # Convert all values in the column

#     # 4. Return DataFrame with updated weight columns
#     return df


In [30]:
# # FUNCTION - Convert weight to grams
# # Normalizes mixed integer/float weights by assuming decimals indicate kilograms.
# # ------------------------------------------------------------------------------
def convert_weight_to_kgs(
    df: pd.DataFrame,
    cols: str | list[str],
    kg_threshold: int = 10
) -> pd.DataFrame:
    """
    For each specified column:
      • Leave NaNs untouched.
      • Parse the value (int/float/str → Decimal for precision).
      • If the numeric value > kg_threshold, divide by 1000.
        Otherwise keep the numeric value as-is.
    Returns the same DataFrame with adjusted columns.
    """

    # 1. Normalise `cols` to a list
    if isinstance(cols, str):
        cols = [cols]

    # 2. Helper for one cell
    def _convert(x):
        if pd.isna(x):
            return np.nan

        s = str(x).strip()
        try:
            val = Decimal(s)
        except (InvalidOperation, ValueError):
            logging.warning(f"Value '{s}' could not be parsed as a number.")
            return np.nan

        return float(val / Decimal('1000')) if val > kg_threshold else float(val)

    # 3. Apply to each requested column
    for col in cols:
        if col not in df.columns:
            logging.error(f"Column '{col}' not found in DataFrame.")
            continue
        df[col] = df[col].apply(_convert)

    # 4. Return the (now-modified) DataFrame
    return df


In [31]:
# FUNCTION - Purify Columns by Deleting Disallowed Data Types
# Cleans specified columns by removing values incompatible with a target dtype and optionally writes a report.
# ------------------------------------------------------------------------------
def purify_column_num(
    df: pd.DataFrame,
    dtype: str,
    cols: list[str] = None,
    report_path: Optional[str] = None
) -> pd.DataFrame:
    """
    Steps:
    1. Determine which columns to process, skipping 'facility', 'uid', and 'uniquekey'.
    2. Define helper to compare old vs new values safely, including NaNs.
    3. For each target column:
       a. If Boolean type, apply boolean conversion and track deletions.
       b. Otherwise, test each cell against the target type, delete invalid entries.
       c. After cleaning, cast the column to the desired pandas dtype.
       d. Record deletion counts and old→new mappings.
    4. Optionally write a detailed report if `report_path` is provided.
    5. Return the DataFrame with purified columns.
    """
    # 1. Identify columns to purify
    skip_cols = {'facility', 'uid', 'uniquekey', 'startedatdischarge', 'completedatdischarge', 'ingestedatdischarge'}
    if not cols:
        cols = [c for c in df.columns if c.lower() not in skip_cols]
    else:
        cols = [c for c in cols if c.lower() not in skip_cols]

    # 2. Helper: detect changes, including NaN differences
    def values_differ(old_val, new_val):
        if pd.isna(old_val) and pd.isna(new_val):
            return False
        if pd.isna(old_val) or pd.isna(new_val):
            return True
        return old_val != new_val

    report_entries: list[str] = []  # Gather report lines

    # 3. Process each column
    for col in cols:
        if col not in df.columns:
            raise KeyError(f"Column '{col}' does not exist.")

        mapping_counts: dict[tuple[Any, Any], int] = {}
        deletion_counts: dict[Any, int] = {}

        # 3a. Boolean conversion
        if dtype.lower() == 'boolean':
            def to_bool(x):
                if pd.isnull(x):
                    return pd.NA
                s = str(x).strip().lower()
                if s in {'yes','y','true','1','in'}:
                    return True
                if s in {'no','n','false','out'}:
                    return False
                return pd.NA

            original = df[col].copy()
            df[col] = df[col].apply(to_bool).astype('boolean')

            # Track changes
            for idx, new in df[col].items():
                old = original.loc[idx]
                if values_differ(old, new):
                    mapping_counts[(old,new)] = mapping_counts.get((old,new),0) + 1
                    if pd.isna(new) and pd.notna(old):
                        deletion_counts[old] = deletion_counts.get(old,0) + 1

            # Report section
            total_deleted = sum(deletion_counts.values())
            report_entries.append(f"Column: {col} (Boolean)")
            report_entries.append(f"Total deleted/converted values: {total_deleted}")
            report_entries.append("Deleted values:")
            if deletion_counts:
                for val, cnt in deletion_counts.items():
                    report_entries.append(f"  {val}: {cnt}")
            else:
                report_entries.append("  None")
            report_entries.append("Old → New mappings:")
            if mapping_counts:
                for (o,n),cnt in mapping_counts.items():
                    report_entries.append(f"  {o} → {n}: {cnt}")
            else:
                report_entries.append("  No changes")
            report_entries.append("")

        # 3b. Other data types
        else:
            for idx, val in df[col].items():
                if pd.isnull(val):
                    continue
                orig = val
                valid = True
                if dtype.lower() == 'datetime':
                    try:
                        pd.to_datetime(val, errors='raise')
                    except:
                        valid = False
                elif dtype.lower() == 'numeric':
                    try:
                        pd.to_numeric(val, errors='raise')
                    except:
                        valid = False
                elif dtype.lower() == 'categorical':
                    if not isinstance(val, str):
                        valid = False
                else:
                    raise ValueError("Unsupported dtype. Use datetime, numeric, Boolean, or categorical.")

                if not valid:
                    deletion_counts[orig] = deletion_counts.get(orig,0) + 1
                    mapping_counts[(orig,pd.NA)] = mapping_counts.get((orig,pd.NA),0) + 1
                    df.at[idx, col] = pd.NA

            # 3c. Cast to target dtype
            if dtype.lower() == 'numeric':
                df[col] = pd.to_numeric(df[col], errors='coerce')
            elif dtype.lower() == 'datetime':
                df[col] = pd.to_datetime(df[col], errors='coerce')
            elif dtype.lower() == 'categorical':
                df[col] = df[col].astype('category')

            # Report section
            total_deleted = sum(deletion_counts.values())
            report_entries.append(f"Column: {col}")
            report_entries.append(f"Total deleted values: {total_deleted}")
            report_entries.append("Deleted values:")
            if deletion_counts:
                for val, cnt in deletion_counts.items():
                    report_entries.append(f"  {val}: {cnt}")
            else:
                report_entries.append("  None")
            report_entries.append("Old → New mappings:")
            if mapping_counts:
                for (o,n),cnt in mapping_counts.items():
                    report_entries.append(f"  {o} → {n}: {cnt}")
            else:
                report_entries.append("  No changes")
            report_entries.append("")

    # 4. Optional report writing
    if report_path:
        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_entries))
        except:
            pass  # Skip if unable to write

    # 5. Return the purified DataFrame
    return df


#### Boolean Validation Functions

In [32]:
# FUNCTION - Purify Columns by Deleting Disallowed Data Types
# Cleans specified columns by removing values incompatible with a target dtype and optionally writes a report.
# -------------------------------------------------------------------------------------------------------------
def purify_column_bool(
    df: pd.DataFrame,
    dtype: str,
    cols: list[str] = None,
    report_path: Optional[str] = None
) -> pd.DataFrame:
    """
    Steps:
    1. Determine which columns to process, skipping 'facility', 'uid', and 'uniquekey'.
    2. Define helper to compare old vs new values safely, including NaNs.
    3. For each target column:
       a. If Boolean type, apply boolean conversion and track deletions.
       b. Otherwise, test each cell against the target type, delete invalid entries.
       c. After cleaning, cast the column to the desired pandas dtype.
       d. Record deletion counts and old→new mappings.
    4. Optionally write a detailed report if `report_path` is provided.
    5. Return the DataFrame with purified columns.
    """
    # 1. Identify columns to purify
    skip_cols = {'facility', 'uid', 'uniquekey', 'startedatdischarge', 'completedatdischarge', 'ingestedatdischarge'}
    if not cols:
        cols = [c for c in df.columns if c.lower() not in skip_cols]
    else:
        cols = [c for c in cols if c.lower() not in skip_cols]

    # 2. Helper: detect changes, including NaN differences
    def values_differ(old_val, new_val):
        if pd.isna(old_val) and pd.isna(new_val):
            return False
        if pd.isna(old_val) or pd.isna(new_val):
            return True
        return old_val != new_val

    report_entries: list[str] = []  # Gather report lines

    # 3. Process each column
    for col in cols:
        if col not in df.columns:
            raise KeyError(f"Column '{col}' does not exist.")

        mapping_counts: dict[tuple[Any, Any], int] = {}
        deletion_counts: dict[Any, int] = {}

        # 3a. Boolean conversion
        if dtype.lower() == 'boolean':
            def to_bool(x):
                if pd.isnull(x):
                    return pd.NA
                s = str(x).strip().lower()
                if s in {'yes','y','true','1','in'}:
                    return True
                if s in {'no','n','false','out'}:
                    return False
                return pd.NA

            original = df[col].copy()
            df[col] = df[col].apply(to_bool).astype('boolean')

            # Track changes
            for idx, new in df[col].items():
                old = original.loc[idx]
                if values_differ(old, new):
                    mapping_counts[(old,new)] = mapping_counts.get((old,new),0) + 1
                    if pd.isna(new) and pd.notna(old):
                        deletion_counts[old] = deletion_counts.get(old,0) + 1

            # Report section
            total_deleted = sum(deletion_counts.values())
            report_entries.append(f"Column: {col} (Boolean)")
            report_entries.append(f"Total deleted/converted values: {total_deleted}")
            report_entries.append("Deleted values:")
            if deletion_counts:
                for val, cnt in deletion_counts.items():
                    report_entries.append(f"  {val}: {cnt}")
            else:
                report_entries.append("  None")
            report_entries.append("Old → New mappings:")
            if mapping_counts:
                for (o,n),cnt in mapping_counts.items():
                    report_entries.append(f"  {o} → {n}: {cnt}")
            else:
                report_entries.append("  No changes")
            report_entries.append("")

        # 3b. Other data types
        else:
            for idx, val in df[col].items():
                if pd.isnull(val):
                    continue
                orig = val
                valid = True
                if dtype.lower() == 'datetime':
                    try:
                        pd.to_datetime(val, errors='raise')
                    except:
                        valid = False
                elif dtype.lower() == 'numeric':
                    try:
                        pd.to_numeric(val, errors='raise')
                    except:
                        valid = False
                elif dtype.lower() == 'categorical':
                    if not isinstance(val, str):
                        valid = False
                else:
                    raise ValueError("Unsupported dtype. Use datetime, numeric, Boolean, or categorical.")

                if not valid:
                    deletion_counts[orig] = deletion_counts.get(orig,0) + 1
                    mapping_counts[(orig,pd.NA)] = mapping_counts.get((orig,pd.NA),0) + 1
                    df.at[idx, col] = pd.NA

            # 3c. Cast to target dtype
            if dtype.lower() == 'numeric':
                df[col] = pd.to_numeric(df[col], errors='coerce')
            elif dtype.lower() == 'datetime':
                df[col] = pd.to_datetime(df[col], errors='coerce')
            elif dtype.lower() == 'categorical':
                df[col] = df[col].astype('category')

            # Report section
            total_deleted = sum(deletion_counts.values())
            report_entries.append(f"Column: {col}")
            report_entries.append(f"Total deleted values: {total_deleted}")
            report_entries.append("Deleted values:")
            if deletion_counts:
                for val, cnt in deletion_counts.items():
                    report_entries.append(f"  {val}: {cnt}")
            else:
                report_entries.append("  None")
            report_entries.append("Old → New mappings:")
            if mapping_counts:
                for (o,n),cnt in mapping_counts.items():
                    report_entries.append(f"  {o} → {n}: {cnt}")
            else:
                report_entries.append("  No changes")
            report_entries.append("")

    # 4. Optional report writing
    if report_path:
        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_entries))
        except:
            pass  # Skip if unable to write

    # 5. Return the purified DataFrame
    return df


#### Categorical & Object Validation Functions

In [33]:
# FUNCTION - Load and Aggregate Data Dictionary
# Reads an Excel file (multiple sheets) and builds a mapping:
# key (lowercase) -> {values, label_to_value, datatype}
# ------------------------------------------------------------------------------
def load_and_aggregate_data_dictionary2(
    dict_filepath: str
) -> Dict[str, Any]:
    """
    Steps:
    1. Load the Excel workbook from the given filepath.
    2. Iterate through each sheet:
       a. Attempt to parse the sheet into a DataFrame; skip on errors.
       b. Ensure required columns ['Key','Label','Data Type','Value','Value Label'] are present; skip if not.
       c. Rename headers to lowercase keys: key, label, datatype, value, value_label.
       d. Normalize text fields: strip whitespace, unify casing.
       e. For each row, skip empty keys, then:
          - Initialize a new entry if key unseen, storing empty sets/mappings and datatype.
          - Check for datatype consistency on repeated keys.
          - Add raw 'value' to the values set if valid.
          - Map both 'value_label' and 'label' back to the raw 'value'.
    3. Log completion and return the aggregated dictionary.
    """
    # Step 1: Load workbook
    try:
        xlsx = pd.ExcelFile(dict_filepath)  # Open the Excel file
    except Exception as e:
        logging.error(f"Failed to load dictionary file: {e}")
        raise  # Propagate failure

    aggregated_dict: Dict[str, Any] = {}  # Final output container

    # Step 2: Process each sheet
    for sheet in xlsx.sheet_names:
        try:
            df_sheet = xlsx.parse(sheet)  # Read sheet into DataFrame
        except Exception as e:
            logging.error(f"Failed to parse sheet '{sheet}': {e}")
            continue  # Skip this sheet on parse errors

        # 2b: Verify presence of expected columns
        expected_cols = ['Key', 'Label', 'Data Type', 'Value', 'Value Label']
        df_sheet.columns = [col.strip() for col in df_sheet.columns]  # Trim header whitespace
        if not all(col in df_sheet.columns for col in expected_cols):
            logging.warning(f"Sheet '{sheet}' missing expected columns. Skipping.")
            continue

        # 2c: Rename to consistent lowercase keys
        df_sheet = df_sheet.rename(columns={
            'Key': 'key',
            'Label': 'label',
            'Data Type': 'datatype',
            'Value': 'value',
            'Value Label': 'value_label'
        })
        # 2d: Normalize text fields
        df_sheet['key'] = df_sheet['key'].astype(str).str.strip().str.lower()
        df_sheet['value'] = df_sheet['value'].astype(str).str.strip()
        df_sheet['label'] = df_sheet['label'].astype(str).str.strip()
        df_sheet['value_label'] = df_sheet['value_label'].astype(str).str.strip()
        df_sheet['datatype'] = df_sheet['datatype'].astype(str).str.strip().str.lower()

        # 2e: Populate aggregated_dict row by row
        for _, row in df_sheet.iterrows():
            key = row['key']  # Normalized dictionary key
            if pd.isna(key) or key == '':
                continue  # Skip empty or NaN keys

            # Initialize new entry if unseen
            if key not in aggregated_dict:
                aggregated_dict[key] = {
                    'values': set(),
                    'label_to_value': {},
                    'datatype': row['datatype']
                }
            else:
                # Warn if datatype differs from first encountered
                if aggregated_dict[key]['datatype'] != row['datatype']:
                    logging.warning(
                        f"Datatype inconsistency for key '{key}' in sheet '{sheet}'. "
                        "Using first encountered datatype."
                    )

            # Add valid raw values
            if row['value'] not in [None, '', 'nan']:
                aggregated_dict[key]['values'].add(row['value'])
            # Map both 'value_label' and 'label' back to raw 'value'
            if row['value_label'] not in [None, '', 'nan']:
                aggregated_dict[key]['label_to_value'][row['value_label']] = row['value']
            if row['label'] not in [None, '', 'nan']:
                aggregated_dict[key]['label_to_value'][row['label']] = row['value']

    logging.info("Completed aggregating data dictionary from Excel.")  # Final log
    return aggregated_dict  # Return assembled mapping


In [34]:
# FUNCTION - Raise Flags for Disallowed Non-Numeric Values
# Checks non-numeric columns against allowed values specified in dict_mapping; flags values but does not modify DataFrame.
# ------------------------------------------------------------------------------
def raise_flags_non_numeric(
    df: pd.DataFrame,
    dict_mapping: dict[str, Any],
    report_file: Optional[str] = None,
    skip_columns: list[str] = ['uid', 'facility', 'uniquekey', "startedatdischarge", "completedatdischarge", "ingestedatdischarge"]
) -> pd.DataFrame:
    """
    Steps:
    1. Initialize skip_columns if not provided.
    2. Lowercase the dictionary keys for case-insensitive lookup.
    3. Lowercase skip_columns for consistency.
    4. Prepare containers for report_info and skipped columns.
    5. Iterate over DataFrame columns:
       a. Skip columns containing 'uid', exactly 'facility' or 'uniquekey', or in skip_columns.
       b. Process only non-numeric columns.
       c. If column key exists in dict_mapping, retrieve allowed_values set.
       d. Define is_value_allowed(val) to check membership.
       e. Apply mask to flag disallowed entries.
       f. Count flagged rows and unique bad values, record if any.
    6. Build text report lines summarizing flags and skipped columns.
    7. Optionally write report to report_file, skipping on errors.
    8. Return original DataFrame unchanged.
    """
    # 1. Default skip_columns to empty list
    if skip_columns is None:
        skip_columns = []

    # 2. Create lowercase dict mapping
    aggregated_dict_lower = {k.lower(): v for k, v in dict_mapping.items()}

    # 3. Lowercase skip_columns for matching
    skip_columns_lower = [c.lower() for c in skip_columns]

    # 4. Report containers
    report_info: dict[str, dict[str, Any]] = {}
    skipped_columns: list[str] = []

    # 5. Iterate through columns
    for col in df.columns:
        col_lower = col.lower()
        # 5a. Skip identifiers and user-specified columns
        if (
            'uid' in col_lower or
            col_lower in ['facility','uniquekey'] or
            col_lower in skip_columns_lower
        ):
            skipped_columns.append(col)
            continue

        # 5b. Only non-numeric columns
        if not pd.api.types.is_numeric_dtype(df[col]):
            # 5c. Check if mapping exists
            if col_lower in aggregated_dict_lower:
                allowed_values = aggregated_dict_lower[col_lower]['values']

                # 5d. Helper to test value validity
                def is_value_allowed(val: Any) -> bool:
                    if pd.isnull(val):
                        return True
                    if val in allowed_values:
                        return True
                    # Handle combination strings
                    if isinstance(val, str) and val.startswith('{') and val.endswith('}'):
                        parts = [item.strip() for item in val[1:-1].split(',')]
                        return all(part in allowed_values for part in parts)
                    return False

                # 5e. Apply mask to identify disallowed
                mask = df[col].apply(lambda x: not is_value_allowed(x))  # True for disallowed
                num_flagged = int(mask.sum())
                bad_values = df[col][mask].dropna().unique().tolist()

                # 5f. Record if flags present
                if num_flagged > 0:
                    report_info[col] = {
                        'rows_flagged': num_flagged,
                        'disallowed_values': bad_values
                    }

    # 6. Build report text
    lines: list[str] = []
    lines.append('Disallowed Values Flags Report')
    lines.append('================================')
    lines.append('')
    if report_info:
        for col, info in report_info.items():
            vals = ', '.join(f'"{v}"' for v in info['disallowed_values'])
            lines.append(f'Column: "{col}"')
            lines.append(f'  Rows Flagged: {info["rows_flagged"]}')
            lines.append(f'  Disallowed Values: [{vals}]')
            lines.append('')
    else:
        lines.append('No disallowed values found in any column.')
        lines.append('')

    if skipped_columns:
        lines.append('Columns Skipped (UID, Facility, UniqueKey, or User-Specified):')
        lines.append('---------------------------------------------------------------')
        for c in skipped_columns:
            lines.append(f'  - {c}')
        lines.append('')

    report_text = '\n'.join(lines)

    # 7. Optional report writing
    if report_file:
        try:
            with open(report_file, 'w', encoding='utf-8') as f:
                f.write(report_text)
        except Exception:
            pass  # Skip report on write error

    # 8. Return unchanged DataFrame
    return df


In [35]:
# FUNCTION - Clean DataFrame with Dictionary
# Cleans and standardizes column values based on a reference dictionary, with optional report.
# ---------------------------------------------------------------------------------------------
def clean_dataframe_with_dictionary(
    df: pd.DataFrame,
    dict_input: pd.DataFrame | str,
    report_path: Optional[str] = None,
    cols_to_skip: list[str] = ['uid', 'facility', 'uniquekey', "startedatdischarge", "completedatdischarge", "ingestedatdischarge"]
) -> pd.DataFrame:
    """
    Steps:
    1. Initialize cols_to_skip list.
    2. Load or concatenate the dictionary into a DataFrame.
    3. Build mappings for each column key:
       - val_to_label: raw value -> description
       - label_to_val: description -> raw value
    4. Prepare report tracking structures:
       - replacements_made, unknown_values, columns_without_mapping, special_values_not_replaced.
    5. Copy the original DataFrame to avoid in-place modification.
    6. For each column (skipping cols_to_skip):
       a. If no mapping, record and skip.
       b. For each unique non-null value:
          i.   If combination string ("{...}"), split into items.
          ii.  For each item:
               - If special token "none"/"norm"/"normal", record it.
               - Else map via val_to_label or label_to_val, or record as unknown.
          iii. Rebuild combination and replace all occurrences if changed.
          iv.  For single values, apply similar mapping logic.
       c. Remove unused categories for categorical columns.
    7. Final pass to clean up all categorical columns.
    8. Build report text in sections.
    9. If report_path provided, attempt to write report; skip silently on failure.
   10. Return the cleaned DataFrame.
    """
    # 1. Default skip list
    if cols_to_skip is None:
        cols_to_skip = []

    # 2. Load dictionary DataFrame
    if isinstance(dict_input, str) and os.path.exists(dict_input):
        xls = pd.ExcelFile(dict_input)
        dict_df_list = [pd.read_excel(xls, sheet_name=sheet) for sheet in xls.sheet_names]
        dict_df = pd.concat(dict_df_list, ignore_index=True)
    elif isinstance(dict_input, pd.DataFrame):
        dict_df = dict_input.copy()
    else:
        raise ValueError("dict_input must be a file path or pandas DataFrame.")

    # 3. Build column mappings
    from collections import defaultdict
    col_mappings: dict[str, dict[str, dict]] = defaultdict(lambda: {"val_to_label":{}, "label_to_val":{}})
    for _, row in dict_df.iterrows():
        if not all(k in row for k in ["Key","Value","Value Label"]):
            continue
        key = str(row["Key"]).strip().lower()
        val = str(row["Value"]).strip()
        label = str(row["Value Label"]).strip()
        col_mappings[key]["val_to_label"][val] = label
        col_mappings[key]["label_to_val"][label] = val

    # 4. Initialize report trackers
    replacements_made: dict[str, list[tuple[Any,Any]]] = defaultdict(list)
    unknown_values: dict[str, set] = defaultdict(set)
    columns_without_mapping: list[str] = []
    special_values_not_replaced: dict[str, dict[str,int]] = defaultdict(lambda: defaultdict(int))

    # 5. Copy DataFrame
    cleaned_df = df.copy()

    # 6. Process each column
    for col in cleaned_df.columns:
        if col in cols_to_skip:
            continue
        key = col.split('.')[0].strip().lower()
        mapping = col_mappings.get(key)
        # 6a. Skip if no mapping available
        if not mapping or (not mapping["val_to_label"] and not mapping["label_to_val"]):
            columns_without_mapping.append(col)
            continue

        val_to_label = mapping["val_to_label"]
        label_to_val = mapping["label_to_val"]
        unique_vals = cleaned_df[col].dropna().unique()

        # 6b. Iterate unique values
        for orig in unique_vals:
            s = str(orig).strip()
            # 6b.i: Handle combination
            if s.startswith("{") and s.endswith("}"):
                items = [x.strip().strip("'\"") for x in s[1:-1].split(',')]
                new_items = []
                for item in items:
                    low = item.lower()
                    if low in ("none","norm","normal"):
                        special_values_not_replaced[col][low] += 1
                        new_items.append(item)
                    elif item in val_to_label:
                        new_items.append(item)
                    elif item in label_to_val:
                        new_items.append(label_to_val[item])
                    else:
                        new_items.append(item)
                        unknown_values[col].add(item)
                new_s = "{" + ",".join(new_items) + "}"
                if new_s != s:
                    cleaned_df[col].replace(orig, new_s, inplace=True)
                    replacements_made[col].append((orig, new_s))

            else:
                # 6b.iv: Single value
                low = s.lower()
                if low in ("none","norm","normal"):
                    special_values_not_replaced[col][low] += 1
                elif s in val_to_label:
                    continue
                elif s in label_to_val:
                    new_val = label_to_val[s]
                    cleaned_df[col].replace(orig, new_val, inplace=True)
                    replacements_made[col].append((orig, new_val))
                else:
                    unknown_values[col].add(s)

        # 6c: Cleanup categories
        if pd.api.types.is_categorical_dtype(cleaned_df[col]):
            cleaned_df[col] = cleaned_df[col].cat.remove_unused_categories()

    # 7. Final cleanup for all categorical
    for col in cleaned_df.columns:
        if pd.api.types.is_categorical_dtype(cleaned_df[col]):
            cleaned_df[col] = cleaned_df[col].cat.remove_unused_categories()

    # 8. Build report sections
    lines: list[str] = []
    # Section 1
    lines.append("Section 1 - Replacements")
    if not replacements_made:
        lines.append("  (No replacements made.)")
    else:
        for c, repls in replacements_made.items():
            lines.append(f"Column '{c}':")
            seen = set()
            for o,n in repls:
                if (o,n) not in seen:
                    lines.append(f"  - {o} -> {n}")
                    seen.add((o,n))
            lines.append("")
    # Section 2
    lines.append("Section 2 - Unknown Values")
    if not unknown_values:
        lines.append("  (No unknown values.)")
    else:
        for c, vals in unknown_values.items():
            lines.append(f"Column '{c}': {len(vals)} unknown values: {list(vals)}")
    # Section 3
    lines.append("Section 3 - No Mapping Available")
    if not columns_without_mapping:
        lines.append("  (All columns had mappings.)")
    else:
        for c in columns_without_mapping:
            lines.append(f"  - {c}")
    # Section 4
    lines.append("Section 4 - Special Values Not Replaced")
    if not special_values_not_replaced:
        lines.append("  (None encountered.)")
    else:
        for c, d in special_values_not_replaced.items():
            lines.append(f"Column '{c}': {d}")

    report_text = '\n'.join(lines)

    # 9. Optional report writing
    if report_path:
        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write(report_text)
        except Exception:
            pass

    # 10. Return cleaned DataFrame
    return cleaned_df


In [36]:
# FUNCTION - Align Datasets
# Compares two country-specific dictionaries (ZIM vs. MWI) and optionally generates an alignment report.
# ------------------------------------------------------------------------------
def align_datasets(
    dict_input_zim: pd.DataFrame | str,
    dict_input_mwi: pd.DataFrame | str,
    report_path: Optional[str] = None
) -> None:
    """
    Steps:
    1. Build value mappings for each country:
       a. If `dict_input` is a filepath, read and concatenate all sheets into one DataFrame.
       b. Else if it's a DataFrame, copy it.
       c. Assemble mapping: key (lowercased) -> set of values.
    2. Identify common keys between ZIM and MWI mappings.
    3. For each common key where the allowed-value sets differ:
       a. Compute three sorted lists: values in both, only in ZIM, only in MWI.
       b. Append formatted lines to `report_lines`.
    4. If `report_path` is provided, attempt to write the report; skip silently on failure.
    """
    # 1. Helper to build mapping
    def build_mapping(input_data: pd.DataFrame | str) -> dict[str, set[str]]:
        # a. Load from Excel if path
        if isinstance(input_data, str) and os.path.exists(input_data):
            xls = pd.ExcelFile(input_data)
            df_list = [pd.read_excel(xls, sheet_name=sheet) for sheet in xls.sheet_names]
            dict_df = pd.concat(df_list, ignore_index=True)
        # b. Copy if DataFrame
        elif isinstance(input_data, pd.DataFrame):
            dict_df = input_data.copy()
        else:
            raise ValueError("dict_input must be a file path or pandas DataFrame.")

        mapping: dict[str, set[str]] = {}
        # c. Assemble mapping
        for _, row in dict_df.iterrows():
            if "Key" not in row or "Value" not in row:
                continue
            key = str(row["Key"]).strip().lower()
            value = str(row["Value"]).strip()
            mapping.setdefault(key, set()).add(value)
        return mapping

    # Build mappings for both countries
    mapping_zim = build_mapping(dict_input_zim)
    mapping_mwi = build_mapping(dict_input_mwi)

    # 2. Find common keys
    common_keys = set(mapping_zim.keys()) & set(mapping_mwi.keys())

    report_lines: list[str] = ["Alignment Report"]
    # 3. Compare allowed values
    for key in sorted(common_keys):
        zim_vals = mapping_zim[key]
        mwi_vals = mapping_mwi[key]
        # Skip identical sets
        if zim_vals == mwi_vals:
            continue
        both = sorted(zim_vals & mwi_vals, key=str.lower)
        only_zim = sorted(zim_vals - mwi_vals, key=str.lower)
        only_mwi = sorted(mwi_vals - zim_vals, key=str.lower)

        report_lines.append(f"Column: {key}")
        report_lines.append(f"  Found in both: {both}")
        report_lines.append(f"  Found in ZIM only: {only_zim}")
        report_lines.append(f"  Found in MWI only: {only_mwi}")
        report_lines.append("")  # Blank line

    # 4. Optional report writing
    if report_path:
        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_lines))
        except Exception:
            pass  # Skip on write failure


In [37]:
# FUNCTION - Remove Disallowed Values with Optional Report
# Replaces specified disallowed values with NaN and optionally writes a detailed report.
# ------------------------------------------------------------------------------
def remove_disallowed_values_with_report(
    values_to_delete: dict[str, Collection[Any]] | list[tuple[Collection[str], Collection[Any]]],
    df: pd.DataFrame,
    report_filepath: Optional[str] = None
) -> pd.DataFrame:
    """
    Steps:
    1. Normalize `values_to_delete` into `col_to_disallowed`: column -> set of disallowed values.
    2. Record original value counts for each mapped column; track missing columns.
    3. Replace occurrences of each disallowed value with NaN in-place.
    4. Build report lines detailing skipped columns and counts of deleted values per column.
    5. If `report_filepath` is provided, attempt to write the report; skip silently on failure.
    6. Return the modified DataFrame.
    """
    # 1. Normalize input to column->disallowed set
    col_to_disallowed: dict[str, set[Any]] = {}
    if isinstance(values_to_delete, dict):
        items = values_to_delete.items()
    elif isinstance(values_to_delete, list):
        items = values_to_delete
    else:
        raise ValueError("values_to_delete must be a dict or list of pairs.")

    for key, disallowed in items:
        # If key is a list/tuple of columns, apply same disallowed set to each
        if isinstance(key, (list, tuple)):
            for col in key:
                col_to_disallowed.setdefault(col, set()).update(disallowed)
        else:
            col_to_disallowed.setdefault(key, set()).update(disallowed)

    # 2. Track processing and original counts
    processed_columns: list[str] = []
    skipped_columns: list[str] = []
    original_counts: dict[str, dict[Any,int]] = {}
    for col, dis_set in col_to_disallowed.items():
        if col in df.columns:
            processed_columns.append(col)
            original_counts[col] = df[col].value_counts(dropna=False).to_dict()
        else:
            skipped_columns.append(col)

    # 3. Replace disallowed values with NaN
    for col in processed_columns:
        dis_list = list(col_to_disallowed[col])
        df.loc[df[col].isin(dis_list), col] = np.nan

    # 4. Build report lines
    lines: list[str] = []
    lines.append("Skipped columns:")
    if skipped_columns:
        for col in skipped_columns:
            lines.append(f"  - {col}")
    else:
        lines.append("  None")

    lines.append("\nDeleted Values:")
    for col in sorted(processed_columns):
        orig = original_counts.get(col, {})
        final = df[col].value_counts(dropna=False).to_dict()

        lines.append(f"Column: {col}")
        lines.append("  Deleted Values:")
        deleted_any = False
        for val in sorted(col_to_disallowed[col], key=lambda x: str(x)):
            before = orig.get(val, 0)
            after = final.get(val, 0)
            delta = before - after
            if delta > 0:
                lines.append(f"    {val!r}: {delta}")
                deleted_any = True
        if not deleted_any:
            lines.append("    None")
        lines.append("")  # blank line

    report_text = "\n".join(lines)

    # 5. Optional report writing
    if report_filepath:
        try:
            with open(report_filepath, 'w', encoding='utf-8') as f:
                f.write(report_text)
        except Exception:
            pass  # skip on write error

    # 6. Return the DataFrame
    return df

In [38]:
# FUNCTION - Apply Value Mappings with Optional Report
# Replaces specified old values with new values (including within combinations) and optionally writes a report.
# ------------------------------------------------------------------------------
def apply_value_mappings_with_report(
    df: pd.DataFrame,
    mappings: dict[str, dict[Any, list[Any]]],
    report_path: Optional[str] = None
) -> pd.DataFrame:
    """
    Steps:
    1. Initialize an empty list `report_lines` to accumulate report entries.
    2. Define helper `count_occurrences_in_cell` to count occurrences of a target value,
       handling both single entries and comma-separated combinations (with or without braces).
    3. Iterate through each column and its mapping in `mappings`:
       a. Skip if the column does not exist in the DataFrame.
       b. Build `rep_dict` mapping each old_val -> new_val for fast lookup.
       c. Count how many replacements will occur for each old_val by summing occurrences.
       d. Record lines for values with non-zero replacement counts.
       e. Define `map_cell` to apply replacements in each cell:
          - Direct replace if exactly matches old_val.
          - For comma-separated strings, replace within tokens and rebuild with braces if needed.
       f. Apply `map_cell` to the entire column in-place.
       g. Append the recorded lines for this column to `report_lines` if any.
    4. Join `report_lines` into a single string `report_text`.
    5. If `report_path` is provided, attempt to write `report_text` to file; skip on any error.
    6. Return the modified DataFrame.
    """
    # 1. Prepare report container
    report_lines: list[str] = []

    # 2. Helper to count occurrences of a target in a cell
    def count_occurrences_in_cell(cell: Any, target: Any) -> int:
        cell_str = str(cell)
        if cell_str == str(target):
            return 1
        if "," in cell_str:
            has_braces = cell_str.startswith("{") and cell_str.endswith("}")
            inner = cell_str[1:-1] if has_braces else cell_str
            tokens = [tok.strip() for tok in inner.split(",")]
            return tokens.count(str(target))
        return 0

    # 3. Iterate mappings
    for column, col_mapping in mappings.items():
        if column not in df.columns:
            continue  # 3a. Skip missing columns

        # 3b. Build flat old->new dictionary
        rep_dict: dict[str, Any] = {}
        for new_val, old_list in col_mapping.items():
            for old_val in old_list:
                rep_dict[str(old_val)] = new_val

        # 3c. Count how many replacements for each old_val
        column_report: list[str] = []
        for old_str, new_val in rep_dict.items():
            count_replaced = sum(
                count_occurrences_in_cell(cell, old_str) for cell in df[column]
            )
            if count_replaced > 0:
                column_report.append(f"  {old_str} -> {new_val}: {count_replaced}")

        # 3e. Define mapping function for a single cell
        def map_cell(cell: Any) -> Any:
            cell_str = str(cell)
            # Direct replacement
            if cell_str in rep_dict:
                return rep_dict[cell_str]
            # Handle comma-separated values
            if "," in cell_str:
                has_braces = cell_str.startswith("{") and cell_str.endswith("}")
                inner = cell_str[1:-1] if has_braces else cell_str
                tokens = [tok.strip() for tok in inner.split(",")]
                mapped = [rep_dict.get(tok, tok) for tok in tokens]
                new_inner = ", ".join(mapped)
                return f"{{{new_inner}}}" if has_braces else new_inner
            return cell

        # 3f. Apply mapping to column
        df[column] = df[column].apply(map_cell)

        # 3g. Append column report if entries exist
        if column_report:
            report_lines.append(f"Column: {column}")
            report_lines.extend(column_report)
            report_lines.append("")

    # 4. Build final report text
    report_text = "\n".join(report_lines)

    # 5. Optional report writing
    if report_path:
        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write(report_text)
        except Exception:
            pass  # Skip on write error

    # 6. Return updated DataFrame
    return df

In [39]:
# FUNCTION - Replace Unallowed with "OTHER"
# Replaces values not in the allowed set with "OTHER" and optionally writes a report.
# ------------------------------------------------------------------------------
def replace_unallowed_with_other(
    df: pd.DataFrame,
    allowed_values_by_col: dict[str, Collection[Any]],
    report_path: Optional[str] = None
) -> pd.DataFrame:
    """
    Steps:
    1. Initialize an empty list to accumulate report lines.
    2. For each column in allowed_values_by_col:
       a. Skip if the column doesn't exist in df.
       b. Convert allowed_values to a set for fast lookup.
       c. Initialize a counter for total replacements.
       d. Define process_cell to:
          - Return the original cell if it's in allowed_set.
          - Otherwise increment the counter and return "OTHER".
       e. Apply process_cell to the entire column.
       f. If replacements occurred, append report entries for this column.
    3. If report_path is provided, attempt to write the report; silently skip on failure.
    4. Return the modified DataFrame.
    """
    # 1. Prepare report container
    report_lines: list[str] = []

    # 2. Iterate over each mapping
    for col, allowed_values in allowed_values_by_col.items():
        # 2a. Skip missing columns
        if col not in df.columns:
            continue
        # 2b. Build a set of allowed values
        allowed_set = set(allowed_values)
        # 2c. Counter for replacements in this column
        total_replacements = 0

        # 2d. Cell processing function
        def process_cell(cell: Any) -> Any:
            nonlocal total_replacements
            # Keep allowed values
            if cell in allowed_set:
                return cell
            # Replace disallowed with "OTHER"
            total_replacements += 1
            return "OTHER"

        # 2e. Apply to column
        df[col] = df[col].apply(process_cell)

        # 2f. Record in report if replacements happened
        if total_replacements > 0:
            report_lines.append(f"Column: {col}")
            report_lines.append(f"  TOTAL REPLACED: {total_replacements}")
            report_lines.append("")

    # 3. Optional report writing
    if report_path:
        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write("\n".join(report_lines))
        except Exception:
            pass  # Skip on write errors

    # 4. Return DataFrame
    return df

#### Datetime Validation Functions

In [40]:
# FUNCTION - Purify Columns by Data Type with Optional Report
# Removes values that cannot be cast to the specified dtype and optionally writes a report.
# ------------------------------------------------------------------------------
def purify_column_dt(
    df: pd.DataFrame,
    dtype: str,
    cols: list[str] | None = None,
    report_path: str | None = None
) -> pd.DataFrame:
    """
    Steps:
    1. Determine which columns to process (exclude 'facility','uid','uniquekey').
    2. For each column:
       a. If targeting Boolean, apply a safe conversion function and record changes.
       b. Else, validate each non-null cell against the target dtype and delete invalid values.
       c. Track counts of deletions and old->new mappings.
    3. Compile report lines summarizing per-column deletions and mappings.
    4. If `report_path` is provided, attempt to write the report; silently skip on error.
    5. Return the modified DataFrame.
    """
    import pandas as pd
    import numpy as np

    # 1. Identify columns to process (skip always-facility/uid/uniquekey)
    skip_cols = {"facility", "uid", "uniquekey", "startedatdischarge", "completedatdischarge", "ingestedatdischarge"}
    if not cols:
        target_cols = [c for c in df.columns if c.lower() not in skip_cols]
    else:
        target_cols = [c for c in cols if c in df.columns and c.lower() not in skip_cols]

    # Helper to compare old vs new values safely
    def values_differ(old, new):
        if pd.isna(old) and pd.isna(new):
            return False
        if pd.isna(old) or pd.isna(new):
            return True
        return old != new

    report_lines: list[str] = []

    # 2. Process each target column
    for col in target_cols:
        mapping_counts: dict[tuple[Any,Any], int] = {}
        deleted_counts: dict[Any,int] = {}

        # 2a. Boolean conversion
        if dtype.lower() == "boolean":
            # Safe converter: yes/y/true -> True; no/n/false -> False; else pd.NA
            def to_bool(x):
                if pd.isnull(x):
                    return pd.NA
                s = str(x).strip().lower()
                if s in {"yes","y","true"}:
                    return True
                if s in {"no","n","false"}:
                    return False
                return pd.NA

            original = df[col].copy()
            # Apply and cast to pandas BooleanDtype
            df[col] = df[col].apply(to_bool).astype('boolean')

            # Count conversions & deletions
            for idx, new_val in df[col].items():
                old_val = original.iat[idx]
                if values_differ(old_val, new_val):
                    mapping_counts[(old_val, new_val)] = mapping_counts.get((old_val, new_val), 0) + 1
                    if pd.isna(new_val) and pd.notna(old_val):
                        deleted_counts[old_val] = deleted_counts.get(old_val, 0) + 1

            total_deleted = sum(deleted_counts.values())
            report_lines.append(f"Column: {col} (Boolean)")
            report_lines.append(f"  Total deleted/converted: {total_deleted}")

        # 2b. Other types: datetime, numeric, categorical
        else:
            for idx, val in df[col].items():
                if pd.isnull(val):
                    continue
                original_val = val
                is_valid = True
                # Validate by dtype
                if dtype.lower() == "datetime":
                    try:
                        pd.to_datetime(val, errors='raise')
                    except Exception:
                        is_valid = False
                elif dtype.lower() == "numeric":
                    try:
                        pd.to_numeric(val, errors='raise')
                    except Exception:
                        is_valid = False
                elif dtype.lower() == "categorical":
                    is_valid = isinstance(val, str)
                else:
                    raise ValueError(f"Unsupported dtype '{dtype}'")

                # Delete invalid
                if not is_valid:
                    df.at[idx, col] = pd.NA
                    deleted_counts[original_val] = deleted_counts.get(original_val, 0) + 1
                    mapping_counts[(original_val, pd.NA)] = mapping_counts.get((original_val, pd.NA), 0) + 1

            total_deleted = sum(deleted_counts.values())
            report_lines.append(f"Column: {col} ({dtype})")
            report_lines.append(f"  Total deleted: {total_deleted}")

        # 2c. Append detailed counts
        if deleted_counts:
            report_lines.append("Deleted Values and Counts:")
            for val, cnt in deleted_counts.items():
                report_lines.append(f"  {val!r}: {cnt}")
        else:
            report_lines.append("  None deleted.")

        report_lines.append("Old->New mappings:")
        if mapping_counts:
            for (old, new), cnt in mapping_counts.items():
                report_lines.append(f"  {old!r} -> {new!r}: {cnt}")
        else:
            report_lines.append("  No changes.")
        report_lines.append("")  # blank line

    # 3. Compile report text
    full_report = "\n".join(report_lines)

    # 4. Optional: write report
    if report_path:
        try:
            with open(report_path, 'w', encoding='utf-8') as f:
                f.write(full_report)
        except Exception:
            pass  # skip on write failure

    # 5. Return the purified DataFrame
    return df

#### Merging Functions

In [41]:
# FUNCTION - Merge Validated DataFrames
# -----------------------------------------------

def merge_validated_data(
    df_numeric: pd.DataFrame,
    df_boolean: pd.DataFrame,
    df_category: pd.DataFrame,
    df_object: pd.DataFrame,
    df_datetime: pd.DataFrame,
    on: list, #= ["facility", "uid", "uniquekey", "startedatdischarge", "completedatdischarge", "ingestedatdischarge", "datetimedischarge.value"],
    how: str = "inner",
    csv_path: str = None,
    pkl_path: str = None
) -> pd.DataFrame:
    """
    Sequentially merge five DataFrames of different types into a single DataFrame.

    Parameters
    ----------
    df_numeric : pd.DataFrame
        First DataFrame (numeric).
    df_boolean : pd.DataFrame
        Second DataFrame (boolean).
    df_category : pd.DataFrame
        Third DataFrame (categorical).
    df_object : pd.DataFrame
        Fourth DataFrame (object/string).
    df_datetime : pd.DataFrame
        Fifth DataFrame (datetime).
    on : list of str, default ["facility","uid","uniquekey"]
        Column names to join on for each merge.
    how : str, default "inner"
        Type of merge to perform ("inner", "left", "outer", etc.).
    csv_path : str, optional
        If provided, path to save the merged DataFrame as CSV.
    pkl_path : str, optional
        If provided, path to save the merged DataFrame as a pickle.

    Returns
    -------
    pd.DataFrame
        The fully merged DataFrame.
    """
    # Merge numeric + boolean
    df_merged = pd.merge(df_numeric, df_boolean, on=on, how=how)

    # Add category
    df_merged = pd.merge(df_merged, df_category, on=on, how=how)

    # Add object
    df_merged = pd.merge(df_merged, df_object, on=on, how=how)

    # Add datetime
    df_merged = pd.merge(df_merged, df_datetime, on=on, how=how)

    # Optionally write out
    if csv_path:
        df_merged.to_csv(csv_path, index=False)
    if pkl_path:
        df_merged.to_pickle(pkl_path)

    return df_merged


###
---

### CLEANING

#### 1st Stage Cleaning

In [None]:
# LOAD DATA - CSVs & DICTIONARIES

# Read CSV with proper NA handling. This ensures that empty strings, "nan", "NA", etc. are treated as NaN.
df = pd.read_csv(csv_filepath, delimiter=",")
logging.info(f"Loaded CSV with shape: {df.shape}")

# Load and aggregate data dictionaries.
dict_mapping = load_and_aggregate_dictionary(cfg["dict_filepath"])
logging.info(f"Loaded dictionary with keys: {list(dict_mapping.keys())[:10]}")

# Generate a pre-processing report.
get_report(df, None)
logging.info("Pre-processing report generated.")

# Optional Reports

In [None]:
# Remove frame shifted rows
df_removed_frame_shift = remove_frame_shift(df, None, None, None)

# Optional Reports

In [None]:
# Clean column headers.
df_clean_cols = clean_columns(df_removed_frame_shift)


In [None]:
# Merge duplicate columns.
df_merge_cols = merge_duplicate_columns(df_clean_cols)

In [None]:
# Clean impure columns using dictionary mappings.
df_pure_values = clean_columns_using_base_name(df_merge_cols, dict_mapping, None)
logging.info("Column cleaning report generated.")

# Optional Reports
# column_cleaning_report_filepath_zim

In [None]:
## FOWARD FILL NONES
df_forward_fill_nones = fill_missing_with_label_value(df_pure_values, None)

# Optional Reports
# forward_fill_nones_report_zim

In [None]:
# Forward fill missing .value columns using corresponding .label columns.
df_forward_fill = forward_fill_numeric_datetime(df_forward_fill_nones, None)
logging.info("Forward fill complete.")

# Optional Reports
# forward_fill_report_path_zim

In [None]:
# drop autopopulating/repeat columns
df_dropped_autopops = drop_unwanted_columns(df_forward_fill, None )

In [None]:
# Convert columns to correct datatypes using the new combined function.
df_dtypes_fixed = convert_columns_and_create_report(df_dropped_autopops,dict_mapping, report_path= None)
logging.info("Data type conversion complete.")

# Optional Report
# dtype_conversion_report_filepath_zim

In [None]:
# Final post-processing report.
get_report(df_dtypes_fixed, None)
logging.info("Post-processing report generated.")

# Optional Report
# post_process_report_filepath_zim

In [None]:
# Aggregate and save the column datatypes & descriptions.
summary_df = map_df_columns_to_descriptions_and_datatypes(df_dtypes_fixed, cfg["dict_filepath"], None, None)

# Optional Reports
# output_csv_filepath_zim, output_excel_filepath_zim

In [None]:
# The merge columns
on_cols = ["facility", "uid", "uniquekey", "startedatdischarge", "completedatdischarge", "ingestedatdischarge"]


# Calculate the number of missing values for each row
# create a temporary column to store this count
df_dtypes_fixed['missing_count'] = df_dtypes_fixed.isnull().sum(axis=1)

# Sort the DataFrame. This is crucial.
# First, sort by your 'on_cols' to group duplicates together.
# Second, sort by 'missing_count' in ascending order (fewer missing values first).
# Third, the default stable sort will keep the original order for rows with the same missing_count.
df_sorted_for_deduplication = df_dtypes_fixed.sort_values(
    by=on_cols + ['missing_count'],
    ascending=True # Sort missing_count ascending so fewer missing values come first
)

#Drop duplicates based on 'on_cols', keeping the 'first' one.
# Because of the previous sort, 'first' will now correspond to:
# 1. The record with the fewest missing values among the duplicates.
# 2. If missing values are tied, the one that appeared first in the original (or input to sort_values) order.
df_deduplicated_by_missingness = df_sorted_for_deduplication.drop_duplicates(
    subset=on_cols,
    keep='first'
)

# Drop the temporary 'missing_count' column
df_deduplicated_by_missingness = df_deduplicated_by_missingness.drop(columns=['missing_count'])


print(f"Original DataFrame shape: {df_dtypes_fixed.shape}")
print(f"Deduplicated DataFrame shape: {df_deduplicated_by_missingness.shape}")

# Verify if any duplicates remain based on on_cols (should be 0)
print(f"Duplicates remaining after deduplication: {df_deduplicated_by_missingness.duplicated(subset=on_cols).sum()}")

# inspect the first few rows of the cleaned DataFrame
# print(df_deduplicated_by_missingness.head())

In [None]:
df_first_stage_cleaned_data = df_deduplicated_by_missingness
df_first_stage_cleaned_data.shape


###
---

### VALIDATION

#### Numeric Validation

In [None]:
# LOAD DATA - CSVs & DICTIONARIES

# Read CSV with proper NA handling. This ensures that empty strings, "nan", "NA", etc. are treated as NaN.
df_num = df_first_stage_cleaned_data

# Load and aggregate validation dictionary.
sheet_name = sheet_name # <---------------- specify Excel sheet name with the numerical validations
feature_dict_mapping = load_and_aggregate_validation_dictionary(feature_dict_filepath, sheet_name)

In [None]:
# Assign Correct set of features
df_features_of_interest_num = df_num[cfg["num"]]

In [None]:
# Remove Suffixes from features
df_removed_suffixes_num = remove_suffixes(df_features_of_interest_num)

In [None]:
# Delete disallowed dtypes
dtype = 'numeric' # 'numeric', 'datetime', 'boolean', 'categorical'
cols_to_clean_num = []
df_purified_cols_num = purify_column_num(df_removed_suffixes_num, dtype, cols= cols_to_clean_num, report_path=None)

# Optional Report
# report_dtypes_disallowed_ZIM

In [None]:
# Raise Flags  Numeric
skip_columns= []
numeric_flags= raise_flags_numeric(df_purified_cols_num, feature_dict_mapping, None, skip_columns=skip_columns)

# Optional Reports
# report_numeric_flags_zim

In [None]:
# Format the weight features to make all values (grams) because some values are grams & some are Kgs.
df_weight_fixed = convert_weight_to_kgs(df_purified_cols_num, cols = cfg["weight_cols"])

In [None]:
# Remove disallowed values from numeric features - FIX Clinically Implausible values
# skip_columns_zim = ['age','bloodsugarmmol', 'dischrr', 'hr', 'rr', 'length', 'nnuadmtemp', 'wcc1r']
# skip_columns_mwi = ['age', 'bloodsugarmg', 'bloodsugarmmol', 'dischhr', 'dischrr', 'bsmg', 'hr', 'rr', 'dischtemp']
df_removed_numeric_invalids = validate_numeric_ranges(df_weight_fixed, feature_dict_mapping, None, skip_columns= cfg["skip_columns"])

# Optional Reports
# report_numeric_disallowed_ZIM

In [None]:
# Remove duplicates from the numeric DataFrame after validation.
df_validated_numeric = df_removed_numeric_invalids.drop_duplicates()

#### Boolean Validation

In [None]:
# LOAD DATA - CSVs & DICTIONARIES
# Read CSV with proper NA handling. This ensures that empty strings, "nan", "NA", etc. are treated as NaN.
df_bool = df_first_stage_cleaned_data

In [None]:
# Assign Correct set of features
df_features_of_interest_bool = df_bool[cfg["bool"]]

In [None]:
# Remove Suffixes from features
df_removed_suffixes_bool = remove_suffixes(df_features_of_interest_bool)

In [None]:
# Delete disallowed dtypes
dtype = 'boolean' # 'numerical', 'datetime', 'boolean', 'categorical'
cols_to_clean_bool = [] # "disccovidrisk", "haart", "nvpgiven", "phototherapy", "hyposymptoms"
df_purified_cols_bool = purify_column_bool(df_removed_suffixes_bool, dtype, cols= cols_to_clean_bool, report_path=None)

# Optional Reports
# report_dtypes_disallowed_ZIM

In [None]:
# Remove duplicates from the boolean DataFrame after validation.
df_validated_bool = df_purified_cols_bool.drop_duplicates()

#### Categorical Validation

In [None]:
# LOAD DATA - CSVs & DICTIONARIES

# Read Data
df_cat = df_first_stage_cleaned_data

# Load and aggregate data dictionary.
dict_mapping = load_and_aggregate_data_dictionary2(cfg["dict_filepath"])
logging.info(f"Loaded dictionary with keys: {list(dict_mapping.keys())[:10]}")


In [None]:
# Assign Correct set of features
df_features_of_interest_cat = df_cat[cfg["cat"]]

In [None]:
# Remove Suffixes from features
df_removed_suffixes_cat = remove_suffixes(df_features_of_interest_cat)

In [None]:

cols_to_skip = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge"]

df_value_replacements_cat = clean_dataframe_with_dictionary(df_removed_suffixes_cat, cfg["dict_filepath"], None, cols_to_skip)

# Optional Reports
#  report_zim

In [None]:
df_delete_disallowed_values_cat = remove_disallowed_values_with_report(cfg["values_to_delete"], df_value_replacements_cat, None)

# Optional Reports
# disallowed_deletes_zim = r"C:\Users\kmeck\OneDrive - Imperial College London\Desktop\Disallowed_deletes_zim.txt"
# disallowed_deletes_mwi = r"C:\Users\kmeck\OneDrive - Imperial College London\Desktop\Disallowed_deletes_mwi.txt"

In [None]:
# Apply the mappings and generate the report.
df_aligned = apply_value_mappings_with_report(df_delete_disallowed_values_cat, value_mappings_zim, None)

# Optional Reports
# report_file_path

In [None]:
# Remove 'nan' Category - Replace with NaN
# List of columns to exclude
cols_to_exclude = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge"]

# ZIM
# Loop over columns except the excluded ones
for col in [c for c in df_aligned.columns if c not in cols_to_exclude]:
    if pd.api.types.is_categorical_dtype(df_aligned[col]):
        if 'nan' in df_aligned[col].cat.categories:
            # Remove the 'nan' category, converting its entries to NaN
            df_aligned[col] = df_aligned[col].cat.remove_categories(['nan'])

In [None]:
df_validated_cat = df_aligned.drop_duplicates()

#### Object Validation

In [None]:
# LOAD DATA - CSVs & DICTIONARIES

# Read CSV with proper NA handling. This ensures that empty strings, "nan", "NA", etc. are treated as NaN.
df_obj = df_first_stage_cleaned_data

# Load and aggregate data dictionary.
dict_mapping = load_and_aggregate_data_dictionary2(cfg["dict_filepath"])
logging.info(f"Loaded dictionary with keys: {list(dict_mapping.keys())[:10]}")


In [None]:
# Assign Correct set of features
df_features_of_interest_obj = df_obj[cfg["obj"]]

In [None]:
# Remove Suffixes from features
df_removed_suffixes_obj = remove_suffixes(df_features_of_interest_obj)

In [None]:
cols_to_skip = ["facility","uid","uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge"]

df_value_replacements_obj = clean_dataframe_with_dictionary(df_removed_suffixes_obj, cfg["dict_filepath"], None, cols_to_skip)

# Optional Reports
# report_zim

In [None]:
df_validated_obj = df_value_replacements_obj.drop_duplicates()

#### Datetime Validation

In [None]:
# LOAD DATA - CSVs & DICTIONARIES

# Read CSV with proper NA handling. This ensures that empty strings, "nan", "NA", etc. are treated as NaN.
df_dt = df_first_stage_cleaned_data

In [None]:
# Assign Correct set of features
df_features_of_interest_dt = df_dt[cfg["dt"]]

In [None]:
# Remove Suffixes from features
df_removed_suffixes_dt = remove_suffixes(df_features_of_interest_dt)

In [None]:
# Delete disallowed dtypes
dtype = 'datetime' # 'numerical', 'datetime', 'boolean', 'categorical'
cols_to_clean_dt = []
df_purified_cols_dt = purify_column_dt(df_removed_suffixes_dt, dtype, cols= cols_to_clean_dt, report_path=None)

# Optional Reports
# report_dtypes_disallowed_ZIM


In [None]:
# REPLACE 'nan' with NaT

cols_to_exclude = ["facility","uid","uniquekey"]

# Loop over columns except the excluded ones
for col in [c for c in df_purified_cols_dt.columns if c not in cols_to_exclude]:
    if pd.api.types.is_datetime64_any_dtype(df_purified_cols_dt[col]):
        # Convert the column to object to safely replace 'nan'
        df_purified_cols_dt[col] = df_purified_cols_dt[col].astype(object).replace('nan', pd.NaT)
        # Convert back to datetime, coercing errors to NaT if necessary
        df_purified_cols_dt[col] = pd.to_datetime(df_purified_cols_dt[col], errors='coerce')


In [None]:
df_validated_dt = df_purified_cols_dt.drop_duplicates()

###
---

### DATA MERGING & CLEANED OUTPUTS

In [None]:
# Merge the Validated Data Into one Dataframe

df_merged = merge_validated_data(
    df_validated_numeric,
    df_validated_bool,
    df_validated_cat,
    df_validated_obj,
    df_validated_dt,
    on=["facility", "uid", "uniquekey","startedatdischarge", "completedatdischarge", "ingestedatdischarge"],
    how="inner",
    csv_path= merged_data_csv,
    pkl_path= merged_data_pkl
)

###
---

## OUTPUT

In [None]:
df_merged

In [None]:
# CODE RUNTIME
#-----------------
notebook_end_time = time.time()
total_runtime = notebook_end_time - notebook_start_time
minutes = int(total_runtime // 60)
seconds = int(total_runtime % 60)
print(f"⏱️ Notebook runtime: {minutes} min {seconds} sec")

# ~END