In [None]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**2.
Biomarker Identification (20 points)**:<br>
a.
Identify PMPs with high specificity for tissue differentiation, minimizing false positives for Tissue #1 while allowing some false negatives. Use statistical or machine learning approaches to assign confidence (e.g., p-values) to each PMP (15 points).<br>
b.
Calculate the mean variant read fraction (VRF) for each PMP in both tissues (5 points).

In [None]:

!pip install pyspark


from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
#from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier
#from tqdm import tqdm




In [None]:

spark = SparkSession.builder \
    .appName("rf Feature Importance") \
    .getOrCreate()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder

In [None]:
df = spark.read.csv('/content/drive/MyDrive/phased_methylation_pattern/normalized_data.csv', header=True, inferSchema=True)


In [None]:
# Step 5: Rename columns to avoid issues with special characters
for col in df.columns:
    new_col = col.replace('`', '').replace('.', '_')  # Replace problematic characters
    df = df.withColumnRenamed(col, new_col)

In [None]:
#df.write.parquet('/content/drive/MyDrive/phased_methylation_pattern/pmps_dataset.parquet')

In [None]:
# Step 4: Load your dataset from Parquet format
#df = spark.read.parquet('/content/drive/MyDrive/phased_methylation_pattern/pmps_dataset.parquet')


In [None]:
df.printSchema()
df.show()

root
 |-- strand: integer (nullable = true)
 |-- 000: double (nullable = true)
 |-- 001: double (nullable = true)
 |-- 010: double (nullable = true)
 |-- 011: double (nullable = true)
 |-- 100: double (nullable = true)
 |-- 101: double (nullable = true)
 |-- 110: double (nullable = true)
 |-- 111: double (nullable = true)
 |-- Tissue: string (nullable = true)
 |-- CpG_1: double (nullable = true)
 |-- CpG_2: double (nullable = true)
 |-- CpG_3: double (nullable = true)

+------+--------------------+--------------------+---+---+---+---+---+--------------------+------+-------------------+------------------+-------------------+
|strand|                 000|                 001|010|011|100|101|110|                 111|Tissue|              CpG_1|             CpG_2|              CpG_3|
+------+--------------------+--------------------+---+---+---+---+---+--------------------+------+-------------------+------------------+-------------------+
|     0| 0.07472939217318901|0.003157894736842105|0.

In [None]:
from pyspark.ml.feature import VectorAssembler


In [None]:
# Step 1: Install necessary libraries

# Step 6: Prepare features and labels
feature_columns = ['000', '001', '010', '011', '100', '101', '110', '111', 'CpG_1', 'CpG_2', 'CpG_3']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
df_transformed = assembler.transform(df)

In [None]:
from pyspark.ml.feature import StringIndexer

# Create a StringIndexer to convert 'Tissue' to numeric
indexer = StringIndexer(inputCol="Tissue", outputCol="Tissue_indexed")

# Fit the indexer on the data and transform
indexed_df = indexer.fit(df_transformed).transform(df_transformed)

In [None]:
# Step 7: Split the data into training and testing sets
train_df, test_df = indexed_df.randomSplit([0.7, 0.3], seed=42)

In [None]:

# Update the labelCol to use the indexed column
rf = RandomForestClassifier(featuresCol='features', labelCol='Tissue_indexed', numTrees=100)

model = rf.fit(indexed_df)

In [None]:

importances = model.featureImportances
print("Feature Importances:")
for col, importance in zip(feature_columns, importances):
    print(f"{col}: {importance}")


predictions = model.transform(test_df)
predictions.select('Tissue', 'prediction').show()


Feature Importances:
000: 0.5508724833003564
001: 0.033179412949510524
010: 0.049460542663780825
011: 0.020231926455488665
100: 0.09587597929801375
101: 0.06367075454053728
110: 0.02173009661668143
111: 0.020674414166899847
CpG_1: 0.05806796380022913
CpG_2: 0.037626590191887674
CpG_3: 0.04860983601661431
+------+----------+
|Tissue|prediction|
+------+----------+
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
| cfDNA|       0.0|
+------+----------+
only showing top 20 rows



**From the above feature importance result, it can be observed that out of all PMPs, `0001 is contributing maximum in discriminating cfDNA and Islet**

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics

In [None]:
model.save("/content/drive/MyDrive/phased_methylation_pattern/rf_model")

In [None]:
rf_predictions_test = model.transform(test_df)

In [None]:
rf_predictions_train = model.transform(train_df)

In [None]:

# Evaluate on test data
# Accuracy
test_accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="Tissue_indexed", predictionCol="prediction", metricName="accuracy")
test_accuracy = test_accuracy_evaluator.evaluate(rf_predictions_test)

In [None]:
test_accuracy

0.7930496050646033

In [None]:

# Evaluate on train data
# Accuracy
train_accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="Tissue_indexed", predictionCol="prediction", metricName="accuracy")
train_accuracy = train_accuracy_evaluator.evaluate(rf_predictions_train)

In [None]:
train_accuracy

0.7930666178230422

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator



# Assuming 'Tissue_indexed' is a binary label:
test_roc_evaluator = BinaryClassificationEvaluator(labelCol="Tissue_indexed", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

# OR, if 'Tissue_indexed' is multiclass, use MulticlassClassificationEvaluator with 'areaUnderROC':
# test_roc_evaluator = MulticlassClassificationEvaluator(labelCol="Tissue_indexed", predictionCol="prediction", metricName="areaUnderROC")

test_roc_auc = test_roc_evaluator.evaluate(rf_predictions_test)

In [None]:
test_roc_auc

0.8371008664916569

In [None]:
# AUC-ROC (assuming binary classification)
train_roc_evaluator = BinaryClassificationEvaluator(labelCol="Tissue_indexed", rawPredictionCol="rawPrediction")
train_roc_auc = train_roc_evaluator.evaluate(rf_predictions_train)

In [None]:
train_roc_auc

0.8372336469694129

In [None]:


test_metrics = BinaryClassificationMetrics(rf_predictions_test.select('label', 'prediction').rdd.map(lambda x: (float(x[1]), float(x[0]))))
test_mcc = test_metrics.areaUnderPR

train_metrics = BinaryClassificationMetrics(rf_predictions_train.select('label', 'prediction').rdd.map(lambda x: (float(x[1]), float(x[0]))))
train_mcc = train_metrics.areaUnderPR

# Print results
print(f"Test Accuracy: {test_accuracy}")
print(f"Test AUC-ROC: {test_roc_auc}")
print(f"Test MCC: {test_mcc}")
print(f"Train Accuracy: {train_accuracy}")
print(f"Train AUC-ROC: {train_roc_auc}")
print(f"Train MCC: {train_mcc}")


In [None]:

feature_columns = ['feature1', 'feature2', 'feature3']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
df_transformed = assembler.transform(df)

rf = RandomForestClassifier(featuresCol='features', labelCol='target')
model = rf.fit(df_transformed)


In [None]:
#ddf = dd.read_csv('/content/drive/MyDrive/phased_methylation_pattern/normalized_data.csv')


In [None]:
ddf.info()

<class 'dask_expr.DataFrame'>
Columns: 13 entries, strand to CpG_3
dtypes: float64(11), int64(1), string(1)

In [None]:
ddf.tail(5)


Unnamed: 0,strand,`000,`001,`010,`011,`100,`101,`110,`111,Tissue,CpG_1,CpG_2,CpG_3
15392178,1,0.005273,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Islet,0.451547,0.456004,0.454591
15392179,1,0.003261,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Islet,0.451547,0.456004,0.454911
15392180,1,0.005273,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Islet,0.451547,0.456415,0.454591
15392181,1,0.003261,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Islet,0.451547,0.456415,0.454911
15392182,1,0.003261,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Islet,0.451547,0.456553,0.454911


In [None]:
ddf.value_counts('Tissue')

Unnamed: 0_level_0,count
Tissue,Unnamed: 1_level_1
cfDNA,11602083
Islet,3790100


In [None]:
ddf.head()

Unnamed: 0,strand,`000,`001,`010,`011,`100,`101,`110,`111,Tissue,CpG_1,CpG_2,CpG_3
0,0,0.074729,0.003158,0.0,0.0,0.0,0.0,0.0,0.000455,cfDNA,0.453237,0.452484,0.450617
1,0,0.075354,0.0,0.0,0.0,0.0,0.0,0.0,0.000455,cfDNA,0.453237,0.452484,0.450937
2,0,0.074174,0.005965,0.0,0.0,0.0,0.0,0.0,0.000455,cfDNA,0.453237,0.452484,0.451165
3,0,0.074382,0.004912,0.0,0.0,0.0,0.0,0.0,0.000455,cfDNA,0.453237,0.452484,0.451759
4,0,0.075354,0.0,0.0,0.0,0.0,0.0,0.0,0.000455,cfDNA,0.453237,0.452484,0.451942


In [None]:
#methylation_columns = ['000', '001', '010', '011', '100', '101', '110', '111']

In [None]:
ddf.info()

<class 'dask_expr.DataFrame'>
Columns: 13 entries, strand to CpG_3
dtypes: category(1), float64(11), int64(1)

In [None]:
#!nvidia-smi
#!pip install cuml-cu11 dask-cuda --extra-index-url=https://pypi.ngc.nvidia.com


In [None]:
#!pip install cuml-cu11 --extra-index-url=https://pypi.ngc.nvidia.com
#!apt install -y libcusparse11

In [None]:

X = ddf[['`000', '`001', '`010', '`011', '`100', '`101', '`110', '`111',	'CpG_1',	'CpG_2',	'CpG_3']]
y = ddf['Tissue']


In [None]:

X = X.to_dask_array(lengths=True)
y = y.to_dask_array(lengths=True)


X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)



In [None]:
X_train.shape

(12313746, 11)

In [None]:
import dask_xgboost as dxgb
from dask_ml.xgboost import DaskXGBClassifier  # Import DaskXGBClassifier from dask_ml.xgboost


model = DaskXGBClassifier(n_estimators=100, use_label_encoder=True)  # Use DaskXGBClassifier directly
model.fit(X_train, y_train)


In [None]:
model = dxgb.DaskXGBClassifier(n_estimators=100, use_label_encoder=True)
model.fit(X_train, y_train)

In [None]:
# Step 7: Calculate feature importance
importance = model.get_booster().get_score(importance_type='gain')

# Convert to DataFrame for better visualization
importance_df = pd.DataFrame(importance.items(), columns=['Feature', 'Gain']).sort_values(by='Gain', ascending=False)

# Step 8: Visualize feature importance
plt.figure(figsize=(10, 6))
plt.barh(importance_df['Feature'], importance_df['Gain'], color='steelblue')
plt.xlabel('Gain')
plt.title('Feature Importance (Gain) from XGBoost with Dask')
plt.show()

In [None]:

X_train_pd = X_train.compute()
X_test_pd = X_test.compute()
y_train_pd = y_train.compute()
y_test_pd = y_test.compute()

In [None]:

"""print("Before SMOTE:", Counter(y_train_pd))
sm = SMOTE(random_state=42)
X_train_resampled, y_train_resampled = sm.fit_resample(X_train_pd, y_train_pd)
print("After SMOTE:", Counter(y_train_resampled))"""

Before SMOTE: Counter({'cfDNA': 7541437, 'Islet': 2464427})


In [None]:
from cuml.ensemble import RandomForestClassifier as cuRF
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

# Initialize GPU-accelerated Random Forest
clf = cuRF(n_estimators=100, random_state=42, max_depth=10, max_features='auto')

# Train on the balanced dataset
clf.fit(X_train_pd.values, y_train_pd.values)

In [None]:


# Predictions
y_test_pred = clf.predict(X_test_pd.values)

# Evaluate Performance
print("Test Accuracy:", accuracy_score(y_test_pd, y_test_pred))

# Confusion matrix and classification report
print("\nConfusion Matrix:")
print(confusion_matrix(y_test_pd, y_test_pred))
print("\nClassification Report:")
print(classification_report(y_test_pd, y_test_pred))
