In [28]:
import pandas as pd
import numpy as np
from scipy.stats import pearsonr, spearmanr, f_oneway
from sklearn.feature_selection import mutual_info_regression
from sklearn.preprocessing import LabelEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler

In [29]:
def correlation_ratio(categories, values):
    """Compute correlation ratio for categorical-continuous association"""
    categories = np.array(categories)
    values = np.array(values)
    f_cat, _ = pd.factorize(categories)
    cat_num = np.max(f_cat) + 1
    y_avg_array = np.zeros(cat_num)
    n_array = np.zeros(cat_num)
    
    for i in range(cat_num):
        cat_measures = values[np.argwhere(f_cat == i).flatten()]
        n_array[i] = len(cat_measures)
        y_avg_array[i] = np.mean(cat_measures)
    
    y_total_avg = np.sum(np.multiply(y_avg_array, n_array)) / np.sum(n_array)
    numerator = np.sum(np.multiply(n_array, np.power(np.subtract(y_avg_array, y_total_avg), 2)))
    denominator = np.sum(np.power(np.subtract(values, y_total_avg), 2))
    
    if denominator == 0:
        return 0.0
    return np.sqrt(numerator / denominator)

In [30]:
def correlation_ratio(categories, values):
    """Compute correlation ratio for categorical-continuous association"""
    categories = np.array(categories)
    values = np.array(values)
    f_cat, _ = pd.factorize(categories)
    cat_num = np.max(f_cat) + 1
    y_avg_array = np.zeros(cat_num)
    n_array = np.zeros(cat_num)
    
    for i in range(cat_num):
        cat_measures = values[np.argwhere(f_cat == i).flatten()]
        n_array[i] = len(cat_measures)
        y_avg_array[i] = np.mean(cat_measures)
    
    y_total_avg = np.sum(np.multiply(y_avg_array, n_array)) / np.sum(n_array)
    numerator = np.sum(np.multiply(n_array, np.power(np.subtract(y_avg_array, y_total_avg), 2)))
    denominator = np.sum(np.power(np.subtract(values, y_total_avg), 2))
    
    if denominator == 0:
        return 0.0
    return np.sqrt(numerator / denominator)

In [31]:
def multi_correlation_fs_regression(
    X,  # DataFrame of features (mixed types)
    y,  # Target Series
    metrics=['pearson', 'mi'],  # Metrics to use
    redundancy_threshold=0.8,   # Max allowed correlation between features
    top_k=None,                 # Max features to select
    random_state=None,          # Seed for reproducibility
    categorical_features=None   # List of categorical feature names
):
    """
    Feature selection for regression handling both numerical and categorical features
    Returns selected feature names and their scores
    """
    # Identify categorical features if not provided
    if categorical_features is None:
        categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
    numerical_features = [col for col in X.columns if col not in categorical_features]
    
    # Preprocessing pipeline for numerical features
    num_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ])
    
    # Preprocessing pipeline for categorical features
    cat_pipeline = Pipeline([
        ('imputer', SimpleImputer(strategy='most_frequent')),
        ('encoder', OneHotEncoder(handle_unknown='ignore', sparse_output=False))
    ])
    
    # Full preprocessing
    preprocessor = ColumnTransformer([
        ('num', num_pipeline, numerical_features),
        ('cat', cat_pipeline, categorical_features)
    ])
    
    X_processed = preprocessor.fit_transform(X)
    X_processed = pd.DataFrame(X_processed, 
                               columns=(numerical_features + 
                                        list(preprocessor.named_transformers_['cat']
                                             .named_steps['encoder']
                                             .get_feature_names_out(categorical_features))))
    
    # Initialize scores DataFrame
    scores = pd.DataFrame(index=X.columns)
    abs_scores = pd.DataFrame(index=X.columns)
    
    # Compute relevance metrics for each feature
    for col in X.columns:
        if col in numerical_features:
            # Numerical feature metrics
            if 'pearson' in metrics:
                r, _ = pearsonr(X[col].fillna(X[col].median()), y)
                scores.loc[col, 'pearson'] = r
                abs_scores.loc[col, 'pearson'] = abs(r)
                
            if 'spearman' in metrics:
                r, _ = spearmanr(X[col].fillna(X[col].median()), y)
                scores.loc[col, 'spearman'] = r
                abs_scores.loc[col, 'spearman'] = abs(r)
                
        else:  # Categorical feature
            # Correlation ratio (eta)
            if 'pearson' in metrics:
                eta = correlation_ratio(X[col], y)
                scores.loc[col, 'pearson'] = eta
                abs_scores.loc[col, 'pearson'] = eta
                
            # ANOVA F-value (converted to correlation-like measure)
            if 'spearman' in metrics:
                groups = [y[X[col] == cat] for cat in X[col].unique()]
                f, _ = f_oneway(*groups)
                scores.loc[col, 'spearman'] = f
                abs_scores.loc[col, 'spearman'] = f
    
    # Mutual Information (handles both types after encoding)
    if 'mi' in metrics:
        # Mark all one-hot encoded columns as discrete
        discrete_features = [
            i for i, col in enumerate(X_processed.columns)
            if any([col.startswith(cat + '_') or col == cat for cat in categorical_features])
        ]
        mi_scores = mutual_info_regression(
            X_processed, y, 
            random_state=random_state,
            discrete_features=discrete_features
        )
        mi_df = pd.Series(mi_scores, index=X_processed.columns)
        
        # Aggregate MI for original categorical features (sum of one-hot components)
        for col in X.columns:
            if col in categorical_features:
                # Sum MI for all one-hot components of this categorical feature
                components = [c for c in mi_df.index if c.startswith(col + '_') or c == col]
                scores.loc[col, 'mi'] = mi_df[components].sum() if components else 0
            else:
                scores.loc[col, 'mi'] = mi_df[col]
                
        abs_scores['mi'] = scores['mi']
    
    # Normalize scores (0 to 1) and combine
    normalized = abs_scores.apply(lambda x: (x - x.min()) / (x.max() - x.min() + 1e-10))
    normalized['combined'] = normalized.mean(axis=1)
    
    # Sort features by combined score
    sorted_features = normalized['combined'].sort_values(ascending=False).index
    
    # Create encoded version for redundancy check
    le = LabelEncoder()
    X_encoded = X.copy()
    for col in categorical_features:
        X_encoded[col] = le.fit_transform(X[col].astype(str))
    
    # Select features with redundancy control
    selected = []
    for feature in sorted_features:
        if top_k is not None and len(selected) >= top_k:
            break
            
        # Calculate maximum correlation with already selected features
        max_corr = 0
        if selected:
            for sel in selected:
                # Handle different feature type combinations
                if feature in numerical_features and sel in numerical_features:
                    corr = abs(pearsonr(X_encoded[feature], X_encoded[sel])[0])
                elif feature in categorical_features and sel in categorical_features:
                    corr = abs(pearsonr(X_encoded[feature], X_encoded[sel])[0])
                else:  # Mixed types - use different metric
                    if feature in numerical_features:
                        corr = correlation_ratio(X_encoded[sel], X_encoded[feature])
                    else:
                        corr = correlation_ratio(X_encoded[feature], X_encoded[sel])
                max_corr = max(max_corr, corr)
        
        # Add feature if below redundancy threshold
        if max_corr < redundancy_threshold:
            selected.append(feature)
    
    return selected, normalized

In [32]:
import pandas as pd

# Load your real data
df = pd.read_csv('../../../data/processed/land_dataset_final_v2.csv')

# Define features and target
X = df.drop(['price_per_m2', 'longitude', 'latitude', 'address_subdivision', 'h_id', 'address_locality', 'price', 'geometry'], axis=1, errors='ignore')
y = df['price_per_m2']

# Identify categorical features
cat_cols = X.select_dtypes(include=['object', 'category']).columns.tolist()

# Perform feature selection
selected_features, scores = multi_correlation_fs_regression(
    X, 
    y,
    metrics=['pearson', 'mi'],       # Use Pearson and MI
    redundancy_threshold=0.8,        # Moderate redundancy control
    top_k=60,                   
    random_state=42,
    categorical_features=cat_cols    # Use detected categorical columns
)

print("Selected features:", selected_features)
print("\nFeature scores:")
print(scores.sort_values('combined', ascending=False))

  r, _ = pearsonr(X[col].fillna(X[col].median()), y)


Selected features: ['address_line_2', 'nearest_cafe', 'nearest_mart', 'n_seven_eleven_in_1km', 'n_bank_in_1km', 'nearest_pre_school', 'nearest_secondary_school', 'nearest_primary_school', 'n_university_in_1km', 'nearest_seven_eleven', 'nearest_hotel', 'nearest_bank', 'Chroy_Changvar_Bridge_3_5km', 'Olympic_Stadium_1_2km', 'nearest_gas_station', 'Bassac_Lane_1_2km', 'Royal_Palace_1_2km', 'Koh_Pich_2_3km', 'Phsar_kandal_1_2km', 'Boeng_Keng_Kang_1_1_2km', 'f_footway', 'Sisowath_Riverside_Park_1_2km', 'Boeng_Keng_Kang_1_nearest', 'nearest_university', 'AEON_Mall_1_2_3km', 'Royal_Palace_nearest', 'Bassac_Lane_nearest', 'Olympic_Stadium_2_3km', 'AEON_Mall_1_1_2km', 'Phsar_Tmey_1_2km', 'Wat_Phnom_2_3km', 'Phnom_Penh_Airport_5_10km', 'Vattanac_Tower_2_3km', 'Russian_Market_2_3km', 'Phsar_Chas_2_3km', 'Phsar_Tmey_nearest', 'Vattanac_Tower_1_2km', 'f_tertiary', 'Phsar_Tmey_2_3km', 'Camko_City_3_5km', 'f_residential', 'Sisowath_Riverside_Park_2_3km', 'Wat_Phnom_3_5km', 'Phsar_Chas_nearest', 'Russ

In [None]:
# Select only the chosen features
X_selected = X[selected_features].copy()

# Identify which selected features are categorical
cat_selected = [col for col in cat_cols if col in selected_features]

from sklearn.preprocessing import OneHotEncoder

if cat_selected:
    ohe = OneHotEncoder(sparse_output=False, handle_unknown='ignore')
    ohe.fit(X_selected[cat_selected])
    X_ohe = pd.DataFrame(
        ohe.transform(X_selected[cat_selected]),
        index=X_selected.index,
        columns=ohe.get_feature_names_out(cat_selected)
    )
    # Drop original categorical columns and concatenate encoded columns
    X_final = pd.concat([X_selected.drop(columns=cat_selected), X_ohe], axis=1)
else:
    X_final = X_selected.copy()

# Add target back for saving
final_df = pd.concat([X_final, y], axis=1)

# Save to CSV
final_df.to_csv("../../../data/preprocessed/feature_selection_by_multi_corr_final_data_60feature.csv", index=False)
print("Saved selected and encoded features to CSV.")

Saved selected and encoded features to CSV.
