In [1]:
import pandas as pd
import numpy as np
import ast
import glob
import os
from pathlib import Path

# ============================================================================
# CONFIGURATION: Define paths here
# ============================================================================
TRAIN_DIR = "AF_DATA/Train"
TEST_DIR = "AF_DATA/Test"

# ============================================================================
# Helper Functions
# ============================================================================

def circular_variance(angles):
    """
    Calculate circular variance for angles (1 - R).
    R is the mean resultant length after converting angles to unit vectors.
    """
    if len(angles) == 0:
        return np.nan
    
    # Convert angles to radians
    angles_rad = np.deg2rad(angles)
    
    # Convert to unit vectors
    x = np.cos(angles_rad)
    y = np.sin(angles_rad)
    
    # Calculate mean resultant length R
    R = np.sqrt(np.mean(x)**2 + np.mean(y)**2)
    
    # Circular variance = 1 - R
    return 1 - R

def parse_contact_map(contact_map_str):
    """
    Parse contact map string representation safely.
    Handles both integer and string keys.
    """
    try:
        # Try to parse as dictionary
        contact_dict = ast.literal_eval(contact_map_str)
        
        # Convert keys to integers if they're strings
        if isinstance(contact_dict, dict):
            parsed_dict = {}
            for key, value in contact_dict.items():
                try:
                    int_key = int(key)
                    parsed_dict[int_key] = value
                except (ValueError, TypeError):
                    parsed_dict[key] = value
            return parsed_dict
        return contact_dict
    except (ValueError, SyntaxError) as e:
        return None

def extract_contact_features(contact_map_str, num_residues):
    """
    Extract contact map features:
    - Avg_Degree: Mean number of contacts per residue
    - Avg_Seq_Sep: Average sequence separation for all unique contacts
    - Frac_Long_Range: Fraction of contacts with sequence separation >= 5
    """
    contact_dict = parse_contact_map(contact_map_str)
    
    if contact_dict is None:
        return np.nan, np.nan, np.nan
    
    # Collect all unique contacts (i, j) where i < j
    contacts = set()
    degrees = np.zeros(num_residues)
    
    for i, contact_list in contact_dict.items():
        try:
            i = int(i)
            if i >= num_residues:
                continue
            
            # Count contacts for residue i
            if isinstance(contact_list, list):
                for j, has_contact in enumerate(contact_list):
                    if has_contact == 1 and j < num_residues:
                        if i < j:
                            contacts.add((i, j))
                        degrees[i] += 1
        except (ValueError, TypeError, IndexError):
            continue
    
    # Calculate Avg_Degree
    avg_degree = np.mean(degrees) if len(degrees) > 0 else 0.0
    
    # Calculate Avg_Seq_Sep and Frac_Long_Range
    if len(contacts) == 0:
        return avg_degree, 0.0, 0.0
    
    seq_separations = [abs(j - i) for i, j in contacts]
    avg_seq_sep = np.mean(seq_separations)
    frac_long_range = np.mean([sep >= 5 for sep in seq_separations])
    
    return avg_degree, avg_seq_sep, frac_long_range

def process_single_file(filepath):
    """
    Process a single CSV file and extract all 8 features.
    Returns a dictionary with features or None if processing fails.
    """
    try:
        # Read CSV file
        df = pd.read_csv(filepath)
        
        # Check if file is empty
        if df.empty:
            print(f"Warning: Empty file {filepath}")
            return None
        
        # Get protein name from filename
        protein_name = Path(filepath).stem
        
        # ====================================================================
        # 1. Confidence Features
        # ====================================================================
        plddt_values = df['pLDDT'].dropna()
        if len(plddt_values) == 0:
            print(f"Warning: No pLDDT values in {filepath}")
            return None
        
        plddt_mean = plddt_values.mean()
        plddt_std = plddt_values.std()
        
        # ====================================================================
        # 2. Secondary Structure Features
        # ====================================================================
        phi_values = df['phi'].dropna()
        psi_values = df['psi'].dropna()
        
        # Get valid indices where both phi and psi are available
        valid_indices = df.index[df['phi'].notna() & df['psi'].notna()]
        
        if len(valid_indices) == 0:
            print(f"Warning: No valid phi/psi angles in {filepath}")
            return None
        
        # Helix: phi in [-90, -30] and psi in [-80, -10]
        helix_mask = (
            (df.loc[valid_indices, 'phi'] >= -90) & 
            (df.loc[valid_indices, 'phi'] <= -30) &
            (df.loc[valid_indices, 'psi'] >= -80) & 
            (df.loc[valid_indices, 'psi'] <= -10)
        )
        frac_helix = helix_mask.sum() / len(valid_indices)
        
        # Sheet: phi in [-180, -90] and psi in [90, 180]
        sheet_mask = (
            (df.loc[valid_indices, 'phi'] >= -180) & 
            (df.loc[valid_indices, 'phi'] <= -90) &
            (df.loc[valid_indices, 'psi'] >= 90) & 
            (df.loc[valid_indices, 'psi'] <= 180)
        )
        frac_sheet = sheet_mask.sum() / len(valid_indices)
        
        # ====================================================================
        # 3. Rigidity Features
        # ====================================================================
        phi_circ_var = circular_variance(phi_values.values)
        psi_circ_var = circular_variance(psi_values.values)
        
        # Backbone_Rigidity = average of phi and psi circular variance
        if np.isnan(phi_circ_var) or np.isnan(psi_circ_var):
            backbone_rigidity = np.nan
        else:
            backbone_rigidity = (phi_circ_var + psi_circ_var) / 2.0
        
        # ====================================================================
        # 4. Topology Features (Contact Map)
        # ====================================================================
        # Get contact map from first row (should be same for all rows)
        contact_map_str = df['contact_map'].iloc[0]
        num_residues = len(df)
        
        avg_degree, avg_seq_sep, frac_long_range = extract_contact_features(
            contact_map_str, num_residues
        )
        
        # ====================================================================
        # Compile results
        # ====================================================================
        result = {
            'protein_name': protein_name,
            'pLDDT_mean': plddt_mean,
            'pLDDT_std': plddt_std,
            'Frac_Helix': frac_helix,
            'Frac_Sheet': frac_sheet,
            'Backbone_Rigidity': backbone_rigidity,
            'Avg_Degree': avg_degree,
            'Avg_Seq_Sep': avg_seq_sep,
            'Frac_Long_Range': frac_long_range
        }
        
        return result
        
    except Exception as e:
        print(f"Error processing {filepath}: {str(e)}")
        return None

def process_directory(directory_path):
    """
    Process all CSV files in a directory.
    Returns a DataFrame with all extracted features.
    """
    csv_files = glob.glob(os.path.join(directory_path, "*.csv"))
    
    if len(csv_files) == 0:
        print(f"Warning: No CSV files found in {directory_path}")
        return pd.DataFrame()
    
    results = []
    for csv_file in csv_files:
        result = process_single_file(csv_file)
        if result is not None:
            results.append(result)
        else:
            print(f"Skipped {csv_file}")
    
    if len(results) == 0:
        return pd.DataFrame()
    
    return pd.DataFrame(results)

# ============================================================================
# Main Processing
# ============================================================================

print("Processing training data...")
train_df = process_directory(TRAIN_DIR)
print(f"Processed {len(train_df)} training files")

print("\nProcessing test data...")
test_df = process_directory(TEST_DIR)
print(f"Processed {len(test_df)} test files")

# Save results
if len(train_df) > 0:
    train_df.to_csv("train_structural_features.csv", index=False)
    print(f"\nSaved training features to train_structural_features.csv")
else:
    print("\nWarning: No training data to save")

if len(test_df) > 0:
    test_df.to_csv("test_structural_features.csv", index=False)
    print(f"Saved test features to test_structural_features.csv")
else:
    print("Warning: No test data to save")

print("\nProcessing complete!")


Processing training data...
Skipped AF_DATA/Train/9835.result.csv
Skipped AF_DATA/Train/21712.result.csv
Skipped AF_DATA/Train/9838.result.csv
Skipped AF_DATA/Train/7791.result.csv
Skipped AF_DATA/Train/18865.result.csv
Skipped AF_DATA/Train/7794.result.csv
Skipped AF_DATA/Train/18871.result.csv
Skipped AF_DATA/Train/7785.result.csv
Skipped AF_DATA/Train/20466.result.csv
Skipped AF_DATA/Train/18867.result.csv
Skipped AF_DATA/Train/7131.result.csv
Skipped AF_DATA/Train/9826.result.csv
Skipped AF_DATA/Train/9823.result.csv
Skipped AF_DATA/Train/7775.result.csv
Skipped AF_DATA/Train/7134.result.csv
Skipped AF_DATA/Train/18855.result.csv
Skipped AF_DATA/Train/9841.result.csv
Skipped AF_DATA/Train/6741.result.csv
Skipped AF_DATA/Train/7796.result.csv
Skipped AF_DATA/Train/19146.result.csv
Skipped AF_DATA/Train/18858.result.csv
Skipped AF_DATA/Train/6796.result.csv
Skipped AF_DATA/Train/7776.result.csv
Skipped AF_DATA/Train/18861.result.csv
Skipped AF_DATA/Train/18870.result.csv
Skipped AF_D