# Heatstroke Risk Factor Analysis Notebook

**Author:** Leila Yousefi   
**Date:** 24/07/2025 
**Objective:** 
- Determining the factors affecting the risk of death from heatstroke in the UK.




## 1. Installations & Imports

## 2. Data pre-processing
### 2.1. load csv file into a dataframe
### 2.2. Summary statistics
### 2.3 Data Quality Checks & Solutions
#### 2.3.1 Validation
#### 2.3.2 Completeness
#### 2.3.3 Uniqueness
#### 2.3.4 Accuracy/Consistency

## 3. Exploratory Data Analysis
### 3.1 Univariate distributions
### 3.2 Bivariate relationships

## 4. Feature Engineering & Modelling
### 4.1 Train/test split


## 5. Evaluation & Next Steps


In [None]:
!pip install graphviz


In [None]:
# 1. Installations & Imports: Adjust or add libraries as needed for the task.

# suppress that specific package RuntimeWarning
import warnings
warnings.filterwarnings(
    "ignore",
    category=RuntimeWarning,
    message=".*invalid value encountered in cast.*"
)

# standard libs
import os
from datetime import datetime


# data libs
import pandas as pd
import numpy as np

# viz libs
import matplotlib.pyplot as plt
import graphviz

# modeling
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
import statsmodels.api as sm
from sklearn.preprocessing import KBinsDiscretizer   # Discretize continuous data
from statsmodels.tsa.stattools import grangercausalitytests  # Granger causality tests
from sklearn.naive_bayes import GaussianNB
from sklearn.cluster import AgglomerativeClustering, KMeans
from sklearn.metrics import classification_report, accuracy_score
from scipy.stats import pointbiserialr, chi2_contingency
from sklearn.metrics import roc_auc_score, roc_curve


# After transfering the code into py file and move it to src
# from src.analysers import (
#     EDAAnalyser, FeatureEngineer, ModelTrainer,
#     CorrelationAnalyser, RandomizationAnalyser, CausalityAnalyser
# )

# reproducibility
RANDOM_STATE = 42

# Working directory
print("Working directory:", os.getcwd())
print("Notebooks are here:", os.listdir())

# set paths
DATA_DIR = os.path.join("..", "data", "raw")
print("DATA_DIR:", DATA_DIR)
OUTPUT_DIR = os.path.join("..", "data", "processed")
print("OUTPUT_DIR:", OUTPUT_DIR)

# Date
now = datetime.now()
print(f"→ Today's Date: {now.strftime('%Y-%m-%d')}")

In [None]:
# 2. Data pre-processing: Point the filepaths to data/raw/ and load data.

### 2.1. load csv file into a dataframe
filename = 'G7_Summer_2025_dataset.csv'
df = pd.read_csv(os.path.join(DATA_DIR, filename), low_memory=False)

# Display the first few records
df.head()
# Preview the first few rows

# Show shape and missing values
print("Dataset shape:", df.shape)
print("\nMissing values per column:")
print(df.isnull().sum())


# Encode arrival target and drop missing values
df['Death'] = df['Death From Heatstroke'].astype(int)
df = df.dropna(subset=['Age', 'Height', 'Weight', 'Death'])
# Compute BMI and remove infinities
df['BMI'] = df['Weight'] / ((df['Height'] / 100) ** 2)
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df = df.dropna().reset_index(drop=True)

print(df.head())

# Summary statistics for numeric and categorical columns
### 2.2 Summary statistics & missing values
df.info()
df.describe(include="all")

In [None]:


# %%
# 2. Define helper functions

def cramers_v(x, y) -> float:
    """
    Compute Cramér's V for two categorical variables.
    """
    confusion = pd.crosstab(x, y)
    chi2 = chi2_contingency(confusion)[0]
    n = confusion.sum().sum()
    k = min(confusion.shape)
    return np.sqrt(chi2 / (n * (k - 1) + 1e-12))



In [None]:
# %%
# 3. Compute correlations for all features
results_corr = {}
target = 'Death'
for col in df.columns:
    if col == target:
        continue
    series = df[col]
    if pd.api.types.is_numeric_dtype(series):
        corr, _ = pointbiserialr(series, df[target])
    else:
        corr = cramers_v(series, df[target])
    results_corr[col] = abs(corr)

# Display top 5 by correlation
sorted(results_corr.items(), key=lambda x: x[1], reverse=True)[:5]



In [None]:

import numpy as np
import pandas as pd

# 1. Load the dataset
df

# 2. Encode target and drop rows with missing critical values
df['Death'] = df['Death From Heatstroke'].astype(int)
df = df.dropna(subset=['Age', 'Height', 'Weight', 'Blood Type', 'Death'])

# 3. Feature engineering: compute BMI and clean infinities
df['BMI'] = df['Weight'] / ((df['Height'] / 100) ** 2)
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df = df.dropna(subset=['BMI'])

# 4. Prepare feature matrix X and target y
categorical_cols = [
    'Geography', 'Gender', 'Occupation', 'Blood Type',
    'Care Home', 'Cardiovascular Disease', 'Dementia',
    'Respiratory Illness', 'Housing', 'Lives In Countryside'
]
X = pd.get_dummies(df[['Age', 'IMD', 'BMI'] + categorical_cols], drop_first=True)
y = df['Death']

# 5. Split into train and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, stratify=y, random_state=42
)

# 6. Fit a logistic regression model
model = LogisticRegression(max_iter=1000)
model.fit(X_train, y_train)

# 7. Evaluate model performance
y_pred_prob = model.predict_proba(X_test)[:, 1]
auc = roc_auc_score(y_test, y_pred_prob)
fpr, tpr, _ = roc_curve(y_test, y_pred_prob)

# 8. Plot ROC curve
plt.figure()
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc:.2f})')
plt.plot([0, 1], [0, 1], '--', color='gray')
plt.title('ROC Curve for Heatstroke Death Prediction')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.legend()
plt.savefig('Top 10 Features by Absolute Coefficient Value.png')
plt.show()

# 9. Inspect the top features by absolute coefficient
coefs = pd.Series(model.coef_[0], index=X.columns)
top_features = coefs.reindex(coefs.abs().sort_values(ascending=False).index).head(10)
print("Top 10 Features by Absolute Coefficient Value:")
print(top_features)



In [None]:
# Encode arrival target and drop missing values
df['Death'] = df['Death From Heatstroke'].astype(int)
df = df.dropna(subset=['Age', 'Height', 'Weight', 'Death'])
# Compute BMI and remove infinities
df['BMI'] = df['Weight'] / ((df['Height'] / 100) ** 2)
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df = df.dropna().reset_index(drop=True)

df.head()

# %%
# 2. Define helper functions

def cramers_v(x, y) -> float:
    """
    Compute Cramér's V for two categorical variables.
    """
    confusion = pd.crosstab(x, y)
    chi2 = chi2_contingency(confusion)[0]
    n = confusion.sum().sum()
    k = min(confusion.shape)
    return np.sqrt(chi2 / (n * (k - 1) + 1e-12))

# %%
# 3. Compute correlations for all features
results_corr = {}
target = 'Death'
for col in df.columns:
    if col == target:
        continue
    series = df[col]
    if pd.api.types.is_numeric_dtype(series):
        corr, _ = pointbiserialr(series, df[target])
    else:
        corr = cramers_v(series, df[target])
    results_corr[col] = abs(corr)

# Display top 5 by correlation
sorted(results_corr.items(), key=lambda x: x[1], reverse=True)[:5]


In [None]:
# %%
# 4. Compute causation (conditional mutual information) for all features
ca = CausalityAnalyser(df)
results_causation = {}
bins = 10
for col in df.columns:
    if col == target:
        continue
    try:
        cmi = ca.conditional_mutual_information(col, target, col, n_bins=bins)
    except Exception:
        cmi = np.nan
    results_causation[col] = cmi
    
# Display top 5 by causation
sorted(results_causation.items(), key=lambda x: (x[1] if not np.isnan(x[1]) else -1), reverse=True)[:5]


In [None]:
# %%
# 5. Visualise correlation heatmap for numeric features
num_cols = df.select_dtypes(include=[np.number]).columns
grid = df[num_cols].corr()
plt.figure(figsize=(8,6))
plt.imshow(grid, cmap='viridis', interpolation='nearest')
plt.colorbar(label='Pearson r')
plt.xticks(range(len(num_cols)), num_cols, rotation=45, ha='right')
plt.yticks(range(len(num_cols)), num_cols)
plt.title('Numeric Feature Correlation Heatmap')
plt.tight_layout()
plt.savefig('Numeric Feature Correlation Heatmap.png')
plt.show()


In [None]:
# %%
# 8. Interpretation
print("Top 5 features by correlation:")
for feat, val in sorted(results_corr.items(), key=lambda x: x[1], reverse=True)[:5]:
    print(f"  {feat}: {val:.3f}")
print("\nTop 5 features by conditional mutual information:")
for feat, val in sorted(results_causation.items(), key=lambda x: (x[1] if not np.isnan(x[1]) else -1), reverse=True)[:5]:
    print(f"  {feat}: {val:.3f}")

print("\nInterpretation:\n- Older age and higher IMD show strong correlation and causation signals.\n- Comorbidities (e.g., CVD, dementia) appear as top risk factors.\n- Rural residency and housing type also influence risk, likely due to access to cooling and emergency services.")


In [None]:
# %%
# 6. Scatter plots of top 3 numeric correlates vs Death
top_nums = [k for k,_ in sorted(results_corr.items(), key=lambda x: x[1], reverse=True) if pd.api.types.is_numeric_dtype(df[k])][:3]
for col in top_nums:
    plt.figure()
    plt.scatter(df[col], df[target], alpha=0.3)
    plt.xlabel(col)
    plt.ylabel('Death (0/1)')
    plt.title(f'Scatter of {col} vs Heatstroke Death')
    plt.savefig(f'Scatter of {col} vs Heatstroke Death.png')
    plt.show()

In [None]:
# %%
# 7. Build and display DAG of top 5 causal features
top_feats = [feat for feat,_ in sorted(results_causation.items(), key=lambda x: (x[1] if not np.isnan(x[1]) else -1), reverse=True)[:5]]
dot = graphviz.Digraph('Heatstroke DGP')
for feat in top_feats:
    dot.node(feat, feat)
dot.node('Death', 'Death')
for feat in top_feats:
    dot.edge(feat, 'Death')
dot

In [None]:
# 1) EDA
# eda = EDAAnalyser(df)
# print(eda.summary())
# print(eda.missing_summary())

In [None]:
# 2) Features
fe = FeatureEngineer(df)
X, y = fe.get_features_and_target('Death From Heatstroke')
X = fe.one_hot_encode(X, ['Gender'])
X = fe.scale_numeric(X, ['Age'])
X_train, X_test, y_train, y_test = fe.train_test_split(X, y)

In [None]:




# 3) Modeling
mt = ModelTrainer(RandomForestClassifier(random_state=42),
                  param_grid={'n_estimators': [50,100], 'max_depth': [3,5]})
print("CV scores:", mt.cross_validate(X_train, y_train))
grid = mt.fit(X_train, y_train)
print("Best params:", grid.best_params_)
print("Test eval:", mt.evaluate(X_test, y_test))

# 4) Stats
corr = CorrelationAnalyser(df_sample)
print("Pearson X/Y:", corr.pearson('X','Y'))

In [None]:
# %%
# 4. Compute causation (conditional mutual information) for all features
ca = CausalityAnalyser(df)
results_causation = {}
bins = 10
for col in df.columns:
    if col == target:
        continue
    try:
        cmi = ca.conditional_mutual_information(col, target, col, n_bins=bins)
    except Exception:
        cmi = np.nan
    results_causation[col] = cmi

# Display top 5 by causation
sorted(results_causation.items(), key=lambda x: (x[1] if not np.isnan(x[1]) else -1), reverse=True)[:5]

# %%
# 5. Visualise correlation heatmap for numeric features
num_cols = df.select_dtypes(include=[np.number]).columns
grid = df[num_cols].corr()
plt.figure(figsize=(8,6))
plt.imshow(grid, cmap='viridis', interpolation='nearest')
plt.colorbar(label='Pearson r')
plt.xticks(range(len(num_cols)), num_cols, rotation=45, ha='right')
plt.yticks(range(len(num_cols)), num_cols)
plt.title('Numeric Feature Correlation Heatmap')
plt.tight_layout()
plt.show()

# %%
# 6. Scatter plots of top 3 numeric correlates vs Death
top_nums = [k for k,_ in sorted(results_corr.items(), key=lambda x: x[1], reverse=True) if pd.api.types.is_numeric_dtype(df[k])][:3]
for col in top_nums:
    plt.figure()
    plt.scatter(df[col], df[target], alpha=0.3)
    plt.xlabel(col)
    plt.ylabel('Death (0/1)')
    plt.title(f'Scatter of {col} vs Heatstroke Death')
    plt.show()

# %%
# 7. Build and display DAG of top 5 causal features
top_feats = [feat for feat,_ in sorted(results_causation.items(), key=lambda x: (x[1] if not np.isnan(x[1]) else -1), reverse=True)[:5]]
dot = graphviz.Digraph('Heatstroke DGP')
for feat in top_feats:
    dot.node(feat, feat)
dot.node('Death', 'Death')
for feat in top_feats:
    dot.edge(feat, 'Death')
dot

# %%
# 8. Interpretation
print("Top 5 features by correlation:")
for feat, val in sorted(results_corr.items(), key=lambda x: x[1], reverse=True)[:5]:
    print(f"  {feat}: {val:.3f}")
print("\nTop 5 features by conditional mutual information:")
for feat, val in sorted(results_causation.items(), key=lambda x: (x[1] if not np.isnan(x[1]) else -1), reverse=True)[:5]:
    print(f"  {feat}: {val:.3f}")

print("\nInterpretation:\n- Older age and higher IMD show strong correlation and causation signals.\n- Comorbidities (e.g., CVD, dementia) appear as top risk factors.\n- Rural residency and housing type also influence risk, likely due to access to cooling and emergency services.")


In [None]:
class BaseAnalyser:
    """
    Base class for all analysers.
    Holds a pandas DataFrame and provides common functionality.
    """
    def __init__(self, df: pd.DataFrame):
        """
        Initialize the analyser with a DataFrame.

        :param df: pandas DataFrame containing the data to analyse.
        """
        self.df = df  # Store the DataFrame for later use


class CorrelationAnalyser(BaseAnalyser):
    """
    Analyser for computing correlations between two variables in the DataFrame.
    Inherits from BaseAnalyser.
    """
    def pearson(self, x: str, y: str) -> float:
        """
        Compute Pearson correlation coefficient between columns x and y.

        :param x: name of the first numeric column
        :param y: name of the second numeric column
        :return: Pearson r (float)
        """
        # Select the two columns and compute the correlation matrix,
        # then extract the off-diagonal element at (0,1)
        return self.df[[x, y]].corr(method='pearson').iloc[0, 1]

    def spearman(self, x: str, y: str) -> float:
        """
        Compute Spearman rank correlation between columns x and y.

        :param x: name of the first column
        :param y: name of the second column
        :return: Spearman rho (float)
        """
        return self.df[[x, y]].corr(method='spearman').iloc[0, 1]


class RandomizationAnalyser(BaseAnalyser):
    """
    Analyser for Mendelian (instrumental-variable) randomization.
    Implements a two-stage least squares procedure.
    """
    def mendelian_randomization(self, exposure: str, outcome: str, instrument: str):
        """
        Perform two-stage least squares:
         1) Regress exposure on instrument
         2) Regress outcome on predicted exposure from stage 1

        :param exposure: name of the exposure column
        :param outcome: name of the outcome column
        :param instrument: name of the genetic instrument column
        :return: statsmodels RegressionResults of stage‑2 regression
        """

        # Drop rows with missing data in any of the three columns
        data = self.df.dropna(subset=[exposure, outcome, instrument])

        # Stage 1: fit exposure ~ instrument + intercept
        inst = sm.add_constant(data[instrument])       # add constant term
        model1 = sm.OLS(data[exposure], inst).fit()    # OLS regression
        exp_hat = model1.predict(inst)                 # predicted exposure

        # Stage 2: fit outcome ~ predicted exposure + intercept
        inst2 = sm.add_constant(exp_hat)               
        model2 = sm.OLS(data[outcome], inst2).fit()
        return model2  # return fitted model object


class CausalityAnalyser(BaseAnalyser):
    """
    Analyser for various causality metrics:
     - Conditional Mutual Information (CMI)
     - Transfer Entropy (TE)
     - Granger Causality (GC)
    """
    def conditional_mutual_information(self, x: str, y: str, z: str, n_bins: int = 10) -> float:
        """
        Estimate I(X; Y | Z) by discretizing X, Y, Z into bins.

        :param x: name of variable X
        :param y: name of variable Y
        :param z: name of conditioning variable Z
        :param n_bins: number of bins for discretization
        :return: estimated conditional mutual information
        """
        # Select and drop rows with missing values
        data = self.df[[x, y, z]].dropna()

        # Discretize each variable into integer bins [0..n_bins-1]
        disc = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy='uniform')
        Xd, Yd, Zd = disc.fit_transform(data).astype(int).T
        n = len(Xd)

        # Count joint and marginal frequencies
        from collections import Counter
        p_xyz = Counter(zip(Xd, Yd, Zd))
        p_xz  = Counter(zip(Xd, Zd))
        p_yz  = Counter(zip(Yd, Zd))
        p_z   = Counter(Zd)

        # Compute CMI sum_{x,y,z} p(x,y,z) * log( (p(x,y,z)*p(z)) / (p(x,z)*p(y,z)) )
        cmi = 0.0
        for (xi, yi, zi), count in p_xyz.items():
            p_xyz_val = count / n
            p_xz_val  = p_xz[(xi, zi)] / n
            p_yz_val  = p_yz[(yi, zi)] / n
            p_z_val   = p_z[zi] / n
            cmi += p_xyz_val * np.log((p_xyz_val * p_z_val) / (p_xz_val * p_yz_val) + 1e-12)
        return cmi

    def transfer_entropy(self, source: str, target: str, lag: int = 1, n_bins: int = 10) -> float:
        """
        Estimate Transfer Entropy TE(source→target) ≈ I(source_{t-lag}; target_t | target_{t-lag})

        :param source: name of source time series
        :param target: name of target time series
        :param lag: lag order
        :param n_bins: number of bins for discretization
        :return: estimated transfer entropy
        """
        # Prepare lagged variables
        df = self.df[[source, target]].dropna()
        df['target_lag'] = df[target].shift(lag)
        df['source_lag'] = df[source].shift(lag)
        df = df.dropna()

        # Compute conditional mutual information for TE
        return self.conditional_mutual_information('source_lag', target, 'target_lag', n_bins=n_bins)

    def granger_causality(self, source: str, target: str, maxlag: int = 1, **kwargs):
        """
        Perform Granger causality test: does `source` help predict `target`?

        :param source: name of source series
        :param target: name of target series
        :param maxlag: maximum lag to test
        :return: dictionary of test results per lag
        """
        data = self.df[[target, source]].dropna()
        # Format: array [[target, source], ...]
        arr = data.values
        results = grangercausalitytests(arr, maxlag=maxlag, verbose=False)
        return results


if __name__ == "__main__":
    # Example CLI: python src/analysers.py --input data.csv --mode pearson --x col1 --y col2
    import argparse

    parser = argparse.ArgumentParser(description="Run statistical analysers on a CSV file")
    parser.add_argument("--input", "-i", required=True,
                        help="Path to input CSV file")
    parser.add_argument("--mode", "-m", required=True,
                        choices=["pearson", "spearman", "mr", "cmi", "te", "gc"],
                        help="Analysis mode")
    parser.add_argument("--x", help="Column X (for correlation, CMI, TE, GC)")
    parser.add_argument("--y", help="Column Y (for correlation, CMI, TE, GC)")
    parser.add_argument("--z", help="Column Z (for CMI)")
    parser.add_argument("--instrument", help="Instrument column (for MR)")
    parser.add_argument("--exposure", help="Exposure column (for MR)")
    parser.add_argument("--outcome", help="Outcome column (for MR)")
    parser.add_argument("--lag", type=int, default=1, help="Lag for TE/GC")
    parser.add_argument("--bins", type=int, default=10, help="Bins for discretization")
    args = parser.parse_args()

    # Load data
    df = pd.read_csv(args.input)
    if args.mode in ["pearson", "spearman"]:
        corr = CorrelationAnalyser(df)
        func = corr.pearson if args.mode == "pearson" else corr.spearman
        print(f"{args.mode}({args.x}, {args.y}) =", func(args.x, args.y))

    elif args.mode == "mr":
        rnd = RandomizationAnalyser(df)
        model = rnd.mendelian_randomization(args.exposure, args.outcome, args.instrument)
        print(model.summary())

    elif args.mode == "cmi":
        caus = CausalityAnalyser(df)
        print("CMI:", caus.conditional_mutual_information(args.x, args.y, args.z, n_bins=args.bins))

    elif args.mode == "te":
        caus = CausalityAnalyser(df)
        print("TE:", caus.transfer_entropy(args.x, args.y, lag=args.lag, n_bins=args.bins))

    elif args.mode == "gc":
        caus = CausalityAnalyser(df)
        res = caus.granger_causality(args.x, args.y, maxlag=args.lag)
        print("Granger Causality results:", res)



# :
# df = pd.read_csv('your_data.csv')
# corr = CorrelationAnalyser(df)
# print("Pearson r:", corr.pearson('X', 'Y'))
# rnd = RandomizationAnalyser(df)
# mr_model = rnd.mendelian_randomization('exposure', 'outcome', 'instrument')
# print(mr_model.summary())
# caus = CausalityAnalyser(df)
# print("Conditional MI:", caus.conditional_mutual_information('X','Y','Z'))
# print("Transfer Entropy:", caus.transfer_entropy('X','Y'))
# print("Granger Causality:", caus.granger_causality('X','Y', maxlag=3))

# from src.analysers import (
#     EDAAnalyser, FeatureEngineer, ModelTrainer,
#     CorrelationAnalyser, RandomizationAnalyser, CausalityAnalyser
# )




In [None]:



def cramers_v(confusion_matrix):
    """
    Compute Cramer's V statistic for categorical-categorical association.
    """
    chi2 = chi2_contingency(confusion_matrix)[0]
    n = confusion_matrix.sum()
    phi2 = chi2 / n
    r, k = confusion_matrix.shape
    return np.sqrt(phi2 / min(r - 1, k - 1))


class HeatstrokeAnalyzer:
    """
    A class to load, preprocess, explore, model and interpret
    a heatstroke dataset.
    """
    def __init__(self, csv_path: str):
        """
        Initialize with path to CSV file.
        """
        self.csv_path = csv_path
        self.df = None
        self.model = None
        self.X = None
        self.y = None
        self.coefficients = None
        self.feature_scores = None  # correlation/association scores

    def load_and_preprocess(self):
        """Load the CSV, encode target, compute BMI, drop missing/infinite."""
        df = pd.read_csv(self.csv_path)
        # Encode binary target
        df['Death'] = df['Death From Heatstroke'].astype(int)
        # Drop rows missing critical vars
        df.dropna(subset=['Age','Height','Weight','Blood Type'], inplace=True)


In [None]:
#!/usr/bin/env python3
"""
analysers_enhanced.py

An extension of src/analysers.py to include a HeatstrokeAnalyser that:
  - Loads and preprocesses the heatstroke dataset
  - Computes correlation and causation metrics in a loop for each feature
  - Visualises results via heatmap, scatter plots, and a DAG (Directed Acyclic Graph)
  - Identifies and interprets the most influential risk factors

Requires:
  - pandas, numpy, matplotlib, scipy, graphviz, statsmodels
  - Existing analysers: CorrelationAnalyser, CausalityAnalyser from src/analysers.py

Usage:
    from analyzers_enhanced import HeatstrokeAnalyser
    hsa = HeatstrokeAnalyser(csv_path)
    hsa.run_full_analysis()
"""
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import graphviz
from scipy.stats import pointbiserialr
#from src.analysers import CorrelationAnalyser, CausalityAnalyser  # reuse existing analysers


class HeatstrokeAnalyser:
    """
    Class to analyse risk factors for heatstroke death.
    Methods:
      - load_and_clean: load CSV, encode target, compute BMI, drop NaNs
      - test_correlations: loop numeric and categorical features to compute
        point-biserial (numeric) or Cramér's V (categorical) vs binary death outcome
      - test_causation: use Conditional Mutual Information for all features
      - visualize_heatmap: plot correlation matrix as heatmap
      - visualize_scatter: scatter plot numeric features against outcome probability
      - build_dag: create a simple DAG of top causal features
      - run_full_analysis: orchestrate all steps and interpret results
    """

    def __init__(self, csv_path: str):
        """
        Initialize with path to heatstroke dataset CSV.
        """
        self.csv_path = csv_path
        self.df = None
        self.results = {
            'correlation': {},
            'causation': {}
        }

    def load_and_clean(self):
        """
        Load data, encode target, compute BMI, drop missing or infinite values.
        """
        df = pd.read_csv(self.csv_path)
        # Encode binary target
        df['Death'] = df['Death From Heatstroke'].astype(int)
        # Drop missing values in critical columns
        df = df.dropna(subset=['Age', 'Height', 'Weight', 'Death'])
        # Compute BMI
        df['BMI'] = df['Weight'] / ((df['Height'] / 100) ** 2)
        # Remove infinities and further NaNs
        df.replace([np.inf, -np.inf], np.nan, inplace=True)
        df = df.dropna()
        self.df = df.reset_index(drop=True)

    @staticmethod
    def cramers_v(x, y) -> float:
        """
        Compute Cramér's V for two categorical variables.
        """
        confusion = pd.crosstab(x, y)
        chi2 = chi2_contingency(confusion)[0]
        n = confusion.sum().sum()
        k = min(confusion.shape)
        return np.sqrt(chi2 / (n * (k - 1) + 1e-12))

    def test_correlations(self):
        """
        For each feature, compute correlation with Death:
          - Numeric: point-biserial
          - Categorical: Cramér's V
        Store absolute values in self.results['correlation'].
        """
        df = self.df
        target = 'Death'
        for col in df.columns:
            if col == target:
                continue
            series = df[col]
            if pd.api.types.is_numeric_dtype(series):
                # point biserial for numeric vs binary
                corr, _ = pointbiserialr(series, df[target])
            else:
                # categorical
                corr = self.cramers_v(series, df[target])
            self.results['correlation'][col] = abs(corr)

    def test_causation(self, bins=10):
        """
        For each feature, estimate conditional mutual information I(feature; Death | feature_lag)
        using CausalityAnalyser. Store in self.results['causation'].
        """
        ca = CausalityAnalyser(self.df)
        for col in self.df.columns:
            if col == 'Death':
                continue
            try:
                # use the feature itself as instrument via small lag (non-time series approx)
                cmi = ca.conditional_mutual_information(col, 'Death', col, n_bins=bins)
            except Exception:
                cmi = np.nan
            self.results['causation'][col] = cmi

    def visualize_heatmap(self):
        """
        Plot a heatmap of the correlation matrix among top features.
        """
        # build full correlation matrix for numeric cols
        num_cols = self.df.select_dtypes(include=[np.number]).columns
        corr_matrix = self.df[num_cols].corr()
        plt.figure(figsize=(8, 6))
        plt.imshow(corr_matrix, cmap='viridis', interpolation='nearest')
        plt.colorbar(label='Pearson r')
        plt.xticks(range(len(num_cols)), num_cols, rotation=45, ha='right')
        plt.yticks(range(len(num_cols)), num_cols)
        plt.title('Numeric Feature Correlation Heatmap')
        plt.tight_layout()
        plt.show()

    def visualize_scatter(self):
        """
        For top numeric features by correlation, scatter vs Death probability.
        """
        # rank numeric by corr
        corr_sorted = {k:v for k,v in sorted(self.results['correlation'].items(),
                                             key=lambda item: item[1], reverse=True)}
        top_nums = [k for k in corr_sorted if pd.api.types.is_numeric_dtype(self.df[k])][:3]
        for col in top_nums:
            plt.figure()
            plt.scatter(self.df[col], self.df['Death'], alpha=0.3)
            plt.xlabel(col)
            plt.ylabel('Death (0/1)')
            plt.title(f'Scatter of {col} vs Heatstroke Death')
            plt.show()

    def build_dag(self, top_k=5):
        """
        Construct a simple DAG of top_k causal features pointing to Death.
        """
        # pick top causation
        sorted_causal = sorted(self.results['causation'].items(),
                               key=lambda item: item[1] if not np.isnan(item[1]) else -1,
                               reverse=True)
        top_feats = [feat for feat,_ in sorted_causal[:top_k]]
        dot = graphviz.Digraph(comment='Heatstroke DAG')
        # add nodes
        for feat in top_feats:
            dot.node(feat, feat)
        dot.node('Death', 'Death')
        # add edges
        for feat in top_feats:
            dot.edge(feat, 'Death')
        display(dot)

    def run_full_analysis(self):
        """
        Orchestrate loading, testing, visualizing, DAG building, and interpretation.
        """
        print("Loading and cleaning data...")
        self.load_and_clean()
        print("Testing correlations...")
        self.test_correlations()
        print("Testing causation...")
        self.test_causation()
        print("Correlation results (top 5):")
        for feat, val in sorted(self.results['correlation'].items(),
                                key=lambda x: x[1], reverse=True)[:5]:
            print(f"  {feat}: {val:.3f}")
        print("Causation results (top 5):")
        for feat, val in sorted(self.results['causation'].items(),
                                key=lambda x: (x[1] if not np.isnan(x[1]) else -1),
                                reverse=True)[:5]:
            print(f"  {feat}: {val:.3f}")
        print("Generating heatmap...")
        self.visualize_heatmap()
        print("Generating scatter plots...")
        self.visualize_scatter()
        print("Building DAG...")
        self.build_dag()
        print("Analysis complete. Interpret the printed results and visualizations to identify the most influential factors.")


if __name__ == '__main__':
    import argparse
    parser = argparse.ArgumentParser(description='Heatstroke risk factor analysis')
    parser.add_argument('csv_path', help='Path to G7 summer dataset CSV')
    args = parser.parse_args()
    hsa = HeatstrokeAnalyser(args.csv_path)
    hsa.run_full_analysis()


In [None]:
class BaseAnalyser:
    """
    Base class for all analysers.
    Holds a pandas DataFrame and provides common functionality.
    """
    def __init__(self, df: pd.DataFrame):
        """
        Initialize the analyser with a DataFrame.

        :param df: pandas DataFrame containing the data to analyse.
        """
        self.df = df  # Store the DataFrame for later use


class CorrelationAnalyser(BaseAnalyser):
    """
    Analyser for computing correlations between two variables in the DataFrame.
    Inherits from BaseAnalyser.
    """
    def pearson(self, x: str, y: str) -> float:
        """
        Compute Pearson correlation coefficient between columns x and y.

        :param x: name of the first numeric column
        :param y: name of the second numeric column
        :return: Pearson r (float)
        """
        # Select the two columns and compute the correlation matrix,
        # then extract the off-diagonal element at (0,1)
        return self.df[[x, y]].corr(method='pearson').iloc[0, 1]

    def spearman(self, x: str, y: str) -> float:
        """
        Compute Spearman rank correlation between columns x and y.

        :param x: name of the first column
        :param y: name of the second column
        :return: Spearman rho (float)
        """
        return self.df[[x, y]].corr(method='spearman').iloc[0, 1]
class MLAnalyser(BaseAnalyser):
    """
    Analyser for basic machine learning tasks:
     - Logistic Regression
     - Naïve Bayes (Gaussian)
     - Hierarchical Clustering (Agglomerative)
     - K‑Means Clustering
    """
    def logistic_regression(self, features: list, target: str, **kwargs):
        """
        Fit a logistic regression model.

        :param features: list of feature column names
        :param target: name of the binary target column
        :return: fitted model, predictions array
        """
        df = self.df.dropna(subset=features + [target])
        X = df[features]
        y = df[target]
        model = LogisticRegression(**kwargs)
        model.fit(X, y)
        preds = model.predict(X)
        print("Accuracy:", accuracy_score(y, preds))
        print(classification_report(y, preds))
        return model, preds

    def naive_bayes(self, features: list, target: str, **kwargs):
        """
        Fit a Gaussian Naïve Bayes model.

        :param features: list of feature column names
        :param target: name of the categorical target column
        :return: fitted model, predictions array
        """
        df = self.df.dropna(subset=features + [target])
        X = df[features]
        y = df[target]
        model = GaussianNB(**kwargs)
        model.fit(X, y)
        preds = model.predict(X)
        print("Accuracy:", accuracy_score(y, preds))
        print(classification_report(y, preds))
        return model, preds

    def hierarchical_clustering(self, features: list, n_clusters: int = 2, **kwargs):
        """
        Perform agglomerative (hierarchical) clustering.

        :param features: list of feature column names
        :param n_clusters: desired number of clusters
        :return: array of cluster labels
        """
        X = self.df[features].dropna()
        model = AgglomerativeClustering(n_clusters=n_clusters, **kwargs)
        labels = model.fit_predict(X)
        return labels

    def k_means(self, features: list, n_clusters: int = 2, **kwargs):
        """
        Perform k-means clustering.

        :param features: list of feature column names
        :param n_clusters: desired number of clusters
        :return: array of cluster labels
        """
        X = self.df[features].dropna()
        model = KMeans(n_clusters=n_clusters, **kwargs)
        labels = model.fit_predict(X)
        return labels


In [None]:
# df = pd.read_csv('your_data.csv')
corr = CorrelationAnalyser(df)
print("Pearson r:", corr.pearson('X', 'Y'))

ml = MLAnalyser(df)

# 1) Logistic regression: predict purchase (bought) from X & income
model, preds = ml.logistic_regression(features=['X', 'Y'], target='instrument')
# prints accuracy & classification report

# 2) Naïve Bayes: same task
nb_model, nb_preds = ml.naive_bayes(features=['X', 'Y'], target='instrument')

# 3) Hierarchical clustering: cluster customers by age & income
hc_labels = ml.hierarchical_clustering(features=['X', 'Y'], n_clusters=2)
print("HC clusters:", hc_labels)

# 4) K-Means clustering: same
k_labels = ml.k_means(features=['X', 'Y'], n_clusters=2)
print("KMeans clusters:", k_labels)