# Survival Analysis on TCGA High Grade Serous Ovarian Cancer

In [None]:
%pip install pandas numpy scikit-learn lifelines

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from lifelines import KaplanMeierFitter
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import KFold, GridSearchCV
from sklearn.pipeline import Pipeline
from sksurv.linear_model import CoxnetSurvivalAnalysis
from sksurv.util import Surv
from sksurv.metrics import concordance_index_censored
from matplotlib.backends.backend_pdf import PdfPages
from sklearn.compose import ColumnTransformer
from lifelines.statistics import logrank_test
from scipy import stats
import datetime

## Load Data and preprocess survival columns to a binary format

In [None]:
df = pd.read_csv("./data/hgsoc_tcga_gdc_clinical_data.tsv", sep='\t')

time_col = "Overall Survival (Months)"
event_col_raw = "Overall Survival Status"
event_col = 'Overall Survival Final'
df = df.dropna(subset=[event_col_raw])

df[event_col] = df[event_col_raw].str.extract(r'(\d+)').astype(int)

# Ensure event_col is binary (1=event occurred/death, 0=censored)
df[event_col] = df[event_col].astype(int)
df[event_col].value_counts()

## Select features

In [None]:
''' After careful consideration of the features, here are the ones I think would be the most informative given the information provided. Of course, the model can be further improved by using mutations, CNVs or other genomic biomarkers. Check out model_b to see how to I incorporate all biomarker and clinical information into a model. But for this exercise let's keep it simple and prioritize clinical features'''
df= df[["Patient ID", "Sample ID", "Race Category", "FIGO Stage", "Oncotree Code", "Diagnosis Age", "Fraction Genome Altered", time_col, event_col]]
feature_columns = [
    "Diagnosis Age",
    "Fraction Genome Altered",
    "Race Category",
    "FIGO Stage",
    "Oncotree Code",
]

df = df.dropna(subset=[time_col, event_col,
                       "Diagnosis Age",
                       "Fraction Genome Altered",
                       "Race Category",
                       "FIGO Stage",
                       "Oncotree Code"])

# Convert to structured array needed by scikit-survival
y = Surv.from_arrays(event=(df[event_col] == 1), time=df[time_col].values)

X = df[feature_columns].copy()

# Specify which columns are categorical and which are numeric. You would need to take a look at the data to do that 
cat_cols = ["Race Category", "FIGO Stage", "Oncotree Code"]
num_cols = ["Diagnosis Age", "Fraction Genome Altered"]
print(f"Number of patients in the cohort: {df['Sample ID'].nunique()}")


## Build a preprocessing pipeline with scikit-learn

In [None]:
# i love writing pipelines
numeric_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="mean")),
    ("scaler", StandardScaler())
])

# Define categorical pipeline to process cat columns
categorical_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("onehot", OneHotEncoder(sparse_output=False, drop="first", handle_unknown="ignore"))
])

preprocessor = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, num_cols),
        ("cat", categorical_transformer, cat_cols)
    ]
)

## Build Coxnet with L1 penalty model and tune parameters

In [None]:
# CoxnetSurvivalAnalysis allows fitting a path of solutions for given alphas.
# We'll use GridSearchCV to select the best alpha by maximizing c-index.
alphas = 10 ** np.linspace(-3, 1, 20)  # you can adjust the range

# Define a simple function for c-index scoring in GridSearchCV
def c_index_scorer(estimator, X_values, y_values):
    predi = estimator.predict(X_values)
    return concordance_index_censored(y_values['event'], y_values['time'], predi)[0]

coxnet = CoxnetSurvivalAnalysis(l1_ratio=1.0, alpha_min_ratio=0.01)

pipe = Pipeline([
    ("preprocessing", preprocessor),
    ("model", coxnet)
])

param_grid = {
    "model__alphas": [alphas]
}

cv = KFold(n_splits=5, shuffle=True, random_state=42)
grid_search = GridSearchCV(pipe, param_grid, scoring=c_index_scorer, cv=cv)
grid_search.fit(X, y)

# Get the best model
best_model = grid_search.best_estimator_.named_steps['model']

# See what you get for the best alpha
best_alpha = best_model.alphas_[0]  # For L1-penalized Cox model, alphas_[0] contains the used alpha
# Lets save feature coefficients
final_coefs = best_model.coef_[0]

#some helper variables
feature_names_num = num_cols
encoder = grid_search.best_estimator_.named_steps['preprocessing'].named_transformers_['cat'].named_steps['onehot']
encoded_cat_names = encoder.get_feature_names_out(cat_cols)
all_feature_names = feature_names_num + encoded_cat_names.tolist()

# Print final coefficients
print("Final Coefficients:")
for name, coef in zip(all_feature_names, final_coefs):
    print(f"{name}: {coef:.4f}")

# print best alpha value
print(f"Best alpha that we can choose: {best_alpha:.4f}")

best_model = grid_search.best_estimator_

print("Best of the best parameters:", grid_search.best_params_)
print("Best CV c-index:", grid_search.best_score_)

# Extract final model
final_coxnet = best_model.named_steps["model"]


# Get feature names from the ColumnTransformer
feature_names_num = num_cols
# Extract encoded categorical feature names
encoder = best_model.named_steps["preprocessing"].named_transformers_["cat"].named_steps["onehot"]
encoded_cat_names = encoder.get_feature_names_out(cat_cols)
all_feature_names = feature_names_num + encoded_cat_names.tolist()

print("Final coefficients at best alpha value:")
for fname, coef in zip(all_feature_names, final_coefs):
    print(f"{fname}: {coef}")


## Galculate Risk Scores and Kaplan-Meier (KM) Curves. Plot KM curves for High and Low Risk group. Use Log-Rank test, Breslow and Tarone-Ware tests. For shorter survival times use Wilcoxon test 

In [None]:
# First, set up local functions for Breslow and Tarone-Ware weights
def breslow_weights(at_risk):
    return at_risk

def tarone_ware_weights(at_risk):
    return np.sqrt(at_risk)

def wilcoxon_weights(events):
    return events

def weighted_logrank_test(time_high, time_low, event_high, event_low, weighting_function):
    n_high = len(time_high)
    n_low = len(time_low)
    combined_times = np.concatenate([time_high, time_low])
    combined_events = np.concatenate([event_high, event_low])
    group_indicator = np.concatenate([np.ones(n_high), np.zeros(n_low)])

    sorted_indices = np.argsort(combined_times)
    times = combined_times[sorted_indices]
    events = combined_events[sorted_indices]
    groups = group_indicator[sorted_indices]

    at_risk_high = np.cumsum(groups[::-1])[::-1]
    at_risk_low = np.cumsum((1 - groups)[::-1])[::-1]
    at_risk_total = at_risk_high + at_risk_low

    weights = weighting_function(at_risk_total)
    observed_high = np.cumsum(events * groups * weights)
    expected_high = np.cumsum(events * (at_risk_high / at_risk_total) * weights)
    test_statistic = np.sum((observed_high - expected_high) ** 2 / (expected_high + 1e-10))
    # Test statistic (chi-squared)
    p_value = 1 - stats.chi2.cdf(test_statistic, df=1)
    return test_statistic, p_value

## Calculate risk scores for High and Low risk groups and perform log-rank and wilcoxon tests, implement automated decision about model value based on ci-index score

In [None]:
risk_scores = best_model.predict(X)

# Split patients into high and low risk by median
median_score = np.median(risk_scores)
high_risk_group = risk_scores >= median_score
low_risk_group = risk_scores < median_score

# Extract time and event data for each group
time_high = df.loc[high_risk_group, time_col]
event_high = df.loc[high_risk_group, event_col]

time_low = df.loc[low_risk_group, time_col]
event_low = df.loc[low_risk_group, event_col]

# Perform Log-Rank Test
logrank_results = logrank_test(time_high, time_low, event_high, event_low)

# Calculate Breslow, Tarone and logrank p-values
test_stat_breslow, breslow_p = weighted_logrank_test(time_high, time_low, event_high, event_low, breslow_weights)
test_stat_tarone, tarone_p = weighted_logrank_test(time_high, time_low, event_high, event_low, tarone_ware_weights)
test_stat_logrank, logrank_p = logrank_results.test_statistic, logrank_results.p_value
test_statistic_wilcoxon, p_value_wilcoxon = weighted_logrank_test(
    time_high, time_low, event_high, event_low, wilcoxon_weights
)
print(f"Wilcoxon Test: Test Statistic = {test_statistic_wilcoxon:.4f}, p-value = {p_value_wilcoxon:.4e}")
# create variable to make automated decision based on the ci-index score
decision=0
if grid_search.best_score_<0.5:
    decision = 'terrible'
elif grid_search.best_score_==0.5:
    decision = 'random performance'
elif grid_search.best_score_>0.5 and grid_search.best_score_<=0.8:
    decision = 'above chance performance'
elif grid_search.best_score_>0.8 and grid_search.best_score_<=0.95:
    decision = 'good'
elif grid_search.best_score_>0.95:
    decision = 'excellent'
else :
    decision = 'missed bucket: rethink your code'

## Fit Cox Proportional Hazard Model and calculate standard errors

In [None]:
from lifelines import CoxPHFitter

# Prepare data for lifelines CoxPHFitter
df_lifelines = df.copy()
df_lifelines["event"] = (df[event_col] == 1).astype(int)
df_lifelines["time"] = df[time_col]

# Select covariates and survival data
lifelines_columns = feature_columns + ["time", "event"]

# Fit CoxPHFitter
cph = CoxPHFitter()
cph.fit(df_lifelines[lifelines_columns], duration_col="time", event_col="event")

# Extract coefficients and standard errors
cph_summary = cph.summary
coefficients = cph_summary["coef"]
standard_errors = cph_summary["se(coef)"]

print("Coefficients:\n", coefficients)
print("Standard Errors:\n", standard_errors)

## Calculate covariate effects on Survival with confidence intervals

In [None]:

# Define z-value for 95% CI
z = 1.96

# Calculate confidence intervals
lower_ci = np.exp(final_coefs - z * standard_errors)
upper_ci = np.exp(final_coefs + z * standard_errors)

# Add to the covariate_effects DataFrame
covariate_effects["Lower CI"] = lower_ci
covariate_effects["Upper CI"] = upper_ci

# Print covariate effects with CIs
print("\nCovariate Effects on Survival with Confidence Intervals:")
print(covariate_effects)

##Plot Hazard Ratios and Confidence Intervals in the notebook

In [None]:
plt.figure(figsize=(10, 6))

# Plot hazard ratios with error bars
plt.errorbar(
    covariate_effects["Hazard Ratio (HR)"],  # HR values
    covariate_effects["Feature"],  # Covariate names
    xerr=[covariate_effects["Hazard Ratio (HR)"] - covariate_effects["Lower CI"],
          covariate_effects["Upper CI"] - covariate_effects["Hazard Ratio (HR)"]],
    fmt='o', color='blue', ecolor='gray', elinewidth=2, capsize=4
)

# Add a vertical line at HR=1
plt.axvline(1, color="black", linestyle="--", linewidth=1)

# Set labels and title
plt.title("Hazard Ratios with Confidence Intervals")
plt.xlabel("Hazard Ratio")
plt.ylabel("Covariate")
plt.tight_layout()
plt.show()

# Set up PDF reporting and write analysis plots into the PDF

In [None]:
report_filename = f"./reports/Survival_Analysis_Report_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
from matplotlib.figure import Figure
with PdfPages(report_filename) as pdf:

    # Figure 1: KM Curves
    plt.figure(figsize=(8, 6))
    kmf_high = KaplanMeierFitter()
    kmf_low = KaplanMeierFitter()

    kmf_high.fit(time_high, event_high, label='High Risk')
    ax = kmf_high.plot_survival_function(ci_show=True, color='red')

    kmf_low.fit(time_low, event_low, label='Low Risk')
    kmf_low.plot_survival_function(ci_show=True, ax=ax, color='blue')

    plt.title("Kaplan-Meier Survival Curves (High vs Low Risk)")
    plt.xlabel("Time (Months)")
    plt.ylabel("Survival Probability")
    plt.legend()
    plt.grid(False)

    plt.text(0.2, 0.2, f'Log-Rank Test p-value: {logrank_p:.4f}', fontsize=8, transform=plt.gca().transAxes)
    plt.text(0.2, 0.15, f'Breslow Test p-value: {breslow_p:.4f}', fontsize=8, transform=plt.gca().transAxes)
    plt.text(0.2, 0.1, f'Tarone-Ware Test p-value: {tarone_p:.4f}', fontsize=8, transform=plt.gca().transAxes)
    plt.text(0.2, 0.05, f'Wilcoxon Test p-value: {p_value_wilcoxon:.4f}', fontsize=8, transform=plt.gca().transAxes)

    pdf.savefig()
    plt.close()

    # Figure 2: Lasso Cox Coefficient Plot
    plt.figure(figsize=(10, 6))
    sorted_indices = np.argsort(final_coefs)
    plt.barh(np.array(all_feature_names)[sorted_indices], final_coefs[sorted_indices])
    plt.title("Feature Coefficients from LASSO Cox Model")
    plt.xlabel("Coefficient")
    plt.ylabel("Features")
    plt.tight_layout()
    pdf.savefig()
    plt.close()

    # Figure 3: Conclusions Page
    fig = Figure()
    text_ax = fig.add_subplot(111)
    text_ax.axis('off')
    conclusion_text = (
        "Conclusions:\n\n"
        f"1. The best alpha chosen by cross-validation is {best_alpha:.4f}.\n"
        f"2. The cross-validated c-index was {grid_search.best_score_:.3f}, indicating {decision} model discriminative ability.\n"
        f"3. Patients stratified by median predicted risk scores show distinct survival curves, "
        f"with the high-risk group demonstrating worse survival.\n"
        f"4. Statistical test results comparing survival curves:\n"
        f"   - Log-Rank Test p-value: {logrank_p:.4f}, Test-statistics: {test_stat_logrank:.4f}\n"
        f"   - Breslow Test p-value: {breslow_p:.4f}, Test-statistics: {test_stat_breslow:.4f}\n"
        f"   - Tarone-Ware Test p-value: {tarone_p:.4f}, Test-statistics: {test_stat_tarone:.4f}\n"
        f"   - Wilcoxon Test p-value: {p_value_wilcoxon:.4f}, Test-statistics: {test_statistic_wilcoxon:.4f}\n"

        f"5. The LASSO penalty shrinks some coefficients to zero, highlighting the most informative features.\n"
    )
    text_ax.text(0.01, 0.98, conclusion_text, verticalalignment='top', fontsize=12, wrap=True)
    pdf.savefig(fig)
    plt.close(fig)

    # Figure 4 Hazard ratios with confidence intervals from Cox Hazard Ratios Model
    plt.figure(figsize=(10, 6))
    plt.errorbar(
        covariate_effects["Hazard Ratio (HR)"],
        covariate_effects["Feature"],
        xerr=[covariate_effects["Hazard Ratio (HR)"] - covariate_effects["Lower CI"],
              covariate_effects["Upper CI"] - covariate_effects["Hazard Ratio (HR)"]],
        fmt='o', color='blue', ecolor='gray', elinewidth=2, capsize=4
    )
    plt.axvline(1, color="black", linestyle="--", linewidth=1)
    plt.title("Hazard Ratios with Confidence Intervals")
    plt.xlabel("Hazard Ratio")
    plt.ylabel("Covariate")
    plt.tight_layout()
    pdf.savefig()
    plt.close()

    # Save covariate effects table in the PDF
    fig = plt.figure(figsize=(10, 6))
    plt.axis('off')
    table = plt.table(cellText=covariate_effects.values,
                      colLabels=covariate_effects.columns,
                      loc='center')
    table.auto_set_font_size(False)
    table.set_fontsize(10)
    table.auto_set_column_width(col=list(range(len(covariate_effects.columns))))
    plt.title("Covariate Effects on Survival")
    pdf.savefig()
    plt.close()

print(f"Covariate analysis with confidence intervals added to {report_filename}")