In [2]:
spark.sparkContext.setLogLevel("ERROR")

In [3]:
# 1. Load Dataset

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

spark = SparkSession.builder.appName("AirlinesClassification").getOrCreate()

# Load dataset
df = spark.read.csv("/home/sita/Downloads/airlines.csv", header=True, inferSchema=True)

print("=== Dataset Loaded ===")
df.printSchema()
df.show(5)
print("Total Rows:", df.count())


                                                                                

=== Dataset Loaded ===
root
 |-- index: integer (nullable = true)
 |-- airline: string (nullable = true)
 |-- flight: string (nullable = true)
 |-- source_city: string (nullable = true)
 |-- departure_time: string (nullable = true)
 |-- stops: string (nullable = true)
 |-- arrival_time: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- class: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- days_left: integer (nullable = true)
 |-- price: integer (nullable = true)

+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|index| airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|
+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+
|    0|SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1|

In [5]:
# 1.2 Create classification label: price above median = expensive (1)

median_price = df.approxQuantile("price", [0.5], 0.01)[0]

df = df.withColumn("Label", when(col("price") > median_price, 1).otherwise(0))

df.select("price", "Label").show(10)
print("Median Price:", median_price)


                                                                                

+-----+-----+
|price|Label|
+-----+-----+
| 5953|    0|
| 5953|    0|
| 5956|    0|
| 5955|    0|
| 5955|    0|
| 5955|    0|
| 6060|    0|
| 6060|    0|
| 5954|    0|
| 5954|    0|
+-----+-----+
only showing top 10 rows

Median Price: 7412.0


In [6]:
# 1.3 String Indexing (categorical â†’ numeric)

categorical_cols = ["airline", "source_city", "stops", "destination_city", "class"]

indexers = [
    StringIndexer(inputCol=col_name, outputCol=col_name + "_idx")
    for col_name in categorical_cols
]

for indexer in indexers:
    df = indexer.fit(df).transform(df)

df.show(5)


                                                                                

+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+-----+-----------+---------------+---------+--------------------+---------+
|index| airline| flight|source_city|departure_time|stops| arrival_time|destination_city|  class|duration|days_left|price|Label|airline_idx|source_city_idx|stops_idx|destination_city_idx|class_idx|
+-----+--------+-------+-----------+--------------+-----+-------------+----------------+-------+--------+---------+-----+-----+-----------+---------------+---------+--------------------+---------+
|    0|SpiceJet|SG-8709|      Delhi|       Evening| zero|        Night|          Mumbai|Economy|    2.17|        1| 5953|    0|        5.0|            0.0|      1.0|                 0.0|      0.0|
|    1|SpiceJet|SG-8157|      Delhi| Early_Morning| zero|      Morning|          Mumbai|Economy|    2.33|        1| 5953|    0|        5.0|            0.0|      1.0|                 0.0|      0.0|
|    2| AirAsia

In [7]:
# 1.4 Feature Assembler

feature_cols = [
    "duration",
    "days_left",
    "airline_idx",
    "source_city_idx",
    "stops_idx",
    "destination_city_idx",
    "class_idx"
]

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="Features"
)

df = assembler.transform(df)

df.select("Features", "Label").show(5, truncate=False)


+------------------------------+-----+
|Features                      |Label|
+------------------------------+-----+
|[2.17,1.0,5.0,0.0,1.0,0.0,0.0]|0    |
|[2.33,1.0,5.0,0.0,1.0,0.0,0.0]|0    |
|[2.17,1.0,4.0,0.0,1.0,0.0,0.0]|0    |
|(7,[0,1,4],[2.25,1.0,1.0])    |0    |
|(7,[0,1,4],[2.33,1.0,1.0])    |0    |
+------------------------------+-----+
only showing top 5 rows



In [8]:
# Train Test Split

train, test = df.randomSplit([0.7, 0.3], seed=42)
print("Train rows:", train.count())
print("Test rows:", test.count())


                                                                                

Train rows: 210119




Test rows: 90034


                                                                                

In [9]:
# 2. Build Logistic Regression Model

lr = LogisticRegression(featuresCol="Features", labelCol="Label")
model = lr.fit(train)

print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)


                                                                                

Coefficients: [0.06921004979018004,-0.1299200898234925,-0.4010513120144647,0.009670846055984816,0.40834009006313604,-0.0026495549012668664,26.68598976305024]
Intercept: 1.491114542388675


In [10]:
# Model Evaluation

pred = model.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="Label", predictionCol="prediction")

acc = evaluator.evaluate(pred, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(pred, {evaluator.metricName: "f1"})

print("=== MODEL PERFORMANCE ===")
print("Accuracy:", round(acc, 4))
print("F1 Score:", round(f1, 4))




=== MODEL PERFORMANCE ===
Accuracy: 0.8993
F1 Score: 0.8992


                                                                                

In [12]:
# 3. Hyperparameter Tuning

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.0, 0.01, 0.1])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [10, 30, 50])
             .build())

cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=MulticlassClassificationEvaluator(labelCol="Label"),
    numFolds=3
)

print("=== Running Cross Validation ===")

cv_model = cv.fit(train)
best_model = cv_model.bestModel

print("Best RegParam:", best_model._java_obj.getRegParam())
print("Best ElasticNet:", best_model._java_obj.getElasticNetParam())
print("Best MaxIter:", best_model._java_obj.getMaxIter())


=== Running Cross Validation ===


                                                                                

Best RegParam: 0.01
Best ElasticNet: 1.0
Best MaxIter: 10


In [13]:
# Final Evaluation After Tuning

final_pred = best_model.transform(test)

final_acc = evaluator.evaluate(final_pred, {evaluator.metricName: "accuracy"})
final_f1  = evaluator.evaluate(final_pred, {evaluator.metricName: "f1"})

print("=== FINAL MODEL RESULTS ===")
print("Final Accuracy:", round(final_acc, 4))
print("Final F1 Score:", round(final_f1, 4))


                                                                                

=== FINAL MODEL RESULTS ===
Final Accuracy: 0.9017
Final F1 Score: 0.9016


                                                                                