In [1]:
from pyspark.sql import SparkSession

# Khởi tạo SparkSession
spark = SparkSession.builder.appName("Pre Process").getOrCreate()

# Kiểm tra số lượng cores
cores = spark.sparkContext.getConf().get("spark.executor.instances", "3")
print(f"You are working with {cores} core(s).")

# Kiểm tra SparkSession
spark

You are working with 3 core(s).


In [2]:
# Load dữ liệu
path = "C:/Users/vmcch/BIGDATA/sentiment_analysis_project/"
# spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")
df = spark.read.csv(path + "data/eda", header=True, inferSchema=True)


In [3]:
# Hiển thị dữ liệu 10 dòng đầu
df.limit(10).toPandas()

Unnamed: 0,text,label
0,i looked at his sleeping face i suddenly felt ...,1
1,i feel like im starting to settle in here and ...,1
2,i chose to trust that these feelings and belie...,1
3,i hope that if they are ever feeling anything ...,1
4,i wake up really refreshed and feel like i hav...,0
5,i dont hear from you i feel like this insert g...,0
6,i suppose that everyone feels that way when th...,1
7,i feel like such a whiney ass,0
8,my parents and brother came up over the weeken...,1
9,i am not a strict journalist and only capturin...,1


In [4]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re
from nltk.corpus import stopwords
import nltk

# Tải stopwords
nltk.download('stopwords')
stop_words = set(stopwords.words('english'))

# Định nghĩa hàm tiền xử lý
def preprocess_text(text):
    # Loại bỏ các ký tự không phải chữ cái và số
    text = re.sub(r"[^a-zA-Z0-9\s]", "", text)
    # Chuyển thành chữ thường
    text = text.lower()
    # Loại bỏ stopwords
    text = " ".join([word for word in text.split() if word not in stop_words])
    return text

# Đăng ký hàm với PySpark
preprocess_text_udf = udf(preprocess_text, StringType())

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\vmcch\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [5]:
# Áp dụng hàm tiền xử lý cho cột text
df = df.withColumn("clean_text", preprocess_text_udf(df.text))

# Hiển thị dữ liệu đã tiền xử lý
df.select("clean_text").show(10, truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------+
|clean_text                                                                                                                         |
+-----------------------------------------------------------------------------------------------------------------------------------+
|looked sleeping face suddenly felt stress leave body small stress deadline stress life feeling replaced sweet peace                |
|feel like im starting settle im productive                                                                                         |
|chose trust feelings beliefs resolved deep level cheer larger part ready eager change                                              |
|hope ever feeling anything less special remember sharing one story makes really big eyes                                           |
|wake really refreshed feel like energy time wake feeling grog

In [16]:
# Chia dữ liệu thành 3 phần: train, test và validation với tỉ lệ 8:1:1
train_df, test_df, val_df = df.randomSplit([0.8, 0.1, 0.1], seed=42)

# Hiển thị số lượng mẫu trong mỗi phần
print(f"Train size: {train_df.count()}, Validation size: {val_df.count()}, Test size: {test_df.count()}")

Train size: 209618, Validation size: 26024, Test size: 26126


In [17]:
from pyspark.ml.feature import Tokenizer

# Chuyển văn bản thành danh sách từ
tokenizer = Tokenizer(inputCol="clean_text", outputCol="words")

# Áp dụng Tokenizer vào DataFrame
train_df = tokenizer.transform(train_df)
val_df = tokenizer.transform(val_df)
test_df = tokenizer.transform(test_df)

train_df.show(5, truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                                                                                         |label|clean_text                                                                                                                        |words                                                                                                                         

In [18]:
from pyspark.sql.functions import col
from pyspark.ml.feature import HashingTF

# Áp dụng HashingTF để tạo embedding
max_features = 100  # Số chiều của vector embedding
hashingTF = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=max_features)

# Áp dụng HashingTF vào DataFrame
train_df = hashingTF.transform(train_df)
val_df = hashingTF.transform(val_df)
test_df = hashingTF.transform(test_df)

# Chuyển đổi tên cột
train_df = train_df.withColumnRenamed("raw_features", "embedding")
val_df = val_df.withColumnRenamed("raw_features", "embedding")
test_df = test_df.withColumnRenamed("raw_features", "embedding")

# Hiển thị kết quả
train_df.show(5, truncate=False)


+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                                                                                         |label|clean_text                                                                                                               

In [19]:
# Chọn chỉ cột "text", "embedding" và "label" trước khi ghi
train_df.select("text", "label", "embedding").write.mode("overwrite").parquet(path + "data/processed/train/hashingTF.parquet")
test_df.select("text", "label", "embedding").write.mode("overwrite").parquet(path + "data/processed/test/hashingTF.parquet")
val_df.select("text", "label", "embedding").write.mode("overwrite").parquet(path + "data/processed/validation/hashingTF.parquet")

In [20]:
train_df = train_df.drop("embedding").drop("scaled_embedding").drop("raw_features")
val_df = val_df.drop("embedding").drop("scaled_embedding").drop("raw_features")
test_df = test_df.drop("embedding").drop("scaled_embedding").drop("raw_features")

train_df.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- clean_text: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [21]:
from pyspark.sql.functions import col
from pyspark.ml.feature import HashingTF, IDF

# Số chiều của vector embedding
max_features = 100  

# Áp dụng HashingTF
hashingTF = HashingTF(inputCol="words", outputCol="raw_features", numFeatures=max_features)
train_df = hashingTF.transform(train_df)
val_df = hashingTF.transform(val_df)
test_df = hashingTF.transform(test_df)

# Áp dụng IDF để tính trọng số
idf = IDF(inputCol="raw_features", outputCol="idf_features")
idf_model = idf.fit(train_df)

train_df = idf_model.transform(train_df)
val_df = idf_model.transform(val_df)
test_df = idf_model.transform(test_df)

# Đổi tên cột kết quả
train_df = train_df.withColumnRenamed("idf_features", "embedding")
val_df = val_df.withColumnRenamed("idf_features", "embedding")
test_df = test_df.withColumnRenamed("idf_features", "embedding")

# Hiển thị kết quả
train_df.show(5, truncate=False)


+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [22]:
# Chọn chỉ cột "text", "embedding" và "label" trước khi ghi
train_df.select("text", "label", "embedding").write.mode("overwrite").parquet(path + "data/processed/train/hashingTFIDF.parquet")
test_df.select("text", "label", "embedding").write.mode("overwrite").parquet(path + "data/processed/test/hashingTFIDF.parquet")
val_df.select("text", "label", "embedding").write.mode("overwrite").parquet(path + "data/processed/validation/hashingTFIDF.parquet")

In [23]:
train_df = train_df.drop("embedding").drop("words").drop("raw_features")
val_df = val_df.drop("embedding").drop("words").drop("raw_features")
test_df = test_df.drop("embedding").drop("words").drop("raw_features")

train_df.printSchema()

root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- clean_text: string (nullable = true)



In [24]:
train_df.show(5, truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------+
|text                                                                                                                                                                                                                         |label|clean_text                                                                                                                        |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-------------------------------------------------

In [25]:
from pyspark.sql.functions import col, udf
from gensim.models import Word2Vec
from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np

# Chuẩn bị dữ liệu cho Word2Vec
corpus = train_df.select("clean_text").rdd.flatMap(lambda x: x).filter(lambda x: x is not None).collect()

# Chuẩn bị dữ liệu cho Word2Vec: chia thành danh sách các từ
sentences = [text.split() for text in corpus]

In [26]:
# Huấn luyện Word2Vec
VECTOR_SIZE = 100
print("Đang huấn luyện Word2Vec model trên train_df...")
word2vec_model = Word2Vec(sentences, vector_size=VECTOR_SIZE, window=5, min_count=1, workers=4)
print("Huấn luyện model hoàn tất!")

# Hàm tạo embedding dưới dạng VectorUDT với scaling về [0, 1]
def get_embedding_word2vec(text):
    words = text.split()
    word_vectors = [word2vec_model.wv[word] for word in words if word in word2vec_model.wv]
    if word_vectors:
        # Tính vector trung bình
        vector = np.mean(word_vectors, axis=0)
        # Chuẩn hóa về [0, 1] bằng min-max scaling
        min_val = np.min(vector)
        max_val = np.max(vector)
        if max_val != min_val:  # Tránh chia cho 0
            scaled_vector = (vector - min_val) / (max_val - min_val)
        else:
            scaled_vector = vector - min_val  # Nếu min = max, đưa về 0
        return Vectors.dense(scaled_vector)
    return Vectors.dense([0.0] * VECTOR_SIZE)

# Đăng ký UDF với VectorUDT
embedding_udf = udf(get_embedding_word2vec, VectorUDT())

# Ghi đè cột "embedding" cho các DataFrame
train_df = train_df.withColumn("embedding", embedding_udf(col("clean_text")))
val_df = val_df.withColumn("embedding", embedding_udf(col("clean_text")))
test_df = test_df.withColumn("embedding", embedding_udf(col("clean_text")))

# Hiển thị schema và kết quả
train_df.printSchema()
train_df.show(5, truncate=False)

Đang huấn luyện Word2Vec model trên train_df...
Huấn luyện model hoàn tất!
root
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- clean_text: string (nullable = true)
 |-- embedding: vector (nullable = true)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+----------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [27]:
# Chọn chỉ cột "text", "embedding" và "label" trước khi ghi
train_df.select("text", "label", "embedding").write.mode("overwrite").parquet(path + "data/processed/train/w2v.parquet")
test_df.select("text", "label", "embedding").write.mode("overwrite").parquet(path + "data/processed/test/w2v.parquet")
val_df.select("text", "label", "embedding").write.mode("overwrite").parquet(path + "data/processed/validation/w2v.parquet")

In [28]:
spark.stop()