# IMPORT LIBRARIES AND SET CONSTANTS

In [13]:
import csv
import numpy as np
from pathlib import Path

DATA_PATH  = Path('../data')
PROCESSED_DATA_PATH = DATA_PATH / 'processed'

VAR_LIST_FILE_PATH = Path('../documentation/var_list_decription.txt')
PROCESSED_FILE_PATH_CSV = PROCESSED_DATA_PATH / 'converted_from_script_new02.csv'

FINAL_FILE_PATH = PROCESSED_DATA_PATH / 'final_data.csv'

## Data preprocessing
The original script overviewed data with pandas .info() and .head(). Since pandas is disallowed, we will skip directly to processing.

In [14]:
NEW_VAR_NAMES = [
    "State",
    "Sex",
    "GeneralHealth",
    "PhysicalHealthDays",
    "MentalHealthDays",
    "LastCheckupTime",
    "PhysicalActivities",
    "SleepHours",
    "RemovedTeeth",
    "HadHeartAttack",
    "HadAngina",
    "HadStroke",
    "HadAsthma",
    "HadSkinCancer",
    "HadCOPD",
    "HadDepressiveDisorder",
    "HadKidneyDisease",
    "HadArthritis",
    "HadDiabetes",
    "DeafOrHardOfHearing",
    "BlindOrVisionDifficulty",
    "DifficultyConcentrating",
    "DifficultyWalking",
    "DifficultyDressingBathing",
    "DifficultyErrands",
    "SmokerStatus",
    "ECigaretteUsage",
    "ChestScan",
    "RaceEthnicityCategory",
    "AgeCategory",
    "HeightInMeters",
    "WeightInKilograms",
    "BMI",
    "AlcoholDrinkers",
    "HIVTesting",
    "FluVaxLast12",
    "PneumoVaxEver",
    "TetanusLast10Tdap",
    "HighRiskLastYear",
    "CovidPos"
]

### Load Variable List (without Pandas)

In [15]:
var_list = []
try:
    with open(VAR_LIST_FILE_PATH, 'r', encoding='utf-8') as f:
        for line in f:
            if ' - ' in line:
                var_name = line.split(' - ', 1)[0].strip()
                var_list.append(var_name)
except FileNotFoundError:
    print(f"Error: The file {VAR_LIST_FILE_PATH} was not found.")
except Exception as e:
    print(f"An error occurred while reading {VAR_LIST_FILE_PATH}: {e}")

print(f"Loaded {len(var_list)} variables to keep.")
print(var_list)

Loaded 40 variables to keep.
['_STATE', 'SEXVAR', 'GENHLTH', 'PHYSHLTH', 'MENTHLTH', 'CHECKUP1', 'EXERANY2', 'SLEPTIM1', 'RMVTETH4', 'CVDINFR4', 'CVDCRHD4', 'CVDSTRK3', 'ASTHMA3', 'CHCSCNC1', 'CHCCOPD3', 'ADDEPEV3', 'CHCKDNY2', 'HAVARTH4', 'DIABETE4', 'DEAF', 'BLIND', 'DECIDE', 'DIFFWALK', 'DIFFDRES', 'DIFFALON', '_SMOKER3', 'ECIGNOW2', 'LCSCTSC1', '_RACEGR4', '_AGEG5YR', 'HTM4', 'WTKG3', '_BMI5', 'DRNKANY6', '_AIDTST4', 'FLUSHOT7', 'PNEUVAC4', 'TETANUS1', 'HIVRISK5', 'COVIDPOS']


### Define All Transformation Dictionaries

In [16]:
STATE = {
    1: "Alabama",
    2: "Alaska",
    4: "Arizona",
    5: "Arkansas",
    6: "California",
    8: "Colorado",
    9: "Connecticut",
    10: "Delaware",
    11: "District of Columbia",
    12: "Florida",
    13: "Georgia",
    15: "Hawaii",
    16: "Idaho",
    17: "Illinois",
    18: "Indiana",
    19: "Iowa",
    20: "Kansas",
    21: "Kentucky",
    22: "Louisiana",
    23: "Maine",
    24: "Maryland",
    25: "Massachusetts",
    26: "Michigan",
    27: "Minnesota",
    28: "Mississippi",
    29: "Missouri",
    30: "Montana",
    31: "Nebraska",
    32: "Nevada",
    33: "New Hampshire",
    34: "New Jersey",
    35: "New Mexico",
    36: "New York",
    37: "North Carolina",
    38: "North Dakota",
    39: "Ohio",
    40: "Oklahoma",
    41: "Oregon",
    42: "Pennsylvania",
    44: "Rhode Island",
    45: "South Carolina",
    46: "South Dakota",
    47: "Tennessee",
    48: "Texas",
    49: "Utah",
    50: "Vermont",
    51: "Virginia",
    53: "Washington",
    54: "West Virginia",
    55: "Wisconsin",
    56: "Wyoming",
    66: "Guam",
    72: "Puerto Rico",
    78: "Virgin Islands"
}

SEX = {1: 'Male', 2: 'Female'}

GEN_HEALTH = {
    1: "Excellent",
    2: "Very good",
    3: "Good",
    4: "Fair",
    5: "Poor"
}

PHYS_MEN_HEALTH = {77.0: np.nan,
               88.0: 0.0,
               99.0: np.nan
                  }

LAST_CHECKUP = {
    1: "Within past year (anytime less than 12 months ago)",
    2: "Within past 2 years (1 year but less than 2 years ago)",
    3: "Within past 5 years (2 years but less than 5 years ago)",
    4: "5 or more years ago"
}

YES_NO_QUESTIONS = {1: 'Yes', 2: 'No'}

SLEEP_TIME = lambda x: np.nan if x > 24 else x

TEETH_REMOVED = {
    1: "1 to 5",
    2: "6 or more (but not all)",
    3: "All",
    8: "None of them"
}

DIABETES = {
    1: "Yes",
    2: "Yes (but only during pregnancy - female)",
    3: "No",
    4: "No (pre-diabetes or borderline diabetes)",
}

SMOKER_STATUS = {
    1: "Current smoker - now smokes every day",
    2: "Current smoker - now smokes some days",
    3: "Former smoker",
    4: "Never smoked"
}

ECIGARETTES = {
    1: "Never used e-cigarettes in my entire life",
    2: "Use them every day",
    3: "Use them some days",
    4: "Not at all (right now)"
}

RACE = {
    1: "White only (Non-Hispanic)",
    2: "Black only (Non-Hispanic)",
    3: "Other race only (Non-Hispanic)",
    4: "Multiracial (Non-Hispanic)",
    5: "Hispanic"
}

AGE_CATEGORY = {
    1: "Age 18 to 24",
    2: "Age 25 to 29",
    3: "Age 30 to 34",
    4: "Age 35 to 39",
    5: "Age 40 to 44",
    6: "Age 45 to 49",
    7: "Age 50 to 54",
    8: "Age 55 to 59",
    9: "Age 60 to 64",
    10: "Age 65 to 69",
    11: "Age 70 to 74",
    12: "Age 75 to 79",
    13: "Age 80 or older"
}

TETANUS = {
    1: "Yes - received Tdap",
    2: "Yes - received tetanus shot, but not Tdap",
    3: "Yes - received tetanus shot but not sure what type",
    4: "No - did not receive any tetanus shot in the past 10 years",
}

COVID = {
    1: "Yes",
    2: "No",
    3: "Tested positive using home test without a health professional"
}


### Define Helper Functions for Row Transformation

In [17]:
# Create a mapping of new column names to their index (0, 1, 2, ...)
NEW_COL_INDICES = {name: i for i, name in enumerate(NEW_VAR_NAMES)}

# --- Helper functions to safely convert values ---
def safe_float(value):
    """Converts a value to float, returning np.nan on failure."""
    try:
        return float(value)
    except (ValueError, TypeError):
        return np.nan

def safe_int(value):
    """Converts a value to int (via float), returning np.nan on failure."""
    try:
        return int(float(value))
    except (ValueError, TypeError):
        return np.nan

# --- Transformation functions for each column ---
# These functions modify the row list in-place.

def transform_state(row):
    idx = NEW_COL_INDICES['State']
    val_int = safe_int(row[idx])
    row[idx] = STATE.get(val_int, np.nan)

def transform_sex(row):
    idx = NEW_COL_INDICES['Sex']
    val_int = safe_int(row[idx])
    row[idx] = SEX.get(val_int, np.nan)

def transform_gen_health(row):
    idx = NEW_COL_INDICES['GeneralHealth']
    val_int = safe_int(row[idx])
    row[idx] = GEN_HEALTH.get(val_int, np.nan)

def transform_phys_health(row):
    idx = NEW_COL_INDICES['PhysicalHealthDays']
    val_float = safe_float(row[idx])
    # Use .get() to replace 77/88/99, otherwise keep the original value
    row[idx] = PHYS_MEN_HEALTH.get(val_float, val_float)

def transform_ment_health(row):
    idx = NEW_COL_INDICES['MentalHealthDays']
    val_float = safe_float(row[idx])
    row[idx] = PHYS_MEN_HEALTH.get(val_float, val_float)

def transform_checkup(row):
    idx = NEW_COL_INDICES['LastCheckupTime']
    val_int = safe_int(row[idx])
    row[idx] = LAST_CHECKUP.get(val_int, np.nan)

def transform_sleep(row):
    idx = NEW_COL_INDICES['SleepHours']
    val_float = safe_float(row[idx])
    row[idx] = SLEEP_TIME(val_float)

def transform_teeth(row):
    idx = NEW_COL_INDICES['RemovedTeeth']
    val_int = safe_int(row[idx])
    row[idx] = TEETH_REMOVED.get(val_int, np.nan)

def transform_diabetes(row):
    idx = NEW_COL_INDICES['HadDiabetes']
    val_int = safe_int(row[idx])
    row[idx] = DIABETES.get(val_int, np.nan)

def transform_smoker(row):
    idx = NEW_COL_INDICES['SmokerStatus']
    val_int = safe_int(row[idx])
    row[idx] = SMOKER_STATUS.get(val_int, np.nan)

def transform_ecig(row):
    idx = NEW_COL_INDICES['ECigaretteUsage']
    val_int = safe_int(row[idx])
    row[idx] = ECIGARETTES.get(val_int, np.nan)

def transform_race(row):
    idx = NEW_COL_INDICES['RaceEthnicityCategory']
    val_int = safe_int(row[idx])
    row[idx] = RACE.get(val_int, np.nan)

def transform_age(row):
    idx = NEW_COL_INDICES['AgeCategory']
    val_int = safe_int(row[idx])
    row[idx] = AGE_CATEGORY.get(val_int, np.nan)

def transform_height(row):
    idx = NEW_COL_INDICES['HeightInMeters']
    val_float = safe_float(row[idx])
    row[idx] = val_float / 100 if not np.isnan(val_float) else np.nan

def transform_weight(row):
    idx = NEW_COL_INDICES['WeightInKilograms']
    val_float = safe_float(row[idx])
    row[idx] = val_float / 100 if not np.isnan(val_float) else np.nan

def transform_bmi(row):
    idx = NEW_COL_INDICES['BMI']
    val_float = safe_float(row[idx])
    row[idx] = val_float / 100 if not np.isnan(val_float) else np.nan

def transform_tetanus(row):
    idx = NEW_COL_INDICES['TetanusLast10Tdap']
    val_int = safe_int(row[idx])
    row[idx] = TETANUS.get(val_int, np.nan)

def transform_covid(row):
    idx = NEW_COL_INDICES['CovidPos']
    val_int = safe_int(row[idx])
    row[idx] = COVID.get(val_int, np.nan)

# Create functions for all Yes/No questions
def create_yes_no_transformer(col_name):
    """Factory to create a mapping function for any Yes/No column."""
    def transformer(row):
        idx = NEW_COL_INDICES[col_name]
        val_int = safe_int(row[idx])
        row[idx] = YES_NO_QUESTIONS.get(val_int, np.nan)
    return transformer

yes_no_cols = [
    'PhysicalActivities', 'HadHeartAttack', 'HadAngina', 'HadStroke', 
    'HadAsthma', 'HadSkinCancer', 'HadCOPD', 'HadDepressiveDisorder', 
    'HadKidneyDisease', 'HadArthritis', 'DeafOrHardOfHearing', 
    'BlindOrVisionDifficulty', 'DifficultyConcentrating', 'DifficultyWalking', 
    'DifficultyDressingBathing', 'DifficultyErrands', 'ChestScan', 
    'AlcoholDrinkers', 'HIVTesting', 'FluVaxLast12', 'PneumoVaxEver', 
    'HighRiskLastYear'
]

# List of all transformation functions to apply to each row
MASTER_TRANSFORM_LIST = [
    transform_state,
    transform_sex,
    transform_gen_health,
    transform_phys_health,
    transform_ment_health,
    transform_checkup,
    transform_sleep,
    transform_teeth,
    transform_diabetes,
    transform_smoker,
    transform_ecig,
    transform_race,
    transform_age,
    transform_height,
    transform_weight,
    transform_bmi,
    transform_tetanus,
    transform_covid
] + [create_yes_no_transformer(col) for col in yes_no_cols]


### Process and Save the Final File

This cell performs the main work:
1.  Opens the input and output files.
2.  Reads the input header to build an index map.
3.  Writes the new header (`NEW_VAR_NAMES`) to the output file.
4.  Loops through every row in the large input file.
5.  Filters each row to keep only the columns in `var_list`.
6.  Applies all transformations to the filtered row.
7.  Checks for any `NaN` values (replicating `dropna()`).
8.  If no `NaN`s are found, writes the fully processed row to the final file.

In [18]:
def process_and_write():
    print("Processing started...")
    rows_written = 0
    rows_read = 0
    
    try:
        with open(PROCESSED_FILE_PATH_CSV, mode='r', encoding='utf-8') as infile, \
             open(FINAL_FILE_PATH, mode='w', encoding='utf-8', newline='') as outfile:
            
            reader = csv.reader(infile)
            writer = csv.writer(outfile)
            
            # 1. Read the original header
            original_header = next(reader)
            
            # 2. Create a map of {column_name: index} from the original file
            header_map = {name: i for i, name in enumerate(original_header)}
            
            # 3. Get the list of *indices* to keep, in the correct order
            try:
                indices_to_keep = [header_map[name] for name in var_list]
            except KeyError as e:
                print(f"Error: Column {e} not found in the input CSV. Aborting.")
                return
            
            # 4. Write the NEW header to the output file
            writer.writerow(NEW_VAR_NAMES)
            
            # 5. Process each row
            for row in reader:
                rows_read += 1
                if rows_read % 100000 == 0:
                    print(f"Processed {rows_read} rows...")
                
                # 6. Filter the row to only the columns we want
                # Use list() to create a new copy of the data
                processed_row = [row[i] for i in indices_to_keep]
                
                # 7. Apply all transformations in-place
                for transform_func in MASTER_TRANSFORM_LIST:
                    transform_func(processed_row)
                
                # 8. Replicate dropna() - check for any NaN/None values
                has_nan = False
                for val in processed_row:
                    if val is None or val is np.nan:
                        has_nan = True
                        break
                    # Also check for float NaNs which are not identical to np.nan
                    if isinstance(val, float) and np.isnan(val):
                        has_nan = True
                        break
                
                # 9. Write to new CSV if the row is clean
                if not has_nan:
                    writer.writerow(processed_row)
                    rows_written += 1
                    
    except FileNotFoundError:
        print(f"Error: Input file {PROCESSED_FILE_PATH_CSV} not found.")
        return
    except Exception as e:
        print(f"An error occurred during processing: {e}")
        return

    print(f"Processing complete. Wrote {rows_written} valid rows to {FINAL_FILE_PATH}")

# Run the entire process
process_and_write()

Processing started...
Processed 100000 rows...
Processed 200000 rows...
Processed 300000 rows...
Processed 400000 rows...
Processing complete. Wrote 246022 valid rows to ..\data\processed\final_data.csv
