In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import (
    NaiveBayes,
    DecisionTreeClassifier,
    RandomForestClassifier,
    LogisticRegression,
)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
import time

In [None]:
# To simulate a cluster environment, change the instance size to test multi instance performance

spark = (
    SparkSession.builder.master("local")
    .appName("Sar_Image_Analysis")
    .config("spark.driver.cores", "2")
    .config("spark.driver.memory", "2g")
    .config("spark.executor.cores", "2")
    .config("spark.executor.memory", "2g")
    .config("spark.driver.maxResultSize", "3g")
    .config("spark.executor.instances", "4")
    .getOrCreate()
)

# Read Training Data along with its labels

In [None]:
from functools import reduce

# This assumes that all datasets have the same schema
# Load the datasets

alongside = spark.read.csv("data/alongside.csv").withColumn("string_label", F.lit("alongside"))
building = spark.read.csv("data/building.csv").withColumn("string_label", F.lit("building"))
road = spark.read.csv("data/road.csv").withColumn("string_label", F.lit("road"))
vegetation = spark.read.csv("data/vegetation.csv").withColumn("string_label", F.lit("vegetation"))
water = spark.read.csv("data/water.csv").withColumn("string_label", F.lit("water"))

# Combine all datasets into one training dataset
training_dataset = (
    alongside
    .union(building)
    .union(road)
    .union(vegetation)
    .union(water)
)

# Convert Feature format to float and Assemble Vectors

In [None]:
"""
Convert all feature columns to FloatType This is necessary for MLlib to work with the data.
We assume that all columns except the last one are features The last column is the label.
"""

# Identify all feature columns (all columns except the last one which is the label)
feature_columns = training_dataset.columns[:-1]  # All columns except the last one

# Cast multiple columns at once
training_dataset = training_dataset.select(
    *[F.col(c).cast(FloatType()).alias(c) for c in feature_columns],
    F.col("string_label")  # Keep label column as is
)

In [None]:
# Create feature vector using VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
training_dataset = assembler.transform(training_dataset).select("features", "string_label")

In [None]:
# String indexing for the label column
string_indexer = StringIndexer(inputCol="string_label", outputCol="label", stringOrderType="alphabetAsc")
training_dataset = string_indexer.fit(training_dataset).transform(training_dataset)

In [None]:
"""
Split the dataset into training and test sets
Note: In a real-world scenario, you would typically want to use a stratified split
to ensure that each class is represented in both sets.
For simplicity, we will use a random split here
"""
train, test = training_dataset.randomSplit([0.8, 0.2], seed=42)

# Inıtalize classifier evaluator
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy"
)

# 1-Machine learning models for classification

## 1.1-Naive Bayes Model

In [None]:
naive_bayes_model = NaiveBayes(
    smoothing=0.01, modelType="multinomial", featuresCol="features", labelCol="label"
).fit(train)

predictions = naive_bayes_model.transform(test)

accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

## 1.2-Random Forest Classifier

In [None]:
random_forest_model = RandomForestClassifier(
    featuresCol="features", labelCol="label", numTrees=10, maxDepth=10
).fit(train)

predictions = random_forest_model.transform(test)

accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

## 1.3-Decision Tree Classifier

In [None]:
decision_tree_model = DecisionTreeClassifier(
    featuresCol="features", labelCol="label", maxDepth=10
).fit(train)

predictions = decision_tree_model.transform(test)

accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

## 1.4-Logistic Regression Classifier

In [None]:
logistic_regression_model = LogisticRegression(
    featuresCol="features",
    labelCol="label",
    maxIter=10,
    regParam=0.01,
    elasticNetParam=0.8,
    standardization=False,
).fit(train)

predictions = logistic_regression_model.transform(test)

accuracy = evaluator.evaluate(predictions)

In [None]:
training_summary = logistic_regression_model.summary

metrics = {
    "Accuracy": training_summary.accuracy,
    "FPR": training_summary.weightedFalsePositiveRate,
    "TPR": training_summary.weightedTruePositiveRate,
    "F-measure": training_summary.weightedFMeasure(),
    "Precision": training_summary.weightedPrecision,
    "Recall": training_summary.weightedRecall,
}

for metric, value in metrics.items():
    print(f"{metric}: {value}")

# 2-Cross Validation for all Models tested above

In [None]:
paramGrid = ParamGridBuilder().build()

models = [
    (
        NaiveBayes(
            smoothing=0.01,
            modelType="multinomial",
            featuresCol="features",
            labelCol="label",
        ),
        "Naive Bayes",
    ),
    (
        RandomForestClassifier(
            featuresCol="features", labelCol="label", numTrees=10, maxDepth=10
        ),
        "Random Forest",
    ),
    (
        DecisionTreeClassifier(featuresCol="features", labelCol="label", maxDepth=10),
        "Decision Tree",
    ),
    (
        LogisticRegression(
            featuresCol="features",
            labelCol="label",
            maxIter=300,
            regParam=0.01,
            elasticNetParam=0.8,
            standardization=False,
        ),
        "Logistic Regression",
    ),
]

In [None]:
results = []

for model, model_name in models:
    start = time.time()
    cv = CrossValidator(
        estimator=Pipeline(stages=[model]),
        estimatorParamMaps=paramGrid,
        evaluator=evaluator,
        numFolds=10,
        parallelism=10
    )
    
    cvModel = cv.fit(training_dataset)
    accuracy = evaluator.evaluate(cvModel.transform(training_dataset))
    results.append((model_name, accuracy, time.time()-start))

for name, accuracy, duration in results:
    print(f"{name}: Accuracy = {accuracy*100:.2f}% | Time = {duration:.2f}s")

# 3-Run a trained model on full scale SAR image

In [None]:
# We use decision tree as the final model

decision_tree_model = DecisionTreeClassifier(
    featuresCol="features", labelCol="label", maxDepth=10
).fit(training_dataset)

In [None]:
# Read the full scale SAR image in parquet format
indiana_df = spark.read.parquet("data/indiana.parquet")

# Apply the same preprocessing steps as for the training data
# Convert feature columns to FloatType
indiana_df = indiana_df.select(*[F.col(c).cast(FloatType()).alias(c) for c in indiana_df.columns])

# Assemble features into vector
assembler = VectorAssembler(inputCols=indiana_df.columns, outputCol="features")
indiana_df = assembler.transform(indiana_df).select("features")

In [None]:
# Apply the trained decision tree model to classify the data
indiana_predictions = decision_tree_model.transform(indiana_df)

# Calculate total number of predictions to compute percentages
total = indiana_predictions.count()

# Group by the predicted string label and calculate count and percentage
result = (
    indiana_predictions.groupBy("prediction")
    .count()
    .withColumn("percentage", F.round(F.col("count") * 100 / total))
)

result.show()