<a href="https://colab.research.google.com/github/janabioinfo-hub/RNA-Model/blob/main/Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# =============================================================================
# RNA-seq XGBoost Classification Pipeline with Job ID folders and File Upload
# =============================================================================

# ======= Setup & Imports =======
!pip install xgboost scikit-learn matplotlib seaborn imbalanced-learn

import os, uuid
import pandas as pd
import numpy as np
import xgboost as xgb
from sklearn.decomposition import PCA
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
from imblearn.over_sampling import RandomOverSampler
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.backends.backend_pdf import PdfPages
from google.colab import drive, files
from IPython.display import display, FileLink
import warnings
warnings.filterwarnings('ignore')

# ======= Mount Google Drive =======
drive.mount("/content/drive")

# ======= Dynamic Job ID & Output Folder Creation =======
BASE_DRIVE_PATH = "/content/drive/MyDrive/RNA_Seq_Jobs"
job_id = str(uuid.uuid4())[:8]
job_folder = os.path.join(BASE_DRIVE_PATH, job_id)
os.makedirs(job_folder, exist_ok=True)
print(f"Job ID: {job_id}")
print(f"Outputs will be saved in: {job_folder}")

# ======= File Upload for Test Counts =======
print("Please drag and drop your test counts CSV (normalized_vst_test.csv) below.")
uploaded = files.upload()
for fn in uploaded.keys():
    dest = os.path.join(job_folder, fn)
    os.rename(fn, dest)
    test_counts_path = dest
    print(f"Test counts file uploaded to {dest}")

# ======= Manually Set Paths for Required Files (Edit if needed) =======
counts_path = "/content/drive/MyDrive/Data/normalized_vst_train.csv"
coldata_path = "/content/drive/MyDrive/Data/Column_Data.csv"
genes_list_path = "/content/drive/MyDrive/Data/ensembl_to_gene_names.csv"

# Output paths (in job folder)
predictions_csv = os.path.join(job_folder, "predicted_phases.csv")
pca_plot_pdf = os.path.join(job_folder, "pca_analysis.pdf")
individual_pca_pdf = os.path.join(job_folder, "individual_test_pca_plots.pdf")
summary_plot_pdf = os.path.join(job_folder, "test_samples_summary_pca.pdf")

# ======= Data Loading =======
counts = pd.read_csv(counts_path, index_col=0, sep=',')
coldata = pd.read_csv(coldata_path, header=None, sep=',')
coldata.columns = ['sample', 'phase']
coldata['sample'] = coldata['sample'].astype(str).str.strip()
genes = pd.read_csv(genes_list_path, header=None).iloc[:, 0].tolist()
print(f"Loaded counts: {counts.shape}, samples: {counts.columns.size}, genes: {counts.index.size}")

# ======= Data Preparation =======
counts_filtered = counts.loc[counts.index.isin(genes)]
X_full = counts_filtered.transpose()
X_full.index = X_full.index.astype(str).str.strip()
data = X_full.merge(coldata, left_index=True, right_on='sample')
print(data['phase'].value_counts())

# ======= Oversampling =======
def oversample_to_majority(df, class_col):
    class_counts = df[class_col].value_counts()
    max_count = class_counts.max()
    features = df.drop(columns=['sample', class_col])
    labels = df[class_col]
    over_sampler = RandomOverSampler(sampling_strategy='not majority', random_state=42)
    X_res, y_res = over_sampler.fit_resample(features, labels)
    balanced_df = pd.DataFrame(X_res, columns=features.columns)
    balanced_df[class_col] = y_res
    return balanced_df

balanced_data = oversample_to_majority(data, 'phase')
print(balanced_data['phase'].value_counts())
X_balanced = balanced_data.drop(columns=['phase'])
y_balanced = balanced_data['phase']
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y_balanced)
print({phase: idx for idx, phase in enumerate(label_encoder.classes_)})

# ======= Train-Validation Split =======
X_train, X_val, y_train, y_val = train_test_split(
    X_balanced, y_encoded, test_size=0.25, stratify=y_encoded, random_state=42)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)

# ======= Model Training =======
model = xgb.XGBClassifier(
    eval_metric='mlogloss', random_state=42, n_estimators=100, max_depth=4,
    learning_rate=0.1, reg_alpha=0.1, reg_lambda=0.1)
model.fit(X_train_scaled, y_train)
y_pred_val = model.predict(X_val_scaled)
val_accuracy = accuracy_score(y_val, y_pred_val)
print(f"Validation Accuracy: {val_accuracy:.4f}")
print(classification_report(y_val, y_pred_val, target_names=label_encoder.classes_))

# ======= Load Test Data & Prediction =======
test_counts = pd.read_csv(test_counts_path, index_col=0)
test_counts_filtered = test_counts.loc[test_counts.index.isin(genes)]
X_test = test_counts_filtered.transpose()
X_test_scaled = scaler.transform(X_test)
test_predictions_encoded = model.predict(X_test_scaled)
test_predictions = label_encoder.inverse_transform(test_predictions_encoded)
test_probabilities = model.predict_proba(X_test_scaled)
print(f"Test samples: {X_test.index.size}, Genes: {X_test.columns.size}")

# ======= Save Predictions =======
predictions_df = pd.DataFrame({
    'sample': X_test.index,
    'predicted_phase': test_predictions
})
for i, phase in enumerate(label_encoder.classes_):
    predictions_df[f'prob_{phase}'] = test_probabilities[:, i]
predictions_df.to_csv(predictions_csv, index=False)
print(f"Predictions saved to: {predictions_csv}")
print(pd.Series(test_predictions).value_counts())

# ======= PCA Analysis & Visualization =======
combined_data = np.vstack([X_train_scaled, X_val_scaled, X_test_scaled])
pca = PCA(n_components=2)
pca.fit(X_train_scaled)
train_pca = pca.transform(X_train_scaled)
val_pca = pca.transform(X_val_scaled)
test_pca = pca.transform(X_test_scaled)
num_train = X_train_scaled.shape[0]
num_val = X_val_scaled.shape[0]
num_test = X_test_scaled.shape[0]
plot_df = pd.DataFrame(
    np.vstack([train_pca, val_pca, test_pca]), columns=['PC1', 'PC2'])
plot_df['dataset'] = (
    ['Training'] * num_train + ['Validation'] * num_val + ['Test'] * num_test)
plot_df['phase'] = np.concatenate([
    label_encoder.inverse_transform(y_train),
    label_encoder.inverse_transform(y_val),
    test_predictions
])

# === Individual PCA Plots for Each Test Sample ===
with PdfPages(individual_pca_pdf) as pdf:
    plt.figure(figsize=(10, 8))
    train_phases = label_encoder.inverse_transform(y_train)
    colors = sns.color_palette("Set1", n_colors=len(label_encoder.classes_))
    phase_colors = {phase: colors[i] for i, phase in enumerate(label_encoder.classes_)}
    for phase in label_encoder.classes_:
        phase_mask = train_phases == phase
        plt.scatter(train_pca[phase_mask, 0], train_pca[phase_mask, 1], c=[phase_colors[phase]], s=60, alpha=0.7, label=f'{phase} (Train)')
    plt.title("Training Data PCA Space (Reference)")
    plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)')
    plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)')
    plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    pdf.savefig()
    plt.close()
    for i, sample_name in enumerate(X_test.index):
        plt.figure(figsize=(10, 8))
        for phase in label_encoder.classes_:
            phase_mask = train_phases == phase
            plt.scatter(train_pca[phase_mask, 0], train_pca[phase_mask, 1], c=[phase_colors[phase]], s=40, alpha=0.3, label=f'{phase} (Train)')
        test_point = test_pca[i]
        predicted_phase = test_predictions[i]
        plt.scatter(test_point[0], test_point[1], c='red', s=200, marker='*', edgecolors='black', linewidth=2, label=f'{sample_name}\n(Predicted: {predicted_phase})')
        prob_text = "Prediction Probabilities:\n"
        for j, phase in enumerate(label_encoder.classes_):
            prob_text += f"{phase}: {test_probabilities[i,j]:.3f}\n"
        plt.text(0.02, 0.98, prob_text, transform=plt.gca().transAxes, fontsize=9, verticalalignment='top', bbox=dict(boxstyle='round', facecolor='white', alpha=0.8))
        plt.title(f"Test Sample: {sample_name}\nPredicted Phase: {predicted_phase}")
        plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)')
        plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)')
        plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        pdf.savefig()
        plt.close()
print(f"Individual PCA plots saved to: {individual_pca_pdf}")

# === Summary PCA Plot for All Test Samples ===
plt.figure(figsize=(12, 10))
for phase in label_encoder.classes_:
    phase_mask = train_phases == phase
    plt.scatter(train_pca[phase_mask, 0], train_pca[phase_mask, 1], c=[phase_colors[phase]], s=40, alpha=0.3, label=f'{phase} (Train)')
for i, sample_name in enumerate(X_test.index):
    test_point = test_pca[i]
    predicted_phase = test_predictions[i]
    plt.scatter(test_point[0], test_point[1], c=[phase_colors[predicted_phase]], s=150, marker='D', edgecolors='black', linewidth=1, alpha=0.9)
    plt.annotate(sample_name, (test_point[0], test_point[1]), xytext=(5,5), textcoords='offset points', fontsize=8, alpha=0.8)
plt.title("All Test Samples Projected on Training PCA Space")
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%} variance)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%} variance)')
plt.legend(title="Training Phases", bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(summary_plot_pdf, format='pdf', bbox_inches='tight', dpi=300)
plt.show()
print(f"Summary PCA plot saved to: {summary_plot_pdf}")

# === Display Downloadable Links to Outputs ===
print("Download Results:")
display(FileLink(predictions_csv))
display(FileLink(individual_pca_pdf))
display(FileLink(summary_plot_pdf))

# ==== Final Job Summary ====
print("\n" + "="*60)
print("ANALYSIS COMPLETE")
print("="*60)
print(f"Job ID: {job_id}")
print(f"Model validation accuracy: {val_accuracy:.4f}")
print(f"Test samples predicted: {len(test_predictions)}")
print("Outputs:")
print(f"- Predictions CSV: {predictions_csv}")
print(f"- Individual PCA PDF: {individual_pca_pdf}")
print(f"- Summary PCA PDF: {summary_plot_pdf}")
print("="*60)
