In [6]:
from pymongo import MongoClient

# URI trùng với docker-compose (username: root, password: example, host: mongo)
MONGO_URI = "mongodb://root:example@localhost:27017/?authSource=admin"

# Tạo client
client = MongoClient(MONGO_URI)

# Chọn database
db = client["web-attack-db"]

print("Kết nối OK, đã insert thử document.")

Kết nối OK, đã insert thử document.


In [2]:
# Thư mục chứa các file .json (đổi đường dẫn cho phù hợp)
# Ví dụ: r"E:\web-attack-recognization\json" trên Windows
JSON_FOLDER = r"E:\json"

# Nếu True: xóa collection cũ trước khi nạp lại
DROP_EXISTING = True

# Tối ưu khi file lớn
NDJSON_CHUNKSIZE = 50_000
ARRAY_BATCH_SIZE = 50_000

In [3]:
import os
from pymongo.errors import BulkWriteError
def safe_collection_name(path: str) -> str:
    name = os.path.splitext(os.path.basename(path))[0]
    return "".join(ch if ch.isalnum() or ch in "-._" else "_" for ch in name) or "imported_collection"
def clean_nan_records(records):
    """Loại bỏ các trường có giá trị NaN từ danh sách bản ghi"""
    cleaned_records = []
    for record in records:
        # Loại bỏ các trường có giá trị NaN
        cleaned_record = {k: v for k, v in record.items() if pd.notna(v)}
        cleaned_records.append(cleaned_record)
    return cleaned_records

def list_json_files(folder: str):
    if not os.path.isdir(folder):
        raise FileNotFoundError(f"Thư mục không tồn tại: {folder}")
    files = []
    for f in os.listdir(folder):
        p = os.path.join(folder, f)
        if os.path.isfile(p) and f.lower().endswith(".json") and ".ipynb_checkpoints" not in p:
            files.append(p)
    return files

def import_ndjson(coll, json_path: str, chunksize: int = 50_000) -> int:
    total = 0
    try:
        for chunk in pd.read_json(json_path, lines=True, chunksize=chunksize):
            recs = chunk.to_dict(orient="records")
            if recs:
                # Làm sạch bản ghi: loại bỏ các trường có giá trị NaN
                cleaned_recs = clean_nan_records(recs)
                try:
                    coll.insert_many(cleaned_recs, ordered=False)
                except BulkWriteError:
                    pass
                total += len(cleaned_recs)
        return total
    except ValueError:
        # Không phải NDJSON hợp lệ
        raise

def import_json_array(coll, json_path: str, batch_size: int = 50_000) -> int:
    df = pd.read_json(json_path)  # kỳ vọng 1 mảng JSON
    total = 0
    if len(df) == 0:
        return 0

    # Loại bỏ các hàng có tất cả giá trị là NaN
    df = df.dropna(how='all')

    for start in range(0, len(df), batch_size):
        batch = df.iloc[start:start + batch_size].to_dict(orient="records")
        # Làm sạch bản ghi: loại bỏ các trường có giá trị NaN
        cleaned_batch = clean_nan_records(batch)
        try:
            coll.insert_many(cleaned_batch, ordered=False)
        except BulkWriteError:
            pass
        total += len(cleaned_batch)
    return total


In [4]:
from datetime import datetime
files = list_json_files(JSON_FOLDER)
if not files:
    raise SystemExit(f"Không tìm thấy file .json nào trong: {JSON_FOLDER}")

summary = []
print(f"Tìm thấy {len(files)} file JSON trong: {JSON_FOLDER}")



Tìm thấy 16 file JSON trong: E:\json


In [5]:
import pandas as pd
for fp in files:
    coll_name = safe_collection_name(fp)
    coll = db[coll_name]

    if DROP_EXISTING and coll_name in db.list_collection_names():
        print(f"- Drop collection cũ: {coll_name}")
        coll.drop()

    print(f"- Import: {os.path.basename(fp)} -> collection: {coll_name}")

    inserted = 0
    try:
        # Thử NDJSON trước
        inserted = import_ndjson(coll, fp, chunksize=NDJSON_CHUNKSIZE)
        if inserted == 0:
            # Có thể file không phải NDJSON → thử mảng JSON
            inserted = import_json_array(coll, fp, batch_size=ARRAY_BATCH_SIZE)
    except ValueError:
        # Không phải NDJSON hợp lệ → thử mảng JSON
        inserted = import_json_array(coll, fp, batch_size=ARRAY_BATCH_SIZE)

    count_after = coll.estimated_document_count()
    print(f"  Đã nhập {inserted} bản ghi. Tổng hiện có: {count_after}")
    summary.append({
        "file": os.path.basename(fp),
        "collection": coll_name,
        "inserted": inserted,
        "total_in_collection": count_after,
        "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    })

print("\nTÓM TẮT:")
import pandas as pd
pd.DataFrame(summary)

- Import: fox_aminer_labeled.json -> collection: fox_aminer_labeled
  Đã nhập 10581 bản ghi. Tổng hiện có: 10581
- Import: fox_wazuh_labeled.json -> collection: fox_wazuh_labeled
  Đã nhập 462523 bản ghi. Tổng hiện có: 462523
- Import: harrison_aminer_labeled.json -> collection: harrison_aminer_labeled
  Đã nhập 10194 bản ghi. Tổng hiện có: 10194
- Import: harrison_wazuh_labeled.json -> collection: harrison_wazuh_labeled
  Đã nhập 583754 bản ghi. Tổng hiện có: 583754
- Import: russellmitchell_aminer_labeled.json -> collection: russellmitchell_aminer_labeled
  Đã nhập 4056 bản ghi. Tổng hiện có: 4056
- Import: russellmitchell_wazuh_labeled.json -> collection: russellmitchell_wazuh_labeled
  Đã nhập 41488 bản ghi. Tổng hiện có: 41488
- Import: santos_aminer_labeled.json -> collection: santos_aminer_labeled
  Đã nhập 4266 bản ghi. Tổng hiện có: 4266
- Import: santos_wazuh_labeled.json -> collection: santos_wazuh_labeled
  Đã nhập 126513 bản ghi. Tổng hiện có: 126513
- Import: shaw_aminer_

Unnamed: 0,file,collection,inserted,total_in_collection,time
0,fox_aminer_labeled.json,fox_aminer_labeled,10581,10581,2025-09-17 18:38:32
1,fox_wazuh_labeled.json,fox_wazuh_labeled,462523,462523,2025-09-17 18:39:16
2,harrison_aminer_labeled.json,harrison_aminer_labeled,10194,10194,2025-09-17 18:39:16
3,harrison_wazuh_labeled.json,harrison_wazuh_labeled,583754,583754,2025-09-17 18:40:18
4,russellmitchell_aminer_labeled.json,russellmitchell_aminer_labeled,4056,4056,2025-09-17 18:40:18
5,russellmitchell_wazuh_labeled.json,russellmitchell_wazuh_labeled,41488,41488,2025-09-17 18:40:22
6,santos_aminer_labeled.json,santos_aminer_labeled,4266,4266,2025-09-17 18:40:23
7,santos_wazuh_labeled.json,santos_wazuh_labeled,126513,126513,2025-09-17 18:40:38
8,shaw_aminer_labeled.json,shaw_aminer_labeled,2243,2243,2025-09-17 18:40:38
9,shaw_wazuh_labeled.json,shaw_wazuh_labeled,68539,68539,2025-09-17 18:40:45


In [None]:
from airflow import DAG