In [None]:
!pip install pyspark tensorflow




In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, NaiveBayes, DecisionTreeClassifier, GBTClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import tensorflow as tf
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
from google.colab import drive
import joblib


In [None]:
spark = SparkSession.builder \
    .appName("Text Classification with DL and ML Models") \
    .getOrCreate()


In [None]:
drive.mount('/content/drive')

train_data = spark.read.csv('/content/drive/MyDrive/data/train/train_final.csv', header=True, inferSchema=True)
test_data = spark.read.csv('/content/drive/MyDrive/data/test/test_final.csv', header=True, inferSchema=True)

train_data = train_data.dropna(subset=["text", "label"]).withColumn("label", train_data["label"].cast("int"))
test_data = test_data.dropna(subset=["text", "label"]).withColumn("label", test_data["label"].cast("int"))

train_data.show(5)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
+--------------------+-----+
|                text|label|
+--------------------+-----+
|em được làm fan c...|    0|
|đúng là bọn mắt h...|    2|
|đậu văn cường giờ...|    0|
|côn đồ cục súc vô...|    2|
|từ lý thuyết đến ...|    0|
+--------------------+-----+
only showing top 5 rows



In [None]:

train_data = train_data.dropna(subset=["label"])
test_data = test_data.dropna(subset=["label"])


In [None]:
from pyspark.sql.functions import length, trim, col

train_data = train_data.filter((length(trim(col("text"))) > 0))
test_data = test_data.filter((length(trim(col("text"))) > 0))

In [None]:
from pyspark.sql.functions import count, when, isnull
train_data.select([count(when(isnull(c), c)).alias(c) for c in train_data.columns]).show()
test_data.select([count(when(isnull(c), c)).alias(c) for c in train_data.columns]).show()


+----+-----+
|text|label|
+----+-----+
|   0|    0|
+----+-----+

+----+-----+
|text|label|
+----+-----+
|   0|    0|
+----+-----+



In [None]:

print("Train data schema:")
train_data.printSchema()

print("\nTest data schema:")
test_data.printSchema()


Train data schema:
root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)


Test data schema:
root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



In [None]:

print("Train data types:", train_data.dtypes)
print("Test data types:", test_data.dtypes)


Train data types: [('text', 'string'), ('label', 'int')]
Test data types: [('text', 'string'), ('label', 'int')]


In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
hashing_tf = HashingTF(inputCol="filtered_words", outputCol="raw_features", numFeatures=5000)
idf = IDF(inputCol="raw_features", outputCol="features")


In [None]:
# Kiểm tra các giá trị nhãn trong dữ liệu huấn luyện
train_data.select("label").distinct().show()


+-----+
|label|
+-----+
|    1|
|    2|
|    0|
+-----+



In [None]:
pipelines = {
    'Logistic Regression': Pipeline(stages=[tokenizer, remover, hashing_tf, idf, LogisticRegression(featuresCol="features", labelCol="label")]),
    'Random Forest': Pipeline(stages=[tokenizer, remover, hashing_tf, idf, RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=100)]),
    'Naive Bayes': Pipeline(stages=[tokenizer, remover, hashing_tf, idf, NaiveBayes(featuresCol="features", labelCol="label")]),
    'Decision Tree': Pipeline(stages=[tokenizer, remover, hashing_tf, idf, DecisionTreeClassifier(featuresCol="features", labelCol="label")]),
}

pipeline_path = '/content/drive/MyDrive/models/'

In [None]:
for model_name, pipeline_model in pipelines.items():
    print(f"Training and saving pipeline for {model_name}...")
    model = pipeline_model.fit(train_data)  # Huấn luyện pipeline
    model.save(pipeline_path + f'{model_name}_pipeline')

Training and saving pipeline for Logistic Regression...
Training and saving pipeline for Random Forest...
Training and saving pipeline for Naive Bayes...
Training and saving pipeline for Decision Tree...


In [None]:
from pyspark.ml import PipelineModel

logistic_regression_pipeline = PipelineModel.load(pipeline_path + 'Logistic Regression_pipeline')
random_forest_pipeline = PipelineModel.load(pipeline_path + 'Random Forest_pipeline')
naive_bayes_pipeline = PipelineModel.load(pipeline_path + 'Naive Bayes_pipeline')
decision_tree_pipeline = PipelineModel.load(pipeline_path + 'Decision Tree_pipeline')

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")


In [None]:
logistic_regression_predictions = logistic_regression_pipeline.transform(test_data)
random_forest_predictions = random_forest_pipeline.transform(test_data)
naive_bayes_predictions = naive_bayes_pipeline.transform(test_data)
decision_tree_predictions = decision_tree_pipeline.transform(test_data)

In [None]:
logistic_regression_accuracy = evaluator.evaluate(logistic_regression_predictions)
random_forest_accuracy = evaluator.evaluate(random_forest_predictions)
naive_bayes_accuracy = evaluator.evaluate(naive_bayes_predictions)
decision_tree_accuracy = evaluator.evaluate(decision_tree_predictions)

In [None]:
print(f"Logistic Regression Accuracy: {logistic_regression_accuracy:.4f}")
print(f"Random Forest Accuracy: {random_forest_accuracy:.4f}")
print(f"Naive Bayes Accuracy: {naive_bayes_accuracy:.4f}")
print(f"Decision Tree Accuracy: {decision_tree_accuracy:.4f}")

Logistic Regression Accuracy: 0.7795
Random Forest Accuracy: 0.8306
Naive Bayes Accuracy: 0.7163
Decision Tree Accuracy: 0.8380
