<a href="https://colab.research.google.com/github/jahnavivummaneni/Breast-Cancer-EarlyPrediction/blob/main/breast_2025_bigdata.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# 1. Install Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# 2. Download Spark 3.3.1 (from Apache archive)
!wget -O spark-3.3.1-bin-hadoop3.tgz https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

# 3. Extract Spark
!tar -xzf spark-3.3.1-bin-hadoop3.tgz

# 4. Install findspark
!pip install -q findspark


--2025-07-18 12:55:08--  https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz
Resolving archive.apache.org (archive.apache.org)... 65.108.204.189, 2a01:4f9:1a:a084::2
Connecting to archive.apache.org (archive.apache.org)|65.108.204.189|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 299350810 (285M) [application/x-gzip]
Saving to: ‘spark-3.3.1-bin-hadoop3.tgz’


2025-07-18 12:55:26 (16.5 MB/s) - ‘spark-3.3.1-bin-hadoop3.tgz’ saved [299350810/299350810]



In [None]:
import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

findspark.init()


In [None]:
# Step 2: Initialize SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BreastCancerPrediction") \
    .getOrCreate()

# Step 3: Load the CSV dataset
df = spark.read.csv("breast-cancer-dataset.csv", header=True, inferSchema=True)

# Step 4: Print schema
df.printSchema()


root
 |-- S/N: integer (nullable = true)
 |-- Year: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Menopause: integer (nullable = true)
 |-- Tumor Size (cm): string (nullable = true)
 |-- Inv-Nodes: string (nullable = true)
 |-- Breast: string (nullable = true)
 |-- Metastasis: string (nullable = true)
 |-- Breast Quadrant: string (nullable = true)
 |-- History: string (nullable = true)
 |-- Diagnosis Result: string (nullable = true)



In [None]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

# Step 1: Cast numeric columns to double
df = df.withColumn("Age", col("Age").cast("double")) \
       .withColumn("Tumor Size (cm)", col("Tumor Size (cm)").cast("double")) \
       .withColumn("Inv-Nodes", col("Inv-Nodes").cast("double")) \
       .withColumn("Metastasis", col("Metastasis").cast("double"))

# Step 2: Index categorical columns
indexers = [
    StringIndexer(inputCol="Diagnosis Result", outputCol="Diagnosis_idx"),
    StringIndexer(inputCol="Menopause", outputCol="Menopause_idx"),
    StringIndexer(inputCol="Breast", outputCol="Breast_idx"),
    StringIndexer(inputCol="Breast Quadrant", outputCol="Quadrant_idx")
]

# Step 3: Assemble features
assembler = VectorAssembler(
    inputCols=["Age", "Tumor Size (cm)", "Inv-Nodes", "Metastasis"],
    outputCol="features"
)

# Step 4: Create pipeline and transform
pipeline = Pipeline(stages=indexers + [assembler])
processed_df = pipeline.fit(df).transform(df)

# Step 5: Show result
processed_df.select("features", "Diagnosis_idx").show(5)


+------------------+-------------+
|          features|Diagnosis_idx|
+------------------+-------------+
|[40.0,2.0,0.0,0.0]|          0.0|
|[39.0,2.0,0.0,0.0]|          0.0|
|[45.0,4.0,0.0,0.0]|          0.0|
|[26.0,3.0,0.0,0.0]|          0.0|
|[21.0,1.0,0.0,0.0]|          0.0|
+------------------+-------------+
only showing top 5 rows



In [None]:
# Drop any rows with nulls in relevant columns
df_clean = df.dropna(subset=["Age", "Tumor Size (cm)", "Inv-Nodes", "Metastasis"])

# Reapply pipeline
processed_df = pipeline.fit(df_clean).transform(df_clean)


In [None]:
from pyspark.ml.feature import Imputer

# Impute missing values with mean
imputer = Imputer(
    inputCols=["Age", "Tumor Size (cm)", "Inv-Nodes", "Metastasis"],
    outputCols=["Age", "Tumor Size (cm)", "Inv-Nodes", "Metastasis"]
)

df_imputed = imputer.fit(df).transform(df)
processed_df = pipeline.fit(df_imputed).transform(df_imputed)


In [None]:
assembler = VectorAssembler(
    inputCols=["Age", "Tumor Size (cm)", "Inv-Nodes", "Metastasis"],
    outputCol="features",
    handleInvalid="skip"  # <-- NEW!
)

pipeline = Pipeline(stages=indexers + [assembler])
processed_df = pipeline.fit(df).transform(df)


In [None]:
# Split the cleaned data
train_df, test_df = processed_df.randomSplit([0.8, 0.2], seed=42)

print(f"Training samples: {train_df.count()}")
print(f"Testing samples: {test_df.count()}")


Training samples: 179
Testing samples: 33


In [None]:
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(
    labelCol="Diagnosis_idx",
    featuresCol="features",
    maxIter=100,
    maxDepth=5,
    stepSize=0.1,
    seed=42
)

gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)


In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="Diagnosis_idx", predictionCol="prediction"
)

for metric in ["accuracy", "f1", "weightedPrecision", "weightedRecall"]:
    evaluator.setMetricName(metric)
    score = evaluator.evaluate(gbt_predictions)
    print(f"{metric.replace('weighted', 'Weighted ').title()}: {score:.2f}")


Accuracy: 0.91
F1: 0.91
Weighted Precision: 0.92
Weighted Recall: 0.91
