# 0. JDBC 사용해서 DB에 적재하는 코드 두고 갑니다~

## 0-1. 필요한 라이브러리 설치 및 호출

In [0]:
# %sh
# pip install --quiet psycopg2-binary

In [0]:
%pip install sqlalchemy



[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting sqlalchemy
  Using cached sqlalchemy-2.0.43-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.2 MB)
Collecting typing-extensions>=4.6.0
  Using cached typing_extensions-4.15.0-py3-none-any.whl (44 kB)
Collecting greenlet>=1
  Using cached greenlet-3.2.4-cp310-cp310-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (584 kB)
Installing collected packages: typing-extensions, greenlet, sqlalchemy
  Attempting uninstall: typing-extensions
    Found existing installation: typing_extensions 4.4.0
    Not uninstalling typing-extensions at /databricks/python3/lib/python3.10/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-604df710-fc7a-4de4-8e65-ce48b9582ce1
    Can't uninstall 'typing_extensions'. No files were found to uninstall.
Successfully installed greenlet-3.2.4 sqlalchemy-2.0.43 typing-extensions-4.15.0
[43mNote: you may need

In [0]:
dbutils.library.restartPython()

In [0]:
import psycopg2
import pyspark
from pyspark import sql as pysql
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField,
    StringType, ShortType, IntegerType, LongType, DoubleType, BooleanType,
    TimestampType, ArrayType, MapType
)
from pyspark.sql import functions as F
import pandas as pd
from sqlalchemy import create_engine

## 0-2. 스키마(DB) 및 테이블 이름 설정

In [0]:
'''SCHEMA_LEVEL = "bronze"
TABLE_NAME = "brz_employee_info"'''

## 0-3. Azure VM에 올라와있는 PostgreSQL과 연결하는 코드 (VM 켜야 연결 가능)

In [0]:
'''conn = psycopg2.connect(
    host="20.196.145.211",
    port=5432,
    dbname="postgres",
    user="postgres",
    password="asdASD123!@#"
)
cur = conn.cursor()'''

## 0-4. (필요하면 사용) 특정 테이블 DROP하는 코드

In [0]:
# # 기존 테이블 드롭 (CASCADE 포함)
# cur.execute(f"DROP TABLE IF EXISTS {SCHEMA_LEVEL}.{TABLE_NAME} CASCADE;")

## 0-5. 정해진 PostgreSQL 스키마로 테이블을 생성하는 코드(본인 테이블에 맞게 바꾸기, VM 켜서 직접 생성해도 무관)

In [0]:
# cur.execute(f"""
# CREATE TABLE IF NOT EXISTS {SCHEMA_LEVEL}.{TABLE_NAME} (
#   id                   varchar(255) NOT NULL PRIMARY KEY,
#   sha                  char(40),
#   author               varchar(100),
#   private              boolean,
#   tags                 text ,
#   likes                double precision,
#   created_at           timestamptz,
#   config               text,
# );
# """)

## 0-6. Psycopg2의 모든 실행 내역 commit하고 종료하기

In [0]:
# conn.commit()
# cur.close()
# conn.close()

## 0-7. INSERT 함수 정의

In [0]:
'''
def insert_data(
    data: Union[pd.DataFrame, pyspark.sql.DataFrame],
    schema: str,
    table: str,
    mode: str = "overwrite"
) -> None:
    # spark 데이터프레임 생성
    columns = list(data.columns)
    if isinstance(data, pd.DataFrame):
        # data = spark.createDataFrame(data, schema=columns)
        data = spark.createDataFrame(data)
    
    # 인증 정보 하드코딩
    jdbc_user = "postgres"
    jdbc_pass = "asdASD123!@#"

    # 연결 문자열 하드코딩
    host = "20.196.145.211"
    port = "5432"
    jdbc_url = f"jdbc:postgresql://{host}:{port}/postgres"

    data.write.format("jdbc").mode(mode).options(
        url=jdbc_url,
        user=jdbc_user,
        password=jdbc_pass,

        # 가끔 테이블 이름을 대소문자 혼용하여 인자로 던져주는 경우를 대비한 코드
        # PostgreSQL의 테이블은 언제나 소문자만 사용함
        dbtable=f"{schema}.{table.lower()}",

        # overwrite일 때 DROP 후 CREATE가 아니라, TRUNCATE TABLE로 데이터만 비움
        truncate=True,

        # 대량 적재 튜닝: INSERT할 때 한 번에 몇 행씩 묶어서 넣을지
        batchsize=1000,

        # 스키마 적재 튜닝: INSERT시 JDBC는 모든 문자열 컬럼에 대해 setString() 함수를 사용해서, 자동으로 PosgreSQL에 text로 적재함
        # 이 옵션을 사용하면 unspecified로 넘겨 PostgreSQL 테이블의 스키마를 확인하고, 해당 스키마에 맞게끔 캐스팅함
        # 대신 넣으려는 데이터와 테이블의 스키마를 정확히 일치시켜야 함. 그렇지 않으면 에러가 발생함
        # 에러가 발생하는 경우 그냥 주석처리하거나, PostgreSQL 테이블 스키마를 TEXT로 전부 바꾸면 해결
        stringtype="unspecified"
    ).save()
'''

## 0-8. (필요하면 사용) 데이터프레임 로드 후 spark 데이터프레임 생성

In [0]:
# # 필요하면 그때 사용
# # spark dataframe schema 정의
# schema = StructType([
#     StructField("id",                   StringType(),       False), # PK, NOT NULL
#     StructField("sha",                  StringType(),       True),  # CHAR(40)
#     StructField("author",               StringType(),       True),  # 
#     StructField("private",              BooleanType(),      True),  # 
#     StructField("tags",                 StringType(),       True),  # '["a","b"]' 같은 문자열
#     StructField("likes",                IntegerType(),      True),  # 
#     StructField("created_at",           TimestampType(),    True),  # 
#     StructField("config",               StringType(),       True),  # JSON 문자열
# ])

In [0]:
# csv 로드 후 spark dataframe 생성
# path = "/Workspace/Shared/BRZ_user/brz_employee_info.csv"
# encoding_type = "UTF-8"
# df = pd.read_csv(path, encoding=encoding_type)

# spark_df = spark.createDataFrame(df, schema=schema)

In [0]:
""" 필요하다면 여기서 최소한의 전처리 하기 """

' 필요하다면 여기서 최소한의 전처리 하기 '

## 0-8. Azure VM의 PostgreSQL에 데이터 적재하기

In [0]:
# 특정 테이블에 데이터를 삽입하는 코드
# mode 종류: overwrite, append, ignore, error
# insert_data(
#     # data=spark_df,
#     data=df,
#     schema=SCHEMA_LEVEL,
#     table=TABLE_NAME,
#     mode="overwrite"
# )

## cloud DB에 적재

In [0]:
SCHEMA_LEVEL = "bronze"
TABLE_NAME = "brz_employee_info"  # 새 테이블로 저장
POSTGRES_CONN = "postgresql://Drawbridge:asdASD123%21%40%23@cloud-postgredb-server.postgres.database.azure.com:5432/postgres"
engine = create_engine(POSTGRES_CONN, connect_args={"sslmode": "require"})

path = "/Workspace/Shared/BRZ_user/brz_employee_info.csv"
encoding_type = "UTF-8"
df = pd.read_csv(path, encoding=encoding_type)

df.to_sql(
    name=TABLE_NAME,
    con=engine,
    schema=SCHEMA_LEVEL,
    if_exists='replace',  # 기존 테이블 덮어쓰기
    index=False
)

print(f"✅ 수정된 데이터를 {SCHEMA_LEVEL}.{TABLE_NAME} 테이블로 저장 완료")

✅ 수정된 데이터를 bronze.brz_employee_info 테이블로 저장 완료
