In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import os
from pyspark.sql.functions import col, regexp_replace, trim
import preprocess

# Tạo SparkSession
spark = SparkSession.builder.appName("SimpleSparkApp").getOrCreate()

In [2]:
# Lấy dữ liệu từ tất cả các file

from pyspark.sql.functions import size
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
# Đường dẫn đến thư mục chứa các tệp JSON
json_folder_path = "../crawl/data/"

# Lấy danh sách các tệp trong thư mục
json_files = [f for f in os.listdir(json_folder_path) if f.endswith(".json")]

# Define schema for the data
schema = StructType(
    [
        StructField("title", StringType(), True),
        StructField("content", StringType(), True),
        StructField("time", StringType(), True),
        StructField("category", ArrayType(StringType()), True),
        StructField("views", StringType(), True),
        StructField("num_answer", StringType(), True),
        StructField("votes", StringType(), True),
        StructField("solved", StringType(), True),
    ]
)

# Tạo DataFrame rỗng để chứa dữ liệu từ tất cả các tệp JSON
df_all = spark.createDataFrame([], schema)

# Đọc từng tệp JSON và ghép vào DataFrame chung
for json_file in json_files:
    json_file_path = os.path.join(json_folder_path, json_file)
    df = spark.read.json(json_file_path, schema=schema)

    # Thực hiện xử lý dữ liệu như bạn đã làm trước đó
    df_processed = (
        df.withColumn("content", regexp_replace("content", "<.*?>", ""))
        .withColumn("content", regexp_replace("content", "\\s+", " "))
        .withColumn("content", trim(col("content")))
        .withColumn("views", preprocess.convert_to_numeric("views"))
        .withColumn("num_answer", preprocess.convert_to_numeric("num_answer"))
        .withColumn("votes", preprocess.convert_to_numeric("votes"))
    )

    # Ghép DataFrame mới vào DataFrame chung
    df_all = df_all.union(df_processed)

# Loại các bản ghi chứa null hoặc có độ dài của mảng "category" là 0
df_all = df_all.na.drop().filter(size("category") > 0)

In [88]:
# Lấy dữ liệu từ 1 file

from pyspark.sql.functions import size
# Đường dẫn đến tệp JSON
json_file_path = "../crawl/data/tezos.json"

# Đọc tệp JSON vào DataFrame
df = spark.read.json(json_file_path)

# Loại bỏ các tag HTML và xóa khoảng trắng
df_all = (
    df.withColumn("content", regexp_replace("content", "<.*?>", ""))
    .withColumn("content", regexp_replace("content", "\\s+", " "))
    .withColumn("content", trim(col("content")))
    .withColumn("views", preprocess.convert_to_numeric("views"))
    .withColumn("num_answer", preprocess.convert_to_numeric("num_answer"))
    .withColumn("votes", preprocess.convert_to_numeric("votes"))
)

# Loại các bản ghi chứa null hoặc có độ dài của mảng "category" là 0
df_all = df_all.na.drop().filter(size("category") > 0)

In [3]:
# Đếm số bản ghi
df_all.count()

1753827

In [90]:
# Đếm số category

from pyspark.sql.functions import explode
# Sử dụng hàm explode để chuyển mảng thành các phần tử đơn lẻ
categories = df_all.select(explode("category").alias("category")).select("category").distinct()

categories.show(truncate=False)
categories.count()

+---------------+
|category       |
+---------------+
|operation      |
|bigmap         |
|conseil        |
|tezos-client   |
|liquidity      |
|tokens         |
|documentation  |
|codebase       |
|archetype      |
|completium     |
|utxo           |
|metadata       |
|password       |
|burn           |
|endorser       |
|governance     |
|account        |
|staking-balance|
|smartpy        |
|tezbox         |
+---------------+
only showing top 20 rows



220

In [None]:
# Lưu số category

# Đường dẫn đến tệp JSON đầu ra
output_json_path = "./output"

# Ghi DataFrame ra tệp JSON với mode là "overwrite"
categories.coalesce(1).write.mode("overwrite").json(output_json_path)

In [4]:
# Lưu toàn bộ data đã qua tiền xử lý

# Đường dẫn đến tệp JSON đầu ra
output_json_path = "./output"

# Ghi DataFrame ra tệp JSON với mode là "overwrite"
df_all.coalesce(1).write.mode("overwrite").json(output_json_path)

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Tokenizer, HashingTF, StringIndexer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Chuyển đổi cột category từ ArrayType(StringType) thành StringType
first_category_udf = udf(lambda x: x[0] if x else None, StringType())
df_train = df_all.limit(20000).withColumn("category", first_category_udf("category"))

# Chuyển đổi category thành dạng số
indexer = StringIndexer(inputCol="category", outputCol="label", handleInvalid="keep")
tokenizer = Tokenizer(inputCol="content", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Tạo Pipeline
pipeline = Pipeline(stages=[indexer, tokenizer, hashingTF, rf])

# Chia dữ liệu thành tập huấn luyện và tập kiểm tra
(train_data, test_data) = df_train.randomSplit([0.8, 0.2], seed=42)

# Huấn luyện mô hình
model = pipeline.fit(train_data)

In [95]:
# Dự đoán trên tập kiểm tra
predictions = model.transform(test_data)

# Đánh giá mô hình
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

Accuracy: 0.2010443864229765


In [None]:
# Lưu mô hình
model.save("./model")

In [None]:
# Tải lại mô hình
loaded_model = PipelineModel.load("your_model_path")

# Dùng mô hình đã tải lại để dự đoán
predictions = loaded_model.transform(new_data)


In [None]:
# Đóng SparkSession
spark.stop()