In [0]:
from pyspark.sql import SparkSession  # This imports the SparkSession class, the entry point for using Spark.
from pyspark.sql.functions import col, when  # Imports functions for column selection and conditional operations.
from pyspark.ml.feature import VectorAssembler, StringIndexer  # Tools for feature engineering: creating feature vectors and indexing categorical data.
from pyspark.ml import Pipeline  # Enables building machine learning pipelines in PySpark.
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier  # Machine learning algorithms for classification tasks.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator  # Evaluates the performance of classification models.

spark = SparkSession.builder \
    .appName("PySpark ML Pipeline") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")


# Load Dataset (simulated data)
data = [
    (1, 34, 15000, "No"),
    (2, 45, 20000, "Yes"),
    (3, 23, 12000, "No"),
    (4, 36, 30000, "No"),
    (5, 52, 8000, "Yes"),
    (6, 30, 22000, "No"),
    (7, 40, 10000, "Yes"),
    (8, 28, 14000, "No"),
    (9, 35, 25000, "No"),
    (10, 48, 7000, "Yes")
]
columns = ["CustomerID", "Age", "Income", "Churn"]

# Create DataFrame
df = spark.createDataFrame(data, columns)  # Converts the dataset into a Spark DataFrame for processing.

# Data Preprocessing
# Convert target variable 'Churn' into numerical format
indexer = StringIndexer(inputCol="Churn", outputCol="label")  # Converts the 'Churn' column into a numeric label.

# Feature engineering
feature_cols = ["Age", "Income"]  # Specifies the columns to be used as features.
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")  # Combines features into a single vector.

# Splitting the dataset into training and testing
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)  # Splits data into 80% training and 20% testing.

# Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label")  # Initializes the Logistic Regression model.

# Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="label")  # Initializes the Random Forest model.

# Build a Pipeline for Logistic Regression
lr_pipeline = Pipeline(stages=[indexer, assembler, lr])  # Defines a pipeline with preprocessing and Logistic Regression.
#indexer: Encodes the target variable (Churn) into numeric format.
#assembler: Combines feature columns (Age and Income) into a single vector.
#lr: Trains a logistic regression model using the processed features and labels.

# Train Logistic Regression model
lr_model = lr_pipeline.fit(train_data)  # Fits the pipeline to the training data.

# Evaluate Logistic Regression model
if not test_data.isEmpty():  # Ensure test_data is not empty to avoid errors
    lr_predictions = lr_model.transform(test_data)  # Generates predictions for the test dataset.
    lr_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")  # Evaluates accuracy of predictions.
    lr_accuracy = lr_evaluator.evaluate(lr_predictions)  # Calculates accuracy for Logistic Regression.
    print(f"Logistic Regression Accuracy: {lr_accuracy:.2f}")
else:
    print("Test data is empty. Cannot evaluate Logistic Regression model.")

# Build a Pipeline for Random Forest
rf_pipeline = Pipeline(stages=[indexer, assembler, rf])  # Defines a pipeline with preprocessing and Random Forest.

# Train Random Forest model
rf_model = rf_pipeline.fit(train_data)  # Fits the pipeline to the training data.

# Evaluate Random Forest model
if not test_data.isEmpty():  # Ensure test_data is not empty to avoid errors
    rf_predictions = rf_model.transform(test_data)  # Generates predictions for the test dataset.
    rf_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")  # Evaluates accuracy of predictions.
    rf_accuracy = rf_evaluator.evaluate(rf_predictions)  # Calculates accuracy for Random Forest.
    print(f"Random Forest Accuracy: {rf_accuracy:.2f}")

    # Compare Results
    if lr_accuracy > rf_accuracy:
        print("Logistic Regression performs better.")
    else:
        print("Random Forest performs better.")
else:
    print("Test data is empty. Cannot evaluate Random Forest model.")

# Save model (optional)
rf_model.write().overwrite().save("rf_model_path")  # Saves the trained Random Forest model.
lr_model.write().overwrite().save("lr_model_path")  # Saves the trained Logistic Regression model.

# Stop Spark session
# # Stops the Spark session to release resources.


Logistic Regression Accuracy: 1.00
Random Forest Accuracy: 1.00
Random Forest performs better.


In summary, Random Forest generally performs better in real-world scenarios due to its ability to capture complex patterns, handle noise, and manage feature interactions. However, the accuracy of 1.00 on the given dataset for both models might indicate that the dataset is simple or that both models have been fine-tuned to the point of overfitting.