In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DoubleType
from pyproj import Transformer
from pyspark.sql.functions import pandas_udf

import pandas as pd
import yaml
import os

from utils.spark_path import get_latest_year_month_path, get_current_year_month_path

In [2]:
spark = (
    SparkSession.builder
    .appName("silver_clean_to_s0_address")
    .master("spark://spark-master:7077")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    .config("spark.sql.shuffle.partitions", "200")
    .getOrCreate()
)

Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/21 10:59:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Config

In [3]:
CONFIG_PATH = "./config.yaml"

with open(CONFIG_PATH, "r", encoding="utf-8") as f:
    cfg = yaml.safe_load(f)

ROOT = cfg["data_lake"]["root"]
LAYERS = cfg["data_lake"]["layers"]

# input
address_clean_src_base = os.path.join(
    ROOT,
    LAYERS["silver"]["clean"]["domains"]["address_clean"]["paths"]["parquet"]
)
address_clean_src_path = get_latest_year_month_path(spark, address_clean_src_base)

coord_clean_src_base = os.path.join(
    ROOT,
    LAYERS["silver"]["clean"]["domains"]["coord_clean"]["paths"]["parquet"]
)
coord_clean_src_path = get_latest_year_month_path(spark, coord_clean_src_base)


# output
s0_address_base = os.path.join(
    ROOT,
    LAYERS["silver"]["stages"]["s0"]["domains"]["address"]["paths"]["parquet"]
)
s0_address_path = get_current_year_month_path(s0_address_base)

s0_partition_cols = LAYERS["silver"]["stages"]["s0"]["domains"]["address"].get("partition")

print("[PATH] address_src_path =", address_clean_src_path)
print("[PATH] coord_src_path   =", coord_clean_src_path)
print("[PATH] s0_address_path  =", s0_address_path)
print("[CONF] s0_partition_cols =", s0_partition_cols)

[PATH] address_src_path = /opt/spark/data/silver/clean/address/year=2026/month=02
[PATH] coord_src_path   = /opt/spark/data/silver/clean/coord/year=2026/month=02
[PATH] s0_address_path  = /opt/spark/data/silver/s0/address/year=2026/month=02
[CONF] s0_partition_cols = ['region']


# Bronze 데이터 로드 (도로명주소, 위치정보)

In [4]:
# ============================================================
# LOAD
# ============================================================
addr_clean_df = spark.read.parquet(address_clean_src_path)
coord_clean_df = spark.read.parquet(coord_clean_src_path)

addr_clean_df.printSchema()
coord_clean_df.printSchema()

                                                                                

root
 |-- PNU코드: string (nullable = true)
 |-- 도로명주소: string (nullable = true)
 |-- region: string (nullable = true)

root
 |-- 도로명주소: string (nullable = true)
 |-- x_utmk: double (nullable = true)
 |-- y_utmk: double (nullable = true)
 |-- region: string (nullable = true)



# 도로명주소, 위치정보 join

In [5]:
joined_df = (
    addr_clean_df
    .join(coord_clean_df.drop("region"), on="도로명주소", how="left")
)

print("joined_df schema")
joined_df.printSchema()
joined_df.show(5, truncate=False)

joined_df schema
root
 |-- 도로명주소: string (nullable = true)
 |-- PNU코드: string (nullable = true)
 |-- region: string (nullable = true)
 |-- x_utmk: double (nullable = true)
 |-- y_utmk: double (nullable = true)



[Stage 6:>                                                          (0 + 1) / 1]

+-------------------------+-------------------+------+--------------+--------------+
|도로명주소               |PNU코드            |region|x_utmk        |y_utmk        |
+-------------------------+-------------------+------+--------------+--------------+
|경기도 가평군 가랫골길 17|4182025031101400010|경기  |1000681.927311|1976146.147896|
|경기도 가평군 가랫골길 22|4182025031106870000|경기  |1000668.204629|1976181.683314|
|경기도 가평군 가랫골길 24|4182025031105080000|경기  |1000657.463751|1976191.74324 |
|경기도 가평군 가랫골길 38|4182025031105100005|경기  |1000523.771708|1976229.485399|
|경기도 가평군 가랫골길 9 |4182025031101400008|경기  |1000758.497104|1976124.261699|
+-------------------------+-------------------+------+--------------+--------------+
only showing top 5 rows


                                                                                

# 좌표 변환 (EPSG:5179 -> EPSG:4326)

In [6]:
schema = StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
])

@pandas_udf(schema)
def utmk5179_to_wgs84(x: pd.Series, y: pd.Series) -> pd.DataFrame:
    # ✅ executor에서 import + transformer 생성
    from pyproj import Transformer
    transformer = Transformer.from_crs("EPSG:5179", "EPSG:4326", always_xy=True)

    # null 처리
    mask = x.isna() | y.isna()
    xx = x.astype("float64")
    yy = y.astype("float64")

    lon, lat = transformer.transform(xx, yy)

    out = pd.DataFrame({"latitude": lat, "longitude": lon})
    out.loc[mask, ["latitude", "longitude"]] = None
    return out

# 적용
converted_df = (
    joined_df
    .withColumn("wgs84", utmk5179_to_wgs84(F.col("x_utmk"), F.col("y_utmk")))
    .withColumn("latitude", F.col("wgs84.latitude"))
    .withColumn("longitude", F.col("wgs84.longitude"))
    .drop("wgs84")
)

converted_df.printSchema()


root
 |-- 도로명주소: string (nullable = true)
 |-- PNU코드: string (nullable = true)
 |-- region: string (nullable = true)
 |-- x_utmk: double (nullable = true)
 |-- y_utmk: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [7]:
joined_df = (
    converted_df
    .drop("x_utmk", "y_utmk")
    .select("region", "PNU코드", "도로명주소", "longitude", "latitude")
)

joined_df.show(5, truncate=False)

[Stage 11:>                                                         (0 + 1) / 1]

+------+-------------------+-------------------------+------------------+------------------+
|region|PNU코드            |도로명주소               |longitude         |latitude          |
+------+-------------------+-------------------------+------------------+------------------+
|경기  |4182025031101400010|경기도 가평군 가랫골길 17|127.50774450708117|37.78500342081887 |
|경기  |4182025031106870000|경기도 가평군 가랫골길 22|127.50758869417821|37.785323719849806|
|경기  |4182025031105080000|경기도 가평군 가랫골길 24|127.50746672085072|37.785414400065214|
|경기  |4182025031105100005|경기도 가평군 가랫골길 38|127.5059484269042 |37.78575466522971 |
|경기  |4182025031101400008|경기도 가평군 가랫골길 9 |127.50861407143987|37.784806095122065|
+------+-------------------+-------------------------+------------------+------------------+
only showing top 5 rows


                                                                                

# 결과 저장

In [8]:
(
    joined_df
    .write.mode("overwrite")
    .partitionBy(*s0_partition_cols)
    .parquet(s0_address_path)
)

print("✅ saved:", s0_address_path)

                                                                                

✅ saved: /opt/spark/data/silver/s0/address/year=2026/month=02


In [9]:
spark.stop()