In [1]:
# Import các thư viện cần thiết
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import (DecisionTreeClassifier,
                                       RandomForestClassifier, 
                                       GBTClassifier)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from hdfs import InsecureClient

In [2]:
# Kết nối với HDFS và tạo Spark Session
client = InsecureClient('http://localhost:9870', user='thanhtin')
spark = (SparkSession.builder.appName("trainmodel_phanlop")
         .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000")
         .getOrCreate())

In [3]:
# Đường dẫn file HDFS
train_path = "hdfs://localhost:9000/thanhtin/train-airline.csv"
test_path = "hdfs://localhost:9000/thanhtin/test-airline.csv"

# Đọc file train và test từ HDFS vào DataFrame
train_df = spark.read.csv(train_path, header=True, inferSchema=True)
test_df = spark.read.csv(test_path, header=True, inferSchema=True)

# Loại bỏ các giá trị null nếu có
train_df = train_df.na.drop()
test_df = test_df.na.drop()

# Hiển thị một vài dòng dữ liệu để kiểm tra
train_df.show(5)
test_df.show(5)


+------+------+-----------------+---+---------------+--------+---------------+---------------------+---------------------------------+----------------------+-------------+--------------+---------------+------------+----------------------+----------------+----------------+----------------+---------------+----------------+-----------+--------------------------+------------------------+--------------------+
|    id|Gender|    Customer Type|Age| Type of Travel|   Class|Flight Distance|Inflight wifi service|Departure/Arrival time convenient|Ease of Online booking|Gate location|Food and drink|Online boarding|Seat comfort|Inflight entertainment|On-board service|Leg room service|Baggage handling|Checkin service|Inflight service|Cleanliness|Departure Delay in Minutes|Arrival Delay in Minutes|        satisfaction|
+------+------+-----------------+---+---------------+--------+---------------+---------------------+---------------------------------+----------------------+-------------+-------------

In [4]:
# Mã hóa các cột chuỗi thành dạng số bằng StringIndexer
indexers = [
    StringIndexer(inputCol=column, outputCol=column + "_index").fit(train_df)
    for column in ["Gender", "Customer Type", "Type of Travel", "Class"]
]

for indexer in indexers:
    train_df = indexer.transform(train_df)
    test_df = indexer.transform(test_df)

# Cập nhật danh sách các cột đặc trưng sau khi thêm các cột đã được mã hóa
feature_columns = ["Age", "Flight Distance", "Inflight wifi service", "Departure/Arrival time convenient",
                   "Ease of Online booking", "Gate location", "Food and drink", "Online boarding", 
                   "Seat comfort", "Inflight entertainment", "On-board service", "Leg room service", 
                   "Baggage handling", "Checkin service", "Inflight service", "Cleanliness", 
                   "Departure Delay in Minutes", "Arrival Delay in Minutes", 
                   "Gender_index", "Customer Type_index", "Type of Travel_index", "Class_index"]

# Tạo cột 'features' với VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
train_df = assembler.transform(train_df)
test_df = assembler.transform(test_df)

# Chuyển cột 'satisfaction' thành nhãn số (label) chỉ trên tập train, sau đó dùng lại cho test
satisfaction_indexer = StringIndexer(inputCol="satisfaction", outputCol="label").fit(train_df)
train_df = satisfaction_indexer.transform(train_df)
test_df = satisfaction_indexer.transform(test_df)

# Chia dữ liệu train thành tập train và tập validation
train_data, val_data = train_df.randomSplit([0.8, 0.2], seed=42)

# Hiển thị kết quả sau khi tiền xử lý
train_df.select("features", "label").show(5)


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[13.0,460.0,3.0,4...|  0.0|
|[25.0,235.0,3.0,2...|  0.0|
|[26.0,1142.0,2.0,...|  1.0|
|[25.0,562.0,2.0,5...|  0.0|
|[61.0,214.0,3.0,3...|  1.0|
+--------------------+-----+
only showing top 5 rows



In [5]:
# Khởi tạo và huấn luyện các mô hình
# dt = DecisionTreeClassifier(featuresCol="features", labelCol="label",maxDepth=15)
# dt_model = dt.fit(train_data)

rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=150, maxDepth=15)
rf_model = rf.fit(train_data)

# gbt = GBTClassifier(featuresCol="features", labelCol="label", maxIter=20, maxDepth=5, stepSize=0.1)
# gbt_model = gbt.fit(train_data)



# Dự đoán trên tập validation
# predictions = dt_model.transform(val_data)
# predictions.select("features", "label", "prediction").show(5)
rf_predictions = rf_model.transform(val_data)
rf_predictions.select("features", "label", "prediction").show(5)
# gbt_predictions = gbt_model.transform(val_data)
# gbt_predictions.select("features", "label", "prediction").show(5)


+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|[27.0,421.0,1.0,2...|  0.0|       0.0|
|[47.0,1676.0,4.0,...|  1.0|       1.0|
|[48.0,3088.0,4.0,...|  1.0|       1.0|
|[23.0,173.0,3.0,0...|  0.0|       0.0|
|[24.0,108.0,0.0,0...|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 5 rows



In [6]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Khởi tạo evaluator với các tham số khác nhau
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

# Đánh giá các chỉ số cho mô hình Decision Tree
# dt_accuracy = evaluator_accuracy.evaluate(predictions)
# dt_precision = evaluator_precision.evaluate(predictions)
# dt_recall = evaluator_recall.evaluate(predictions)
# dt_f1 = evaluator_f1.evaluate(predictions)
# 
# print(f"Decision Tree Accuracy: {dt_accuracy}")
# print(f"Decision Tree Precision: {dt_precision}")
# print(f"Decision Tree Recall: {dt_recall}")
# print(f"Decision Tree F1 Score: {dt_f1}")




# Đánh giá các chỉ số cho Random Forest (nếu cần)
rf_accuracy = evaluator_accuracy.evaluate(rf_predictions)
rf_precision = evaluator_precision.evaluate(rf_predictions)
rf_recall = evaluator_recall.evaluate(rf_predictions)
rf_f1 = evaluator_f1.evaluate(rf_predictions)

print(f"Random Forest Accuracy: {rf_accuracy}")
print(f"Random Forest Precision: {rf_precision}")
print(f"Random Forest Recall: {rf_recall}")
print(f"Random Forest F1 Score: {rf_f1}")

# Đánh giá các chỉ số cho Gradient Boosted Tree (nếu cần)
# gbt_accuracy = evaluator_accuracy.evaluate(gbt_predictions)
# gbt_precision = evaluator_precision.evaluate(gbt_predictions)
# gbt_recall = evaluator_recall.evaluate(gbt_predictions)
# gbt_f1 = evaluator_f1.evaluate(gbt_predictions)
# 
# print(f"Gradient Boosted Tree Accuracy: {gbt_accuracy}")
# print(f"Gradient Boosted Tree Precision: {gbt_precision}")
# print(f"Gradient Boosted Tree Recall: {gbt_recall}")
# print(f"Gradient Boosted Tree F1 Score: {gbt_f1}")


Random Forest Accuracy: 0.9606861696957916
Random Forest Precision: 0.9608028940880025
Random Forest Recall: 0.9606861696957917
Random Forest F1 Score: 0.9606183636171961


In [7]:
# Decision Tree Feature Importance
rf_feature_importance = rf_model.featureImportances
print("RF Feature Importances::")
for feature, importance in zip(feature_columns, rf_feature_importance):
    print(f"{feature}: {importance}")


RF Feature Importances::
Age: 0.021609152457023408
Flight Distance: 0.019788950240227806
Inflight wifi service: 0.15192206261007707
Departure/Arrival time convenient: 0.010472722481763615
Ease of Online booking: 0.036346568538793894
Gate location: 0.010521217006041557
Food and drink: 0.008976829630579095
Online boarding: 0.2262816890741332
Seat comfort: 0.046526996582901235
Inflight entertainment: 0.05835812617868721
On-board service: 0.026558173958785812
Leg room service: 0.03246666131651929
Baggage handling: 0.02043205817729557
Checkin service: 0.019138710466751217
Inflight service: 0.02033539486456418
Cleanliness: 0.019838119528695783
Departure Delay in Minutes: 0.005569149592915699
Arrival Delay in Minutes: 0.007122243945039225
Gender_index: 0.0025944932616326463
Customer Type_index: 0.0421501555580666
Type of Travel_index: 0.11441645318863294
Class_index: 0.09857407134087294


In [8]:
# Dự đoán trên tập test (sử dụng mô hình Random Forest)
test_predictions = rf_model.transform(test_df)
test_predictions.select("features","label", "prediction").show(10)

NameError: name 'dt_model' is not defined

In [9]:
# Lưu mô hình vào HDFS thư mục /thanhtin
rf_model.save("hdfs://localhost:9000/thanhtin/rf_model")


In [10]:
spark.stop()