In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import lit, substring, col
from delta.tables import DeltaTable 
import os
import psycopg2 
import boto3
from botocore.exceptions import ClientError

In [5]:
# input parameters
WK_YM = '202502'

In [2]:
def is_running_in_jupyter():
    try:
        # IPython 환경인지 확인
        shell = get_ipython().__class__.__name__
        if shell == 'ZMQInteractiveShell':
            return True   # Jupyter Notebook 또는 QtConsole
        elif shell == 'TerminalInteractiveShell':
            return False  # 터미널에서 실행되는 IPython
        else:
            return False  # 기타 환경
    except NameError:
        return False      # IPython이 아님 (일반 Python 인터프리터)


In [16]:
def set_config_env(s3_bucket_name):
    if is_running_in_jupyter():
        # jupyter notebook에서는 $HOME/config/.env 파일에서 환경변수 read     
        from dotenv import load_dotenv
        dotenv_path = './../../config/.env'
        load_dotenv(dotenv_path=dotenv_path)    

    # S3_CONFIG, PG_CONFIG 설정 
    S3_CONFIG = {
        "access_key": os.getenv("AWS_ACCESS_KEY_ID"),
        "secret_key": os.getenv("AWS_SECRET_ACCESS_KEY"),
        "endpoint": os.getenv("AWS_ENDPOINT_URL"),
        "bucket":  s3_bucket_name
    }

    PG_CONFIG = {
        "host": os.getenv("PG_HOST"),
        "port": os.getenv("PG_PORT", "5432"), # 기본값 5432 지정
        "dbname": os.getenv("PG_DB"),
        "user": os.getenv("PG_USER"),
        "password": os.getenv("PG_PASSWORD")
    }
        
    return S3_CONFIG, PG_CONFIG 
    

In [17]:
# 환경변수에서 S3_CONFIG, PG_CONFIG setting
S3_CONFIG = {}
PG_CONFIG = {}
S3_CONFIG, PG_CONFIG = set_config_env(s3_bucket_name="paasup")
    
print(f"S3_ACCESS_KEY is {S3_CONFIG['access_key']}") 
print(f"PG_HOST is {PG_CONFIG['host']}") 


S3_ACCESS_KEY is 3JHV0NGHJP378DJ4CXJ484GFFTIQQ2CW5CQFWJBM64K0QN2LSTIKKBJP1
PG_HOST is demo01-postgresql2-postgresql-ha-postgresql.demo01-postgresql2.svc.cluster.local


In [18]:
def create_spark_session(app_name,  notebook_name, notebook_namespace, s3_access_key, s3_secret_key, s3_endpoint):

    if is_running_in_jupyter():    
        conf = SparkConf()
        # Spark driver, executor 설정
        conf.set("spark.submit.deployMode", "client")
        conf.set("spark.executor.instances", "1")
        conf.set("spark.executor.memory", "1G")
        conf.set("spark.driver.memory", "1G")
        conf.set("spark.executor.cores", "1")
        conf.set("spark.kubernetes.namespace", notebook_namespace)
        conf.set("spark.kubernetes.container.image", "paasup/spark:3.5.2-java17-python3.11-2")
        conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "default-editor")
        conf.set("spark.kubernetes.driver.pod.name", os.environ["HOSTNAME"])
        conf.set("spark.driver.bindAddress", "0.0.0.0")
        conf.set("spark.driver.host", notebook_name+ "-headless." + notebook_namespace + ".svc.cluster.local")
        conf.set("spark.driver.port", "51810")        
        conf.set("spark.broadcast.port", "51811")     
        conf.set("spark.blockManager.port", "51812")
    
        # s3, delta, postgresql 사용 시 필요한 jar 패키지 설정
        jar_list = ",".join([
        "org.apache.hadoop:hadoop-common:3.3.4",
        "org.apache.hadoop:hadoop-aws:3.3.4",
        "com.amazonaws:aws-java-sdk:1.11.655", 
        "io.delta:delta-spark_2.12:3.3.1", # Keep if you might use Delta Lake for other operations
        "org.postgresql:postgresql:42.7.2" # Added for PostgreSQL
        ])
        conf.set("spark.jars.packages", jar_list)  
    
        # s3 세팅
        conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        conf.set('spark.hadoop.fs.s3a.path.style.access', 'true')
        conf.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'true')
        conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
        conf.set('spark.hadoop.fs.s3a.access.key', s3_access_key)
        conf.set('spark.hadoop.fs.s3a.secret.key', s3_secret_key)
        conf.set('spark.hadoop.fs.s3a.endpoint', s3_endpoint)
        
        ### ssl 검증 비활성화
        conf.set("spark.driver.extraJavaOptions", "-Dcom.amazonaws.sdk.disableCertChecking=true")
        conf.set("spark.executor.extraJavaOptions", "-Dcom.amazonaws.sdk.disableCertChecking=true")
    
        # deltalake 세팅
        conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 

        # --- SparkSession 빌드 ---
        spark = SparkSession.builder \
            .appName(app_name) \
            .config(conf=conf) \
            .getOrCreate()
    else:
        # --- SparkSession 빌드 ---
        spark = SparkSession.builder \
            .appName(app_name) \
            .getOrCreate()

    return spark

In [19]:
def move_s3_file(s3_config, source_key, dest_key):
    """
    boto3를 사용하여 S3 내에서 파일을 이동(복사 후 삭제)합니다.
    """
    s3_client = boto3.client(
        's3',
        aws_access_key_id=s3_config['access_key'],
        aws_secret_access_key=s3_config['secret_key'],
        endpoint_url=s3_config['endpoint'],
        verify=False # 자체 서명 인증서 사용 시 필요할 수 있음
    )
    try:
        copy_source = {'Bucket': s3_config['bucket'], 'Key': source_key}
        print(f"Copying S3 object from {source_key} to {dest_key}...")
        s3_client.copy_object(CopySource=copy_source, Bucket=s3_config['bucket'], Key=dest_key)
        print(f"Deleting source S3 object: {source_key}...")
        s3_client.delete_object(Bucket=s3_config['bucket'], Key=source_key)
        print(f"Successfully moved {source_key} to {dest_key}.")
        return True
    except ClientError as e:
        print(f"Error moving S3 file: {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred during S3 move: {e}")
        return False

In [20]:
def subway_data_append(spark, s3_config, pg_config):  #pg_config, pg_table, pg_url, pg_properties, delete_sql, insert_sql):
    """
    CSV--> DELTA --> POSTGRESQL 처리하는 메인 파이프라인
    """
    pg_url = f"jdbc:postgresql://{pg_config['host']}:{pg_config['port']}/{pg_config['dbname']}"
    pg_properties = {
        "user": pg_config['user'],
        "password": pg_config['password'],
        "driver": "org.postgresql.Driver"
    }

    print(f" bucket name is {s3_config['bucket']}")
    csv_source_key = f"datasource/CARD_SUBWAY_MONTH_{WK_YM}.csv"
    csv_archive_key = f"archive/CARD_SUBWAY_MONTH_{WK_YM}.csv"
    csv_s3_path = f"s3a://{s3_config['bucket']}/{csv_source_key}"
    delta_s3_path = f"s3a://{s3_config['bucket']}/deltalake/subway_passengers"
    pg_table_name = "subway_passengers"
    
    try:    
        print(f"\n--- Starting processing for {pg_table_name} during '{WK_YM}' ---")

        # --- 단계 1: S3 CSV 읽기 ---
        print(f"Reading CSV from {csv_s3_path}...")
        # CSV 스키마나 옵션(overwrite, inferSchema...)은 소스 파일에 맞게 조정
        csv_df = spark.read.option("header", "true").option("overwriteSchema", "true").csv(csv_s3_path)
        # '사용일자' 컬럼이 YYYYMMDD 형식이라고 가정
        # 필요시 yyyymm 컬럼을 추가하거나 기존 컬럼을 확인
        # csv_df = csv_df.withColumn("operation_month", lit(yyyymm)) # 필요시 추가
        csv_df.createOrReplaceTempView("subway_data")
        print(f"CSV for {WK_YM} loaded. Count: {csv_df.count()}")
        csv_df.show(5)

        # --- 단계 2: Delta Lake에 삭제 후 삽입 (Spark SQL 사용) ---
        print(f"Deleting old data for {WK_YM} from Delta table: {delta_s3_path}")
        
        if DeltaTable.isDeltaTable(spark, delta_s3_path):
            # '사용일자' 컬럼 형식에 맞게 WHERE 조건 수정
            print(f" Delta Table {delta_s3_path} 있음") 
            delete_delta_sql = f"delete from delta.`{delta_s3_path}` where `사용일자`  like '{WK_YM}'||'%' "
            delta_mode = "append"
            try:
                spark.sql(delete_delta_sql)
                print(f"Deletion from Delta complete. Inserting new data for {WK_YM}...")
            except Exception as e:
                print(f"Skip on error occurred during Delta DELETE, even though table exists: {e}")
                raise # 예상치 못한 오류는 다시 발생시킴         
        else:
            print(f" Delta Table {delta_s3_path} 없음") 
            delta_mode = "overwrite"
        
        # DataFrame API로 쓰기 (SQL INSERT INTO도 가능하지만, DF API가 S3 쓰기에 더 일반적)
        csv_df.write.format("delta").mode(delta_mode).save(delta_s3_path)
        print(f"Successfully wrote data to Delta table {delta_s3_path}.")
        
        # --- 단계 3: S3 CSV 파일 이동 ---
        print(f"Archiving S3 CSV file...")
        move_s3_file(s3_config, csv_source_key, csv_archive_key)
        

        # --- 단계 4: PostgreSQL에 삭제 후 삽입 ---   
        print(f"Deleting old data for {WK_YM} from PostgreSQL table: {pg_table_name}")        
        conn = None
        cursor = None
        conn = psycopg2.connect (
                    host=pg_config['host'],
                    port=pg_config['port'],
                    dbname=pg_config['dbname'],
                    user=pg_config['user'],
                    password=pg_config['password'] )
        cursor = conn.cursor()

        delete_sql = f" delete from {pg_table_name} where use_date like  '{WK_YM}'||'%' "
        print(f"Executing DELETE query: {delete_sql}")
        cursor.execute(delete_sql)
        deleted_rows = cursor.rowcount
        if cursor: cursor.close()
        if conn: 
            conn.commit()
            conn.close()
        print(f"Successfully executed DELETE query. {deleted_rows} rows deleted.")    

        print(f"Inserting data for {WK_YM} into PostgreSQL table: {pg_table_name}")
         # 데이터를 가공할 Spark SQL 쿼리
        insert_sql = f"""
                        select  `사용일자` use_date
                           ,`노선명` line_no
                           ,`역명`  station_name
                           ,cast(`승차총승객수` as int) pass_in
                           ,cast(`하차총승객수` as int) pass_out
                           ,substring(`등록일자`,1,8) reg_date
                        from delta.`{delta_s3_path}`
                        where `사용일자` like '{WK_YM}'||'%'
                    """      
        print(insert_sql) 
        delta_df = spark.sql(insert_sql)
        delta_df.write \
            .jdbc(url=pg_url,
                  table=pg_table_name,
                  mode="append",
                  properties=pg_properties)
        print(f"Successfully wrote data to PostgreSQL table {pg_table_name}.")

        print(f"--- Processing for {WK_YM} completed successfully! ---")

    except Exception as e:
        print(f"An error occurred during processing {WK_YM}: {e}")
        import traceback
        traceback.print_exc()


In [21]:
# --- 메인 실행 블록 ---
if __name__ == "__main__":
    
    if is_running_in_jupyter():
        WK_YM = '202504'
    else:
        # input parameters by airflow 
        WK_YM = os.environ['WK_YM']

    print("------------------------------")
    print("WK_YM : " + WK_YM)
    print("-------------------------------")

    # --- APP구성 변수 설정  ---
    APP_NAME = 'load_subway_passengers'
    NOTEBOOK_NAME      = "test"
    NOTEBOOK_NAMESPACE = "demo01-kf"    

    # 환경변수에서 S3_CONFIG, PG_CONFIG setting
    S3_CONFIG = {}
    PG_CONFIG = {}
    S3_CONFIG, PG_CONFIG = set_config_env(s3_bucket_name="paasup")
    
    # SparkSession 생성
    spark = create_spark_session(
        app_name=APP_NAME,
        notebook_name=NOTEBOOK_NAME,
        notebook_namespace=NOTEBOOK_NAMESPACE,
        s3_access_key=S3_CONFIG['access_key'],
        s3_secret_key=S3_CONFIG['secret_key'],
        s3_endpoint=S3_CONFIG['endpoint']
        # app_name은 기본값을 사용하므로 전달하지 않아도 됩니다.
    )   
    
    # 데이터 처리 실행
    subway_data_append(spark, S3_CONFIG, PG_CONFIG)
    

    # # SparkSession 종료
    # spark.stop()
    # print("Spark session stopped.")


:: loading settings :: url = jar:file:/usr/local/spark-3.5.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.apache.hadoop#hadoop-common added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk added as a dependency
io.delta#delta-spark_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ba8abe39-4a83-4c29-904f-0f4a777339bb;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-common;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-protobuf_3_7;1.1.1 in central
	found org.apache.hadoop#hadoop-annotations;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 in central
	found com.google.guava#guava;27.0-jre in central
	found com.google.guava#failureaccess;1.0 in central
	found com.google.guava#listenablefuture;9999.0-empty-to-avoid-conflict-with-guava in central
	found com.google.code.findbugs#jsr305;3.0.2 i

 bucket name is paasup

--- Starting processing for subway_passengers during '202504' ---
Reading CSV from s3a://paasup/datasource/CARD_SUBWAY_MONTH_202504.csv...


25/06/04 04:25:48 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/06/04 04:25:49 WARN AmazonHttpClient: SSL Certificate checking for endpoints has been explicitly disabled.
                                                                                

CSV for 202504 loaded. Count: 18515
+--------+------+-----------------------+------------+------------+--------+
|사용일자|노선명|                   역명|승차총승객수|하차총승객수|등록일자|
+--------+------+-----------------------+------------+------------+--------+
|20250401| 2호선|                   시청|       31730|       30459|20250404|
|20250401| 2호선|             을지로입구|       55089|       57583|20250404|
|20250401| 2호선|              을지로3가|       27974|       28001|20250404|
|20250401| 2호선|              을지로4가|       17026|       17012|20250404|
|20250401| 2호선|동대문역사문화공원(DDP)|       15742|       18349|20250404|
+--------+------+-----------------------+------------+------------+--------+
only showing top 5 rows

Deleting old data for 202504 from Delta table: s3a://paasup/deltalake/subway_passengers
 Delta Table s3a://paasup/deltalake/subway_passengers 있음


25/06/04 04:26:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/06/04 04:26:28 WARN AmazonHttpClient: SSL Certificate checking for endpoints has been explicitly disabled.


Deletion from Delta complete. Inserting new data for 202504...


25/06/04 04:26:31 WARN AmazonHttpClient: SSL Certificate checking for endpoints has been explicitly disabled.


Successfully wrote data to Delta table s3a://paasup/deltalake/subway_passengers.
Archiving S3 CSV file...
Copying S3 object from datasource/CARD_SUBWAY_MONTH_202504.csv to archive/CARD_SUBWAY_MONTH_202504.csv...
Deleting source S3 object: datasource/CARD_SUBWAY_MONTH_202504.csv...
Successfully moved datasource/CARD_SUBWAY_MONTH_202504.csv to archive/CARD_SUBWAY_MONTH_202504.csv.
Deleting old data for 202504 from PostgreSQL table: subway_passengers
Executing DELETE query:  delete from subway_passengers where use_date like  '202504'||'%' 




Successfully executed DELETE query. 18515 rows deleted.
Inserting data for 202504 into PostgreSQL table: subway_passengers

                        select  `사용일자` use_date
                           ,`노선명` line_no
                           ,`역명`  station_name
                           ,cast(`승차총승객수` as int) pass_in
                           ,cast(`하차총승객수` as int) pass_out
                           ,substring(`등록일자`,1,8) reg_date
                        from delta.`s3a://paasup/deltalake/subway_passengers`
                        where `사용일자` like '202504'||'%'
                    


[Stage 21:>                                                         (0 + 1) / 1]

Successfully wrote data to PostgreSQL table subway_passengers.
--- Processing for 202504 completed successfully! ---


                                                                                

In [22]:
spark.stop()