# Setup

In [None]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "1",
        "spark.executor.cores": "4",
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type": "native",
        "spark.pyspark.virtualenv.bin.path": "/usr/bin/virtualenv"
    }
}

In [18]:
sc.install_pypi_package("pandas==1.0.5", "https://pypi.org/simple")
sc.install_pypi_package("scipy==1.4.1", "https://pypi.org/simple")
sc.install_pypi_package("matplotlib==3.2.1", "https://pypi.org/simple")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Collecting pandas==1.0.5
  Downloading https://files.pythonhosted.org/packages/af/f3/683bf2547a3eaeec15b39cef86f61e921b3b187f250fcd2b5c5fb4386369/pandas-1.0.5-cp37-cp37m-manylinux1_x86_64.whl (10.1MB)
Collecting python-dateutil>=2.6.1 (from pandas==1.0.5)
  Downloading https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.5 python-dateutil-2.9.0.post0

Collecting scipy==1.4.1
  Downloading https://files.pythonhosted.org/packages/dd/82/c1fe128f3526b128cfd185580ba40d01371c5d299fcf7f77968e22dfcc2e/scipy-1.4.1-cp37-cp37m-manylinux1_x86_64.whl (26.1MB)
Installing collected packages: scipy
Successfully installed scipy-1.4.1

Collecting matplotlib==3.2.1
  Downloading https://files.pythonhosted.org/packages/b2/c2/71fcf957710f3ba1f09088b35776a799ba7dd95f7c2b195ec800933b276b/matplotlib-3.2.1-cp37-cp

In [None]:
# # Repartition and persist in memory
# sdf = spark.read.option("header", True)\
#                 .option("inferSchema", True)\
#                 .option("quote", "\"")\
#                 .option("escape", "\"")\
#                 .option("multiLine", True)\
#                 .csv("s3://truegraph/data_random_split.csv")
# sdf = sdf.repartition(16)

# # Save as Parquet to S3 
# sdf.write.mode("overwrite").parquet("s3://truegraph/data/parquet/data_random_split")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

In [None]:
# Read Parquet back into Spark 
sdf = spark.read.parquet("s3://truegraph/data/parquet/data_random_split").persist()

# Repartition again for better parallelism
sdf = sdf.repartition(128)

# logistic regression 

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    RegexTokenizer, StopWordsRemover,
    HashingTF, IDF, PCA
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

import random
import numpy as np
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Rename and select necessary columns for the pipeline 
data = sdf.select(
    col("ID"),
    col("bias").cast("int").alias("label"),  # ensure numeric
    col("content").alias("text"),
    col("split")  # "train", "valid", or "test"
)

# Filter by split the dataset 
train_df = data.filter(col("split") == "train")
val_df   = data.filter(col("split") == "valid")
test_df  = data.filter(col("split") == "test")

print(f"Counts ‚Üí Train: {train_df.count()}, Val: {val_df.count()}, Test: {test_df.count()}")

train_df = train_df.dropna(subset=["label", "text"]).persist()
val_df   = val_df.dropna(subset=["label", "text"]).persist()
test_df  = test_df.dropna(subset=["label", "text"]).persist()

# Define Spark ML pipeline with PCA and Logistic Regression 
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")
remover   = StopWordsRemover(inputCol="tokens", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=2000)
idf       = IDF(inputCol="rawFeatures", outputCol="features")
pca       = PCA(k=500, inputCol="features", outputCol="pcaFeatures")
lr        = LogisticRegression(featuresCol="pcaFeatures", labelCol="label", maxIter=20, regParam=0.1)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, pca, lr])

# Train the model
model = pipeline.fit(train_df)

# Evaluate on val/test
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_f1  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

for df, name in [(val_df, "Validation"), (test_df, "Test")]:
    preds = model.transform(df)
    acc = evaluator_acc.evaluate(preds)
    f1  = evaluator_f1.evaluate(preds)
    print(f"{name} ‚Üí Accuracy: {acc:.4f},  F1: {f1:.4f}")

# Save model
model.write().overwrite().save("s3://truegraph/models/bias_classifier_pca")
print("‚úÖ Model saved to s3://truegraph/models/bias_classifier_pca")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Counts ‚Üí Train: 27978, Val: 6996, Test: 1300
Validation ‚Üí Accuracy: 0.5945,  F1: 0.5883
Test ‚Üí Accuracy: 0.6146,  F1: 0.6119
‚úÖ Model saved to s3://truegraph/models/bias_classifier_pca

In [None]:
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    RegexTokenizer, StopWordsRemover, HashingTF, IDF, PCA
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import random
import numpy as np
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Load data from Parquet to ensure efficient processing 
sdf = spark.read.parquet("s3://truegraph/data/parquet/data_random_split").repartition(128)

# Select needed columns and rename them for modeling
clean_df = sdf.select(
    col("bias").alias("label"),
    col("content").alias("text"),
    col("split"),
    col("ID")
).filter(col("label").isNotNull() & col("text").isNotNull())

# Split into train/val/test sets and persist them 
train_df = clean_df.filter(col("split") == "train").persist()
val_df   = clean_df.filter(col("split") == "valid").persist()
test_df  = clean_df.filter(col("split") == "test").persist()

print("Counts ‚Üí", "Train:", train_df.count(), "Val:", val_df.count(), "Test:", test_df.count())

# Define Spark ML pipeline
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")
remover   = StopWordsRemover(inputCol="tokens", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=2000)
idf       = IDF(inputCol="rawFeatures", outputCol="features")
pca       = PCA(k=100, inputCol="features", outputCol="pcaFeatures")
lr        = LogisticRegression(featuresCol="pcaFeatures", labelCol="label")

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, pca, lr])

# Define hyperparameter grid to search 
paramGrid = (ParamGridBuilder()
    .addGrid(lr.regParam, [0.01, 0.1, 1.0])
    .addGrid(lr.maxIter, [10, 20, 30, 50])
    .build()
)

# Define evaluator to view model performance 
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)

# Cross-validation setup to find best hyperparameters 
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
).setSeed(SEED)

# Fit model with grid search
cv_model = cv.fit(train_df)

# Evaluate on validation and test sets
for df, name in [(val_df, "Validation"), (test_df, "Test")]:
    preds = cv_model.transform(df)
    acc = MulticlassClassificationEvaluator(
        labelCol="label", predictionCol="prediction", metricName="accuracy"
    ).evaluate(preds)
    f1 = evaluator.evaluate(preds)
    print(f"{name} ‚Üí Accuracy: {acc:.4f}, F1: {f1:.4f}")

# Save best model
cv_model.bestModel.write().overwrite().save("s3://truegraph/models/bias_classifier_hyper_tuned")
print("‚úÖ Best model saved to s3://truegraph/models/bias_classifier_hyper_tuned")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Counts ‚Üí Train: 27978 Val: 6996 Test: 1300
Validation ‚Üí Accuracy: 0.5433, F1: 0.5385
Test ‚Üí Accuracy: 0.5662, F1: 0.5627
‚úÖ Best model saved to s3://truegraph/models/bias_classifier_pca_tuned

In [None]:
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    RegexTokenizer, StopWordsRemover, HashingTF, IDF, PCA
)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import random
import numpy as np
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Load the Parquet dataset 
sdf = spark.read.parquet("s3://truegraph/data/parquet/data_random_split") \
            .select(
              col("bias").alias("label"),
              col("content").alias("text"),
              col("split")
            ) \
            .filter(col("label").isNotNull() & col("text").isNotNull())

train_df = sdf.filter(col("split") == "train").persist()
val_df   = sdf.filter(col("split") == "valid").persist()
test_df  = sdf.filter(col("split") == "test").persist()

print("Counts ‚Üí", train_df.count(), val_df.count(), test_df.count())

# Build pipeline stages and set up PCA with Logistic Regression
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")
remover   = StopWordsRemover(inputCol="tokens", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=2000)
idf       = IDF(inputCol="rawFeatures",   outputCol="features")
pca       = PCA(inputCol="features",      outputCol="pcaFeatures")
# fix LR hyperparameters to reasonable defaults
lr        = LogisticRegression(featuresCol="pcaFeatures", labelCol="label",
                               regParam=0.1, maxIter=20)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, pca, lr])

# Set up a grid that only tunes PCA.k to find the best number of components
paramGrid = (ParamGridBuilder()
    .addGrid(pca.k, [200, 300, 500, 1000, 1500])
    .build()
)

# Evaluator (F1) 
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)

# CrossValidator
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
).setSeed(SEED)

# Fit on the train split (only PCA is being tuned)
cv_model = cv.fit(train_df)

# Inspect the best PCA k
bestPipelineModel = cv_model.bestModel
bestPCAmodel      = bestPipelineModel.stages[4]   # PCA is the 5th stage
print("üîç Best PCA k =", bestPCAmodel.getK())

# Evaluate on validation and test
for df, name in [(val_df, "Validation"), (test_df, "Test")]:
    preds = bestPipelineModel.transform(df)
    acc   = MulticlassClassificationEvaluator(
                labelCol="label", predictionCol="prediction", metricName="accuracy"
            ).evaluate(preds)
    f1    = evaluator.evaluate(preds)
    print(f"{name} ‚Üí Accuracy: {acc:.4f},  F1: {f1:.4f}")

# Save the tuned model
bestPipelineModel.write().overwrite().save("s3://truegraph/models/bias_classifier_pca_tuned")
print("‚úÖ Model saved to s3://truegraph/models/bias_classifier_pca_tuned")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Counts ‚Üí 27978 6996 1300
üîç Best PCA k = 1500
Validation ‚Üí Accuracy: 0.6139,  F1: 0.6110
Test ‚Üí Accuracy: 0.6138,  F1: 0.6131
‚úÖ Model saved to s3://truegraph/models/bias_classifier_pca_tuned

# RandomForest

In [None]:
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    RegexTokenizer, StopWordsRemover,
    HashingTF, IDF, PCA
)
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import random
import numpy as np
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Load and prepare data from Parquet 
sdf = spark.read.parquet("s3://truegraph/data/parquet/data_random_split") \
    .select(
      col("bias").alias("label"),
      col("content").alias("text"),
      col("split")
    ) \
    .filter(col("label").isNotNull() & col("text").isNotNull())

train_df = sdf.filter(col("split") == "train").persist()
test_df  = sdf.filter(col("split") == "test").persist()

print(f"Train rows: {train_df.count()}, Test rows: {test_df.count()}")

# Build pipeline stages
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")
remover   = StopWordsRemover(inputCol="tokens", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=2000)
idf       = IDF(inputCol="rawFeatures",   outputCol="features")
pca       = PCA(inputCol="features",      outputCol="pcaFeatures", k=100)

# Random Forest classifier
rf = RandomForestClassifier(
    featuresCol="pcaFeatures",
    labelCol="label",
    numTrees=100,     # number of trees in the forest
    maxDepth=10,      # maximum depth of each tree
    seed=42
)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, pca, rf])

# Train the model
model = pipeline.fit(train_df)

# Evaluate on test set
preds = model.transform(test_df)
evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_f1  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

acc = evaluator_acc.evaluate(preds)
f1  = evaluator_f1.evaluate(preds)
print(f"Random Forest ‚Üí Test Accuracy: {acc:.4f}, F1: {f1:.4f}")

# Save the RF model
model.write().overwrite().save("s3://truegraph/models/bias_rf_pca100_n100_d10")
print("‚úÖ Random Forest model saved to s3://truegraph/models/bias_rf_pca100_n100_d10")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Train rows: 27978, Test rows: 1300
Random Forest ‚Üí Test Accuracy: 0.5377, F1: 0.5053
‚úÖ Random Forest model saved to s3://truegraph/models/bias_rf_pca100_n100_d10

In [None]:
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, HashingTF, IDF, PCA
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import random
import numpy as np
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Load and prepare data from Parquet
sdf = spark.read.parquet("s3://truegraph/data/parquet/data_random_split") \
    .select(
        col("bias").alias("label"),
        col("content").alias("text"),
        col("split")
    ) \
    .filter(col("label").isNotNull() & col("text").isNotNull())

train_df = sdf.filter(col("split") == "train").persist()
val_df   = sdf.filter(col("split") == "valid").persist()
test_df  = sdf.filter(col("split") == "test").persist()

print(f"Counts ‚Üí Train: {train_df.count()}, Valid: {val_df.count()}, Test: {test_df.count()}")

# Build pipeline stages
tokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W+")
remover   = StopWordsRemover(inputCol="tokens", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=2000)
idf       = IDF(inputCol="rawFeatures",   outputCol="features")
pca       = PCA(inputCol="features",      outputCol="pcaFeatures", k=100)
rf        = RandomForestClassifier(
    featuresCol="pcaFeatures",
    labelCol="label",
    seed=42
)

pipeline = Pipeline(stages=[tokenizer, remover, hashingTF, idf, pca, rf])

# Set up hyperparameter grid for Random Forest 
paramGrid = (ParamGridBuilder()
    .addGrid(rf.numTrees, [100, 200, 500])
    .addGrid(rf.maxDepth, [10, 20, 30])
    .addGrid(rf.minInstancesPerNode, [1, 5, 10])
    .addGrid(rf.featureSubsetStrategy, ["auto","sqrt","log2"]) 
    .build()
)

# Evaluator (F1)
evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

# CrossValidator
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
).setSeed(SEED)

# Fit on the train split
cvModel = cv.fit(train_df)

# Inspect best RF parameters
bestModel = cvModel.bestModel
bestRF    = bestModel.stages[-1]  # RandomForest is last stage

print("üîç Best numTrees:           ", bestRF.getNumTrees)
print("üîç Best maxDepth:           ", bestRF.getMaxDepth())
print("üîç Best minInstancesPerNode:", bestRF.getMinInstancesPerNode())

# Evaluate on validation and test
for df, name in [(val_df, "Validation"), (test_df, "Test")]:
    preds = bestModel.transform(df)
    acc   = MulticlassClassificationEvaluator(
                labelCol="label",
                predictionCol="prediction",
                metricName="accuracy"
            ).evaluate(preds)
    f1    = evaluator.evaluate(preds)
    print(f"{name} ‚Üí Accuracy: {acc:.4f},  F1: {f1:.4f}")

# Save the tuned model that achieved the best performance
bestModel.write().overwrite().save("s3://truegraph/models/bias_rf_pca100_tuned")
print("‚úÖ Random Forest (tuned) saved to s3://truegraph/models/bias_rf_pca100_tuned")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Counts ‚Üí Train: 27978, Valid: 6996, Test: 1300
üîç Best numTrees:            500
üîç Best maxDepth:            20
üîç Best minInstancesPerNode: 5
Validation ‚Üí Accuracy: 0.5640,  F1: 0.5527
Test ‚Üí Accuracy: 0.5592,  F1: 0.5411
‚úÖ Random Forest (tuned) saved to s3://truegraph/models/bias_rf_pca100_tuned

# Ensemble

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType
from pyspark.ml import PipelineModel
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import random
import numpy as np
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Load my tuned models
lr_model = PipelineModel.load("s3://truegraph/models/bias_classifier_pca_tuned")
rf_model = PipelineModel.load("s3://truegraph/models/bias_rf_pca100_tuned")

# Load test data (with ID) from Parquet
sdf = spark.read.parquet("s3://truegraph/data/parquet/data_random_split") \
    .select("ID", col("bias").alias("label"), col("content").alias("text"), "split") \
    .filter(col("label").isNotNull() & col("text").isNotNull())

test_df = sdf.filter(col("split") == "test").cache()

# Get probability vectors from each model
lr_preds = lr_model.transform(test_df) \
    .select("ID", "label", "probability") \
    .witColumnRenamed("probability", "prob_lr")

rf_preds = rf_model.transform(test_df) \
    .select("ID", "probability") \
    .withColumnRenamed("probability", "prob_rf")

# Join the two sets of probabilities on ID
ensemble = lr_preds.join(rf_preds, on="ID")

# Define UDFs for averaging and argmax (returning DoubleType)
avg_udf = udf(
    lambda v1, v2: Vectors.dense([(x + y) / 2 for x, y in zip(v1, v2)]),
    VectorUDT()
)
argmax_udf = udf(
    lambda v: float(max(range(len(v)), key=lambda i: v[i])),
    DoubleType()
)

# Compute the ensemble probabilities and final prediction
ensemble = (ensemble
    .withColumn("avg_prob", avg_udf("prob_lr", "prob_rf"))
    .withColumn("prediction", argmax_udf("avg_prob"))
)

# Ensure both label and prediction are DoubleType
ensemble = (ensemble
    .withColumn("label", col("label").cast("double"))
    .withColumn("prediction", col("prediction").cast("double"))
)

# Evaluate ensemble performance
evaluator_acc = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="f1"
)

acc = evaluator_acc.evaluate(ensemble)
f1  = evaluator_f1.evaluate(ensemble)
print(f"Ensemble ‚Üí Test Accuracy: {acc:.4f},  F1: {f1:.4f}")

# Save ensemble predictions
ensemble.select("ID", "label", "prediction", "avg_prob") \
    .write.mode("overwrite") \
    .parquet("s3://truegraph/models/ensemble_preds_test")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Ensemble ‚Üí Test Accuracy: 0.6377,  F1: 0.6313

In [None]:
# Imports the package for evaluation 
from pyspark.sql.functions import col
from pyspark.mllib.evaluation import MulticlassMetrics

# Load the ensemble predictions previously saved
ensemble = spark.read.parquet("s3://truegraph/models/ensemble_preds_test")

# Confusion matrix
print("Confusion matrix (label √ó prediction):")
ensemble.groupBy("label", "prediction") \
        .count() \
        .orderBy("label", "prediction") \
        .show()

# Build an RDD of (prediction, label) and compute metrics
pred_label_rdd = ensemble.select("prediction", "label") \
                          .rdd.map(lambda r: (r[0], r[1]))
metrics = MulticlassMetrics(pred_label_rdd)

# Get the distinct labels from DataFrame
labels = sorted(ensemble.select("label").distinct().rdd.map(lambda r: r[0]).collect())

# Print per‚Äêclass Precision / Recall / F1
print("Class‚Äêwise metrics:")
for lbl in labels:
    print(f" Class {int(lbl)} ‚Üí "
          f"Precision: {metrics.precision(lbl):.4f},  "
          f"Recall: {metrics.recall(lbl):.4f},  "
          f"F1: {metrics.fMeasure(lbl):.4f}")

# 7) Overall metrics
print(f"\nOverall Accuracy = {metrics.accuracy:.4f}")
print(f"Weighted  F1      = {metrics.weightedFMeasure():.4f}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

‚öôÔ∏è  Confusion matrix (label √ó prediction):
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|  206|
|  0.0|       1.0|   38|
|  0.0|       2.0|  158|
|  1.0|       0.0|   32|
|  1.0|       1.0|  158|
|  1.0|       2.0|  109|
|  2.0|       0.0|   89|
|  2.0|       1.0|   45|
|  2.0|       2.0|  465|
+-----+----------+-----+

üìä Class‚Äêwise metrics:
 Class 0 ‚Üí Precision: 0.6300,  Recall: 0.5124,  F1: 0.5652
 Class 1 ‚Üí Precision: 0.6556,  Recall: 0.5284,  F1: 0.5852
 Class 2 ‚Üí Precision: 0.6352,  Recall: 0.7763,  F1: 0.6987

Overall Accuracy = 0.6377
Weighted  F1      = 0.6313

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types       import DoubleType
from pyspark.ml              import PipelineModel
from pyspark.ml.linalg       import Vectors, VectorUDT
from pyspark.ml.evaluation   import MulticlassClassificationEvaluator

import random, numpy as np
SEED = 42
random.seed(SEED)
np.random.seed(SEED)

# Load tuned pipelines from S3
lr_model = PipelineModel.load("s3://truegraph/models/bias_classifier_pca_tuned")
rf_model = PipelineModel.load("s3://truegraph/models/bias_rf_pca100_tuned")

# Read & cache all splits
sdf = (
    spark.read.parquet("s3://truegraph/data/parquet/data_random_split")
         .select("ID",
                 col("bias").alias("label").cast("double"),
                 col("content").alias("text"),
                 "split")
         .filter(col("label").isNotNull() & col("text").isNotNull())
         .cache()
)
sdf.count()

# Split out validation & test
val_df  = sdf.filter(col("split")=="valid").cache();  val_df.count()
test_df = sdf.filter(col("split")=="test").cache();   test_df.count()

# Pre-score both models on VALIDATION
lr_val = (lr_model.transform(val_df)
              .select("ID","label","probability")
              .withColumnRenamed("probability","prob_lr")
              .cache())
rf_val = (rf_model.transform(val_df)
              .select("ID","probability")
              .withColumnRenamed("probability","prob_rf")
              .cache())
lr_val.count(); rf_val.count()

val_ens = lr_val.join(rf_val, on="ID").cache()
val_ens.count()

# Build TEST ensemble likewise
lr_test = (lr_model.transform(test_df)
                .select("ID","label","probability")
                .withColumnRenamed("probability","prob_lr")
                .cache())
rf_test = (rf_model.transform(test_df)
                .select("ID","probability")
                .withColumnRenamed("probability","prob_rf")
                .cache())
lr_test.count(); rf_test.count()

test_ens = lr_test.join(rf_test, on="ID").cache()
test_ens.count()

# UDF for argmax
argmax_udf = udf(lambda v: float(max(range(len(v)), key=lambda i: v[i])), DoubleType())

# Evaluator
evaluator = MulticlassClassificationEvaluator(labelCol="label",
                                               predictionCol="prediction",
                                               metricName="f1")

# Grid-search weights on VAL
best_w, best_f1 = None, -1.0
for w in [i*0.1 for i in range(11)]:
    weighted_udf = udf(
        lambda v1, v2: Vectors.dense([w*x + (1-w)*y for x,y in zip(v1,v2)]),
        VectorUDT()
    )
    preds = (val_ens
        .withColumn("weighted_prob", weighted_udf("prob_lr","prob_rf"))
        .withColumn("prediction",   argmax_udf("weighted_prob"))
    )
    f1 = evaluator.evaluate(preds)
    print(f" w={w:.1f} ‚Üí F1={f1:.4f}")
    if f1 > best_f1:
        best_f1, best_w = f1, w

print(f"\nüîç Best weight on VAL ‚Üí w_LR={best_w:.1f}, w_RF={1-best_w:.1f}, F1={best_f1:.4f}\n")

# Apply best weight to TEST
weighted_udf = udf(
    lambda v1, v2: Vectors.dense([best_w*x + (1-best_w)*y for x,y in zip(v1,v2)]),
    VectorUDT()
)
test_preds = (test_ens
    .withColumn("weighted_prob", weighted_udf("prob_lr","prob_rf"))
    .withColumn("prediction",   argmax_udf("weighted_prob"))
    .withColumn("label",        col("label").cast("double"))
    .withColumn("prediction",   col("prediction").cast("double"))
)

# Final metrics
acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")\
      .evaluate(test_preds)
f1  = evaluator.evaluate(test_preds)
print(f"Test ‚Üí Accuracy: {acc:.4f}, F1: {f1:.4f}\n")

# Confusion matrix
print("Confusion Matrix:")
test_preds.groupBy("label","prediction").count()\
          .orderBy("label","prediction").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

 w=0.0 ‚Üí F1=0.5527
 w=0.1 ‚Üí F1=0.5713
 w=0.2 ‚Üí F1=0.5872
 w=0.3 ‚Üí F1=0.5995
 w=0.4 ‚Üí F1=0.6075
 w=0.5 ‚Üí F1=0.6163
 w=0.6 ‚Üí F1=0.6213
 w=0.7 ‚Üí F1=0.6221
 w=0.8 ‚Üí F1=0.6170
 w=0.9 ‚Üí F1=0.6119
 w=1.0 ‚Üí F1=0.6110

üîç Best weight on VAL ‚Üí w_LR=0.7, w_RF=0.3, F1=0.6221

Test ‚Üí Accuracy: 0.6269, F1: 0.6229

Confusion Matrix:
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|  206|
|  0.0|       1.0|   50|
|  0.0|       2.0|  146|
|  1.0|       0.0|   36|
|  1.0|       1.0|  168|
|  1.0|       2.0|   95|
|  2.0|       0.0|   97|
|  2.0|       1.0|   61|
|  2.0|       2.0|  441|
+-----+----------+-----+

In [None]:
from pyspark.sql.functions import col
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load your tuned base models
lr_model = PipelineModel.load("s3://truegraph/models/bias_classifier_pca_tuned")
rf_model = PipelineModel.load("s3://truegraph/models/bias_rf_pca100_tuned")

# Load validation split 
sdf = spark.read.parquet("s3://truegraph/data/parquet/data_random_split") \
    .select("ID",
            col("bias").alias("label").cast("double"),
            col("content").alias("text"),
            "split") \
    .filter(col("label").isNotNull() & col("text").isNotNull())

val_df  = sdf.filter(col("split") == "valid").cache()
test_df = sdf.filter(col("split") == "test").cache()

# Generate base‚Äêmodel probabilities on validation
lr_val = lr_model.transform(val_df) \
    .select("ID","label","probability") \
    .withColumnRenamed("probability","prob_lr")

rf_val = rf_model.transform(val_df) \
    .select("ID","probability") \
    .withColumnRenamed("probability","prob_rf")

meta_train = lr_val.join(rf_val, on="ID")

# Assemble those two prob‚Äêvectors into one ‚Äúmeta‚Äù feature
assembler = VectorAssembler(
    inputCols=["prob_lr","prob_rf"],
    outputCol="metaFeatures"
)
meta_train = assembler.transform(meta_train).select("metaFeatures","label")

# Train a small LogisticRegression as the meta‚Äêlearner
meta_lr = LogisticRegression(
    featuresCol="metaFeatures",
    labelCol="label",
    maxIter=10,
    regParam=0.1
)
meta_model = meta_lr.fit(meta_train)

# Now build your meta‚Äêtest set from the actual test split
lr_test = lr_model.transform(test_df) \
    .select("ID","label","probability") \
    .withColumnRenamed("probability","prob_lr")

rf_test = rf_model.transform(test_df) \
    .select("ID","probability") \
    .withColumnRenamed("probability","prob_rf")

meta_test = lr_test.join(rf_test, on="ID")
meta_test = assembler.transform(meta_test).select("metaFeatures","label")

# Apply the meta‚Äêmodel to get final predictions
final = meta_model.transform(meta_test)

# Evaluate
e_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
e_f1  = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

print(f"Stacked Ensemble ‚Üí Test Accuracy: {e_acc.evaluate(final):.4f}, F1: {e_f1.evaluate(final):.4f}")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1748314322528_0004,pyspark,idle,Link,Link,‚úî


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

Stacked Ensemble ‚Üí Test Accuracy: 0.6354, F1: 0.6339

In [None]:
from pyspark.sql.functions import col
from pyspark.mllib.evaluation import MulticlassMetrics

# Load saved ensemble predictions
ensemble = spark.read.parquet("s3://truegraph/models/ensemble_preds_test") \
               .select("label", "prediction")

# Build & show the confusion matrix
conf_df = (ensemble
  .groupBy("label","prediction")
  .count()
  .orderBy("label","prediction")
)
conf_df.show()

# Compute metrics via MulticlassMetrics
rdd = ensemble.rdd.map(lambda r: (float(r.prediction), float(r.label)))
metrics = MulticlassMetrics(rdd)

# Get the distinct labels from the DataFrame
labels = sorted([row[0] for row in ensemble.select("label").distinct().collect()])

# Build a Spark DataFrame of per-class metrics
rows = []
for lbl in labels:
    rows.append(( 
        int(lbl),
        float(metrics.precision(lbl)),
        float(metrics.recall(lbl)),
        float(metrics.fMeasure(lbl))
    ))
prf_df = spark.createDataFrame(rows, schema=["label","precision","recall","f1"])

# Display it
print(" Class-wise Precision / Recall / F1:")
prf_df.show()

# Also print overall stats
print(f"\nOverall Accuracy = {metrics.accuracy:.4f}")
print(f"Weighted   F1 = {metrics.weightedFMeasure():.4f}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),‚Ä¶

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|  206|
|  0.0|       1.0|   38|
|  0.0|       2.0|  158|
|  1.0|       0.0|   32|
|  1.0|       1.0|  158|
|  1.0|       2.0|  109|
|  2.0|       0.0|   89|
|  2.0|       1.0|   45|
|  2.0|       2.0|  465|
+-----+----------+-----+

üìä Class-wise Precision / Recall / F1:
+-----+------------------+------------------+------------------+
|label|         precision|            recall|                f1|
+-----+------------------+------------------+------------------+
|    0|0.6299694189602446|0.5124378109452736|0.5651577503429356|
|    1|0.6556016597510373|0.5284280936454849|0.5851851851851851|
|    2|0.6352459016393442|0.7762938230383973|0.6987227648384672|
+-----+------------------+------------------+------------------+


Overall Accuracy = 0.6377
Weighted   F1 = 0.6313