<a href="https://colab.research.google.com/github/sravanthi200719/HDS5210_InClass/blob/master/Copy_of_midterm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Mid-term for HDS5210
Your supervisor is concerned about 4-year survival risks for COPD. She has asked for you to do some analysis using a new metric, BODE. BODE is an improvement on a previous metric and promises to provide insight on survival risks.

BODE is defined here. https://www.mdcalc.com/calc/3916/bode-index-copd-survival#evidence

Your assignment is to create a BODE calculation, use it to calculate BODE scores and BODE survival rates for a group of patients. Then we want to evaluate the average BODE scores and BODE survival rates for each area hospital.

Your patient input file will have the following columns: NAME,SSN,LANGUAGE,JOB,HEIGHT_M,WEIGHT_KG,fev_pct,dyspnea_description,distance_in_meters,hospital

BODE calculations require a BMI value, so you will have to create a function for it.

Your output should be in the form of two CSV files, patient_output.csv and hospital_output.csv.

Patient_output will have the following columns: NAME,BODE_SCORE,BODE_RISK,HOSPITAL

Hospital output will have the following columns: HOSPITAL_NAME, COPD_COUNT, PCT_OF_COPD_CASES_OVER_BEDS, AVG_SCORE, AVG_RISK

Each function you create should have documentation and a suitable number of test cases. If the input data could be wrong, make sure to raise a Value Error.

For this assignment, use the doctest, json, and csv libraries. Pandas is not allowed for this assignment.

In [1]:
import doctest
import json
import csv

# Step 1: Calculate BMI

In [2]:
import doctest

def calculate_bmi(weight_kg, height_m):
    """
    Calculate the Body Mass Index (BMI) given the weight and height.

    Parameters:
    weight_kg (float): The weight in kilograms.
    height_m (float): The height in meters.

    Returns:
    float: The calculated BMI rounded to one decimal place.
    """
    bmi = weight_kg / (height_m ** 2)
    return round(bmi, 1)

In [3]:
def interpret_bmi(bmi):
    """
    Interpret the BMI value and categorize it based on standard weight status categories.

    Parameters:
    bmi (float): The BMI value to interpret.

    Returns:
    str: The category of the BMI, which can be one of the following:
         - "Underweight" if BMI is less than 18.5.
         - "Normal weight" if BMI is between 18.5 and 24.9.
         - "Overweight" if BMI is between 25 and 29.9.
         - "Obese" if BMI is 30 or above.
    """
    if bmi < 18.5:
        return "Underweight"
    elif 18.5 <= bmi < 25:
        return "Normal weight"
    elif 25 <= bmi < 30:
        return "Overweight"
    else:
        return "Obese"

def display_bmi_result(weight, height, bmi, category):
    """
    Display the BMI calculation results in a formatted and readable manner.

    Parameters:
    weight (float): The weight of the person in kilograms.
    height (float): The height of the person in meters.
    bmi (float): The calculated BMI value.
    category (str): The BMI category based on the BMI interpretation.

    Returns:
    None: This function only prints the formatted BMI result.
    """
    print("\nBMI Calculation Results")
    print("-----------------------")
    print(f"Weight: {weight:.1f} kg")
    print(f"Height: {height:.2f} m")
    print(f"BMI: {bmi:.1f}")
    print(f"Category: {category}")
    print("-----------------------")


In [4]:
### # Test Case 1: Normal weight
print("Test Case 1: Normal weight")
weight1, height1 = 70, 1.75
bmi1 = calculate_bmi(weight1, height1)
category1 = interpret_bmi(bmi1)
display_bmi_result(weight1, height1, bmi1, category1)

Test Case 1: Normal weight

BMI Calculation Results
-----------------------
Weight: 70.0 kg
Height: 1.75 m
BMI: 22.9
Category: Normal weight
-----------------------


In [5]:
### # Test Case 2: Overweight
print("\nTest Case 2: Overweight")
weight2, height2 = 90, 1.80
bmi2 = calculate_bmi(weight2, height2)
category2 = interpret_bmi(bmi2)
display_bmi_result(weight2, height2, bmi2, category2)


Test Case 2: Overweight

BMI Calculation Results
-----------------------
Weight: 90.0 kg
Height: 1.80 m
BMI: 27.8
Category: Overweight
-----------------------


In [6]:
#### # Test Case 3: Underweight
print("\nTest Case 3: Underweight")
weight3, height3 = 50, 1.70
bmi3 = calculate_bmi(weight3, height3)
category3 = interpret_bmi(bmi3)
display_bmi_result(weight3, height3, bmi3, category3)



Test Case 3: Underweight

BMI Calculation Results
-----------------------
Weight: 50.0 kg
Height: 1.70 m
BMI: 17.3
Category: Underweight
-----------------------


### Step 2: Calculate BODE Score

In [15]:
def normalize_dyspnea_description(description):
    """
    These are used because the dataset contains these values in the dyspnea_description

    Normalize variations of dyspnea descriptions to fit known categories.

    >>> normalize_dyspnea_description("STOPS AFTER A FEW MINUTES")
    'Severe breathlessness'

    >>> normalize_dyspnea_description("WHEN HURRYING")
    'Moderate breathlessness'
    """
    description = description.upper().strip()
    if "STOPS AFTER A FEW MINUTES" in description:
        return "Severe breathlessness"
    elif "WHEN HURRYING" in description:
        return "Moderate breathlessness"
    elif "UNABLE TO LEAVE HOME" in description:
        return "Severe breathlessness"
    elif "SLOWER THAN PEERS" in description:
        return "Moderate breathlessness"
    elif "WALKING UPHILL" in description:
        return "Moderate breathlessness"
    elif "ONLY STRENUOUS EXERCISE" in description:
        return "Mild breathlessness"
    elif "BREATHLESS WHEN DRESSING" in description:
        return "Severe breathlessness"
    elif "STOPS WHEN WALKING AT PACE" in description:
        return "Severe breathlessness"
    elif "STOPS AFTER 100 YARDS" in description:
        return "Severe breathlessness"
    return description

def cal_bode_score(bmi, fev_pct, dyspnea_description, distance_in_meters):
    """
    Calculate the BODE score based on BMI, FEV1 percentage, dyspnea description, and distance in meters.

    >>> cal_bode_score(22, 70, 'ONLY STRENUOUS EXERCISE', 400)
    1
    >>> cal_bode_score(18, 40, 'STOPS WHEN WALKING AT PACE', 200)
    8
    >>> assert cal_bode_score(21, 36, 'STOPS WHEN WALKING AT PACE', 250) == 7
    >>> cal_bode_score(15, 37, 'SLOWER THAN PEERS', 350)
    6

Parameters:
    - bmi (float): Patient'sBMI value.
    - fev_pct (float): FEV1 percentage of patient.
    - dyspnea_description (str): Dyspnea description of a patient.
    - distance_in_meters (float): Distance in meters.
    """
    bode_score = 0
    if bmi > 21:
        bode_score += 0
    else:
        bode_score += 1
    if fev_pct >= 65:
        bode_score += 0
    elif 50 <= fev_pct < 65:
       bode_score += 1
    elif 36 <= fev_pct < 50:
        bode_score += 2
    else:
        bode_score += 3

    dyspnea_description = normalize_dyspnea_description(dyspnea_description)
    dyspnea_mapping = {
        "No breathlessness": 0,
        "Mild breathlessness": 1,
        "Moderate breathlessness": 2,
        "Severe breathlessness": 3,
    }

    dyspnea_score = dyspnea_mapping.get(dyspnea_description, None)
    if dyspnea_score is None:
        print(f"Invalid dyspnea description: {dyspnea_description}")
        raise ValueError("Invalid dyspnea description.")
    bode_score += dyspnea_score
    if distance_in_meters > 350:
        bode_score += 0
    elif 250 <= distance_in_meters <= 350:
        bode_score += 1
    elif 150 <= distance_in_meters < 250:
        bode_score += 2
    else:
        bode_score += 3

    return bode_score

def run_doctests():
    """
    Run all doctests in the module.
    """
    doctest.testmod()

if __name__ == "__main__":
    # Run the doctests
    run_doctests()


### Step 3: Calculate BODE Risk

In [18]:
def calculate_bode_risk(bode_score):
    """
    Classify the BODE score into a corresponding risk category for patients with chronic obstructive pulmonary disease (COPD).

    Parameters:
    bode_score (int): The calculated BODE score (a sum of component scores that assess disease severity).

    Returns:
    str: The risk category based on the BODE score, which can be one of the following:
         - "Low" for scores of 0 to 2.
         - "Moderate" for scores of 3 to 4.
         - "High" for scores of 5 to 6.
         - "Very High" for scores of 7 or above.
    >>> calculate_bode_risk(3)
    'Moderate'
    >>> calculate_bode_risk(6)
    'High'
    >>> calculate_bode_risk(8)
    'Very High'
    """
    if bode_score <= 2:
        return "Low"
    elif bode_score <= 4:
        return "Moderate"
    elif bode_score <= 6:
        return "High"
    else:
        return "Very High"
def run_doctests():
    """
    Run all doctests in the module.
    """
    doctest.testmod()

if __name__ == "__main__":
    # Run the doctests
    run_doctests()


In [17]:
calculate_bode_risk(8)

'Very High'

In [19]:
calculate_bode_risk(6)

'High'

In [20]:
calculate_bode_risk(3)

'Moderate'

### Step 4: Load Hospital Data

In [21]:
import json

def load_hospital_data(file_path):
    """
    Load hospital data from a JSON file.

    Parameters:
    file_path (str): The path to the JSON file containing hospital data.

    Returns:
    dict or list: The data loaded from the JSON file, which can be a list of dictionaries or a single dictionary,
    depending on the structure of the JSON file.

    Raises:
    FileNotFoundError: If the file specified by file_path is not found.
    JSONDecodeError: If the file is not a valid JSON format.
    """
    with open(file_path, 'r') as file:
        hospital_data = json.load(file)


In [22]:
import json

def load_hospital_data(file_path):
    """
    Load hospital data from a JSON file.

    Parameters:
    file_path (str): The path to the JSON file containing hospital data.

    Returns:
    dict or list: The data loaded from the JSON file, which can be a list of dictionaries or a single dictionary,
    depending on the structure of the JSON file.

    Raises:
    FileNotFoundError: If the file specified by file_path is not found.
    JSONDecodeError: If the file is not a valid JSON format.
    """
    try:
        with open(file_path, 'r') as file:
            hospital_data = json.load(file)
        return hospital_data
    except FileNotFoundError as e:
        raise FileNotFoundError(f"Error: The file {file_path} was not found.") from e
    except json.JSONDecodeError as e:
        raise json.JSONDecodeError(f"Error: The file {file_path} contains invalid JSON.") from e


In [34]:
import json

def load_hospital_data(file_path):                #load hospital data
    with open(file_path, 'r') as file:
        hospital_data = json.load(file)
    return hospital_data

hospital_data = load_hospital_data('hospitals (1).json')
print(json.dumps(hospital_data, indent=2))

[
  {
    "system": "BJC",
    "hospitals": [
      {
        "name": "BJC",
        "beds": 2000
      },
      {
        "name": "BJC WEST COUNTY",
        "beds": 1000
      },
      {
        "name": "MISSOURI BAPTIST",
        "beds": 800
      }
    ]
  },
  {
    "system": "SSM",
    "hospitals": [
      {
        "name": "SAINT LOUIS UNIVERSITY",
        "beds": 1000
      },
      {
        "name": "ST.MARY'S",
        "beds": 500
      }
    ]
  },
  {
    "system": "ST.LUKE'S",
    "hospitals": [
      {
        "name": "ST.LUKE'S",
        "beds": 800
      }
    ]
  }
]


### Step 5: Main business logic

Call BODE Score, BODE Risk functions for each patient.

For each hospital, calculate Avg BODE score and Avg BODE risk and count the number of cases for each hospital.

In [46]:
import csv
import json
patient_csv_path = '/content/patient.csv'
def load_patient_data(csv_path):
    """
    Load patient data from a CSV file.
    Parameters:
    -csv_path (str): Path to the CSV file.
    Returns:
    -List: List of dictionaries containing patient information which contains keynames as 'NAME', 'HEIGHT_M', 'WEIGHT_KG', 'fev_pct', 'dyspnea_description', 'distance_in_meters', 'hospital'.
    Exceptions:
    -ValueError: If the CSV file is missing required columns or contains invalid data.
    """
    required_columns = {'NAME', 'HEIGHT_M', 'WEIGHT_KG', 'fev_pct', 'dyspnea_description', 'distance_in_meters', 'hospital'}
    patient_data = []

    with open(csv_path, mode='r') as f:
        reader = csv.DictReader(f)

        missing_cols = required_columns - set(reader.fieldnames)
        if missing_cols:
            raise ValueError(f"Missing columns: {missing_cols}")
        for row in reader:
            try:
                patient_data.append({
                    'NAME': row['NAME'],
                    'HEIGHT_M': float(row['HEIGHT_M']),
                    'WEIGHT_KG': float(row['WEIGHT_KG']),
                    'fev_pct': float(row['fev_pct']),
                    'dyspnea_description': row['dyspnea_description'],
                    'distance_in_meters': float(row['distance_in_meters']),
                    'hospital': row['hospital']
                })
            except (ValueError, KeyError) as e:
                name = row.get('NAME', 'Unknown')
                print(f"Skipping {name} due to error: {e}")

    return patient_data
try:
    patients = load_patient_data(patient_csv_path)
    print(f"Loaded {len(patients)} patients from '{patient_csv_path}'.")
except ValueError as e:
    print(f"Error loading patient data: {e}")

patient_csv = "patient.csv"
hospital_json = "hospitals.json"

patient_output_file = "patient_output.csv"
hospital_output_file = "hospital_output.csv"

import json

def load_hospital_data(file_path):
    with open(file_path, 'r') as file:
        hospital_data = json.load(file)
    return hospital_data

hospital_data = load_hospital_data('hospitals (1).json')

hospital_metrics = {}
for entry in hospital_data:
    for hospital in entry['hospitals']:
        hospital_metrics[hospital['name']] = {
            'total_bode_score': 0,
            'total_risk': 0,
            'copd_count': 0,
            'beds': hospital['beds']
        }

def calculate_bmi(weight_kg, height_m):
    return weight_kg / (height_m ** 2) if height_m > 0 else 0

def normalize_dyspnea_description(description):
    """
    These are used because the dataset contains these values in the dyspnea_description

    Normalize variations of dyspnea descriptions to fit known categories.

    >>> normalize_dyspnea_description("STOPS AFTER A FEW MINUTES")
    'Severe breathlessness'

    >>> normalize_dyspnea_description("WHEN HURRYING")
    'Moderate breathlessness'
    """
    description = description.upper().strip()
    if "STOPS AFTER A FEW MINUTES" in description:
        return "Severe breathlessness"
    elif "WHEN HURRYING" in description:
        return "Moderate breathlessness"
    elif "UNABLE TO LEAVE HOME" in description:
        return "Severe breathlessness"
    elif "SLOWER THAN PEERS" in description:
        return "Moderate breathlessness"
    elif "WALKING UPHILL" in description:
        return "Moderate breathlessness"
    elif "ONLY STRENUOUS EXERCISE" in description:
        return "Mild breathlessness"
    elif "BREATHLESS WHEN DRESSING" in description:
        return "Severe breathlessness"
    elif "STOPS WHEN WALKING AT PACE" in description:
        return "Severe breathlessness"
    elif "STOPS AFTER 100 YARDS" in description:
        return "Severe breathlessness"
    return description

def calculate_bode_score(bmi, fev_pct, dyspnea_description, distance_in_meters):
    bode_score = 0
    if bmi > 21:
        bode_score += 0
    else:
        bode_score += 1
    if fev_pct >= 65:
        bode_score += 0
    elif 50 <= fev_pct < 65:
       bode_score += 1
    elif 36 <= fev_pct < 50:
        bode_score += 2
    else:
        bode_score += 3

    dyspnea_description = normalize_dyspnea_description(dyspnea_description)
    dyspnea_mapping = {
        "No breathlessness": 0,
        "Mild breathlessness": 1,
        "Moderate breathlessness": 2,
        "Severe breathlessness": 3,
    }
    dyspnea_score = dyspnea_mapping.get(dyspnea_description, None)
    if dyspnea_score is None:
        print(f"Invalid dyspnea description: {dyspnea_description}")
        raise ValueError("Invalid dyspnea description.")
    bode_score += dyspnea_score
    if distance_in_meters > 350:
        bode_score += 0
    elif 250 <= distance_in_meters <= 350:
        bode_score += 1
    elif 150 <= distance_in_meters < 250:
        bode_score += 2
    else:
        bode_score += 3

    return bode_score

def calculate_bode_risk(bode_score):
    if 0 <= bode_score <= 2:
        return 0.80
    elif 3 <= bode_score <= 4:
        return 0.67
    elif 5 <= bode_score <= 6:
        return 0.57
    elif 7 <= bode_score <= 10:
        return 0.18
    else:
        raise ValueError("BODE score must be between 0 and 10.")
patient_results = []
with open(patient_csv, 'r', encoding='utf-8-sig') as csvf:
    reader = csv.DictReader(csvf)
    header_names = reader.fieldnames
    name_column = next((col for col in header_names if col.lower() == 'name'), None)
    if name_column is None:
        raise ValueError("Name column not found in the CSV file.")

    for row in reader:
        name = row['NAME']
        ssn = row['SSN']
        language = row['LANGUAGE']
        job = row['JOB']
        try:
            height_m = float(row['HEIGHT_M'])
            weight_kg = float(row['WEIGHT_KG'])
            fev_pct = float(row['fev_pct'])
            dyspnea_description = row['dyspnea_description']
            distance_in_meters = float(row['distance_in_meters'])
            hospital_name = row['hospital']
        except ValueError as e:
            print(f"Skipping {name} due to invalid data: {e}")
            continue
        bmi = calculate_bmi(weight_kg, height_m)
        BODE_SCORE = calculate_bode_score(bmi, fev_pct, dyspnea_description, distance_in_meters)
        BODE_RISK = calculate_bode_risk(BODE_SCORE)

        patient_results.append([name, BODE_SCORE, BODE_RISK, hospital_name])
        if hospital_name in hospital_metrics:
              hospital_metrics[hospital_name]['total_bode_score'] += BODE_SCORE
              hospital_metrics[hospital_name]['total_risk'] += BODE_RISK
              hospital_metrics[hospital_name]['copd_count'] += 1

hospital_output_list = []
for hospital_name, metrics in hospital_metrics.items():
    copd_count = metrics['copd_count']
    if copd_count > 0:
        avg_bode_score = metrics['total_bode_score'] / copd_count
        avg_bode_risk = metrics['total_risk'] / copd_count
    else:
        avg_bode_score = 0
        avg_bode_risk = 0
    pct_of_copd_cases = (copd_count / metrics['beds']) * 100 if metrics['beds'] > 0 else 0
    hospital_output_list.append([hospital_name, copd_count, pct_of_copd_cases, avg_bode_score, avg_bode_risk])

# Write Patient_output.csv
with open(patient_output_file, 'w', newline='') as csvf:
    writer = csv.writer(csvf)
    writer.writerow(['NAME', 'BODE_SCORE', 'BODE_RISK', 'HOSPITAL'])
    writer.writerows(patient_results)

# Write Hospital_output.csv
with open(hospital_output_file, 'w', newline='') as csvf:
    writer = csv.writer(csvf)
    writer.writerow(['HOSPITAL_NAME', 'COPD_COUNT', 'PCT_OF_COPD_CASES_OVER_BEDS', 'AVG_SCORE', 'AVG_RISK'])
    writer.writerows(hospital_output_list)




Loaded 1000 patients from '/content/patient.csv'.
