## Description

This CCLE dataset is made by performing the usual bulk RNA-seq technique. It was then normalized using RPKM (Reads Per Kilobase of transcript per Million) to reduce variance in order to make samples comparable.

The RPKM normalization solves two problems that are created during sequencing:

1. **Sequencing depth**: it occurs when samples are sequenced differently, some samples may have been sequenced more or less than others. This effect does not reflect biological status, so we should correct for that.  
2. **Gene length**: the gene length tells us how easy it is for that gene to be detected by sequencing machines. On one hand, longer genes are easier to detect, so their count will be bigger. On the other hand, smaller genes are more difficult. Normalizations on gene length take this into account.

The normalised \((i, j)\) RPKM value where \(i\) is a gene and \(j\) is a sample:

\[
\mathrm{RPKM}_{i,j} \;=\; \frac{x_{i,j}}{l_i \,\cdot\, \sum_j x_{i,j}} \;\times\; 10^6
\]

Where \(x_{i,j}\) is the raw count, \(l_i\) is gene length in kilobases (kb), and the denominator is total reads in sample \(j\).


In [None]:
import pandas as pd
import numpy as np

In [None]:
# ---------mRNA---------
# Description:
# Expression Data with gene as index and cells/samples as columns
data_mrna_seq_rpkm = pd.read_csv('data_mrna_seq_rpkm.txt',
                    sep = '\t',
                    comment = '#')

data_mrna_seq_rpkm.set_index('Hugo_Symbol',inplace=True)

# Merge with mean duplicated rows
data_mrna_seq_rpkm = data_mrna_seq_rpkm.groupby(data_mrna_seq_rpkm.index).mean()

In [None]:
data_mrna_seq_rpkm.head()

In [None]:
#--------Mut query----------
# Description:
# Data telling  which sample is mutated and not-mutated
mutations= pd.read_csv('mutations.txt',
                    sep = '\t',
                    comment = '#')
mutations.set_index('SAMPLE_ID',inplace=True)

In [None]:
mutations.head()

## Why predict on just the five Variant_Type classes

We focus on SNP, DNP, ONP, INS, and DEL because:

1. Biological clarity: these five labels describe the fundamental mutation mechanism, single-base or multi-base changes, insertions, and deletions, so the model learns clear patterns.  
2. Balanced data: each Variant_Type occurs often enough to give the model enough examples, while the detailed Variant_Classification labels are very uneven and would leave some classes too small to learn.  
3. Reduced complexity: Variant_Classification depends on gene structure and reading frame (for example a SNP can be silent or missense depending on codon), which our sequence‐only model cannot infer without extra annotation.  
4. Modular workflow: once the model tags a variant as INS or DEL, we can apply separate rules or a second model to predict functional impact, keeping each step simpler and more reliable.  


In [None]:
# ------Mut ALL-----------
# Dataset containing mutation classes for all genes, it only contains mutated samples
data_mutations = pd.read_csv('data_mutations.txt',
                    sep = '\t',
                    comment = '#')

# Extract TP53 from all genes
data_mutations = data_mutations[data_mutations['Hugo_Symbol'] == 'TP53']

In [None]:
unique_labels = data_mutations['Variant_Type'].unique()
print(unique_labels)

In [None]:
unique_labels = data_mutations['Variant_Classification'].unique()
print(unique_labels)

In [None]:
# Remove unwanted information
data_mutations = data_mutations[['Tumor_Sample_Barcode', 'Variant_Type']]
data_mutations.set_index('Tumor_Sample_Barcode', inplace=True)
# There are repetitions of my mutation type (Variant_Type)
# if there is the same sample with different Variant_Type it should be removed
variant_check = data_mutations.groupby(data_mutations.index)["Variant_Type"].nunique()

In [None]:
data_mutations.head()

In [None]:
# Count how many mutations are in each Variant_Type
counts = data_mutations['Variant_Type'].value_counts()

# Print them one per line
for variant_type, n in counts.items():
    print(f"{variant_type}: {n}")


In [None]:
data_t = data_mrna_seq_rpkm.T

data_t.head()

In [None]:
data_t.loc['253JBV_URINARY_TRACT']

In [None]:
code = {"SNP": 1, "DNP": 2, "DEL": 3, "INS": 4}

In [None]:
# Build target vector y
y = []

# prepare lists to collect rows and their sample name
X_rows = []
sample_names = []
c = 0

# iterate over each mutation record
for bc, mut in data_mutations.iterrows():
    # check if this barcode is in data_t’s index
    if bc in data_t.index:
        # grab the full row from data_t and store it
        X_rows.append(data_t.loc[bc].values)
        y.append(code[mut['Variant_Type']])
        sample_names.append(bc)
    else:
        c += 1

print(f"Number of samples discarded: {c}")

# build a new DataFrame X from the collected rows
X = pd.DataFrame(
    X_rows,
    index=sample_names,
    columns=data_t.columns
)

y = np.array(y)

In [None]:
print(f"X shape: {X.shape}")
print(f"y shape: {y.shape}")

**Step 2: Train - Test split (80% - 20%)**

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
# 80% train, 20% test, stratify to preserve class balance
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42,
    stratify=y
)

print(f"Train shape:\n\tX_train: {X_train.shape}\n\ty_train: {y_train.shape}")
print(f"Test shape:\n\tX_test: {X_test.shape}\n\ty_test: {y_test.shape}")

**Step 3: Model selection and Training**

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score

In [None]:
# Instantiate the model
rf = RandomForestClassifier(
    n_estimators=500,      # you can tune this
    max_depth=None,        # full depth; you can limit for speed/regularization
    random_state=42,
    n_jobs=-1              # use all cores
)

In [None]:
# Train
rf.fit(X_train, y_train)

In [None]:
# Predict
y_pred = rf.predict(X_test)

n = y_test.shape[0]
count_mispred = 0
for i in range(n):
    if y_test[i] != y_pred[i]:
        count_mispred += 1

# Compute the percentage of mispredictions (accuracy)
percentage_mispred = count_mispred / n
print(f'Accuracy: {(1-percentage_mispred)*100:.2f}%')

In [None]:
# PyTorch classifier
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

# Train-Test split
X_train_np, X_test_np, y_train_np, y_test_np = train_test_split(
    X.values, y, test_size=0.2, random_state=42, stratify=y
)

# Convert to tensors
X_train = torch.tensor(X_train_np, dtype=torch.float32)
y_train = torch.tensor(y_train_np, dtype=torch.long)
X_test  = torch.tensor(X_test_np,  dtype=torch.float32)
y_test  = torch.tensor(y_test_np,  dtype=torch.long)

# Wrap in DataLoaders
train_ds = TensorDataset(X_train, y_train)
test_ds  = TensorDataset(X_test,  y_test)
train_dl = DataLoader(train_ds, batch_size=32, shuffle=True)
test_dl  = DataLoader(test_ds,  batch_size=32)

In [None]:
# Define the model: a small MLP
class MLP(nn.Module):
    def __init__(self, in_features, hidden=128, n_classes=4):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(in_features, hidden),
            nn.ReLU(),
            nn.Linear(hidden, n_classes)
        )
    def forward(self, x):
        return self.net(x)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model  = MLP(in_features=X_train.shape[1]).to(device)
loss_fn = nn.CrossEntropyLoss()
opt     = torch.optim.Adam(model.parameters(), lr=1e-3)

# 5) training loop
n_epochs = 20
for epoch in range(n_epochs):
    model.train()
    total_loss = 0
    for xb, yb in train_dl:
        xb, yb = xb.to(device), yb.to(device)
        preds  = model(xb)
        loss   = loss_fn(preds, yb)
        opt.zero_grad()
        loss.backward()
        opt.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_dl)
    print(f"Epoch {epoch+1}/{n_epochs}, loss: {avg_loss:.4f}")

In [None]:
# 6) evaluation
model.eval()
correct = 0
total   = 0
with torch.no_grad():
    for xb, yb in test_dl:
        xb, yb = xb.to(device), yb.to(device)
        preds  = model(xb).argmax(dim=1)
        correct += (preds == yb).sum().item()
        total   += yb.size(0)
print(f"Test accuracy: {correct/total:.2%}")