In [None]:
!pip install python-dotenv tqdm pymysql boto3

In [2]:
import os  
import sys  
import zipfile  
import tempfile  
import shutil  
from tqdm.notebook import tqdm  
from dotenv import load_dotenv  

# 프로젝트 루트 경로 설정  
#sys.path.append('./src')  


from update_objectfile import upload_file_with_metadata  
from data_engineering.obj_storage.boto3 import MinIOClient  
from data_engineering.db.connector import Database  

In [None]:
# 버킷 이름 설정
BUCKET_NAME = "tada"

# 지원하는 파일 확장자
SUPPORTED_EXTENSIONS = {".xlsx", ".json"}

def is_valid_file(filename):
    """지원하는 확장자인지 확인"""
    return any(filename.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS)


# 이미 업로드됐는지 정확하게 확인이 안됨.
#def get_uploaded_files():
#    """DB에서 이미 업로드된 파일 목록 가져오기"""
#    db = Database(
#        host=os.getenv('DB_HOST'),
#        port=int(os.getenv('DB_PORT')),
#        user=os.getenv('DB_USER'),
#        password=os.getenv('DB_PASSWORD'),
#        database=os.getenv('DB_NAME')
#    )
#
#    query = "SELECT category, object_path FROM object_storage"
#    results = db.getter(query)
#    return {(record['category'], record['object_path']) for record in results}


# 이미 업로드한 파일인지 DB에서 확인하는 코드 2차
def is_file_already_uploaded(db, category, object_path):
    """특정 파일이 이미 업로드되었는지 DB에서 직접 확인"""
    query = """
        SELECT COUNT(*) as count
        FROM object_storage
        WHERE category = %s AND object_path = %s
    """
    result = db.getter(query, (category, object_path))
    return result[0]['count'] > 0

In [None]:
# 버킷 이름 설정
BUCKET_NAME = "tada"

# 지원하는 파일 확장자
SUPPORTED_EXTENSIONS = {".xlsx", ".json"}

def is_valid_file(filename):
    """지원하는 확장자인지 확인"""
    return any(filename.lower().endswith(ext) for ext in SUPPORTED_EXTENSIONS)


# 이미 업로드됐는지 정확하게 확인이 안됨.
#def get_uploaded_files():
#    """DB에서 이미 업로드된 파일 목록 가져오기"""
#    db = Database(
#        host=os.getenv('DB_HOST'),
#        port=int(os.getenv('DB_PORT')),
#        user=os.getenv('DB_USER'),
#        password=os.getenv('DB_PASSWORD'),
#        database=os.getenv('DB_NAME')
#    )
#
#    query = "SELECT category, object_path FROM object_storage"
#    results = db.getter(query)
#    return {(record['category'], record['object_path']) for record in results}


# 이미 업로드한 파일인지 DB에서 확인하는 코드 2차
def is_file_already_uploaded(db, category, object_path):
    """특정 파일이 이미 업로드되었는지 DB에서 직접 확인"""
    query = """
        SELECT COUNT(*) as count
        FROM object_storage
        WHERE category = %s AND object_path = %s
    """
    result = db.getter(query, (category, object_path))
    return result[0]['count'] > 0

In [None]:
import os  
import sys  
import zipfile  
import tempfile  
import shutil  
from tqdm.notebook import tqdm  
from dotenv import load_dotenv  

# 환경 변수 로드  
load_dotenv('.env')  

# MinIO 클라이언트 초기화  
from data_engineering.obj_storage.boto3 import MinIOClient  
minio_client = MinIOClient()  

# 데이터셋 경로 설정  
DATASET_PATH = "."  

# 처리할 디렉토리 구조  
DIRECTORIES = [  
    "Training/01.원천데이터",  
    "Training/02.라벨링데이터",  
    "Validation/01.원천데이터",  
    "Validation/02.라벨링데이터"  
]  


# DB 테이블 생성 확인
from data_engineering.db.init_db import create_object_storage_table
create_object_storage_table()

# 이미 업로드된 파일 목록 가져오기
#uploaded_files = get_uploaded_files()
#print(f"이미 업로드된 파일 수: {len(uploaded_files)}")

# 임시 디렉토리 생성
temp_dir = tempfile.mkdtemp()
print(f"임시 디렉토리 생성됨: {temp_dir}")

try:
    # DB 연결 생성
    db = Database(
        host=os.getenv('DB_HOST'),
        port=int(os.getenv('DB_PORT')),
        user=os.getenv('DB_USER'),
        password=os.getenv('DB_PASSWORD'),
        database=os.getenv('DB_NAME')
    )

    # 각 디렉토리 처리
    for dir_path in DIRECTORIES:
        current_path = os.path.join(DATASET_PATH, dir_path)
        print(f"\n처리 중인 디렉토리: {dir_path}")

        # ZIP 파일 목록 가져오기
        zip_files = [f for f in os.listdir(current_path) if f.endswith('.zip')]
        print(f"발견된 ZIP 파일 수: {len(zip_files)}")

        # 각 ZIP 파일 처리
        for zip_filename in tqdm(zip_files, desc="ZIP 파일 처리 중"):
            zip_path = os.path.join(current_path, zip_filename)
            category = f"{dir_path}/{os.path.splitext(zip_filename)[0]}"

            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                for file_info in tqdm(zip_ref.filelist, desc="파일 추출 중"):
                    if is_valid_file(file_info.filename):
                        object_key = f"{category}/{os.path.basename(file_info.filename)}"

                        # DB에서 직접 중복 체크
                        if is_file_already_uploaded(db, category, object_key):
                            print(f"이미 업로드된 파일 건너뛰기: {object_key}")
                            continue

                        # 임시 파일로 추출
                        temp_path = os.path.join(temp_dir, os.path.basename(file_info.filename))
                        with zip_ref.open(file_info) as source, open(temp_path, 'wb') as target:
                            shutil.copyfileobj(source, target)

                        try:
                            # MinIO 업로드 및 메타데이터 저장
                            success = upload_file_with_metadata(
                                local_path=temp_path,
                                category=category,
                                bucket=BUCKET_NAME,
                                object_path=object_key,
                                db=db  # DB 연결 전달
                            )
                            if success:
                                print(f"업로드 성공: {object_key}")
                            else:
                                print(f"업로드 실패: {object_key}")
                        except Exception as e:
                            print(f"처리 실패: {object_key} - {str(e)}")
                        finally:
                            # 임시 파일 삭제
                            if os.path.exists(temp_path):
                                os.remove(temp_path)

except Exception as e:
    print(f"전체 처리 중 오류 발생: {str(e)}")
finally:
    # 리소스 정리
    if 'db' in locals():
        db.close()
    if os.path.exists(temp_dir):
        shutil.rmtree(temp_dir)
    print("처리 완료 및 리소스 정리")

In [None]:
# 최종 상태 확인  
db = Database(  
    host=os.getenv('DB_HOST'),  
    port=int(os.getenv('DB_PORT')),  
    user=os.getenv('DB_USER'),  
    password=os.getenv('DB_PASSWORD'),  
    database=os.getenv('DB_NAME')  
)  

query = "SELECT COUNT(*) as count FROM object_storage"  
result = db.getter(query)  
print(f"\n총 업로드된 파일 수: {result[0]['count']}")  

# 카테고리별 통계 확인  
query = """  
SELECT category, COUNT(*) as count   
FROM object_storage   
GROUP BY category   
ORDER BY count DESC  
"""  
records = db.getter(query)  
print("\n카테고리별 파일 수:")  
for record in records:  
    print(f"- {record['category']}: {record['count']}개")  