In [9]:
# Parquet 가공기
# 지정 DB의 특정 Table의 모든 Row를 Parquet 형식으로 로컬에 저장한다.
import pandas as pd
import sqlalchemy
import json
import os
import uuid

COUNTER_FILE = 'counter.json'
IS_ANOMALY = False
act_type = "anomaly" if IS_ANOMALY else "normal"

# Init Phase ====================================================

# Counter 초기화 및 불러오기
if os.path.exists(COUNTER_FILE):
    with open(COUNTER_FILE, 'r') as f:
        counter = json.load(f)
else:
    counter = {'normal': 0, 'anomaly': 0}

# Counter 증가 함수
def increment_counter():
    global act_type
    counter[act_type] += 1
    with open(COUNTER_FILE, 'w') as f:
        json.dump(counter, f)
    return counter[act_type]

# ================================================================

target_tables=["logs", "tasks", "jobs"]
dir_name=f"{act_type}_{counter[act_type]}"
os.mkdir(dir_name)

engine = sqlalchemy.create_engine("postgresql://REDACTED:REDACTED@REDACTED:5432/tasklist")

for table in target_tables: 
    query = f"SELECT * FROM {table}"
    df = pd.read_sql(query, engine)
    # P
    for col in df.columns:
        if df[col].apply(lambda x: isinstance(x, uuid.UUID)).any():
            df[col] = df[col].apply(lambda x: x.bytes)
    df.to_parquet(f"{dir_name}/{table}.parquet", engine="pyarrow", index=False)

# 중복 및 덮어쓰기 방지
increment_counter()

11

In [10]:
# MinIO Object 정보 긁기
# 지정한 MinIO(mal_case, norm_case)의 지정 Bucket 내 데이터에 대한 메타데이터를 가져오는 코드이다. 
from minio import Minio, S3Error
import re
import json
import yaml

mal_case = {
    "endpoint": "REDACTED",
    "bucket": "REDACTED",
    "access_key": "REDACTED",
    "secret_key": "REDACTED",
}

norm_case = {
    "endpoint": "REDACTED",
    "bucket": "REDACTED",
    "access_key": "REDACTED",
    "secret_key": "REDACTED",
}

# MinIO 클라이언트 초기화
for di in [mal_case, norm_case]:
    di["client"] = Minio(
        endpoint=di["endpoint"],
        access_key=di["access_key"],
        secret_key=di["secret_key"],
        secure=False
    )

for case in [mal_case, norm_case]:
    try:
        client = case["client"]
        bucket_name = case["bucket"]
        objects = client.list_objects(bucket_name, recursive=True)
        result = {
            "endpoint": case["endpoint"],
            "bucket": case["bucket"],
            "metadata": []
        }
        for obj in objects:
            # 3. 각 객체의 메타데이터 조회
            stat = client.stat_object(bucket_name, obj.object_name)
            metadata = {
                "object_name": obj.object_name,
                "size": stat.size,
                "last_modified": stat.last_modified.isoformat(),
                "etag": stat.etag,
                "content_type": stat.content_type,
                #"user_metadata": stat.metadata  # 사용자 정의 메타데이터
            }
            result["metadata"].append(metadata)
        
            # 4. 메타데이터 출력
            #print(f"Object: {obj.object_name}")
            #print(f"  - Size: {stat.size} bytes")
            #print(f"  - Last Modified: {stat.last_modified}")
            #print(f"  - ETag: {stat.etag}")
            #print(f"  - Content-Type: {stat.content_type}")
            #print(f"  - User Metadata: {stat.metadata}")
        #print(result)
        with open(f"{dir_name}/{bucket_name}.json", "w", encoding="utf-8") as f_json:
            json.dump(result, f_json, indent=2, ensure_ascii=False)
            
    except S3Error as e:
        print("오류 발생:", e)
print("Done")

Done
