In [2]:
import psycopg
from psycopg import sql
import config
import re
import pandas as pd
from typing import List, Dict, Any
from datetime import datetime

DB 연결 - doc_flow.db

In [3]:
def get_db_connection():
    try:
        conn = psycopg.connect(
            dbname=config.DB_NAME,
            user=config.DB_USER,
            password=config.DB_PASSWORD,
            host=config.DB_HOST,
            port=config.DB_PORT
        )
        print("데이터베이스에 성공적으로 연결되었습니다.")
        return conn
    except Exception as e:
        print(f"데이터베이스 연결 실패: {e}")
        return None

In [4]:
def get_db_connection2():
    try:
        conn = psycopg.connect(
            dbname='ontology_db',
            user='postgres',
            password='1234',
            host='192.168.1.154',
            port='8004'
        )
        print("데이터베이스에 성공적으로 연결되었습니다.")
        return conn
    except Exception as e:
        print(f"데이터베이스 연결 실패: {e}")
        return None

저장된 테이블 정보 확인

In [5]:
def list_tables(schema):
    conn = get_db_connection2()
    if conn is None:
        return []

    try:
        sql = """
                SELECT table_name
                FROM information_schema.tables
                WHERE table_schema = %s
                ORDER BY table_name;
            """
        with conn.cursor() as cur:
            cur.execute(sql, (schema,))
            tables = cur.fetchall()
        conn.close()
        return [t[0] for t in tables]  # [(‘table1’,), (‘table2’,)] → [‘table1’, ‘table2’]
    except Exception as e:
        print(f"테이블 조회 실패: {e}")
        return []


In [6]:
list_tables('mart')

데이터베이스에 성공적으로 연결되었습니다.


['bkpf',
 'bom',
 'bseg',
 'custom_sales_plan',
 'customers',
 'departments',
 'eina',
 'employees',
 'general_ledger',
 'lfa1',
 'likp',
 'lips',
 'mast',
 'mkpf',
 'mseg',
 'orders',
 'products',
 'stpo',
 'vbak',
 'vbap',
 'vbfa',
 'vbrk',
 'vbrp']

In [7]:
def list_columns(table_name, schema):
    """
    information_schema로 컬럼 목록을 가져옴 (순서 포함)
    """
    conn = get_db_connection2()
    if conn is None:
        return []

    sql = """
    SELECT
        column_name,
        data_type
    FROM information_schema.columns
    WHERE table_schema = %s AND table_name = %s
    ORDER BY ordinal_position;
    """
    with conn, conn.cursor() as cur:
        cur.execute(sql, (schema, table_name))
        rows = cur.fetchall()

    # psycopg3는 기본적으로 튜플 반환
    cols = [
        {
            "name": r[0],
            "type": r[1],
        }
        for r in rows
    ]
    return cols


In [8]:
cols_lists: List[Dict[str, List]] = []
for t in list_tables('warehouse'):
    cols = list_columns(t, 'warehouse')
    col_names = [c['name'] for c in cols]
    cols_lists.append({"table": t, "columns": col_names})
print(cols_lists)

데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
데이터베이스에 성공적으로 연결되었습니다.
[{'table': 'bkpf', 'columns': ['belnr', 'xblnr', 'waers', 'cpudt', 'budat']}, {'table': 'likp', 'columns': ['vbeln', 'erdat', 'gbstk', 'wbstk']}, {'table': 'lips', 'columns': ['posnr', 'vgbel', 'vgpos', 'lfimg', 'vrkme', 'matnr', 'arktx', 'vbeln']}, {'table': 'mkpf', 'columns': ['xblnr', 'mblnr']}, {'table': 'mseg', 'columns': ['mblnr', 'zeile', 'vbeln_im', 'vbelp_im', 'menge', 'meins', 'dmbtr', 'waers', 'matnr', 'cpudt_mkpf']}, {'table': 'vbak', 'columns': ['vbeln', 'waerk', 'erdat', 'gbstk']}, {'table': 'vbap', 'columns': ['vbeln', 'posnr', 'vgbel', 'vgpos', 'kwmeng', 'vrkme', 'netwr', 'matnr', 'arktx']}, {'table': 'vbfa', 'columns': ['vbelv', 'posnv', 'vbeln', 'posnn', 'vbtyp_v', 'vbtyp_n', 'erdat', 'erzet']}, {'table': 'vbrk', 'colum

In [9]:
# conn = get_db_connection2()
# try:
#     with conn.cursor() as cur:
#         # 스키마 생성
#         cur.execute(sql.SQL("CREATE SCHEMA IF NOT EXISTS {s};")
#                     .format(s=sql.Identifier('mart')))

#         for item in cols_lists:
#             table_name = item['table']
#             cols = item.get('columns', [])

#             # 컬럼 리스트 선택
#             select_cols = (sql.SQL(', ').join(map(sql.Identifier, cols))
#                            if cols else sql.SQL('*'))

#             stmt = sql.SQL("""
#                 CREATE TABLE IF NOT EXISTS {mart}.{tbl} AS
#                 SELECT {cols}
#                 FROM {wh}.{tbl};
#             """).format(
#                 mart=sql.Identifier('mart'),
#                 wh=sql.Identifier('warehouse'),
#                 tbl=sql.Identifier(table_name),
#                 cols=select_cols
#             )

#             cur.execute(stmt)
#             print(f"{table_name} 저장 성공")

#     conn.commit()
# except Exception as e:
#     conn.rollback()
#     print(f"실패: {e}")
# finally:
#     conn.close()


In [10]:
# conn = get_db_connection2()

# sql = """ 
# truncate table mart.bseg;
# """

# with conn.cursor() as cur:
#     cur.execute(sql)

# conn.commit()
# conn.close()

In [11]:
# conn = get_db_connection2()

# sql = """ 
# select * from mart.bseg;
# """

# with conn.cursor() as cur:
#     cur.execute(sql)
#     rows = cur.fetchall()
#     for row in rows:
#         print(row)


# conn.close()

### 연결된 DB에 excel 파일 데이터 저장하기

1) doc_flow_db에 저장

In [12]:
def insert_excel_to_table(excel_file, table_name, schema="public"):
    """Excel 파일을 DB 테이블에 바로 삽입"""
    
    # 1. DB 컬럼 목록 가져오기
    cols = list_columns(table_name, schema)
    db_columns = [c["name"] for c in cols]
    print(f"DB 컬럼: {db_columns}")
    
    # 2. Excel 읽기
    df = pd.read_excel(excel_file, dtype=str)
    print(f"Excel 컬럼: {list(df.columns)}")
    
    # 3. 컬럼명 정리 (소문자, 공백제거)
    df.columns = [col.strip().lower().replace(" ", "_") for col in df.columns]
    
    # 4. 매칭되는 컬럼만 선택
    matched_cols = [col for col in db_columns if col in df.columns]
    print(f"매칭된 컬럼: {matched_cols}")
    
    if not matched_cols:
        print("매칭되는 컬럼이 없습니다.")
        return False
    
    # 5. 매칭된 컬럼만으로 데이터 준비
    insert_df = df[matched_cols].fillna('')  # NaN을 빈문자열로
    
    # 6. DB 연결해서 INSERT
    conn = get_db_connection()
    if conn is None:
        return False
    
    try:
        with conn, conn.cursor() as cur:
            # INSERT SQL 생성
            cols_str = ", ".join([f'"{col}"' for col in matched_cols])
            placeholders = ", ".join(["%s"] * len(matched_cols))
            sql = f'INSERT INTO {schema}."{table_name}" ({cols_str}) VALUES ({placeholders})'
            
            # 데이터 삽입
            data = [tuple(row) for row in insert_df.values]
            cur.executemany(sql, data)
            
            print(f"{len(data)}개 행이 삽입되었습니다.")
            return True
            
    except Exception as e:
        print(f"삽입 오류: {e}")
        return False

In [13]:
# 사용 예시
table_name = "bkpf"
excel_file = f"data/{table_name}.xlsx"


cols = list_columns(table_name, 'mart')
cols_list=[]
for c in cols:
    cols_list.append(c["name"])
print(cols_list)

데이터베이스에 성공적으로 연결되었습니다.
['belnr', 'xblnr', 'waers', 'cpudt', 'budat']


2) ontology_db에 저장

In [14]:
# db에 저장된 테이블,컬럼 확인. 부재 시 생성
def ensure_schema(conn, schema: str):
    with conn.cursor() as cur:
        cur.execute(
            sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format(sql.Identifier(schema))
        )

def get_existing_columns(conn, schema: str, table: str) -> List[str]:
    with conn.cursor() as cur:
        cur.execute("""
            SELECT column_name
            FROM information_schema.columns
            WHERE table_schema = %s AND table_name = %s
        """, (schema, table))
        return [r[0] for r in cur.fetchall()]

def table_exists(conn, schema: str, table: str) -> bool:
    with conn.cursor() as cur:
        cur.execute("""
            SELECT 1
            FROM information_schema.tables
            WHERE table_schema = %s AND table_name = %s
        """, (schema, table))
        return cur.fetchone() is not None
    
def create_table(conn, schema: str, table: str, columns: List[str], add_surrogate_pk: bool = False):
    # 기본적으로 TEXT 컬럼으로 생성. 타입 추론이 필요하면 엑셀 스캔해서 개선 가능.
    col_defs = []
    if add_surrogate_pk:
        col_defs.append(sql.SQL("id BIGSERIAL PRIMARY KEY"))
    col_defs += [
        sql.Composed([sql.Identifier(c), sql.SQL(" TEXT")]) for c in columns
    ]
    with conn.cursor() as cur:
        cur.execute(
            sql.SQL("CREATE TABLE {}.{} ({})").format(
                sql.Identifier(schema),
                sql.Identifier(table),
                sql.SQL(", ").join(col_defs),
            )
        )

def add_missing_columns(conn, schema: str, table: str, columns: List[str]):
    existing = set(get_existing_columns(conn, schema, table))
    missing = [c for c in columns if c not in existing]
    if not missing:
        return
    with conn.cursor() as cur:
        for c in missing:
            cur.execute(
                sql.SQL("ALTER TABLE {}.{} ADD COLUMN {} TEXT").format(
                    sql.Identifier(schema),
                    sql.Identifier(table),
                    sql.Identifier(c),
                )
            )

def ensure_table_ready(conn, schema: str, table: str, columns: List[str], add_surrogate_pk: bool = False):
    ensure_schema(conn, schema)
    if not table_exists(conn, schema, table):
        create_table(conn, schema, table, columns, add_surrogate_pk=add_surrogate_pk)
        print(f"생성: {schema}.{table} ({', '.join(columns)})")
    else:
        add_missing_columns(conn, schema, table, columns)
        print(f"확인/보강 완료: {schema}.{table}")

In [15]:
def insert_excel_to_table2(conn, excel_file, table_name, column_lists, schema):
    """Excel 파일을 DB 테이블에 바로 삽입"""
    
    # 1. DB 컬럼 목록 가져오기
    cols = column_lists
    db_columns = cols
    print(f"DB 컬럼: {db_columns}")
    
    # 2. Excel 읽기
    df = pd.read_excel(excel_file, dtype=str)
    print(f"Excel 컬럼: {list(df.columns)}")
    
    # 3. 컬럼명 정리 (소문자, 공백제거)
    df.columns = [col.strip().lower().replace(" ", "_") for col in df.columns]
    
    # 4. 매칭되는 컬럼만 선택
    matched_cols = [col for col in db_columns if col in df.columns]
    print(f"매칭된 컬럼: {matched_cols}")
    
    if not matched_cols:
        print("매칭되는 컬럼이 없습니다.")
        return False
    
    # 5. 매칭된 컬럼만으로 데이터 준비
    insert_df = df[matched_cols].fillna('')  # NaN을 빈문자열로
    
    try:
        with conn.cursor() as cur:
            # INSERT SQL 생성
            cols_str = ", ".join([f'"{col}"' for col in matched_cols])
            placeholders = ", ".join(["%s"] * len(matched_cols))
            sql = f'INSERT INTO {schema}.{table_name} ({cols_str}) VALUES ({placeholders})'
            
            # 데이터 삽입
            data = [tuple(row) for row in insert_df.values]
            cur.executemany(sql, data)
            
            print(f"{len(data)}개 행이 삽입되었습니다.")
            return True
            
    except Exception as e:
        print(f"삽입 오류: {e}")
        return False

In [16]:
# staging에 적재할 데이터 준비 - raw data
def store_raw_data(conn):
    if conn is None:
        raise SystemExit("DB 연결 실패")

    try:
        for item in cols_lists:
            table_name: str = item['table']
            df = pd.read_excel(f"/data/{table_name}.xlsx", dtype=str)

            # 테이블 자동 생성/보강
            ensure_table_ready(conn, schema="staging", table=table_name, columns=df.columns, add_surrogate_pk=False)

            conn.commit()

            # 이후 기존 로더 호출
            excel_file = f"{table_name}.xlsx"
            insert_excel_to_table(excel_file, table_name, df.columns, schema="staging")

    finally:
        conn.close()

In [17]:
# warehouse에 적재할 데이터 준비 - 특정 컬럼들만 선택
def store_selected_columns(conn):
    if conn is None:
        raise SystemExit("DB 연결 실패")

    try:
        for item in cols_lists:
            table_name: str = item['table']
            column_lists: List[str] = item['columns']

            # 테이블 자동 생성/보강
            ensure_table_ready(conn, schema="mart", table=table_name, columns=column_lists, add_surrogate_pk=False)

            conn.commit()

            # 이후 기존 로더 호출
            excel_file = f"data/{table_name}.xlsx"
            #print(pd.read_excel(excel_file).head())
            insert_excel_to_table2(conn, excel_file, table_name, column_lists, schema="mart")
            conn.commit()

    finally:
        print("DB 저장 성공했다리~~~~ 퇴근하고 싶다링~~~")

In [18]:
def insert_excel_to_table_psycopg3(conn, excel_file, table_name, column_lists, schema="mart"):
    import pandas as pd
    df = pd.read_excel(excel_file, dtype=str)
    df.columns = [c.strip().lower().replace(" ", "_") for c in df.columns]

    db_columns = [c.lower() for c in column_lists]
    matched = [c for c in db_columns if c in df.columns]
    if not matched:
        print("매칭되는 컬럼 없음")
        return False

    insert_df = df[matched].where(pd.notnull(df[matched]), None)
    rows = [tuple(rec) for rec in insert_df.to_numpy()]

    cols_ident = sql.SQL(", ").join(sql.Identifier(c) for c in matched)
    placeholders = sql.SQL(", ").join(sql.Placeholder() for _ in matched)

    stmt = sql.SQL("INSERT INTO {sch}.{tbl} ({cols}) VALUES ({ph})").format(
        sch=sql.Identifier(schema),
        tbl=sql.Identifier(table_name),
        cols=cols_ident,
        ph=placeholders
    )

    try:
        with conn.cursor() as cur:
            cur.executemany(stmt, rows) 
        conn.commit()
        print(f"{len(rows)}개 행 삽입 완료")
        return True
    except Exception as e:
        conn.rollback()
        print("삽입 오류:", e)
        return False

In [19]:
conn = get_db_connection2()
print(f"{table_name}")


데이터베이스에 성공적으로 연결되었습니다.
bkpf


In [20]:
# insert_excel_to_table_psycopg3(conn, excel_file, table_name, cols_list, schema='mart')

------

### 전처리

document flow에 사용되는 데이터 전처리

In [32]:
def preprocess_date_column(conn, table_name, column_name, schema):
    try:
        q = sql.SQL("""
            UPDATE {sch}.{tbl}
            SET {col} = CAST({col} AS DATE)
            WHERE {col} IS NOT NULL
    """).format(
        sch=sql.Identifier(schema),
        tbl=sql.Identifier(table_name),
        col=sql.Identifier(column_name)
    )
        with conn.cursor() as cur:
            cur.execute(q)
        conn.commit()
    except Exception as e:
        print(f"저장 실패: {e}")
        conn.rollback()

def preprocess_numeric_strip(file_path, column_name):
    # excel 파일의 값을 바꿔야함...
    df = pd.read_excel(file_path, engine='openpyxl', dtype=str)

    if column_name not in df.columns:
        raise ValueError(f"컬럼 '{column_name}' 이 엑셀에 존재하지 않습니다.")

    df[column_name] = pd.to_numeric(df[column_name], errors="coerce")
    df[column_name] = df[column_name].apply(
        lambda x: f"{x:,.2f}".replace(".", ",") if pd.notnull(x) else ""
    )
    df.to_excel(file_path, index=False, engine='openpyxl')
    return df

In [40]:
def preprocess_docflow_data():
    conn = get_db_connection2()
    to_preprocess_date = {
        "vbrk": ["erdat"], 
        "likp": ["erdat"], 
        "mseg": ["cpudt_mkpf"], 
        "bkpf": ["cpudt", "budat"]
    }
    excel_file = f"data/bseg.xlsx"
    column_name = "WRBTR"
    
    for table, columns in to_preprocess_date.items():
        for column in columns:
            preprocess_date_column(conn, table, column, schema='mart')

    # preprocess_numeric_strip(excel_file, column_name)
    # column_lists = list_columns(table_name, schema="public")
    # insert_excel_to_table2(conn, excel_file, table_name, column_lists, schema="mart")

In [41]:
preprocess_docflow_data()

데이터베이스에 성공적으로 연결되었습니다.


In [30]:
conn.close()

In [None]:
table_name = "bseg"
excel_file = f"data/{table_name}.xlsx"


cols = list_columns(table_name)
cols_lists=[]
for c in cols:
    cols_lists.append(c["name"])
print(cols_lists)

In [None]:
preprocess_numeric_strip(excel_file, 'WRBTR')

In [39]:
pd.read_excel(excel_file)['WRBTR'].head()

0    300,000,00
1    300,000,00
2    400,000,00
3    400,000,00
4    400,000,00
Name: WRBTR, dtype: object

In [None]:
insert_excel_to_table2

---

전처리 전, 후 미리보기 / 사용자 확인 후 실제 update 하는 흐름으로 설계

(A) Postgresql 공통 유틸

In [None]:
# 전처리 후 값이 달라지는지 여부 확인
def _is_changed(new_expr_sql, orig_col) -> sql.SQL:
    return sql.SQL("({new}) IS DISTINCT FROM ({orig}::text)").format(
        new=new_expr_sql, orig=sql.Identifier(orig_col)
    )

# update 안전하게 처리
def _set_clause(assignments: dict[str, sql.SQL]) -> sql.SQL:
    parts = [
        sql.SQL("{col} = {expr}").format(col=sql.Identifier(c), expr=e)
        for c, e in assignments.items()
    ]

In [None]:
# 전처리 미리보기 기능
def _db_preview_rows(conn, schema, table, column, new_expr_sql, extra_cols, limit: int=20, offset: int=0):
    condition_sql = _is_changed(new_expr_sql, column)
    cur = conn.cursor()

    select_list = [
        sql.SQL("ctid"),
        sql.SQL("{orig}::text AS before").format(orig=sql.Identifier(column)),
        sql.SQL("{new} AS after").format(new=new_expr_sql)
    ]

    if extra_cols : #미리보기 시 같이 보여주고 싶은 추가 컬럼들
        select_list += [sql.SQL("{c}").format(c=sql.Identifier(c)) for c in extra_cols]

    q = (
        sql.SQL("SELECT ")
        + sql.SQL(", ").join(select_list)
        + sql.SQL(" FROM {s}.{t} WHERE ").format(s=sql.Identifier(schema), t=sql.Identifier(table))
        + condition_sql
        + sql.SQL(" ORDER BY ctid LIMIT %s OFFSET %s")
    )
    cur.execute(q, (limit, offset))
    rows = cur.fetchall()
    colnames = [d.name for d in cur.description]
    cur.close()

    return {
        "rows": [dict(zip(colnames, r)) for r in rows],
        "condition_sql" : condition_sql
    }

In [None]:
# 전처리 대상 행 수 카운트
def _count_rows(conn, schema, table, condition_sql) -> int:
    cur = conn.cursor()

    q = (
        sql.SQL("SELECT COUNT(*) FROM {s}.{t} WHERE ")
        .format(s=sql.Identifier(schema), t=sql.Identifier(table))
        + condition_sql
    )
    
    cur.execute(q)
    (cnt,) = cur.fetchone() #fetchone : 결과 집합에서 한 행만 가져오는 함수 (ex. 1, 'Rannie')
    cur.close()
    return cnt

In [None]:
# DB에 update - after user's confirmation
def _db_apply_update(conn, schema, table, column, new_expr_sql, condition_sql) -> int:
    """변화가 생기는 행만 update"""
    cur = conn.cursor()

    q = (
        sql.SQL("UPDATE {s}.{t} SET ").format(s=sql.Identifier(schema), t=sql.Identifier(table))
        + _set_clause({column: new_expr_sql})
        + sql.SQL(" WHERE ")
        + condition_sql
    )

    cur.execute()
    conn.commit()
    updated = cur.rowcount
    cur.close()
    return updated

(B) Excel 공동 유틸

In [None]:
def _excel_preview_rows(df: pd.DataFrame, column: str, extra_cols=None, limit: int=20, offset: int=0):
    # . , 제거 전처리 (문자 기준, NanN 안전 처리)
    before = df[column].astype("string")
    after = before.fillna("").apply(lambda x: re.sub(r"[.,]", "", x) if x is not pd.NA else x)

    mask_changed = (after != before) & ~(before.isna() & after.isna())
    preview_df = pd.DataFrame({
        "before": before[mask_changed],
        "after": after[mask_changed]
    })

    if extra_cols:
        for c in extra_cols:
            preview_df[c] = df.loc[mask_changed, c]

    #페이징
    preview_df = preview_df.iloc[offset: offset + limit].copy()
    
    return preview_df

def _excel_apply_transform(df, column):
    df[column] = df[column].astype("string").fillna("").apply(lambda x:re.sub(r"[.,]", "", x))
    return df

숫자/금액 데이터 : ',' '.' 전처리

In [None]:
# 엑셀 파일
def format_number_column_excel(file_path: str, column_name: str):
    df = pd.read_excel(file_path, engine='openpyxl', dtype=str)

    if column_name not in df.columns:
        raise ValueError(f"컬럼 '{column_name}' 이 엑셀에 존재하지 않습니다.")

    df[column_name] = pd.to_numeric(df[column_name], errors="coerce")
    df[column_name] = df[column_name].apply(
        lambda x: f"{x:,.2f}".replace(".", ",") if pd.notnull(x) else ""
    )
    df.to_excel(file_path, index=False, engine='openpyxl')
    return df


In [None]:
# postgresql
def format_number_column_psql(conn, schema, table_name, column_name, dry_run: bool = False):
    cur = conn.cursor()

    # 미리 확인 - 전처리 될 행 확인

    return {"updated rows": updated}

날짜 데이터 전처리

In [None]:
# 시간이 들어가지 않도록 (날짜만 남기기)


# 날짜 데이터 형식 변환

이상치 처리

In [None]:
#음수가 나올 수 없는 값의 음수 제거/보정

공백, 중복 등 처리

In [None]:
#소문자, 대문자 변환

#결측치 처리 1) null값 형식 통일

#결측치 처리 2) 전체 null인 컬럼 삭제

#앞뒤 공백 제거 : strip()

#중복값 제거 (PK 중복 등)


타입 변환