In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.ml.linalg import SparseVector, VectorUDT
from pyspark.ml.feature import PCA

In [None]:
# very important to keep number of partitions low, initially 3300 partitions of 70 Mbs each, reduced to 417 with below config, working with 3300 partitions resulted in 5x more compute time due to a lot of network I/O and led to workers being shut down.
spark = (SparkSession.builder
    .appName("ControlPartitionSize")
    .config("spark.sql.files.maxPartitionBytes", 734003200)
    .config("spark.sql.shuffle.partitions", 100)
    .config("spark.hadoop.fs.gs.inputstream.buffer.size", 1048576) # this reads data in 1 Mb chunks because there is a rate limit on reading data from GCP buckets on trial accounts.

    .getOrCreate())

In [None]:
expression_df = spark.read.parquet("gs://medical-data-for-project/huggingface.co/datasets/vevotx/Tahoe-100M/resolve/main/data/")

In [None]:
# Major bottleneck in terms of performance, no alternative available,
# potentially F.zipwith could be used but did not work in this case.
def make_sparse_vector(genes, expressions):
    if genes is None or expressions is None:
        return SparseVector(63000, {})
    return SparseVector(63000, dict(zip(genes, expressions)))
make_sparse_vector_udf = udf(make_sparse_vector, VectorUDT())
df.withColumn("features", make_sparse_vector_udf("genes", "expressions")).write \
    .mode("overwrite") \
    .parquet("gs://bigdata_27/features/") #After every major computation,
    # data was persisted on GCP bucket to avoid recomputing

In [None]:
df = spark.read.parquet("gs://bigdata_27/features/")

# Variance threshold Selector was used to reduce the column size from
# 63000 (63k) to 6250, given the scale of the task it was quite fast,
# took 4-5 hrs to complete. Variance threshold Selector was used
# because PCA could not be run on 63000 colums as it required atleast 15GBs driver memory
selector = VarianceThresholdSelector(
    varianceThreshold=0.10,
    featuresCol="features",
    outputCol="selectedFeatures"
)

selector_model = selector.fit(df)
selector_model.transform(df).drop("features").write \
    .mode("overwrite") \
    .parquet("gs://bigdata_27/selected_features/")

In [None]:
# The data was sampled such that the target variable distribution remains
# unchanged so that features are representative for testing as well as for
# training a PCA model on a smaller subset. lit(0.01) selects 1% of the data,
# this value was changed from 0.01%, 1%, 12.5%, 25%, 50% all the subsets were
# written to GCP to avoid redoing this computation.

fractions = scaled_df.select("moa-fine").distinct().withColumn("fraction", lit(0.01)).rdd.collectAsMap()
sampled_df = scaled_df.sampleBy("moa-fine", fractions, seed=42).coalesce(20)

In [None]:
# PCA was trained on 1% data 950k rows, large amount of partitions caused
# problems here as well so they reduced as well. This process took 5 hours of computation
sampled_df = spark.read.parquet("gs://bigdata_27/sampled_data/")
pca = PCA(k=256, inputCol="selected_features", outputCol="pca_features")
pca_model = pca.fit(sampled_df)

# Save the PCA model
pca_model.write().overwrite().save("gs://bigdata_27/pca_models/")

In [None]:
pca_model = PCAModel.load("gs://bigdata_27/pca_models/")
sampled_df = pca_model.transform(sampled_df)
sampled_df.select("pca_features").show(1)
pca_model.transform(scaled_df).drop("selected_features").write.mode("overwrite").parquet("gs://bigdata_27/transformed_data/") # Data was written whenever a major computation was taking place
transformed_df = spark.read.parquet("gs://bigdata_27/transformed_data/")

In [None]:
# data is being read from specific directories in buckets depending on the
# experiment. Test row has 10k rows 0.01% of data, since 10k is larger enough
# sample size for good statistical confidance.
train_df =spark.read.parquet("gs://bigdata_27/ten_percent_subset/")
test_df = spark.read.parquet("gs://bigdata_27/test_data/")
from pyspark.sql.functions import col, sum as _sum, when

null_counts = train_df.select([
    _sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in train_df.columns
])

null_counts.show()

##Spark Model Development Pipeline

In [None]:
# Initially one hot encoder and string indexer were used for some categoricall columns, but they were resulting in data leakage since the model was scoring 99.8 F1 score on 12.5 % of data, therefore they were removed and the new pipeline is a much simpler version of what was used before.
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier, MultilayerPerceptronClassifier, LogisticRegression


feature_cols = ["pca_features"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")




label_indexer = StringIndexer(inputCol="moa-fine", outputCol="label_index")


rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label_index",
    maxBins=512,
    numTrees=100
)

lr = LogisticRegression(
    featuresCol="features",
    labelCol="label_index",
    predictionCol="prediction",
    maxIter=100,
    regParam=0.01,
    elasticNetParam=0.0  # L2 regularization
)


pipeline = Pipeline(stages=[assembler, label_indexer, lr])

model = pipeline.fit(train_df)


predictions = model.transform(test_df)


##Model Evaluation

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


evaluator = MulticlassClassificationEvaluator(
    labelCol="label_index",
    predictionCol="prediction",
    metricName="accuracy"
)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy:.4f}")


f1 = evaluator.setMetricName("f1").evaluate(predictions)
precision = evaluator.setMetricName("weightedPrecision").evaluate(predictions)
recall = evaluator.setMetricName("weightedRecall").evaluate(predictions)

print(f"F1 Score = {f1:.4f}")
print(f"Precision = {precision:.4f}")
print(f"Recall = {recall:.4f}")