In [1]:
# 1.) Download the PDB chain sequences (FASTA format from RCSB) via the HTTPS mirror,
#      then keep only the first 15 000 entries.

import requests

# Use the “files.wwpdb.org” HTTPS mirror instead of FTP
pdb_url = "https://files.wwpdb.org/pub/pdb/derived_data/pdb_seqres.txt"
try:
    resp = requests.get(pdb_url, timeout=60)
    resp.raise_for_status()
    text = resp.text.strip()
    if not text.startswith(">"):
        raise RuntimeError("Downloaded content does not look like FASTA.")
except Exception as e:
    raise RuntimeError(f"Failed to download PDB chain sequences: {e}")

# Write the complete dump to a temporary file
with open("pdb_chains.fasta", "w", encoding="utf-8") as f:
    f.write(text + "\n")

# ─── Split the full FASTA into individual (header, sequence) tuples ─────────────
def split_fasta(filepath):
    sequences = []
    with open(filepath, "r") as f:
        header = None
        seq_lines = []
        for line in f:
            line = line.rstrip()
            if line.startswith(">"):
                if header is not None:
                    sequences.append((header, "".join(seq_lines)))
                header = line
                seq_lines = []
            else:
                seq_lines.append(line)
        # Add the final sequence
        if header is not None:
            sequences.append((header, "".join(seq_lines)))
    return sequences

all_chains = split_fasta("pdb_chains.fasta")

# ─── Keep exactly the first 15 000 chains ─────────────────────────────────────────
subset = all_chains[:15000]

# ─── Write those 15 000 chains back to “pdb_chains.fasta” ───────────────────────
with open("pdb_chains.fasta", "w", encoding="utf-8") as f:
    for header, seq in subset:
        f.write(f"{header}\n")
        f.write(f"{seq}\n")

print(f"✔ Extracted {len(subset)} chains → 'pdb_chains.fasta'")


✔ Extracted 15000 chains → 'pdb_chains.fasta'


In [2]:
# 2.) Use DisProt’s search endpoint with format=fasta
import requests
import os

url = "https://disprot.org/api/search?format=fasta&limit=10000"
try:
    resp = requests.get(url, timeout=15)
    resp.raise_for_status()
except Exception as e:
    raise RuntimeError(f"Failed to GET DisProt FASTA via API: {e}")

text = resp.text.strip()

# 2.2) Quick sanity check: FASTA must start with '>', not '<'
if not text.startswith(">"):
    raise RuntimeError(
        "Downloaded content does not look like FASTA. "
        "If it begins with '<', you're still hitting an HTML page instead of raw FASTA."
    )

# 2.3) Write the 100 DisProt entries to a file
with open("disprot_13000.fasta", "w") as f:
    f.write(text + "\n")

print("✔ Successfully fetched 100 DisProt sequences in FASTA format → 'disprot_1000.fasta'")


✔ Successfully fetched 100 DisProt sequences in FASTA format → 'disprot_1000.fasta'


In [3]:
# 2.1) Collect more data

import requests
import time

# ─── PARAMETERS ─────────────────────────────────────────────────────────────
TOTAL_DESIRED = 25_000   # how many DisProt sequences we want total
PER_PAGE      = 100      # DisProt’s hard cap per request
OUTPUT_FILE   = "disprot_13000.fasta"

accum_seqs = []
offset     = 0

while len(accum_seqs) < TOTAL_DESIRED:
    url = f"https://disprot.org/api/search?format=fasta&limit={PER_PAGE}&offset={offset}"
    try:
        resp = requests.get(url, timeout=30)
        resp.raise_for_status()
    except Exception as e:
        raise RuntimeError(f"Failed to GET DisProt FASTA (offset={offset}): {e}")

    block = resp.text.strip()
    if not block.startswith(">"):
        raise RuntimeError(
            "Downloaded content does not look like FASTA. "
            "If it begins with '<', you're still hitting an HTML page."
        )

    # Parse out this page’s FASTA sequences (collecting only the raw sequences, not full headers):
    raw_lines = block.splitlines()
    header = None
    seq_buf = ""
    this_page_seqs = []
    for line in raw_lines:
        if line.startswith(">"):
            if header is not None and seq_buf:
                this_page_seqs.append(seq_buf)
            header = line
            seq_buf = ""
        else:
            seq_buf += line.strip()
    if header is not None and seq_buf:
        this_page_seqs.append(seq_buf)

    if not this_page_seqs:
        # No more sequences returned → break out early
        break

    accum_seqs.extend(this_page_seqs)
    offset += PER_PAGE

    # Sleep briefly (so we don’t hammer the server)
    time.sleep(0.4)

# Trim in case we overshot
accum_seqs = accum_seqs[:TOTAL_DESIRED]

# Write out ~25k sequences in FASTA format (with minimal headers)
with open(OUTPUT_FILE, "w") as f:
    for i, seq in enumerate(accum_seqs):
        f.write(f">disprot_sequence_{i+1}\n")
        f.write(seq + "\n")

print(f"✔ Fetched {len(accum_seqs)} DisProt sequences → '{OUTPUT_FILE}'")


✔ Fetched 25000 DisProt sequences → 'disprot_13000.fasta'


In [4]:
# 2.2) Verify Downloaded Sequences
with open("disprot_13000.fasta") as f:
    for _ in range(5):
        print(f.readline().rstrip())


>disprot_sequence_1
EHVIEMDVTSENGQRALKEQSSKAKIVKNRWGRNVVQISNT
>disprot_sequence_2
VYRNSRAQGGG
>disprot_sequence_3


# Sliding Window vs Global Learned Constraints

In [20]:
import numpy as np
import pandas as pd
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
import math

# --- Parameters for Classifiers ---
# For Global Feature Classifier (WINDOW_SIZE_GLOBAL_FEATURES = None means direct global calculation)
WINDOW_SIZE_GLOBAL_FEATURES = None 

# For Sliding Window Classifier
SLIDING_WINDOW_SIZE = 9  # Size of the sliding window
SLIDING_WINDOW_SLIDE_STEP = 9 # Step for sliding (equal to WINDOW_SIZE for non-overlapping)
SLIDING_WINDOW_PASS_K = 5     # Min conditions a window must meet to "pass"
MAX_UNFORGIVEN_FAILED_WINDOWS_SLIDING = 3 # Max uncancelled failed windows for protein to pass

# ─── (A) Build aa_properties (underlying single AA properties) ────────────────
kd_hydro = {
    'A':  1.8, 'R': -4.5, 'N': -3.5, 'D': -3.5, 'C':  2.5,
    'Q': -3.5, 'E': -3.5, 'G': -0.4, 'H': -3.2, 'I':  4.5,
    'L':  3.8, 'K': -3.9, 'M':  1.9, 'F':  2.8, 'P': -1.6,
    'S': -0.8, 'T': -0.7, 'W': -0.9, 'Y': -1.3, 'V':  4.2
}
charge = { # Simplified charge for H, assuming neutral pH for general calculation
    'A':  0, 'R':  1, 'N':  0, 'D': -1, 'C':  0,
    'Q':  0, 'E': -1, 'G':  0, 'H':  0, 'I':  0, 
    'L':  0, 'K':  1, 'M':  0, 'F':  0, 'P':  0,
    'S':  0, 'T':  0, 'W':  0, 'Y':  0, 'V':  0
}
h_donors = {'A':0,'R':2,'N':2,'D':0,'C':0,'Q':2,'E':0,'G':0,'H':1,'I':0,
            'L':0,'K':1,'M':0,'F':0,'P':0,'S':1,'T':1,'W':1,'Y':1,'V':0}
h_acceptors = {'A':0,'R':0,'N':2,'D':2,'C':1,'Q':2,'E':2,'G':0,'H':1,'I':0,
               'L':0,'K':0,'M':0,'F':0,'P':0,'S':1,'T':1,'W':0,'Y':1,'V':0}
flexibility = {
    'A': 0.357, 'R': 0.529, 'N': 0.463, 'D': 0.511, 'C': 0.346,
    'Q': 0.493, 'E': 0.497, 'G': 0.544, 'H': 0.323, 'I': 0.462,
    'L': 0.365, 'K': 0.466, 'M': 0.295, 'F': 0.314, 'P': 0.509,
    'S': 0.507, 'T': 0.444, 'W': 0.305, 'Y': 0.420, 'V': 0.386
}
sidechain_volume = {
    'A':  88.6, 'R': 173.4, 'N': 114.1, 'D': 111.1, 'C': 108.5, 'Q': 143.8, 
    'E': 138.4, 'G':  60.1, 'H': 153.2, 'I': 166.7, 'L': 166.7, 'K': 168.6, 
    'M': 162.9, 'F': 189.9, 'P': 112.7, 'S':  89.0, 'T': 116.1, 'W': 227.8, 
    'Y': 193.6, 'V': 140.0
}
polarity = {
    'A':  8.1, 'R': 10.5, 'N': 11.6, 'D': 13.0, 'C':  5.5, 'Q': 10.5, 
    'E': 12.3, 'G':  9.0, 'H': 10.4, 'I':  5.2, 'L':  4.9, 'K': 11.3, 
    'M':  5.7, 'F':  5.2, 'P':  8.0, 'S':  9.2, 'T':  8.6, 'W':  5.4, 
    'Y':  6.2, 'V':  5.9
}
choufa_helix = {
    'A': 1.45, 'R': 0.79, 'N': 0.73, 'D': 1.01, 'C': 0.77, 'Q': 1.17, 
    'E': 1.51, 'G': 0.53, 'H': 1.00, 'I': 1.08, 'L': 1.34, 'K': 1.07, 
    'M': 1.20, 'F': 1.12, 'P': 0.59, 'S': 0.79, 'T': 0.82, 'W': 1.14, 
    'Y': 0.61, 'V': 1.06
}
choufa_sheet = {
    'A': 0.97, 'R': 0.90, 'N': 0.65, 'D': 0.54, 'C': 1.30, 'Q': 1.23, 
    'E': 0.37, 'G': 0.75, 'H': 0.87, 'I': 1.60, 'L': 1.22, 'K': 0.74, 
    'M': 1.67, 'F': 1.28, 'P': 0.62, 'S': 0.72, 'T': 1.20, 'W': 1.19, 
    'Y': 1.29, 'V': 1.70
}
rel_ASA = {
    'A': 0.74, 'R': 1.48, 'N': 1.14, 'D': 1.23, 'C': 0.86, 'Q': 1.36, 
    'E': 1.26, 'G': 1.00, 'H': 0.91, 'I': 0.59, 'L': 0.61, 'K': 1.29, 
    'M': 0.64, 'F': 0.65, 'P': 0.71, 'S': 1.42, 'T': 1.20, 'W': 0.55, 
    'Y': 0.63, 'V': 0.54
}
beta_branched = {aa: (1 if aa in ('V','I','T') else 0) for aa in kd_hydro.keys()}

aa_properties_base = {} 
canonical_set = set(kd_hydro.keys())
for aa in canonical_set:
    aa_properties_base[aa] = {
        'hydro_norm': (kd_hydro[aa] + 4.5) / 9.0,
        'charge_val': charge[aa], 
        'h_donors': h_donors[aa],
        'h_acceptors': h_acceptors[aa],
        'flexibility': flexibility[aa],
        'volume_norm': sidechain_volume[aa] / 227.8,
        'pol_norm': (polarity[aa] - 4.9) / (13.0 - 4.9),
        'is_aromatic': 1 if aa in ('F','Y','W') else 0,
        'helix_prop': choufa_helix[aa] / 1.51,
        'sheet_prop': choufa_sheet[aa] / 1.70,
        'asa_norm': (rel_ASA[aa] - 0.54) / (1.48 - 0.54),
        'is_beta_branched': beta_branched[aa]
    }

# ─── (B) Load FASTA sequences & Store Raw Sequences with Labels ────────────────
def load_fasta_with_labels(filepath, label, filter_non_canonical=False):
    sequences_with_labels = []
    try:
        with open(filepath) as f:
            header = None; seq_content = ""
            for line in f:
                line = line.strip()
                if line.startswith(">"):
                    if header is not None and seq_content:
                        if (not filter_non_canonical) or (set(seq_content) <= canonical_set):
                            sequences_with_labels.append({'sequence': seq_content, 'label': label, 'header': header})
                    header = line; seq_content = ""
                else: seq_content += line
            if header is not None and seq_content: # Last sequence
                if (not filter_non_canonical) or (set(seq_content) <= canonical_set):
                    sequences_with_labels.append({'sequence': seq_content, 'label': label, 'header': header})
    except FileNotFoundError: print(f"Warning: File not found {filepath}.")
    return sequences_with_labels

all_sequences_data = []
all_sequences_data.extend(load_fasta_with_labels("pdb_chains.fasta", 1))
all_sequences_data.extend(load_fasta_with_labels("disprot_13000.fasta", 0))

print(f"Loaded {len([item for item in all_sequences_data if item['label'] == 1])} PDB sequences.")
print(f"Loaded {len([item for item in all_sequences_data if item['label'] == 0])} DisProt sequences.")

if not all_sequences_data:
    print("Error: No sequences loaded. Exiting."); exit()

# ─── (C) NEW Feature Computation Functions ───────────────────────────────────
def get_aa_composition(sequence_str):
    composition = {aa: 0 for aa in canonical_set}
    valid_len = 0
    for aa in sequence_str:
        if aa in canonical_set:
            composition[aa] += 1
            valid_len += 1
    if valid_len == 0: return {aa: 0.0 for aa in canonical_set}, 0
    for aa in composition: composition[aa] /= valid_len
    return composition, valid_len

def calculate_shannon_entropy(aa_composition):
    entropy = 0.0
    for aa_freq in aa_composition.values(): # Iterate over frequencies directly
        if aa_freq > 0:
            entropy -= aa_freq * math.log2(aa_freq)
    return entropy

def compute_new_seven_features(sequence_str):
    if not sequence_str: return np.zeros(7)
    composition, valid_seq_len = get_aa_composition(sequence_str)
    if valid_seq_len == 0: return np.zeros(7)

    hydro_norm_sum, flex_norm_sum, h_bond_potential_sum = 0, 0, 0
    for aa in sequence_str:
        if aa in aa_properties_base:
            props = aa_properties_base[aa]
            hydro_norm_sum += props['hydro_norm']
            flex_norm_sum += props['flexibility'] / 0.544
            h_bond_potential_sum += props['h_donors'] + props['h_acceptors']

    net_charge_prop = (composition.get('R',0) + composition.get('K',0)) - \
                      (composition.get('D',0) + composition.get('E',0))
    bulky_hydrophobics_list = ['W', 'C', 'F', 'Y', 'I', 'V', 'L']
    
    return np.array([
        hydro_norm_sum / valid_seq_len,
        flex_norm_sum / valid_seq_len,
        h_bond_potential_sum / valid_seq_len,
        abs(net_charge_prop),
        calculate_shannon_entropy(composition),
        composition.get('P', 0),
        sum(composition.get(aa, 0) for aa in bulky_hydrophobics_list)
    ])

def compute_features_for_dataset(sequence_list, window_size_param=None):
    """
    Computes the new 7 features for a list of sequence strings.
    If window_size_param is None, computes global features.
    If window_size_param is an int, computes features for each window and averages them.
    """
    all_feature_vectors = []
    for seq_str in sequence_list:
        if not seq_str: 
            all_feature_vectors.append(np.zeros(7))
            continue
        
        canonical_sequence = "".join([aa for aa in seq_str if aa in canonical_set])
        if not canonical_sequence: 
            all_feature_vectors.append(np.zeros(7))
            continue

        if window_size_param is None or len(canonical_sequence) < window_size_param:
            all_feature_vectors.append(compute_new_seven_features(canonical_sequence))
        else:
            window_derived_feature_sets = [] 
            for i in range(len(canonical_sequence) - window_size_param + 1):
                window_segment_str = canonical_sequence[i : i + window_size_param]
                window_features = compute_new_seven_features(window_segment_str)
                window_derived_feature_sets.append(window_features)
            if not window_derived_feature_sets:
                all_feature_vectors.append(compute_new_seven_features(canonical_sequence)) # Fallback
            else:
                all_feature_vectors.append(np.mean(np.vstack(window_derived_feature_sets), axis=0))
    return all_feature_vectors

# --- Prepare data for Global New Features Classifier ---
new_feature_names = [
    "hydro_norm_avg", "flex_norm_avg", "h_bond_potential_avg",
    "abs_net_charge_prop", "shannon_entropy", "freq_proline", "freq_bulky_hydrophobics"
]
print(f"\nComputing NEW GLOBAL features (WINDOW_SIZE_GLOBAL_FEATURES = {WINDOW_SIZE_GLOBAL_FEATURES})...")
raw_sequences_list = [item['sequence'] for item in all_sequences_data]
labels_list = [item['label'] for item in all_sequences_data]

global_features_calculated = compute_features_for_dataset(raw_sequences_list, window_size_param=WINDOW_SIZE_GLOBAL_FEATURES)

df_global_features = pd.DataFrame(global_features_calculated, columns=new_feature_names)
df_global_features["label"] = labels_list
print("NEW GLOBAL feature computation complete.")

if df_global_features.empty or df_global_features['label'].nunique() < 2:
    print("Error: Not enough data for global features. Exiting."); exit()
    
X_global_train, X_global_test, y_global_train, y_global_test, train_indices_global, test_indices_global = train_test_split(
    df_global_features.drop(columns=["label"]),
    df_global_features["label"],
    np.arange(len(raw_sequences_list)), 
    test_size=0.2, random_state=42, stratify=df_global_features["label"] 
)

df_train_global_features = X_global_train.copy()
df_train_global_features["label"] = y_global_train

# Raw sequences for the test set (will be used by the sliding window classifier)
test_raw_sequences_for_sliding_window = [raw_sequences_list[i] for i in test_indices_global]
y_test_for_sliding_window = y_global_test # True labels for the test set

print(f"\nGlobal Features Training set size: {len(df_train_global_features)}")
print(f"Global Features Testing set size: {len(X_global_test)}")

# --- Calculate Midpoints from Global New Features Training Data ---
if df_train_global_features["label"].nunique() < 2:
    print("\nError: Global features training set lacks class diversity for midpoints."); exit()
else:
    train_means_global = df_train_global_features.groupby("label").mean().rename(index={0:"DisProt", 1:"PDB"})
    if "PDB" not in train_means_global.index or "DisProt" not in train_means_global.index:
        print("\nError: Could not find means for both PDB and DisProt in global training data."); exit()
    else:
        midpoints_global_new_features = {col: (train_means_global.loc["PDB", col] + train_means_global.loc["DisProt", col]) / 2
                                         for col in X_global_train.columns}
        print("\nGlobal Feature Means (DisProt vs. PDB) from NEW GLOBAL FEATURES TRAINING DATA:\n")
        print(train_means_global, "\n")
        print("Chosen Midpoint Thresholds (from NEW GLOBAL FEATURES TRAINING DATA):\n")
        for feat, t in midpoints_global_new_features.items(): print(f"  {feat:18s} = {t:.3f}")
        print()

# --- Helper to count conditions met for the NEW 7 features ---
def count_conditions_for_new_feature_vector(new_feature_vector_values, midpoints_dict, train_means_for_direction):
    row = pd.Series(new_feature_vector_values, index=new_feature_names)
    conditions_met_count = 0
    
    # hydro_norm_avg: PDB typically higher
    if row["hydro_norm_avg"] >= midpoints_dict.get("hydro_norm_avg", 0.0): conditions_met_count +=1
    # flex_norm_avg: PDB typically lower
    if row["flex_norm_avg"] <= midpoints_dict.get("flex_norm_avg", float('inf')): conditions_met_count +=1
    # h_bond_potential_avg: PDB typically lower
    if row["h_bond_potential_avg"] <= midpoints_dict.get("h_bond_potential_avg", float('inf')): conditions_met_count +=1
    # abs_net_charge_prop: PDB typically lower
    if row["abs_net_charge_prop"] <= midpoints_dict.get("abs_net_charge_prop", float('inf')): conditions_met_count +=1
    # shannon_entropy: PDB typically higher (inspect means to confirm this assumption)
    if train_means_for_direction.loc["PDB", "shannon_entropy"] > train_means_for_direction.loc["DisProt", "shannon_entropy"]:
        if row["shannon_entropy"] >= midpoints_dict.get("shannon_entropy", 0.0): conditions_met_count +=1
    else: # PDB shannon_entropy is lower or equal
        if row["shannon_entropy"] <= midpoints_dict.get("shannon_entropy", float('inf')): conditions_met_count +=1
    # freq_proline: PDB typically lower
    if row["freq_proline"] <= midpoints_dict.get("freq_proline", float('inf')): conditions_met_count +=1
    # freq_bulky_hydrophobics: PDB typically higher
    if row["freq_bulky_hydrophobics"] >= midpoints_dict.get("freq_bulky_hydrophobics", 0.0): conditions_met_count +=1
    
    return conditions_met_count

# ----------------------------------------------------------------------------------
# --- 1. New Global Features Threshold-Based Classifier Evaluation ---
# ----------------------------------------------------------------------------------
print("\n\n--- Evaluating New Global Features Threshold-Based Classifier ---")
df_test_global_features_eval = X_global_test.copy()
df_test_global_features_eval["label"] = y_global_test

if df_test_global_features_eval.empty:
    print("Global features test set is empty. Skipping evaluation.")
else:
    df_test_global_features_eval["conditions_met"] = df_test_global_features_eval.apply(
        lambda r: count_conditions_for_new_feature_vector(r[new_feature_names].values, midpoints_global_new_features, train_means_global), axis=1
    )
    
    dist_test_global = df_test_global_features_eval.groupby("label")["conditions_met"].value_counts().unstack(fill_value=0).rename(index={0:"DisProt", 1:"PDB"})
    print("Distribution of ‘conditions_met’ (NEW GLOBAL features) by Label (ON TEST SET):\n")
    print(dist_test_global, "\n")

    results_global = []
    for k_thresh in range(1, 8):
        preds_test_global = (df_test_global_features_eval["conditions_met"] >= k_thresh).astype(int)
        tp = ((preds_test_global == 1) & (y_global_test == 1)).sum()
        fn = ((preds_test_global == 0) & (y_global_test == 1)).sum()
        tn = ((preds_test_global == 0) & (y_global_test == 0)).sum()
        fp = ((preds_test_global == 1) & (y_global_test == 0)).sum()
        acc = (tp + tn) / len(y_global_test) if len(y_global_test) > 0 else 0
        prec_pdb = tp / (tp + fp) if (tp + fp) > 0 else 0
        rec_pdb = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1_pdb = 2*(prec_pdb*rec_pdb)/(prec_pdb+rec_pdb) if (prec_pdb+rec_pdb)>0 else 0
        results_global.append({
            "k": k_thresh, "TP": tp, "FN": fn, "TN": tn, "FP": fp, "Accuracy": f"{acc:.2%}",
            "Precision (PDB)": f"{prec_pdb:.2%}", "Recall (PDB)": f"{rec_pdb:.2%}", "F1-score (PDB)": f"{f1_pdb:.2%}"
        })
    df_results_global = pd.DataFrame(results_global)
    print("Performance of New Global Features Classifier on TEST SET (varying k):\n")
    print(df_results_global.to_string(index=False))

    if not df_results_global.empty:
        try:
            df_results_global['F1_PDB_float'] = df_results_global['F1-score (PDB)'].str.rstrip('%').astype('float') / 100.0
            best_k_row_global = df_results_global.loc[df_results_global['F1_PDB_float'].idxmax()]
            best_k_global = int(best_k_row_global["k"])
            print(f"\n--- Detailed New Global Features Classification Report for best k = {best_k_global} (based on F1 PDB) ---")
            best_preds_global = (df_test_global_features_eval["conditions_met"] >= best_k_global).astype(int)
            print(classification_report(y_global_test, best_preds_global, target_names=["DisProt (0)", "PDB (1)"], zero_division=0))
            cm_global = confusion_matrix(y_global_test, best_preds_global)
            print("Confusion Matrix for best k (New Global Features):\n", pd.DataFrame(cm_global, index=["Actual DisProt","Actual PDB"], columns=["Pred DisProt","Pred PDB"]))
        except Exception as e: print(f"Error in detailed report for global features: {e}")

# ----------------------------------------------------------------------------------
# --- 2. Sliding Window (Larger) Classifier with Failure Cancellation ---
# ----------------------------------------------------------------------------------
print("\n\n--- Testing Sliding Window (Larger) Classifier with Failure Cancellation ---")

def classify_protein_sliding_window_cancellation_new_features(
    sequence_str, window_size, slide_step,
    midpoints_for_eval, window_k_pass_thresh, 
    max_allowed_total_failures, train_means_for_direction_check): # Added train_means

    if not sequence_str: return 1 
    canonical_sequence = "".join([aa for aa in sequence_str if aa in canonical_set])
    if not canonical_sequence or len(canonical_sequence) < window_size: return 1 

    current_consecutive_failures_streak = 0
    num_windows_processed = 0

    for i in range(0, len(canonical_sequence) - window_size + 1, slide_step):
        window_str = canonical_sequence[i : i + window_size]
        num_windows_processed += 1
        
        seven_new_features_for_current_window = compute_new_seven_features(window_str)
        num_conditions_this_window_met = count_conditions_for_new_feature_vector(
            seven_new_features_for_current_window, 
            midpoints_for_eval,
            train_means_for_direction_check # Pass train_means here
        )
        
        window_passes = (num_conditions_this_window_met >= window_k_pass_thresh)
        
        if window_passes: current_consecutive_failures_streak = 0 
        else: current_consecutive_failures_streak += 1
            
    if num_windows_processed == 0: return 0 
    total_unforgiven_failures = current_consecutive_failures_streak
    
    return 1 if total_unforgiven_failures <= max_allowed_total_failures else 0

print(f"\nApplying Sliding Window (Larger) Classifier (NEW features) with Failure Cancellation: window_size={SLIDING_WINDOW_SIZE}, slide_step={SLIDING_WINDOW_SLIDE_STEP}, window_k_pass_thresh={SLIDING_WINDOW_PASS_K}, max_total_unforgiven_failures={MAX_UNFORGIVEN_FAILED_WINDOWS_SLIDING}...")
predictions_sliding_window_test = []
if not test_raw_sequences_for_sliding_window:
    print("No raw sequences in test set for sliding window classifier.")
else:
    for raw_seq in test_raw_sequences_for_sliding_window:
        pred = classify_protein_sliding_window_cancellation_new_features(
            raw_seq, SLIDING_WINDOW_SIZE, SLIDING_WINDOW_SLIDE_STEP,
            midpoints_global_new_features, # Use midpoints from global new features training
            SLIDING_WINDOW_PASS_K,
            MAX_UNFORGIVEN_FAILED_WINDOWS_SLIDING,
            train_means_global # Pass train_means for direction check
        )
        predictions_sliding_window_test.append(pred)
    print("\nSliding Window (Larger) with Failure Cancellation classification complete.")

    if predictions_sliding_window_test:
        print("\nPerformance of Sliding Window (Larger) Classifier with Failure Cancellation (ON TEST SET):\n")
        print(classification_report(y_test_for_sliding_window, predictions_sliding_window_test, target_names=["DisProt (0)", "PDB (1)"], zero_division=0))
        cm_sliding = confusion_matrix(y_test_for_sliding_window, predictions_sliding_window_test)
        print("Confusion Matrix:\n", pd.DataFrame(cm_sliding, index=["Actual DisProt","Actual PDB"], columns=["Pred DisProt","Pred PDB"]))
        acc_sliding = (cm_sliding[0,0] + cm_sliding[1,1]) / np.sum(cm_sliding) if np.sum(cm_sliding) > 0 else 0
        print(f"Accuracy: {acc_sliding:.2%}")
    else:
        print("No predictions made by Sliding Window (Larger) classifier.")


Loaded 15000 PDB sequences.
Loaded 25000 DisProt sequences.

Computing NEW GLOBAL features (WINDOW_SIZE_GLOBAL_FEATURES = None)...
NEW GLOBAL feature computation complete.

Global Features Training set size: 32000
Global Features Testing set size: 8000

Global Feature Means (DisProt vs. PDB) from NEW GLOBAL FEATURES TRAINING DATA:

         hydro_norm_avg  flex_norm_avg  h_bond_potential_avg  \
label                                                          
DisProt        0.401099       0.837249              1.256680   
PDB            0.475068       0.806553              1.073287   

         abs_net_charge_prop  shannon_entropy  freq_proline  \
label                                                         
DisProt             0.115245         3.359850      0.069069   
PDB                 0.036236         3.700737      0.043093   

         freq_bulky_hydrophobics  
label                             
DisProt                 0.210505  
PDB                     0.312975   

Chosen Midpoin

# Validation Experiments

This section implements rigorous validation controls to test if the concept model generalizes beyond the training distribution.

## Task 1: MobiDB Independent Test Set

MobiDB provides consensus disorder predictions and experimental annotations, serving as an independent validation dataset.

In [None]:
# MobiDB Data Loader
import requests
import time

def load_mobidb_consensus(limit=1000, output_file="mobidb_test.fasta"):
    """
    Fetch consensus disorder predictions from MobiDB API.
    Uses MobiDB-lite entries with experimentally validated disorder annotations.
    
    Args:
        limit: Maximum number of sequences to fetch
        output_file: Path to save FASTA file
    
    Returns:
        List of (sequence, label) tuples where label is 0 (disordered) or 1 (ordered)
    """
    print(f"Fetching MobiDB consensus data (limit={limit})...")
    
    sequences_with_labels = []
    
    # MobiDB API endpoint for searching entries
    # We'll fetch both ordered and disordered proteins
    base_url = "https://mobidb.bio.unipd.it/api/search"
    
    # Fetch disordered proteins (with high disorder content)
    disordered_params = {
        "disorder_content_min": 0.7,  # At least 70% disordered
        "limit": limit // 2
    }
    
    failed_fetches_disordered = 0
    total_fetches_disordered = 0
    
    try:
        print("Fetching disordered proteins from MobiDB...")
        resp = requests.get(base_url, params=disordered_params, timeout=60)
        resp.raise_for_status()
        disordered_data = resp.json()
        
        for entry in disordered_data.get('data', [])[:limit//2]:
            acc = entry.get('acc')
            if acc:
                total_fetches_disordered += 1
                # Fetch sequence
                seq_url = f"https://mobidb.bio.unipd.it/api/download?acc={acc}&format=fasta"
                try:
                    seq_resp = requests.get(seq_url, timeout=30)
                    if seq_resp.status_code == 200:
                        fasta_text = seq_resp.text.strip()
                        lines = fasta_text.split('\n')
                        if len(lines) > 1:
                            sequence = ''.join(lines[1:])
                            sequences_with_labels.append((sequence, 0))  # 0 for disordered
                        else:
                            failed_fetches_disordered += 1
                    else:
                        failed_fetches_disordered += 1
                except Exception as seq_err:
                    failed_fetches_disordered += 1
                    print(f"  Warning: Failed to fetch sequence for {acc}: {seq_err}")
                time.sleep(0.2)  # Rate limiting
        
        if total_fetches_disordered > 0:
            failure_rate = failed_fetches_disordered / total_fetches_disordered
            if failure_rate > 0.3:
                print(f"  ⚠ Warning: High failure rate for disordered proteins ({failed_fetches_disordered}/{total_fetches_disordered} = {failure_rate:.1%})")
                print(f"  Dataset may be imbalanced.")
    except Exception as e:
        print(f"Warning: Error fetching disordered proteins: {e}")
    
    # Fetch ordered proteins (with low disorder content)
    ordered_params = {
        "disorder_content_max": 0.1,  # At most 10% disordered
        "limit": limit // 2
    }
    
    failed_fetches_ordered = 0
    total_fetches_ordered = 0
    
    try:
        print("Fetching ordered proteins from MobiDB...")
        resp = requests.get(base_url, params=ordered_params, timeout=60)
        resp.raise_for_status()
        ordered_data = resp.json()
        
        for entry in ordered_data.get('data', [])[:limit//2]:
            acc = entry.get('acc')
            if acc:
                total_fetches_ordered += 1
                # Fetch sequence
                seq_url = f"https://mobidb.bio.unipd.it/api/download?acc={acc}&format=fasta"
                try:
                    seq_resp = requests.get(seq_url, timeout=30)
                    if seq_resp.status_code == 200:
                        fasta_text = seq_resp.text.strip()
                        lines = fasta_text.split('\n')
                        if len(lines) > 1:
                            sequence = ''.join(lines[1:])
                            sequences_with_labels.append((sequence, 1))  # 1 for ordered
                        else:
                            failed_fetches_ordered += 1
                    else:
                        failed_fetches_ordered += 1
                except Exception as seq_err:
                    failed_fetches_ordered += 1
                    print(f"  Warning: Failed to fetch sequence for {acc}: {seq_err}")
                time.sleep(0.2)  # Rate limiting
        
        if total_fetches_ordered > 0:
            failure_rate = failed_fetches_ordered / total_fetches_ordered
            if failure_rate > 0.3:
                print(f"  ⚠ Warning: High failure rate for ordered proteins ({failed_fetches_ordered}/{total_fetches_ordered} = {failure_rate:.1%})")
                print(f"  Dataset may be imbalanced.")
    except Exception as e:
        print(f"Warning: Error fetching ordered proteins: {e}")
    
    # Save to FASTA file
    if sequences_with_labels:
        with open(output_file, 'w') as f:
            for i, (seq, label) in enumerate(sequences_with_labels):
                label_str = "ordered" if label == 1 else "disordered"
                f.write(f">mobidb_{label_str}_{i+1}\n")
                f.write(f"{seq}\n")
        print(f"✔ Saved {len(sequences_with_labels)} MobiDB sequences to '{output_file}'")
        print(f"  - Ordered: {sum(1 for s, l in sequences_with_labels if l == 1)}")
        print(f"  - Disordered: {sum(1 for s, l in sequences_with_labels if l == 0)}")
    else:
        print("Warning: No sequences fetched from MobiDB")
    
    return sequences_with_labels

# Test the MobiDB loader (commented out by default to avoid API calls)
# mobidb_data = load_mobidb_consensus(limit=100)
print("MobiDB loader function defined. Uncomment the line above to fetch data.")

## Task 2: Homology-Aware Cross-Validation

Implements sequence clustering to prevent homology leakage between train and test sets.
Uses MMseqs2 for fast sequence clustering at 30% identity threshold.

In [None]:
# Homology-Aware Cross-Validation
import subprocess
import tempfile
import os
import random
from collections import defaultdict

def install_mmseqs2():
    """
    Install MMseqs2 if not already available.
    Checks for conda availability before attempting installation.
    """
    try:
        subprocess.run(['mmseqs', 'version'], capture_output=True, check=True)
        print("MMseqs2 is already installed.")
        return True
    except (FileNotFoundError, subprocess.CalledProcessError):
        print("MMseqs2 not found. Checking for conda...")
        
        # Check if conda is available
        try:
            subprocess.run(['conda', '--version'], capture_output=True, check=True)
            print("Conda found. Installing MMseqs2 via conda...")
        except (FileNotFoundError, subprocess.CalledProcessError):
            print("Conda not available. Cannot install MMseqs2 automatically.")
            print("Falling back to simple random split.")
            return False
        
        # Try to install MMseqs2
        try:
            result = subprocess.run(
                ['conda', 'install', '-y', '-c', 'bioconda', 'mmseqs2'], 
                capture_output=True, 
                check=True,
                timeout=300  # 5 minute timeout
            )
            print("✔ MMseqs2 installed successfully.")
            return True
        except subprocess.TimeoutExpired:
            print("Warning: MMseqs2 installation timed out.")
            print("Falling back to simple random split.")
            return False
        except Exception as e:
            print(f"Warning: Could not install MMseqs2: {e}")
            print("Falling back to simple random split.")
            return False

def cluster_sequences_mmseqs2(sequences, identity_threshold=0.3):
    """
    Cluster sequences using MMseqs2.
    
    Args:
        sequences: List of (header, sequence) tuples
        identity_threshold: Sequence identity threshold (0.0-1.0)
    
    Returns:
        Dictionary mapping sequence index to cluster ID
    """
    if not sequences:
        return {}
    
    # Create temporary directory for MMseqs2
    with tempfile.TemporaryDirectory() as tmpdir:
        # Write sequences to FASTA
        fasta_path = os.path.join(tmpdir, 'sequences.fasta')
        with open(fasta_path, 'w') as f:
            for i, (header, seq) in enumerate(sequences):
                f.write(f">seq_{i}\n{seq}\n")
        
        # Run MMseqs2 clustering
        db_path = os.path.join(tmpdir, 'seqDB')
        cluster_path = os.path.join(tmpdir, 'clusterDB')
        tmp_path = os.path.join(tmpdir, 'tmp')
        os.makedirs(tmp_path, exist_ok=True)
        
        try:
            # Create sequence database
            subprocess.run(['mmseqs', 'createdb', fasta_path, db_path],
                         check=True, capture_output=True)
            
            # Cluster sequences
            subprocess.run(['mmseqs', 'cluster', db_path, cluster_path, tmp_path,
                          '--min-seq-id', str(identity_threshold), '-c', '0.8'],
                         check=True, capture_output=True)
            
            # Create TSV output
            tsv_path = os.path.join(tmpdir, 'clusters.tsv')
            subprocess.run(['mmseqs', 'createtsv', db_path, db_path, cluster_path, tsv_path],
                         check=True, capture_output=True)
            
            # Parse cluster assignments
            seq_to_cluster = {}
            with open(tsv_path, 'r') as f:
                for line in f:
                    parts = line.strip().split('\t')
                    if len(parts) >= 2:
                        representative = parts[0].replace('seq_', '')
                        member = parts[1].replace('seq_', '')
                        seq_to_cluster[int(member)] = int(representative)
            
            return seq_to_cluster
            
        except subprocess.CalledProcessError as e:
            print(f"Warning: MMseqs2 clustering failed: {e}")
            # Fallback: assign each sequence to its own cluster
            return {i: i for i in range(len(sequences))}

def homology_aware_split(sequences, labels, test_size=0.2, identity_threshold=0.3, random_state=42):
    """
    Split sequences into train/test sets ensuring sequence clusters stay together.
    
    Args:
        sequences: List of protein sequences (strings)
        labels: List of labels (0 or 1)
        test_size: Fraction of data for test set (0.0-1.0)
        identity_threshold: Sequence identity threshold for clustering
        random_state: Random seed for reproducibility
    
    Returns:
        Tuple of (train_indices, test_indices)
    """
    random.seed(random_state)
    np.random.seed(random_state)
    
    print(f"\nPerforming homology-aware split (identity threshold={identity_threshold})...")
    
    # Check if MMseqs2 is available
    mmseqs_available = install_mmseqs2()
    
    if mmseqs_available:
        # Cluster sequences
        seq_tuples = [(f"seq_{i}", seq) for i, seq in enumerate(sequences)]
        seq_to_cluster = cluster_sequences_mmseqs2(seq_tuples, identity_threshold)
    else:
        # Fallback: each sequence is its own cluster (equivalent to random split)
        print("Using fallback: simple random split without clustering.")
        seq_to_cluster = {i: i for i in range(len(sequences))}
    
    # Group sequences by cluster
    cluster_to_seqs = defaultdict(list)
    for seq_idx, cluster_id in seq_to_cluster.items():
        cluster_to_seqs[cluster_id].append(seq_idx)
    
    print(f"Found {len(cluster_to_seqs)} clusters from {len(sequences)} sequences")
    
    # Separate clusters by label for stratified splitting
    clusters_label_0 = []
    clusters_label_1 = []
    
    for cluster_id, seq_indices in cluster_to_seqs.items():
        # Determine majority label for this cluster
        cluster_labels = [labels[i] for i in seq_indices]
        majority_label = 1 if sum(cluster_labels) > len(cluster_labels) / 2 else 0
        
        if majority_label == 0:
            clusters_label_0.append(seq_indices)
        else:
            clusters_label_1.append(seq_indices)
    
    # Shuffle clusters
    random.shuffle(clusters_label_0)
    random.shuffle(clusters_label_1)
    
    # Split clusters into train/test
    n_test_0 = int(len(clusters_label_0) * test_size)
    n_test_1 = int(len(clusters_label_1) * test_size)
    
    test_clusters_0 = clusters_label_0[:n_test_0]
    train_clusters_0 = clusters_label_0[n_test_0:]
    
    test_clusters_1 = clusters_label_1[:n_test_1]
    train_clusters_1 = clusters_label_1[n_test_1:]
    
    # Flatten clusters to get indices
    train_indices = []
    for cluster in train_clusters_0 + train_clusters_1:
        train_indices.extend(cluster)
    
    test_indices = []
    for cluster in test_clusters_0 + test_clusters_1:
        test_indices.extend(cluster)
    
    print(f"Split complete: {len(train_indices)} train, {len(test_indices)} test")
    print(f"Train labels: {sum(labels[i] for i in train_indices)} folded, "
          f"{len(train_indices) - sum(labels[i] for i in train_indices)} disordered")
    print(f"Test labels: {sum(labels[i] for i in test_indices)} folded, "
          f"{len(test_indices) - sum(labels[i] for i in test_indices)} disordered")
    
    return train_indices, test_indices

print("Homology-aware split functions defined.")

## Task 3: Label-Shuffle Control Experiment

Tests for data leakage and overfitting by randomly permuting labels.
Expected result: Performance should drop to ~50% (random chance) if the model is not overfitting.

In [None]:
# Label-Shuffle Control Experiment
import random
from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def run_label_shuffle_control(sequences_data, n_iterations=5, random_state=42):
    """
    Run classification experiment with shuffled labels as a control.
    
    Args:
        sequences_data: List of dicts with 'sequence' and 'label' keys
        n_iterations: Number of shuffle iterations to average
        random_state: Random seed for reproducibility
    
    Returns:
        Dictionary with average performance metrics
    """
    print(f"\nRunning label-shuffle control experiment ({n_iterations} iterations)...")
    print("Expected result: ~50% accuracy if model is not overfitting.\n")
    
    all_accuracies = []
    all_precisions = []
    all_recalls = []
    all_f1s = []
    
    for iteration in range(n_iterations):
        # Set random seed for this iteration
        iter_seed = random_state + iteration
        random.seed(iter_seed)
        np.random.seed(iter_seed)
        
        # Create shuffled labels
        original_labels = [item['label'] for item in sequences_data]
        shuffled_labels = original_labels.copy()
        random.shuffle(shuffled_labels)
        
        # Create shuffled dataset
        shuffled_data = []
        for i, item in enumerate(sequences_data):
            shuffled_data.append({
                'sequence': item['sequence'],
                'label': shuffled_labels[i],
                'header': item.get('header', f'seq_{i}')
            })
        
        # Extract sequences and labels
        sequences_list = [item['sequence'] for item in shuffled_data]
        labels_list = [item['label'] for item in shuffled_data]
        
        # Compute features
        features = compute_features_for_dataset(sequences_list, window_size_param=None)
        df_features = pd.DataFrame(features, columns=new_feature_names)
        df_features['label'] = labels_list
        
        # Train/test split
        X_train, X_test, y_train, y_test = train_test_split(
            df_features.drop(columns=['label']),
            df_features['label'],
            test_size=0.2,
            random_state=iter_seed,
            stratify=df_features['label']
        )
        
        # Compute midpoints from training data
        train_label_0 = X_train[y_train == 0]
        train_label_1 = X_train[y_train == 1]
        
        midpoints = []
        train_means_label_0 = train_label_0.mean()
        train_means_label_1 = train_label_1.mean()
        
        for feat in new_feature_names:
            midpoint = (train_means_label_0[feat] + train_means_label_1[feat]) / 2
            midpoints.append(midpoint)
        
        # Classify using feature count method (using helper from validation runner)
        # Note: This assumes CLASSIFICATION_THRESHOLD and classify_using_feature_count 
        # are defined in the validation runner cell above
        predictions = classify_using_feature_count(
            X_test, new_feature_names, midpoints,
            train_means_label_0, train_means_label_1,
            threshold=5  # Use best threshold (K=5)
        )
        
        # Calculate metrics
        accuracy = accuracy_score(y_test, predictions)
        precision, recall, f1, _ = precision_recall_fscore_support(
            y_test, predictions, average='binary', zero_division=0
        )
        
        all_accuracies.append(accuracy)
        all_precisions.append(precision)
        all_recalls.append(recall)
        all_f1s.append(f1)
        
        print(f"  Iteration {iteration+1}: Accuracy = {accuracy:.4f}, F1 = {f1:.4f}")
    
    # Calculate average metrics
    results = {
        'accuracy': np.mean(all_accuracies),
        'accuracy_std': np.std(all_accuracies),
        'precision': np.mean(all_precisions),
        'precision_std': np.std(all_precisions),
        'recall': np.mean(all_recalls),
        'recall_std': np.std(all_recalls),
        'f1': np.mean(all_f1s),
        'f1_std': np.std(all_f1s)
    }
    
    print(f"\nLabel-Shuffle Control Results (averaged over {n_iterations} iterations):")
    print(f"  Accuracy: {results['accuracy']:.4f} ± {results['accuracy_std']:.4f}")
    print(f"  Precision: {results['precision']:.4f} ± {results['precision_std']:.4f}")
    print(f"  Recall: {results['recall']:.4f} ± {results['recall_std']:.4f}")
    print(f"  F1-score: {results['f1']:.4f} ± {results['f1_std']:.4f}")
    
    if results['accuracy'] < 0.6:
        print("\n✓ PASS: Accuracy dropped to near-random levels, indicating no data leakage.")
    else:
        print("\n⚠ WARNING: Accuracy remains high with shuffled labels, suggesting possible data leakage or overfitting.")
    
    return results

# Run the control experiment (commented out by default)
# shuffle_results = run_label_shuffle_control(all_sequences_data, n_iterations=5)
print("Label-shuffle control function defined. Uncomment the line above to run the experiment.")

## Task 5: Statistical Significance Testing

Implements bootstrap confidence intervals and McNemar's test to assess statistical significance of results.

In [None]:
# Statistical Significance Testing
from scipy.stats import chi2

def bootstrap_confidence_interval(y_true, y_pred, n_bootstrap=1000, confidence_level=0.95, random_state=42):
    """
    Compute bootstrap confidence interval for accuracy.
    
    Args:
        y_true: True labels
        y_pred: Predicted labels
        n_bootstrap: Number of bootstrap samples
        confidence_level: Confidence level (e.g., 0.95 for 95% CI)
        random_state: Random seed
    
    Returns:
        Dictionary with mean accuracy and confidence interval
    """
    np.random.seed(random_state)
    
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    n_samples = len(y_true)
    
    bootstrap_accuracies = []
    
    for i in range(n_bootstrap):
        # Sample with replacement
        indices = np.random.choice(n_samples, size=n_samples, replace=True)
        bootstrap_accuracy = accuracy_score(y_true[indices], y_pred[indices])
        bootstrap_accuracies.append(bootstrap_accuracy)
    
    bootstrap_accuracies = np.array(bootstrap_accuracies)
    
    # Calculate confidence interval
    alpha = 1 - confidence_level
    lower_percentile = (alpha / 2) * 100
    upper_percentile = (1 - alpha / 2) * 100
    
    ci_lower = np.percentile(bootstrap_accuracies, lower_percentile)
    ci_upper = np.percentile(bootstrap_accuracies, upper_percentile)
    
    return {
        'mean': np.mean(bootstrap_accuracies),
        'std': np.std(bootstrap_accuracies),
        'ci_lower': ci_lower,
        'ci_upper': ci_upper,
        'confidence_level': confidence_level
    }

def mcnemar_test(y_true, predictions1, predictions2):
    """
    Perform McNemar's test to compare two classifiers.
    
    Args:
        y_true: True labels
        predictions1: Predictions from first classifier
        predictions2: Predictions from second classifier
    
    Returns:
        Dictionary with test statistic and p-value
    """
    y_true = np.array(y_true)
    predictions1 = np.array(predictions1)
    predictions2 = np.array(predictions2)
    
    # Create contingency table
    # n01: classifier 1 wrong, classifier 2 correct
    # n10: classifier 1 correct, classifier 2 wrong
    correct1 = (predictions1 == y_true)
    correct2 = (predictions2 == y_true)
    
    n01 = np.sum(~correct1 & correct2)
    n10 = np.sum(correct1 & ~correct2)
    
    # McNemar's test statistic with continuity correction
    if n01 + n10 == 0:
        return {
            'statistic': 0.0,
            'p_value': 1.0,
            'n01': n01,
            'n10': n10,
            'interpretation': 'Classifiers are identical'
        }
    
    statistic = (abs(n01 - n10) - 1) ** 2 / (n01 + n10)
    p_value = 1 - chi2.cdf(statistic, df=1)
    
    # Interpretation
    if p_value < 0.05:
        interpretation = 'Classifiers are significantly different (p < 0.05)'
    else:
        interpretation = 'No significant difference between classifiers (p >= 0.05)'
    
    return {
        'statistic': statistic,
        'p_value': p_value,
        'n01': n01,
        'n10': n10,
        'interpretation': interpretation
    }

def compute_statistical_significance(predictions1, predictions2, labels, 
                                     method1_name="Method 1", method2_name="Method 2",
                                     n_bootstrap=1000):
    """
    Compute statistical significance between two sets of predictions.
    
    Args:
        predictions1: Predictions from first method
        predictions2: Predictions from second method
        labels: True labels
        method1_name: Name of first method
        method2_name: Name of second method
        n_bootstrap: Number of bootstrap samples
    
    Returns:
        Dictionary with all statistical test results
    """
    print(f"\nComputing statistical significance: {method1_name} vs {method2_name}")
    print("=" * 70)
    
    # Bootstrap CI for method 1
    print(f"\nBootstrap confidence intervals ({n_bootstrap} samples):")
    ci1 = bootstrap_confidence_interval(labels, predictions1, n_bootstrap=n_bootstrap)
    print(f"  {method1_name}:")
    print(f"    Accuracy: {ci1['mean']:.4f} ± {ci1['std']:.4f}")
    print(f"    95% CI: [{ci1['ci_lower']:.4f}, {ci1['ci_upper']:.4f}]")
    
    # Bootstrap CI for method 2
    ci2 = bootstrap_confidence_interval(labels, predictions2, n_bootstrap=n_bootstrap)
    print(f"  {method2_name}:")
    print(f"    Accuracy: {ci2['mean']:.4f} ± {ci2['std']:.4f}")
    print(f"    95% CI: [{ci2['ci_lower']:.4f}, {ci2['ci_upper']:.4f}]")
    
    # McNemar's test
    print(f"\nMcNemar's test:")
    mcnemar_result = mcnemar_test(labels, predictions1, predictions2)
    print(f"  Test statistic: {mcnemar_result['statistic']:.4f}")
    print(f"  P-value: {mcnemar_result['p_value']:.4f}")
    print(f"  {mcnemar_result['interpretation']}")
    print(f"  Contingency: {method1_name} wrong & {method2_name} correct = {mcnemar_result['n01']}")
    print(f"               {method1_name} correct & {method2_name} wrong = {mcnemar_result['n10']}")
    
    return {
        'method1_ci': ci1,
        'method2_ci': ci2,
        'mcnemar': mcnemar_result
    }

print("Statistical significance functions defined.")

## Task 4: Performance Comparison and Comprehensive Validation

Run all validation experiments and create a comprehensive comparison table.

In [None]:
# Comprehensive Validation Runner

# Classification threshold constant
CLASSIFICATION_THRESHOLD = 5  # Number of features that must meet the "folded" condition

def classify_using_feature_count(X_test, new_feature_names, midpoints, train_means_label_0, train_means_label_1, threshold=CLASSIFICATION_THRESHOLD):
    """
    Classify sequences using feature count method.
    
    Args:
        X_test: Test features DataFrame
        new_feature_names: List of feature names
        midpoints: List of midpoint values for each feature
        train_means_label_0: Mean values for label 0 (disordered)
        train_means_label_1: Mean values for label 1 (ordered)
        threshold: Number of features that must meet conditions for "folded" classification
    
    Returns:
        List of predictions (0 or 1)
    """
    predictions = []
    for _, test_row in X_test.iterrows():
        satisfied_count = 0
        for feat_idx, feat_name in enumerate(new_feature_names):
            feat_val = test_row[feat_name]
            midpoint_val = midpoints[feat_idx]
            
            # Determine direction based on which class has higher mean
            if train_means_label_1[feat_name] >= train_means_label_0[feat_name]:
                if feat_val >= midpoint_val:
                    satisfied_count += 1
            else:
                if feat_val <= midpoint_val:
                    satisfied_count += 1
        
        predictions.append(1 if satisfied_count >= threshold else 0)
    
    return predictions

def run_classification_experiment(X_train, X_test, y_train, y_test, experiment_name="Experiment"):
    """
    Run the classification experiment with given train/test split.
    
    Args:
        X_train: Training features
        X_test: Test features
        y_train: Training labels
        y_test: Test labels
        experiment_name: Name for this experiment
    
    Returns:
        Dictionary with predictions and metrics
    """
    # Compute midpoints from training data
    train_label_0 = X_train[y_train == 0]
    train_label_1 = X_train[y_train == 1]
    
    if len(train_label_0) == 0 or len(train_label_1) == 0:
        print(f"Warning: {experiment_name} - Insufficient data in one class")
        return None
    
    midpoints = []
    train_means_label_0 = train_label_0.mean()
    train_means_label_1 = train_label_1.mean()
    
    for feat in new_feature_names:
        midpoint = (train_means_label_0[feat] + train_means_label_1[feat]) / 2
        midpoints.append(midpoint)
    
    # Use the helper function for classification
    predictions = classify_using_feature_count(
        X_test, new_feature_names, midpoints, 
        train_means_label_0, train_means_label_1
    )
    
    # Calculate metrics
    accuracy = accuracy_score(y_test, predictions)
    precision, recall, f1, _ = precision_recall_fscore_support(
        y_test, predictions, average='binary', zero_division=0
    )
    
    return {
        'predictions': predictions,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

def run_all_validation_experiments():
    """
    Run all validation experiments and create comparison table.
    
    Returns:
        DataFrame with comparison results
    """
    print("\n" + "="*80)
    print("COMPREHENSIVE VALIDATION EXPERIMENTS")
    print("="*80)
    
    results_list = []
    
    # Prepare data
    sequences_list = [item['sequence'] for item in all_sequences_data]
    labels_list = [item['label'] for item in all_sequences_data]
    
    # Compute features for all sequences
    print("\nComputing features for all sequences...")
    all_features = compute_features_for_dataset(sequences_list, window_size_param=None)
    df_all = pd.DataFrame(all_features, columns=new_feature_names)
    df_all['label'] = labels_list
    
    # Experiment 1: Original random split
    print("\n[1/4] Running original random split experiment...")
    X_train_orig, X_test_orig, y_train_orig, y_test_orig = train_test_split(
        df_all.drop(columns=['label']),
        df_all['label'],
        test_size=0.2,
        random_state=42,
        stratify=df_all['label']
    )
    
    result_orig = run_classification_experiment(
        X_train_orig, X_test_orig, y_train_orig, y_test_orig,
        "Original Split"
    )
    
    if result_orig:
        results_list.append({
            'Dataset': 'Original PDB/DisProt Split',
            'Accuracy': f"{result_orig['accuracy']:.4f}",
            'Precision': f"{result_orig['precision']:.4f}",
            'Recall': f"{result_orig['recall']:.4f}",
            'F1-Score': f"{result_orig['f1']:.4f}"
        })
        print(f"  Accuracy: {result_orig['accuracy']:.4f}, F1: {result_orig['f1']:.4f}")
    
    # Experiment 2: Homology-aware split
    print("\n[2/4] Running homology-aware split experiment...")
    try:
        train_indices, test_indices = homology_aware_split(
            sequences_list, labels_list,
            test_size=0.2,
            identity_threshold=0.3,
            random_state=42
        )
        
        X_train_homology = df_all.drop(columns=['label']).iloc[train_indices]
        X_test_homology = df_all.drop(columns=['label']).iloc[test_indices]
        y_train_homology = df_all['label'].iloc[train_indices]
        y_test_homology = df_all['label'].iloc[test_indices]
        
        result_homology = run_classification_experiment(
            X_train_homology, X_test_homology, y_train_homology, y_test_homology,
            "Homology-Aware Split"
        )
        
        if result_homology:
            results_list.append({
                'Dataset': 'Homology-Aware Split (30% identity)',
                'Accuracy': f"{result_homology['accuracy']:.4f}",
                'Precision': f"{result_homology['precision']:.4f}",
                'Recall': f"{result_homology['recall']:.4f}",
                'F1-Score': f"{result_homology['f1']:.4f}"
            })
            print(f"  Accuracy: {result_homology['accuracy']:.4f}, F1: {result_homology['f1']:.4f}")
            
            # Statistical comparison using bootstrap confidence intervals
            # Since test sets differ, we can't use paired tests like McNemar's.
            # Instead, we compare accuracy distributions using bootstrap CIs.
            print("\n  Computing bootstrap confidence intervals for comparison...")
            
            ci_orig = bootstrap_confidence_interval(
                y_test_orig.tolist() if hasattr(y_test_orig, 'tolist') else y_test_orig,
                result_orig['predictions'],
                n_bootstrap=1000
            )
            ci_homology = bootstrap_confidence_interval(
                y_test_homology.tolist() if hasattr(y_test_homology, 'tolist') else y_test_homology,
                result_homology['predictions'],
                n_bootstrap=1000
            )
            
            print(f"  Original Split - Accuracy: {ci_orig['mean']:.4f} [{ci_orig['ci_lower']:.4f}, {ci_orig['ci_upper']:.4f}]")
            print(f"  Homology-Aware - Accuracy: {ci_homology['mean']:.4f} [{ci_homology['ci_lower']:.4f}, {ci_homology['ci_upper']:.4f}]")
            
            # Check if confidence intervals overlap
            if ci_orig['ci_lower'] > ci_homology['ci_upper']:
                print(f"  → Original split significantly better (non-overlapping CIs)")
            elif ci_homology['ci_lower'] > ci_orig['ci_upper']:
                print(f"  → Homology-aware split significantly better (non-overlapping CIs)")
            else:
                print(f"  → No significant difference (overlapping CIs)")
    except Exception as e:
        print(f"  Error in homology-aware split: {e}")
        results_list.append({
            'Dataset': 'Homology-Aware Split (30% identity)',
            'Accuracy': 'Error',
            'Precision': 'Error',
            'Recall': 'Error',
            'F1-Score': 'Error'
        })
    
    # Experiment 3: Label-shuffle control
    print("\n[3/4] Running label-shuffle control experiment...")
    try:
        shuffle_results = run_label_shuffle_control(all_sequences_data, n_iterations=5)
        results_list.append({
            'Dataset': 'Label-Shuffle Control',
            'Accuracy': f"{shuffle_results['accuracy']:.4f} ± {shuffle_results['accuracy_std']:.4f}",
            'Precision': f"{shuffle_results['precision']:.4f} ± {shuffle_results['precision_std']:.4f}",
            'Recall': f"{shuffle_results['recall']:.4f} ± {shuffle_results['recall_std']:.4f}",
            'F1-Score': f"{shuffle_results['f1']:.4f} ± {shuffle_results['f1_std']:.4f}"
        })
    except Exception as e:
        print(f"  Error in label-shuffle: {e}")
        results_list.append({
            'Dataset': 'Label-Shuffle Control',
            'Accuracy': 'Error',
            'Precision': 'Error',
            'Recall': 'Error',
            'F1-Score': 'Error'
        })
    
    # Experiment 4: MobiDB independent test (placeholder - requires API call)
    print("\n[4/4] MobiDB independent test (skipped - requires API call)")
    print("  To run: Uncomment load_mobidb_consensus() call above and add test code here")
    results_list.append({
        'Dataset': 'MobiDB Independent Test',
        'Accuracy': 'Not run',
        'Precision': 'Not run',
        'Recall': 'Not run',
        'F1-Score': 'Not run'
    })
    
    # Create comparison DataFrame
    df_comparison = pd.DataFrame(results_list)
    
    return df_comparison

# Run all experiments (commented out by default)
# comparison_results = run_all_validation_experiments()
# print("\n" + "="*80)
# print("PERFORMANCE COMPARISON TABLE")
# print("="*80)
# print(comparison_results.to_markdown(index=False))
print("Comprehensive validation functions defined. Uncomment the lines above to run all experiments.")

## Results Interpretation

### Expected Outcomes

If the concept model truly captures biophysical folding principles:

1. **Homology-aware CV should maintain >75% accuracy**
   - This demonstrates the model generalizes beyond memorizing similar sequences
   - Any significant drop suggests sequence homology leakage in original split

2. **MobiDB independent test should show >70% accuracy**
   - MobiDB provides consensus predictions from multiple methods
   - Good performance here validates cross-dataset generalization
   - Lower performance may indicate PDB/DisProt-specific biases

3. **Label-shuffle control should drop to ~50% accuracy**
   - Random labels should yield random-chance performance
   - High accuracy with shuffled labels indicates data leakage or feature artifacts
   - This is a critical sanity check for any ML model

4. **Statistical tests should show significant differences**
   - Bootstrap confidence intervals quantify uncertainty
   - McNemar's test compares different split strategies
   - p < 0.05 indicates statistically significant differences

### Interpretation Guidelines

**✓ Model is production-ready if:**
- Homology-aware accuracy > 75%
- MobiDB accuracy > 70%
- Label-shuffle accuracy ~ 50%
- Narrow bootstrap confidence intervals

**⚠ Model needs refinement if:**
- Large accuracy drop in homology-aware split (>15% drop)
- Poor MobiDB performance (<60% accuracy)
- Label-shuffle accuracy > 60%
- Wide bootstrap confidence intervals

**Possible failure modes:**
- **Dataset-specific biases**: Model captures PDB/DisProt artifacts rather than true folding propensity
- **Sequence homology leakage**: Training and test sets contain similar sequences
- **Feature artifacts**: Features correlate with dataset origin rather than disorder
- **Overfitting**: Model memorizes training data rather than learning general patterns

### Next Steps

Based on validation results:
1. If successful: Deploy concept model API with confidence
2. If marginal: Investigate feature engineering and dataset curation
3. If failed: Reconsider concept model approach or add ML components

---

*Note: To run all experiments, uncomment the function calls in the cells above.*
*Some experiments (MobiDB fetch) may require significant time due to API rate limits.*