<h2>Generate Natality 10 Year - 10 Percent Sample Data</h2>

This notebook generates a dataset with a broader time range, with each year contributing 10 percent of its total rows, chosen at random. This dataset is used for recall optimized modeling, controllable factors modeling, and demographic factors modeling.

In [None]:
from pathlib import Path
import os
import pandas as pd
import numpy as np
import re

BASE_DIR = Path().resolve().parent

In [None]:
def normalize_column_names(df):
    df.columns = (
        df.columns
        .str.strip()
        .str.lower()
        .str.replace(" ", "_")
        .str.replace("-", "_")
    )
    return df


def convert_binary_indicators(df, columns):
    for col in columns:
        if col in df.columns:
            df[col] = (
                df[col]
                .astype(str).str.strip().str.upper()
                .map({"Y": 1, "C": 1, "N": 0})
                .astype("float")
            )
    return df


def clean_special_values(df):
    df.replace({
        9: np.nan,
        99: np.nan,
        999: np.nan,
        9999: np.nan,
        99999: np.nan,
        99.9: np.nan,
        999.9: np.nan,
    }, inplace=True)
    return df

def safe_parse_time_of_day(df):
    """
    Safely parse dob_tt (time of birth) into hour/minute and Fourier encode.
    Handles malformed numeric strings and impossible times.
    """

    if "dob_tt" not in df.columns:
        return df

    # Convert to string + strip spaces + zero-pad
    ts = df["dob_tt"].astype(str).str.strip().str.zfill(4)

    # Only keep values consisting of exactly 4 digits -> otherwise set NaN
    ts = ts.where(ts.str.match(r"^\d{4}$"), np.nan)

    # Extract H and M safely
    hours = pd.to_numeric(ts.str[:2], errors="coerce")
    minutes = pd.to_numeric(ts.str[2:], errors="coerce")

    # Drop impossible values
    invalid_mask = (
        (hours < 0) | (hours > 23) |
        (minutes < 0) | (minutes > 59)
    )

    hours[invalid_mask] = np.nan
    minutes[invalid_mask] = np.nan

    # Compute minute-of-day
    minute_of_day = hours * 60 + minutes

    # Fourier encodings (NaN-safe)
    df["time_sin"] = np.sin(2 * np.pi * minute_of_day / 1440)
    df["time_cos"] = np.cos(2 * np.pi * minute_of_day / 1440)

    return df

def final_feature_engineering(df):

    df = df.rename(columns={'no_mmorb': 'morbidity_reported'})

    df = df[df['morbidity_reported'] != 9]
    # flipping the binary so morbidity is the positive class
    df['morbidity_reported'] = 1 - df['morbidity_reported']

    df = df.drop(columns=["imp_sex"], errors="coerce")

    # Create date from year + week number
    if {"dob_yy", "dob_wk"}.issubset(df.columns):
        df["date"] = pd.to_datetime(
            df["dob_yy"].astype(str) + df["dob_wk"].astype(str) + "1",
            format="%G%V%u",
            errors="coerce"
        )

    # Safe time-of-day processing
    df = safe_parse_time_of_day(df)

    #df = df.drop(['dob_tt', 'dob_wk', 'dob_mm'], axis=1)

    # Sex binary
    if "sex" in df.columns:
        df["sex"] = np.where(df["sex"] == "M", 1, 0)

    return df

In [None]:
# Feature Filters
regex_patterns = ['^mm_', 'no_mmorb', #MaternalMorbidity factors, maternal morbidity
                  '^dob_', 'bfacil$','attend$', #date of birth, type of facility of birth, attendant at birth
                  '^rf_', '^ip_','^ld_', 'ab_','ca_','me_', #RiskFactors, InfectionPresent, LaborandDelivery, AbnormalConditions, congenital anomalies, method of delivery
                  '^ob_',
                  'mager$','mrace6','mracehisp', '^mar_p', 'dmar', 'meduc', #mother's demographichs
                  '^cig_rec','wtgain$', 'bmi$', 'pwgt_r', 'dwgt_r', #mother's health factors
                  'fagecomb','frace6','fracehisp','feduc', #father's demographics
                  'dplural', 'sex$','combgest$', 'dbwt', #baby health factors
                  '^prior','illb_r$', 'ilop_r$','ilp_r', #prior births living, dead, and terminated timeline
                  'previs$', 'precare$', #pregnancy care
                  'apgar' #apgar scores, can be either 5 or 10mins
                  'wic','pay$',#funding
                  ]

combined_regex = '|'.join(regex_patterns)

# Columns where Y → 1, N → 0
binary_patterns = ['^mm_','^rf_', '^ip_','^ld_', '^ab_','^ca_','^me_trial', '^ob_',
                   'wic','cig_rec', 'mar_p' ]
binary_regex = '|'.join(binary_patterns)
exception = {'rf_cesarn', 'rf_fedrg','rf_artec', 'me_trial'}

In [None]:
def process_year_file(file, year, chunk_size = 200_000):

    yearly_outfile = BASE_DIR / "data_main" / "filtered_aligned_natality_data" / f"natality_test_{year}.csv"
    if yearly_outfile.exists():
        yearly_outfile.unlink()

    print(f"Processing year {year}: {file}")

    for i, chunk in enumerate(pd.read_csv(file,
                                          chunksize=chunk_size,
                                          low_memory=False)):

        chunk = normalize_column_names(chunk)

        # Drop f_ columns
        drop_cols = chunk.filter(regex=r"^f_").columns
        chunk = chunk.drop(columns=drop_cols)

        # Subset via regex
        filter_cols = chunk.filter(regex=combined_regex).columns
        chunk = chunk[filter_cols]

        # Clean special codes
        chunk = clean_special_values(chunk)

        # Convert Y/N → binary
        binary_cols = chunk.filter(regex=binary_regex).columns
        binary_cols = [c for c in binary_cols if c not in exception]
        chunk = convert_binary_indicators(chunk, binary_cols)

        chunk = final_feature_engineering(chunk)

        chunk.to_csv(
            yearly_outfile,
            mode="a",
            header=not yearly_outfile.exists(),
            index=False
        )

        if i % 100 == 0:
            print(f"  processed {i * chunk_size:,} rows")

    print(f"Finished year {year}: {yearly_outfile}")

In [None]:
RAW_DATA_DIR = BASE_DIR / "data_main" / "raw_natality_data"

chunk_size = 50_000

# Create new cleaned annual files for years after 2013
for file in RAW_DATA_DIR.glob("natality_data/*.csv"):
    name = os.path.basename(file)
    year = int(re.search(r"(\d{4})", name).group(1))
    if year > 2013:
        process_year_file(file, year, chunk_size)

In [None]:
def process_one_file(file, outfile, chunk_size=200_000, frac=0.10):
    print(f"Sampling from {file}")

    # Iterate chunk by chunk to avoid RAM explosion
    for chunk in pd.read_csv(file, chunksize=chunk_size, low_memory=False):

        # 10% random sample of this chunk
        sampled = chunk.sample(frac=frac, random_state=42)

        # Append to output
        sampled.to_csv(
            outfile,
            mode="a",
            header=not outfile.exists(),   # header only on first write
            index=False
        )

In [None]:
IN = BASE_DIR / "data_main" / "filtered_aligned_natality_data"
OUT = BASE_DIR / "data_main" / "natality_aligned_10pct_sample.csv"

# Remove old file if exists
if OUT.exists():
    OUT.unlink()

chunk_size = 50_000

# Loop through all CSVs in the folder
for file in IN.glob("*.csv"):
    process_one_file(file, OUT, chunk_size=chunk_size, frac=0.10)

print(f"\nFinished! Combined sample saved to:\n{OUT}")