# 다수의 SQL 파일을 실행하는 스크립트(Postgresql)

In [None]:
import psycopg2
import os

def execute_sql_file(conn, file_path):
    """SQL 파일을 한 줄씩 읽어 쿼리 단위(세미콜론 기준)로 실행하는 함수"""
    print(f"{file_path} 파일을 실행합니다.")
    query_buffer = ""
    
    with open(file_path, 'r', encoding='utf-8') as file:
        for line in file:
            query_buffer += line
            # 세미콜론이 포함된 경우 쿼리 단위로 분리
            if ';' in line:
                queries = query_buffer.split(';')
                # 마지막 쿼리는 미완성일 수 있으므로 제외하고 실행
                for query in queries[:-1]:
                    if query.strip():
                        with conn.cursor() as cursor:
                            cursor.execute(query)
                        conn.commit()
                # 미완성 쿼리를 버퍼에 남김
                query_buffer = queries[-1]
    
    # 파일 종료 후 남은 쿼리가 있다면 실행
    if query_buffer.strip():
        with conn.cursor() as cursor:
            cursor.execute(query_buffer)
        conn.commit()
    
    print(f"{file_path} 파일 실행이 완료되었습니다.")

def execute_sql_files_in_directory(db_config, directory):
    """디렉토리 내 모든 SQL 파일을 순차적으로 실행하는 함수"""
    conn = None
    try:
        # PostgreSQL 연결
        conn = psycopg2.connect(**db_config)
        print("Connected to PostgreSQL")
        
        # 디렉토리 내 SQL 파일 실행 (파일 이름 순 정렬)
        for file in sorted(os.listdir(directory)):
            if file.endswith(".sql"):
                execute_sql_file(conn, os.path.join(directory, file))
        
    except Exception as e:
        print(f"Error: {e}")
    finally:
        if conn:
            conn.close()
            print("Connection closed")

if __name__ == "__main__":
    # PostgreSQL 접속 정보
    db_config = {
        "dbname": "DB 이름",
        "user": "사용자",
        "password": "비밀번호",
        "host": "DB IP",
        "port": "DB 포트"
    }
    
    # 실행할 SQL 파일이 있는 디렉토리 경로
    sql_directory = "디렉토리"
    
    execute_sql_files_in_directory(db_config, sql_directory)

# 다수의 CSV파일을 DB에 적재하는 스크립트

In [None]:
import os
import re
import psycopg2
import pandas as pd
from psycopg2.extras import execute_values

schema = "스키마"

def insert_csv_to_db(conn, csv_file_path, table_name, chunk_size=10000):
    """
    대용량 CSV 파일을 chunk 단위로 PostgreSQL에 삽입하는 함수.
    
    Parameters:
        conn: psycopg2 연결 객체
        csv_file_path (str): CSV 파일 경로
        table_name (str): 대상 테이블 이름
        chunk_size (int): 한 번에 insert할 행 수 (기본: 10,000)
    """
    print(f"파일 {csv_file_path} → 테이블 {table_name} 삽입 시작 (Chunk size: {chunk_size})")

    # CSV 읽기 설정
    read_options = {
        "encoding": "utf-8",
        "sep": ",",  # 컬럼 구분자
        "quotechar": '"',  # 따옴표 문자
        "escapechar": "\\",  # 이스케이프 문자
        "parse_dates": True,  # 날짜 자동 변환
        "keep_default_na": False,  # 빈 문자열을 NULL로 변환하지 않음
        "dtype": str,  # 모든 컬럼을 문자열로 로드 후 변환
        "chunksize": chunk_size
    }

    # Chunk 단위로 읽어 DB 삽입
    for chunk in pd.read_csv(csv_file_path, **read_options):
        if chunk.empty:
            print(f"파일 {csv_file_path} 에 데이터가 없어 스킵합니다.")
            return

        # 빈 문자열("")을 None (NULL)로 변환
        chunk = chunk.applymap(lambda x: None if x == "" else x)

        # 컬럼명 매칭
        columns = list(chunk.columns)
        chunk = chunk.where(pd.notna(chunk), None)  # NaN -> None 변환

        # 문자열 길이 초과 방지 (50자로 제한)
        for col in chunk.columns:
            chunk[col] = chunk[col].apply(lambda x: x[:50] if isinstance(x, str) else x)

        values = [tuple(row) for row in chunk.to_numpy()]

        # INSERT 쿼리 (중복된 경우 건너뛰기)
        cols_str = ', '.join(columns)
        query = f"""
        INSERT INTO {table_name} ({cols_str}) 
        VALUES %s 
        """  # 중복 키는 무시하고 삽입하지 않음

        # PostgreSQL에 데이터 삽입
        with conn.cursor() as cur:
            execute_values(cur, query, values)
        conn.commit()
        
        print(f"{csv_file_path} → {len(values)}건 삽입 완료.")

def process_csv_files_in_directory(db_config, directory, chunk_size=10000):
    """
    지정된 디렉토리 내의 모든 CSV 파일을 PostgreSQL에 삽입하는 함수.
    
    파일명 형식: 테이블명_날짜.csv
    """
    conn = None
    try:
        # PostgreSQL 연결
        conn = psycopg2.connect(**db_config)
        print("PostgreSQL 연결 성공")

        # 디렉토리 내 모든 CSV 파일 처리
        for filename in sorted(os.listdir(directory)):
            if filename.lower().endswith(('.csv', '.tsv', '.txt')):
                # 정규식으로 테이블명 추출 (테이블명_날짜.csv 형식)
                match = re.match(r"^([A-Za-z_\d]+)_(\d{8,14})\.(csv|tsv|txt)$", filename)

                if match:
                    table_name = re.sub(r'\d', '', match.group(1))
                    table_name = schema+table_name
                    file_path = os.path.join(directory, filename)
                    insert_csv_to_db(conn, file_path, table_name, chunk_size=chunk_size)
                else:
                    print(f"파일 {filename} 은 형식이 맞지 않아 스킵됩니다.")
    except Exception as e:
        print(f"오류 발생: {e}")
    finally:
        if conn:
            conn.close()
            print("DB 연결 종료")

if __name__ == "__main__":
    # PostgreSQL 접속 정보
    db_config = {
        "dbname": "DB 이름",
        "user": "사용자",
        "password": "비밀번호",
        "host": "DB IP",
        "port": "DB 포트"
    }

    # CSV 파일이 있는 디렉토리
    csv_directory = "디렉토리"

    # CSV 파일을 PostgreSQL에 삽입
    process_csv_files_in_directory(db_config, csv_directory, chunk_size=5000)
