In [8]:
%pip install opencv-python pillow scikit-image pyyaml

^C
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [7]:
import os
import cv2
import numpy as np
import pandas as pd
from PIL import Image
from skimage.feature import hog, local_binary_pattern
import yaml
import warnings

warnings.filterwarnings("ignore")

# ============================================================================
# CONFIGURATION - UPDATE THESE PATHS FOR YOUR SETUP
# ============================================================================
IMG_WIDTH = 800
IMG_HEIGHT = 600
GRID_ROWS = 8
GRID_COLS = 8

# Path to labels.csv (contains ImageFileName, TrainOrTest, c01-c64 columns)
LABELS_CSV = r'C:\Users\sakumavat\Downloads\test\labels.csv'

# Directory containing your actual images (must match ImageFileName in labels.csv)
RAW_IMAGES_DIR = r'C:\Users\sakumavat\Downloads\test\processed_images'

# Output directory for extracted features
EXPERIMENTS_DIR = r'C:\Users\sakumavat\Downloads\test\results'

# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def load_labels(csv_path):
    """Loads labels from CSV and filters for 'Train' images."""
    if not os.path.exists(csv_path):
        print(f"Error: {csv_path} not found.")
        return pd.DataFrame()

    df = pd.read_csv(csv_path)
    if 'TrainOrTest' in df.columns:
        df = df[df['TrainOrTest'] == 'Train']
    return df

def get_cell_coordinates(cell_idx, grid_rows=GRID_ROWS, grid_cols=GRID_COLS, img_w=IMG_WIDTH, img_h=IMG_HEIGHT):
    """Returns (x1, y1, x2, y2) for a 0-based cell index."""
    cell_w = img_w / grid_cols
    cell_h = img_h / grid_rows
    row = cell_idx // grid_cols
    col = cell_idx % grid_cols
    x1, y1 = int(col * cell_w), int(row * cell_h)
    x2, y2 = int(x1 + cell_w), int(y1 + cell_h)
    return x1, y1, x2, y2

def extract_hog_features(cell_gray, params):
    """Extracts HOG features from a grayscale cell."""
    fd = hog(cell_gray,
             orientations=params['orientations'],
             pixels_per_cell=params['pixels_per_cell'],
             cells_per_block=params['cells_per_block'],
             visualize=False,
             channel_axis=None)
    return fd

def extract_lbp_features(cell_gray, params):
    """Extracts LBP histogram from a grayscale cell."""
    radius = params['radius']
    n_points = params['n_points']
    method = params['method']
    lbp = local_binary_pattern(cell_gray, n_points, radius, method)
    n_bins = int(n_points + 2)
    hist, _ = np.histogram(lbp.ravel(), bins=n_bins, range=(0, n_bins), density=True)
    return hist

def extract_color_histogram(cell_rgb, params):
    """Extracts Color Histogram from an RGB cell."""
    bins = params['bins']
    hist_r, _ = np.histogram(cell_rgb[:, :, 0], bins=bins, range=(0, 256), density=True)
    hist_g, _ = np.histogram(cell_rgb[:, :, 1], bins=bins, range=(0, 256), density=True)
    hist_b, _ = np.histogram(cell_rgb[:, :, 2], bins=bins, range=(0, 256), density=True)
    return np.concatenate([hist_r, hist_g, hist_b])

def save_iteration_metadata(output_dir, params, metadata):
    """Saves parameters and metadata to a YAML file."""
    filepath = os.path.join(output_dir, "parameters.yml")
    label_names = {0: "None", 1: "Ball", 2: "Bat", 3: "Stump"}
    readable_label_counts = {label_names.get(label, f"Unknown({label})"): count 
                            for label, count in metadata['label_counts'].items()}
    
    data = {
        "iteration_name": metadata['name'],
        "description": metadata['description'],
        "parameters": params,
        "metadata": {
            "total_images": metadata['total_images'],
            "total_cells": metadata['total_cells'],
            "label_counts": readable_label_counts,
            "feature_dimensions": metadata['feature_dims']
        }
    }
    with open(filepath, "w") as f:
        yaml.dump(data, f, default_flow_style=False, sort_keys=False)

# ============================================================================
# MAIN FEATURE EXTRACTION
# ============================================================================

def run_extraction(iteration_name, iteration_desc, params):
    """Main feature extraction function."""
    # Setup output directory
    output_dir = os.path.join(EXPERIMENTS_DIR, iteration_name)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    print(f"--- Starting Feature Extraction: {iteration_name} ---")
    print(f"Output Directory: {output_dir}\n")

    # Load labels
    df_labels = load_labels(LABELS_CSV)
    if df_labels.empty:
        print("No labels found or empty file.")
        return None

    total_images = len(df_labels)
    print(f"Found {total_images} images in labels.csv")
    
    # Validate images exist
    print("Validating image files...")
    missing_images = []
    existing_images = []
    
    for idx, row in df_labels.iterrows():
        filename = row['ImageFileName']
        filepath = os.path.join(RAW_IMAGES_DIR, filename)
        if not os.path.exists(filepath):
            missing_images.append(filename)
        else:
            existing_images.append(filename)
    
    if missing_images:
        print(f"⚠️  WARNING: {len(missing_images)} images not found (will be skipped)")
        print(f"✓  Will process {len(existing_images)} available images\n")
    else:
        print(f"✓  All {total_images} images found!\n")

    # Extract features
    all_features = []
    label_counts = {0: 0, 1: 0, 2: 0, 3: 0}
    feature_dims = {}
    processed_count = 0

    for idx, row in df_labels.iterrows():
        filename = row['ImageFileName']
        filepath = os.path.join(RAW_IMAGES_DIR, filename)

        if not os.path.exists(filepath):
            continue

        print(f"Processing {processed_count+1}/{len(existing_images)}: {filename}")
        processed_count += 1

        try:
            # Load and resize image
            pil_img = Image.open(filepath)
            pil_img = pil_img.resize((IMG_WIDTH, IMG_HEIGHT), Image.Resampling.LANCZOS)
            img_rgb = np.array(pil_img.convert("RGB"))
            img_gray = cv2.cvtColor(img_rgb, cv2.COLOR_RGB2GRAY)

            # Process each cell in 8x8 grid
            for i in range(GRID_ROWS * GRID_COLS):
                # Get label for this cell
                label_col = f"c{i+1:02d}"
                if label_col not in row:
                    continue
                label = int(row[label_col])
                label_counts[label] = label_counts.get(label, 0) + 1

                # Extract cell image
                x1, y1, x2, y2 = get_cell_coordinates(i)
                cell_rgb = img_rgb[y1:y2, x1:x2]
                cell_gray = img_gray[y1:y2, x1:x2]

                # Extract features
                hog_vec = extract_hog_features(cell_gray, params['hog'])
                lbp_vec = extract_lbp_features(cell_gray, params['lbp'])
                color_vec = extract_color_histogram(cell_rgb, params['color'])

                # Store feature dimensions (first cell only)
                if not feature_dims:
                    feature_dims = {
                        'hog': len(hog_vec),
                        'lbp': len(lbp_vec),
                        'color': len(color_vec),
                        'total': len(hog_vec) + len(lbp_vec) + len(color_vec)
                    }

                # Assemble feature row
                feature_row = {
                    "image_file_name": filename,  # Links to original image
                    "cell_number": i + 1,
                    "row_idx": i // GRID_COLS,
                    "col_idx": i % GRID_COLS,
                    "label": label
                }

                # Add all feature values
                for j, val in enumerate(hog_vec):
                    feature_row[f"hog_{j}"] = val
                for j, val in enumerate(lbp_vec):
                    feature_row[f"lbp_{j}"] = val
                for j, val in enumerate(color_vec):
                    feature_row[f"color_{j}"] = val

                all_features.append(feature_row)

        except Exception as e:
            print(f"ERROR processing {filename}: {e}")

    # Save results
    if not all_features:
        print("\nERROR: No features extracted! Check if images exist and are readable.")
        return None
        
    print(f"\n✓ Successfully extracted features from {processed_count} images ({len(all_features)} cells)")
    print("Saving features.csv...")
    
    df_features = pd.DataFrame(all_features)
    csv_path = os.path.join(output_dir, "features.csv")
    df_features.to_csv(csv_path, index=False)

    # Save metadata
    metadata = {
        "name": iteration_name,
        "description": iteration_desc,
        "total_images": processed_count,
        "total_cells": len(all_features),
        "label_counts": label_counts,
        "feature_dims": feature_dims
    }
    save_iteration_metadata(output_dir, params, metadata)

    print(f"✓ Features saved to: {csv_path}")
    print("✓ Done!\n")
    return csv_path

# ============================================================================
# RUN FEATURE EXTRACTION
# ============================================================================

ITERATION_NAME = "final_iter_03_baseline"
ITERATION_DESC = "Baseline feature extraction with HOG, LBP, and Color Histogram"

PARAMS = {
    'hog': {
        'orientations': 9,
        'pixels_per_cell': (8, 8),
        'cells_per_block': (2, 2)
    },
    'lbp': {
        'radius': 1,
        'n_points': 8,
        'method': 'uniform'
    },
    'color': {
        'bins': 32
    }
}

features_csv_path = run_extraction(ITERATION_NAME, ITERATION_DESC, PARAMS)

ModuleNotFoundError: No module named 'cv2'

In [1]:
%pip install xgboost pandas numpy matplotlib seaborn scikit-learn

Note: you may need to restart the kernel to use updated packages.


In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, roc_auc_score, roc_curve
from sklearn.preprocessing import label_binarize
import xgboost as xgb
import pickle
import warnings

warnings.filterwarnings("ignore")

# ============================================================================
# CONFIGURATION - UPDATE THESE PATHS FOR YOUR SETUP
# ============================================================================

# Path to features.csv (generated by feature extraction cell above)
FEATURES_CSV_PATH = r"C:\Users\sakumavat\Downloads\test\results\final_iter_03_baseline\features.csv"

# Output directory for trained model and results
OUTPUT_BASE_DIR = r"C:\Users\sakumavat\Downloads\test\experiments\training"

RANDOM_STATE = 42

# ============================================================================
# MODEL DEFINITIONS
# ============================================================================

def get_model(model_name):
    """Returns the specified model."""
    if model_name == "SVM":
        return SVC(kernel='rbf', class_weight='balanced', probability=True, random_state=RANDOM_STATE)
    elif model_name == "RF":
        return RandomForestClassifier(n_estimators=100, class_weight='balanced', n_jobs=-1, random_state=RANDOM_STATE)
    elif model_name == "XGB":
        return xgb.XGBClassifier(objective='multi:softprob', eval_metric='mlogloss', random_state=RANDOM_STATE)
    else:
        raise ValueError(f"Unknown model: {model_name}")

# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def load_data(csv_path):
    """Loads features and separates X and y from the extracted features CSV."""
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"{csv_path} not found. Run feature extraction first!")

    df = pd.read_csv(csv_path)
    
    # Metadata columns (not used for training)
    drop_cols = ['image_file_name', 'cell_number', 'row_idx', 'col_idx', 'label']
    
    # X = actual image features (HOG, LBP, Color)
    X = df.drop(columns=drop_cols)
    # y = labels (0=None, 1=Ball, 2=Bat, 3=Stump)
    y = df['label']
    
    metadata = df[drop_cols]
    return X, y, metadata

def plot_roc_auc(y_test, y_score, classes, output_path):
    """Plots ROC AUC for multiclass."""
    y_test_bin = label_binarize(y_test, classes=classes)
    n_classes = y_test_bin.shape[1]

    fpr, tpr, roc_auc = {}, {}, {}
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_score[:, i])
        roc_auc[i] = roc_auc_score(y_test_bin[:, i], y_score[:, i])

    plt.figure(figsize=(10, 8))
    colors = ['blue', 'red', 'green', 'orange']
    label_names = {0: "None", 1: "Ball", 2: "Bat", 3: "Stump"}

    for i, color in zip(range(n_classes), colors):
        label_name = label_names.get(classes[i], str(classes[i]))
        plt.plot(fpr[i], tpr[i], color=color, lw=2,
                 label=f'ROC curve of class {label_name} (area = {roc_auc[i]:0.2f})')

    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) - Multiclass')
    plt.legend(loc="lower right")
    plt.savefig(output_path)
    plt.close()

def save_artifacts(output_dir, model, scaler, metrics_df, predictions_df):
    """Saves model, scaler, metrics, and predictions."""
    # Save model as pickle (for later use/deployment)
    with open(os.path.join(output_dir, "model.pkl"), "wb") as f:
        pickle.dump(model, f)

    # Save scaler as pickle
    if scaler:
        with open(os.path.join(output_dir, "scaler.pkl"), "wb") as f:
            pickle.dump(scaler, f)

    # Save metrics and predictions as CSV
    metrics_df.to_csv(os.path.join(output_dir, "metrics.csv"), index=False)
    predictions_df.to_csv(os.path.join(output_dir, "predicted_labels.csv"), index=False)

# ============================================================================
# MAIN TRAINING FUNCTION (USES 100% OF DATA)
# ============================================================================

def train_and_evaluate(model_name, iteration_name):
    """Main training loop - uses 100% of data for production model."""
    # Setup output directory
    output_dir = os.path.join(OUTPUT_BASE_DIR, iteration_name)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    print(f"{'='*60}")
    print(f"Training: {model_name} ({iteration_name})")
    print(f"{'='*60}\n")

    # Load features
    print("Loading features...")
    X, y, metadata = load_data(FEATURES_CSV_PATH)
    
    print(f"✓ Features shape: {X.shape}")
    print(f"✓ Labels shape: {y.shape}")
    print(f"Label distribution:")
    print(y.value_counts().sort_index())
    print()

    # Use full dataset (no split for production)
    print("Using 100% of data for training (production model)...")
    X_train, y_train = X, y

    # Scale features
    print("Scaling features...")
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)

    # Train model
    print(f"Training {model_name}...")
    model = get_model(model_name)
    model.fit(X_train_scaled, y_train)

    # Evaluate on training data
    print("\nEvaluating on training data...")
    y_train_pred = model.predict(X_train_scaled)
    y_train_prob = model.predict_proba(X_train_scaled)

    train_report = classification_report(y_train, y_train_pred, output_dict=True)
    train_auc = roc_auc_score(label_binarize(y_train, classes=model.classes_), y_train_prob, multi_class='ovr')

    print(f"✓ Train Accuracy: {accuracy_score(y_train, y_train_pred):.4f}")
    print(f"✓ Train ROC AUC: {train_auc:.4f}")

    # Generate metrics
    metrics_list = []
    for label, metrics in train_report.items():
        if isinstance(metrics, dict):
            metrics_list.append({
                "Dataset": "Train",
                "Class": label,
                "Precision": metrics['precision'],
                "Recall": metrics['recall'],
                "F1-Score": metrics['f1-score'],
                "Support": metrics['support']
            })
    metrics_list.append({
        "Dataset": "Train", 
        "Class": "Overall", 
        "ROC_AUC": train_auc, 
        "Accuracy": accuracy_score(y_train, y_train_pred)
    })
    metrics_df = pd.DataFrame(metrics_list)

    # Generate predictions
    full_metadata = metadata.copy()
    full_metadata['True_Label'] = y
    full_metadata['Predicted_Label'] = y_train_pred
    full_metadata[['Prob_0', 'Prob_1', 'Prob_2', 'Prob_3']] = y_train_prob
    full_metadata['Split'] = 'Train'

    # Plot ROC curve
    plot_roc_auc(y_train, y_train_prob, model.classes_, os.path.join(output_dir, "roc_auc_train.png"))

    # Reshape to wide format (matching labels.csv format)
    wide_df = full_metadata.pivot(index='image_file_name', columns='cell_number', values='Predicted_Label')
    wide_df.columns = [f"c{col:02d}" for col in wide_df.columns]
    wide_df = wide_df.reset_index()
    wide_df.rename(columns={'image_file_name': 'ImageFileName'}, inplace=True)
    wide_df['TrainOrTest'] = 'Train'

    # Reorder columns to match labels.csv
    cols = ["ImageFileName", "TrainOrTest"] + [f"c{i+1:02d}" for i in range(64)]
    for col in cols:
        if col not in wide_df.columns:
            wide_df[col] = 0
    wide_df = wide_df[cols]

    # Save everything
    save_artifacts(output_dir, model, scaler, metrics_df, wide_df)

    print(f"\n✓ Model saved to: {os.path.join(output_dir, 'model.pkl')}")
    print(f"✓ Scaler saved to: {os.path.join(output_dir, 'scaler.pkl')}")
    print(f"✓ All artifacts saved to: {output_dir}")
    print(f"{'='*60}\n")

# ============================================================================
# RUN TRAINING
# ============================================================================

# Select model: "SVM", "RF", or "XGB"
MODEL_TO_TRAIN = "SVM"
ITERATION_NAME = "final_iter_01_SVM"

train_and_evaluate(MODEL_TO_TRAIN, ITERATION_NAME)

--- Starting Training: SVM (final_iter_01_SVM) ---
Loading data...
Using full dataset for training...
Scaling features...
Training SVM...
Evaluating on Train Data...
Evaluating on Train Data...
Train Accuracy: 0.3855
Train ROC AUC: 0.6919
Train Accuracy: 0.3855
Train ROC AUC: 0.6919
Artifacts saved to C:\Users\sakumavat\Downloads\test\experiments\training\final_iter_01_SVM
Artifacts saved to C:\Users\sakumavat\Downloads\test\experiments\training\final_iter_01_SVM


# Optional: Test Model with Train/Test Split

**NOTE:** This cell is for testing only! It splits data 80/20 to evaluate model performance.

The actual production model (Cell 4 above) uses 100% of data for training.

In [None]:
# ============================================================================
# TEST EVALUATION WITH TRAIN/TEST SPLIT (FOR TESTING ONLY)
# ============================================================================

def test_model_with_split(model_name, test_size=0.2):
    """Test model performance with train/test split."""
    print(f"\n{'='*60}")
    print(f"TEST MODE: Evaluating {model_name} with {test_size*100:.0f}% test split")
    print(f"{'='*60}\n")
    
    # Load data
    print("Loading features...")
    X, y, metadata = load_data(FEATURES_CSV_PATH)
    
    print(f"Total samples: {len(X)}")
    print(f"Features shape: {X.shape}\n")
    
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, 
        test_size=test_size, 
        random_state=RANDOM_STATE,
        stratify=y
    )
    
    print(f"Training samples: {len(X_train)}")
    print(f"Testing samples: {len(X_test)}\n")
    
    # Scale features
    print("Scaling features...")
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Train model
    print(f"Training {model_name}...\n")
    model = get_model(model_name)
    model.fit(X_train_scaled, y_train)
    
    # Evaluate on train data
    print("--- Train Set Performance ---")
    y_train_pred = model.predict(X_train_scaled)
    y_train_prob = model.predict_proba(X_train_scaled)
    
    train_acc = accuracy_score(y_train, y_train_pred)
    train_auc = roc_auc_score(
        label_binarize(y_train, classes=model.classes_), 
        y_train_prob, 
        multi_class='ovr'
    )
    
    print(f"Train Accuracy: {train_acc:.4f}")
    print(f"Train ROC AUC: {train_auc:.4f}")
    print("\nTrain Classification Report:")
    print(classification_report(y_train, y_train_pred, target_names=["None", "Ball", "Bat", "Stump"]))
    
    # Evaluate on test data
    print("\n--- Test Set Performance ---")
    y_test_pred = model.predict(X_test_scaled)
    y_test_prob = model.predict_proba(X_test_scaled)
    
    test_acc = accuracy_score(y_test, y_test_pred)
    test_auc = roc_auc_score(
        label_binarize(y_test, classes=model.classes_), 
        y_test_prob, 
        multi_class='ovr'
    )
    
    print(f"Test Accuracy: {test_acc:.4f}")
    print(f"Test ROC AUC: {test_auc:.4f}")
    print("\nTest Classification Report:")
    print(classification_report(y_test, y_test_pred, target_names=["None", "Ball", "Bat", "Stump"]))
    
    # Confusion matrix
    print("\nConfusion Matrix (Test Set):")
    cm = confusion_matrix(y_test, y_test_pred)
    print(cm)
    
    # Plot confusion matrix
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=["None", "Ball", "Bat", "Stump"],
                yticklabels=["None", "Ball", "Bat", "Stump"])
    plt.title(f'{model_name} - Confusion Matrix (Test Set)')
    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.show()
    
    # Summary
    print(f"\n{'='*60}")
    print(f"SUMMARY:")
    print(f"  Train Accuracy: {train_acc:.4f} | Test Accuracy: {test_acc:.4f}")
    print(f"  Train ROC AUC:  {train_auc:.4f} | Test ROC AUC:  {test_auc:.4f}")
    
    overfitting = (train_acc - test_acc) > 0.1
    print(f"  Overfitting: {'⚠️  Yes (difference > 10%)' if overfitting else '✓ No (good generalization)'}")
    print(f"{'='*60}\n")
    
    return {
        'train_acc': train_acc,
        'test_acc': test_acc,
        'train_auc': train_auc,
        'test_auc': test_auc
    }

# ============================================================================
# RUN TEST EVALUATION
# ============================================================================

# Select model to test: "SVM", "RF", or "XGB"
TEST_MODEL = "SVM"
results = test_model_with_split(TEST_MODEL, test_size=0.2)