In [47]:
from pyspark.sql import SparkSession
from pyproj import Proj,transform, CRS
import numpy as np
import pandas as pd
import mysql.connector
from pyspark.sql.functions import udf, col, substring, isnan, when, count, isnull
from pyspark.sql.types import FloatType
from pyspark.sql.types import StringType
from tqdm import tqdm

In [2]:
# Spark 세션 시작
spark = SparkSession.builder \
    .appName("CSV to MariaDB") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/24 04:05:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# CSV 파일 읽기
df = spark.read.csv("hdfs://master:9000/input/구polygon.csv", header=True, inferSchema=True)

In [None]:
df.select('구코드').show() # select name from data_sdf;

In [3]:
# EPSG 코드 정의 (권장되는 최신 방식)
proj_UTMK = Proj(CRS.from_epsg(5181))
proj_WGS84 = Proj(CRS.from_epsg(4326))

In [26]:
# 좌표 변환 함수 정의
def utmk_to_wgs84(x, y):
    lon, lat = transform(proj_UTMK, proj_WGS84, x, y)
    return float(lon), float(lat)

def transform_polygon(geometry_str):
    def transform_coords(coords_str):
        transformed_coords = []
        for coord_pair in coords_str.split(", "):
            x, y = map(float, coord_pair.split())
            lon, lat = utmk_to_wgs84(x, y)
            transformed_coords.append(f"{lon} {lat}")
        return transformed_coords

    # Handle POLYGON
    if geometry_str.startswith("POLYGON"):
        coordinates_str = geometry_str[9:-2]  # Remove "POLYGON ((" and "))"
        transformed_coords = transform_coords(coordinates_str)
        transformed_polygon = "POLYGON ((" + ", ".join(transformed_coords) + "))"
        return transformed_polygon
    
    # Handle MULTIPOLYGON
    elif geometry_str.startswith("MULTIPOLYGON"):
        # Clean the MULTIPOLYGON string and split individual polygons
        multipolygons_str = geometry_str[15:-2]  # Remove "MULTIPOLYGON (" and "))"
        individual_polygons = multipolygons_str.split(")), ((")  # Split by the individual polygons
        
        transformed_polygons = []
        for polygon_str in individual_polygons:
            polygon_str = polygon_str.replace("(", "").replace(")", "").strip()  # Clean each polygon string
            transformed_coords = transform_coords(polygon_str)
            transformed_polygon = "((" + ", ".join(transformed_coords) + "))"
            transformed_polygons.append(transformed_polygon)
        
        # Combine transformed polygons into MULTIPOLYGON format
        transformed_multipolygon = "MULTIPOLYGON (" + ", ".join(transformed_polygons) + ")"
        return transformed_multipolygon
    
    return geometry_str  # Return unchanged if not POLYGON or MULTIPOLYGON

# UDF를 통해 변환 함수 등록
utmk_to_wgs84_udf_lon = udf(lambda x, y: utmk_to_wgs84(x, y)[0], FloatType())
utmk_to_wgs84_udf_lat = udf(lambda x, y: utmk_to_wgs84(x, y)[1], FloatType())
transform_polygon_udf = udf(transform_polygon, StringType())

In [None]:
# 새로운 x, y 컬럼 추가 (WGS84로 변환)
df = df.withColumn("x", utmk_to_wgs84_udf_lon(df["X좌표"], df["Y좌표"]))
df = df.withColumn("y", utmk_to_wgs84_udf_lat(df["X좌표"], df["Y좌표"]))

# geometry 컬럼 좌표 변환
df = df.withColumn("transformed_geometry", transform_polygon_udf(df["geometry"]))

# 결과 출력
df.show(truncate=False)

In [None]:
# CSV 파일 읽기
# seoul_df = spark.read.csv("hdfs://master:9000/input/서울polygon.csv", header=True, inferSchema=True)
# seoul_df

In [None]:
# 새로운 x, y 컬럼 추가 (WGS84로 변환)
# seoul_df = seoul_df.withColumn("x", utmk_to_wgs84_udf_lon(seoul_df["X좌표"], seoul_df["Y좌표"]))
# seoul_df = seoul_df.withColumn("y", utmk_to_wgs84_udf_lat(seoul_df["X좌표"], seoul_df["Y좌표"]))

# # geometry 컬럼 좌표 변환
# seoul_df = seoul_df.withColumn("transformed_geometry", transform_polygon_udf(seoul_df["geometry"]))

# # 결과 출력
# seoul_df.show(truncate=False)

In [None]:
# seoul_df.columns

In [66]:
# CSV 파일 읽기
dong_df = spark.read.csv("hdfs://master:9000/input/동polygon.csv", header=True, inferSchema=True)
dong_df

DataFrame[동코드: int, 동이름: string, X좌표: double, Y좌표: double, geometry: string]

In [67]:
df

DataFrame[구: string, 동: string, 1층: string, 1층 외: string, 전체: string]

In [None]:
# 새로운 x, y 컬럼 추가 (WGS84로 변환)
dong_df = dong_df.withColumn("x", utmk_to_wgs84_udf_lon(dong_df["X좌표"], dong_df["Y좌표"]))
dong_df = dong_df.withColumn("y", utmk_to_wgs84_udf_lat(dong_df["X좌표"], dong_df["Y좌표"]))

# geometry 컬럼 좌표 변환
dong_df = dong_df.withColumn("transformed_geometry", transform_polygon_udf(dong_df["geometry"]))

# 결과 출력
dong_df.show(truncate=False)

In [68]:
# 동코드에서 왼쪽 5글자를 추출하여 새로운 gu_code 칼럼 생성
dong_df = dong_df.withColumn("gu_code", substring(col("동코드").cast("string"), 1, 5))

# 결과 출력
dong_df.show()

+--------+---------------+--------+--------+--------------------+-------+
|  동코드|         동이름|   X좌표|   Y좌표|            geometry|gu_code|
+--------+---------------+--------+--------+--------------------+-------+
|11110515|     청운효자동|197342.0|453874.0|POLYGON ((197482....|  11110|
|11110530|         사직동|197383.0|452705.0|POLYGON ((197702....|  11110|
|11110540|         삼청동|198340.0|454312.0|POLYGON ((197980....|  11110|
|11110550|         부암동|196781.0|455266.0|POLYGON ((196621....|  11110|
|11110560|         평창동|197186.0|457344.0|POLYGON ((197800....|  11110|
|11110570|         무악동|196342.0|453063.0|POLYGON ((196444....|  11110|
|11110580|         교남동|196830.0|452392.0|POLYGON ((196720....|  11110|
|11110600|         가회동|198812.0|453662.0|POLYGON ((199036....|  11110|
|11110615|종로1·2·3·4가동|199104.0|452849.0|POLYGON ((199061....|  11110|
|11110630|    종로5·6가동|200389.0|452590.0|POLYGON ((200757....|  11110|
|11110640|         이화동|200267.0|453352.0|POLYGON ((200510....|  11110|
|11110650| 

In [31]:
# CSV 파일 읽기
sang_df = spark.read.csv("hdfs://master:9000/input/상권polygon3.csv", header=True, inferSchema=True)
sang_df

                                                                                

DataFrame[상권_코드: int, 상권이름: string, X좌표: double, Y좌표: double, 구코드: int, 구 이름: string, 동코드: int, 동이름: string, geometry: string]

In [32]:
sang_df.show()

+---------+--------------------------------------+--------+--------+------+-------+--------+---------------+--------------------+
|상권_코드|                              상권이름|   X좌표|   Y좌표|구코드|구 이름|  동코드|         동이름|            geometry|
+---------+--------------------------------------+--------+--------+------+-------+--------+---------------+--------------------+
|  3110008|          배화여자대학교(박노수미술관)|197093.0|453418.0| 11110| 종로구|11110515|     청운효자동|POLYGON ((126.965...|
|  3001494|                    종로·청계 관광특구|199796.0|452274.0| 11110| 종로구|11110615|종로1·2·3·4가동|POLYGON ((127.015...|
|  3130021|                          신설종합시장|201854.0|452569.0| 11110| 종로구|11110710|        숭인2동|POLYGON ((127.021...|
|  3130002|      세종마을음식문화거리(금천교시장)|197423.0|453030.0| 11110| 종로구|11110530|         사직동|POLYGON ((126.972...|
|  3130001|                              통인시장|197351.0|453449.0| 11110| 종로구|11110515|     청운효자동|POLYGON ((126.970...|
|  3130020|                동묘시장(동묘벼룩시장)|201471.0|452424.0| 11110|

In [37]:
# 새로운 x, y 컬럼 추가 (WGS84로 변환)
sang_df = sang_df.withColumn("x", utmk_to_wgs84_udf_lon(sang_df["X좌표"], sang_df["Y좌표"]))
sang_df = sang_df.withColumn("y", utmk_to_wgs84_udf_lat(sang_df["X좌표"], sang_df["Y좌표"]))


sang_df

DataFrame[상권_코드: int, 상권이름: string, X좌표: double, Y좌표: double, 구코드: int, 구 이름: string, 동코드: int, 동이름: string, geometry: string, x: float, y: float]

In [40]:
connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="enqnrhkwk108",
    database="S11P21D108",
    charset="utf8mb4",  # 문자셋 설정
    collation="utf8mb4_general_ci",  # collation 설정
    autocommit=False,  # Auto-commit을 끄고 트랜잭션 처리
    connection_timeout=28800,  # 타임아웃 증가
)
cursor = connection.cursor()
def insert_into_mariadb_area(row):
        sql = """
        INSERT INTO area (id, area_name, dong_code, x_pos, y_pos, polygon)
        VALUES (%s, %s, %s, %s, %s, ST_GeomFromText(%s))
        """
        # 시도 중인 행 삽입
        cursor.execute(sql, (
            row['상권_코드'], row['상권이름'], 
            row['동코드'], row['x'], row['y'], row['geometry']
        ))

# DataFrame의 각 행을 MariaDB에 삽입
for row in sang_df.collect():
    insert_into_mariadb_area(row)

# 변경사항 커밋 및 연결 종료
connection.commit()
cursor.close()
connection.close()

# # 결과 출력
# sang_df.show(truncate=False)

                                                                                

In [21]:
sang_df

DataFrame[상권_코드: int, 상권이름: string, X좌표: double, Y좌표: double, 구코드: int, 구 이름: string, 동코드: int, 동이름: string, geometry: string, x: float, y: float, transformed_geometry: string]

In [22]:
sang_df.show()

100%|██████████| 199/199 [00:01<00:00, 173.28it/s]                  (0 + 1) / 1]
100%|██████████| 641/641 [00:03<00:00, 173.70it/s]
100%|██████████| 18/18 [00:00<00:00, 174.44it/s]
100%|██████████| 56/56 [00:00<00:00, 172.99it/s]
100%|██████████| 42/42 [00:00<00:00, 173.86it/s]
['201566.4738 452608.9068, 201567.1469 452606.980799999, 201568.3303 452607.734200001, 201569.583 452607.8555, 201569.5827 452620.4606, 201573.3594 452612.2038, 201584.3517 452584.699100001, 201599.8581 452551.1107, 201576.2295 452550.783299999, 201564.1556 452552.628000001, 201558.8205 452553.603, 201557.9324 452562.3149, 201557.565 452565.589500001, 201557.0503 452567.1917, 201554.7021 452576.3358, 201542.8273 452572.786699999, 201528.3586 452568.0735, 201523.0936 452566.916200001, 201516.8042 452587.211999999, 201518.2999 452601.377900001, 201525.2719 452601.433, 201525.7085 452619.7349, 201524.9465 452627.7908, 201527.1352 452641.2885, 201528.6546 452640.559800001, 201533.2136 452639.0441, 201536.9056 452637

+---------+--------------------------------------+--------+--------+------+-------+--------+---------------+--------------------+---------+---------+--------------------+
|상권_코드|                              상권이름|   X좌표|   Y좌표|구코드|구 이름|  동코드|         동이름|            geometry|        x|        y|transformed_geometry|
+---------+--------------------------------------+--------+--------+------+-------+--------+---------------+--------------------+---------+---------+--------------------+
|  3110008|          배화여자대학교(박노수미술관)|197093.0|453418.0| 11110| 종로구|11110515|     청운효자동|POLYGON ((196955....| 35.23836|129.78375|POLYGON ((35.2370...|
|  3001494|                    종로·청계 관광특구|199796.0|452274.0| 11110| 종로구|11110615|종로1·2·3·4가동|POLYGON ((201385....|35.262985|129.77203|POLYGON ((35.2772...|
|  3130021|                          신설종합시장|201854.0|452569.0| 11110| 종로구|11110710|        숭인2동|POLYGON ((201937....|35.281437| 129.7759|POLYGON ((35.2821...|
|  3130002|      세종마을음식문화거리(금천교시장)|197423.0|45

In [33]:
connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="enqnrhkwk108",
    database="S11P21D108",
    charset="utf8mb4",  # 문자셋 설정
    collation="utf8mb4_general_ci",  # collation 설정
    autocommit=False,  # Auto-commit을 끄고 트랜잭션 처리
    connection_timeout=28800,  # 타임아웃 증가
)

In [34]:
area_selected = sang_df.select(
    sang_df['상권_코드'].alias('area_code'),
    sang_df['상권이름'].alias('area_name'),
    sang_df['동코드'].alias('dong_code'),
    sang_df['x'].alias('x_pos'),
    sang_df['y'].alias('y_pos'),
    sang_df['geometry'].alias('geometry')
)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `x` cannot be resolved. Did you mean one of the following? [`상권_코드`, `상권이름`, `X좌표`, `Y좌표`, `구코드`, `구 이름`, `동코드`, `동이름`, `geometry`].

In [14]:
area_selected.show()

100%|██████████| 199/199 [00:01<00:00, 169.47it/s]                  (0 + 1) / 1]
100%|██████████| 641/641 [00:03<00:00, 170.67it/s]
100%|██████████| 18/18 [00:00<00:00, 175.10it/s]
100%|██████████| 56/56 [00:00<00:00, 170.74it/s]
100%|██████████| 42/42 [00:00<00:00, 167.49it/s]
100%|██████████| 34/34 [00:00<00:00, 169.24it/s]
100%|██████████| 51/51 [00:00<00:00, 174.41it/s]
100%|██████████| 17/17 [00:00<00:00, 166.13it/s]
100%|██████████| 15/15 [00:00<00:00, 174.58it/s]
100%|██████████| 101/101 [00:00<00:00, 169.61it/s]
100%|██████████| 38/38 [00:00<00:00, 172.71it/s]
100%|██████████| 20/20 [00:00<00:00, 173.43it/s]
100%|██████████| 52/52 [00:00<00:00, 168.79it/s]
100%|██████████| 39/39 [00:00<00:00, 172.77it/s]
100%|██████████| 65/65 [00:00<00:00, 171.72it/s]
100%|██████████| 11/11 [00:00<00:00, 171.06it/s]
100%|██████████| 58/58 [00:00<00:00, 169.03it/s]
100%|██████████| 41/41 [00:00<00:00, 164.36it/s]
100%|██████████| 20/20 [00:00<00:00, 172.67it/s]
 60%|██████    | 72/120 [00:00<00

+---------+--------------------------------------+---------+---------+---------+--------------------+
|area_code|                             area_name|dong_code|    x_pos|    y_pos|            geometry|
+---------+--------------------------------------+---------+---------+---------+--------------------+
|  3110008|          배화여자대학교(박노수미술관)| 11110515| 35.23836|129.78375|POLYGON ((35.2370...|
|  3001494|                    종로·청계 관광특구| 11110615|35.262985|129.77203|POLYGON ((35.2772...|
|  3130021|                          신설종합시장| 11110710|35.281437| 129.7759|POLYGON ((35.2821...|
|  3130002|      세종마을음식문화거리(금천교시장)| 11110530| 35.24143| 129.7796|POLYGON ((35.2425...|
|  3130001|                              통인시장| 11110515|35.240677|129.78416|POLYGON ((35.2413...|
|  3130020|                동묘시장(동묘벼룩시장)| 11110710|35.278027|129.77419|MULTIPOLYGON (((2...|
|  3130019|                         동대문상가D동| 11110670|35.276726|129.77245|POLYGON ((35.2768...|
|  3130018|동대문문구완구거리(동대문문구완구시장)| 11110670|

100%|██████████| 120/120 [00:00<00:00, 172.28it/s]
                                                                                

In [24]:
area_selected.filter(area_selected.geometry.like("%MULTI%")).show()

100%|██████████| 199/199 [00:01<00:00, 168.64it/s]                  (0 + 1) / 1]
100%|██████████| 641/641 [00:03<00:00, 168.67it/s]
100%|██████████| 18/18 [00:00<00:00, 172.64it/s]
100%|██████████| 56/56 [00:00<00:00, 163.66it/s]
100%|██████████| 42/42 [00:00<00:00, 167.84it/s]
100%|██████████| 34/34 [00:00<00:00, 166.09it/s]
100%|██████████| 51/51 [00:00<00:00, 167.98it/s]
100%|██████████| 17/17 [00:00<00:00, 173.95it/s]
100%|██████████| 15/15 [00:00<00:00, 167.28it/s]
100%|██████████| 101/101 [00:00<00:00, 167.22it/s]
100%|██████████| 38/38 [00:00<00:00, 171.49it/s]
100%|██████████| 20/20 [00:00<00:00, 168.39it/s]
100%|██████████| 52/52 [00:00<00:00, 169.86it/s]
100%|██████████| 39/39 [00:00<00:00, 171.17it/s]
100%|██████████| 65/65 [00:00<00:00, 167.14it/s]
100%|██████████| 11/11 [00:00<00:00, 170.62it/s]
100%|██████████| 58/58 [00:00<00:00, 166.94it/s]
100%|██████████| 41/41 [00:00<00:00, 171.37it/s]
100%|██████████| 20/20 [00:00<00:00, 172.76it/s]
100%|██████████| 120/120 [00:00<0

+--------------------+
|            geometry|
+--------------------+
|POLYGON ((35.2370...|
|POLYGON ((35.2772...|
|POLYGON ((35.2821...|
|POLYGON ((35.2425...|
|POLYGON ((35.2413...|
|MULTIPOLYGON (((2...|
|POLYGON ((35.2768...|
|POLYGON ((35.2757...|
|POLYGON ((35.2760...|
|POLYGON ((35.2748...|
|POLYGON ((35.2738...|
|POLYGON ((35.2739...|
|POLYGON ((35.2738...|
|POLYGON ((35.2725...|
|POLYGON ((35.2690...|
|POLYGON ((35.2700...|
|POLYGON ((35.2674...|
|POLYGON ((35.2663...|
|POLYGON ((35.2620...|
|POLYGON ((35.2551...|
+--------------------+
only showing top 20 rows



100%|██████████| 199/199 [00:01<00:00, 170.15it/s]                  (0 + 1) / 1]
100%|██████████| 641/641 [00:03<00:00, 171.50it/s]
100%|██████████| 18/18 [00:00<00:00, 166.67it/s]
100%|██████████| 56/56 [00:00<00:00, 172.76it/s]
100%|██████████| 42/42 [00:00<00:00, 168.45it/s]
100%|██████████| 34/34 [00:00<00:00, 174.52it/s]
100%|██████████| 51/51 [00:00<00:00, 168.35it/s]
100%|██████████| 17/17 [00:00<00:00, 170.35it/s]
100%|██████████| 15/15 [00:00<00:00, 172.11it/s]
100%|██████████| 101/101 [00:00<00:00, 170.01it/s]
100%|██████████| 38/38 [00:00<00:00, 173.99it/s]
100%|██████████| 20/20 [00:00<00:00, 171.95it/s]
100%|██████████| 52/52 [00:00<00:00, 172.73it/s]
100%|██████████| 39/39 [00:00<00:00, 168.17it/s]
100%|██████████| 65/65 [00:00<00:00, 173.79it/s]
100%|██████████| 11/11 [00:00<00:00, 162.85it/s]
100%|██████████| 58/58 [00:00<00:00, 173.27it/s]
100%|██████████| 41/41 [00:00<00:00, 167.41it/s]
100%|██████████| 20/20 [00:00<00:00, 168.38it/s]
100%|██████████| 120/120 [00:00<0

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_212887/4117932964.py", line 20, in transform_polygon
ValueError: could not convert string to float: '453676.211200001)'


In [None]:
# 필요한 컬럼만 선택하여 DataFrame 생성
df_selected = df.select(
    df['구코드'].alias('gu_code'),
    df['구이름'].alias('gu_name'),
    df['x'].alias('x_pos'),
    df['y'].alias('y_pos'),
    df['transformed_geometry'].alias('geometry')
)

In [None]:
dong_selected = dong_df.select(
    dong_df['동코드'].alias('dong_code'),
    dong_df['동이름'].alias('dong_name'),
    dong_df['gu_code'].alias('gu_code'),
    dong_df['x'].alias('x_pos'),
    dong_df['y'].alias('y_pos'),
    dong_df['transformed_geometry'].alias('geometry')
)

In [54]:
# MariaDB에 삽입할 함수 정의
def insert_into_mariadb_gu(row):
    sql = """
    INSERT INTO gu (id, gu_name, x_pos, y_pos, polygon)
    VALUES (%s, %s, %s, %s, ST_GeomFromText(%s))
    """
    cursor.execute(sql, (row['gu_code'], row['gu_name'], row['x_pos'], row['y_pos'], row['geometry']))

def insert_into_mariadb_dong(row):
    sql = """
    INSERT INTO dong (id, dong_name, gu_code, x_pos, y_pos, polygon)
    VALUES (%s, %s, %s, %s, %s, ST_GeomFromText(%s))
    """
    cursor.execute(sql, (row['dong_code'], row['dong_name'], row['gu_code'], row['x_pos'], row['y_pos'], row['geometry']))

def insert_into_mariadb_area(row):
        sql = """
        INSERT INTO area (id, area_name, dong_code, x_pos, y_pos, polygon)
        VALUES (%s, %s, %s, %s, %s, %s)
        """
        # 시도 중인 행 삽입
        cursor.execute(sql, (
            row['area_code'], row['area_name'], 
            row['dong_code'], row['x_pos'], row['y_pos'], row['geometry']
        ))


In [43]:
connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="enqnrhkwk108",
    database="S11P21D108",
    charset="utf8mb4",  # 문자셋 설정
    collation="utf8mb4_general_ci",  # collation 설정
    autocommit=False,  # Auto-commit을 끄고 트랜잭션 처리
    connection_timeout=28800,  # 타임아웃 증가
)

cursor = connection.cursor()

In [None]:


# DataFrame의 각 행을 MariaDB에 삽입
for row in df_selected.collect():
    insert_into_mariadb_gu(row)

# 변경사항 커밋 및 연결 종료
connection.commit()
cursor.close()
connection.close()

In [None]:
# DataFrame의 각 행을 MariaDB에 삽입
for row in dong_selected.collect():
    insert_into_mariadb_dong(row)

# 변경사항 커밋 및 연결 종료
connection.commit()
cursor.close()
connection.close()

In [55]:
# DataFrame의 각 행을 MariaDB에 삽입
for row in area_selected.collect():
    insert_into_mariadb_area(row)

# 변경사항 커밋 및 연결 종료
connection.commit()
cursor.close()
connection.close()

100%|██████████| 199/199 [00:01<00:00, 169.83it/s]                  (0 + 2) / 2]
100%|██████████| 218/218 [00:01<00:00, 170.91it/s]
100%|██████████| 58/58 [00:00<00:00, 171.07it/s]]
100%|██████████| 65/65 [00:00<00:00, 171.46it/s]s]
100%|██████████| 18/18 [00:00<00:00, 173.98it/s]s]
100%|██████████| 66/66 [00:00<00:00, 168.48it/s]s]
100%|██████████| 31/31 [00:00<00:00, 164.95it/s]s]
100%|██████████| 58/58 [00:00<00:00, 170.38it/s]s]
100%|██████████| 202/202 [00:01<00:00, 169.44it/s]
100%|██████████| 641/641 [00:03<00:00, 169.77it/s]
100%|██████████| 18/18 [00:00<00:00, 169.63it/s]s]
100%|██████████| 56/56 [00:00<00:00, 169.63it/s]s]
100%|██████████| 42/42 [00:00<00:00, 169.47it/s]s]
100%|██████████| 34/34 [00:00<00:00, 169.46it/s]s]
100%|██████████| 320/320 [00:01<00:00, 170.02it/s]
100%|██████████| 51/51 [00:00<00:00, 170.42it/s]
100%|██████████| 17/17 [00:00<00:00, 171.95it/s]]
100%|██████████| 15/15 [00:00<00:00, 162.99it/s]]
100%|██████████| 101/101 [00:00<00:00, 171.28it/s]
100%|█

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_205585/4117932964.py", line 20, in transform_polygon
ValueError: could not convert string to float: '453676.211200001)'


100%|██████████| 355/355 [00:02<00:00, 169.35it/s]
100%|██████████| 113/113 [00:00<00:00, 169.14it/s]
 45%|████▌     | 176/389 [00:01<00:01, 169.03it/s]24/09/24 01:59:21 WARN PythonUDFRunner: Incomplete task 1.0 in stage 25 (TID 36) interrupted: Attempting to kill Python Worker
24/09/24 01:59:21 WARN TaskSetManager: Lost task 1.0 in stage 25.0 (TID 36) (master executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 0 in stage 25.0 failed 1 times, most recent failure: Lost task 0.0 in stage 25.0 (TID 35) (master executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_205585/4117932964.py", line 20, in transform_polygon
ValueError: could not convert string to float: '453676.211200001)'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)


In [None]:
cursor.close()
connection.close()

In [43]:
# CSV 파일 읽기
dong_fee_df = spark.read.csv("hdfs://master:9000/input/동_임대료.csv", header=True, inferSchema=True)

In [45]:
dong_fee_df.show()

+------+---------------+-------+-------+-------+
|    구|             동|    1층| 1층 외|   전체|
+------+---------------+-------+-------+-------+
|종로구|         종로구|230,674|160,559|195,617|
|종로구|     청운효자동|188,588|115,520|152,054|
|종로구|         사직동|235,516|132,729|184,123|
|종로구|         삼청동|153,171|   NULL|153,171|
|종로구|         부암동|115,405| 80,929| 98,167|
|종로구|         평창동| 83,479|119,522|101,501|
|종로구|         무악동|   NULL|   NULL|   NULL|
|종로구|         교남동|215,432|152,824|184,128|
|종로구|         가회동|233,527|110,071|171,799|
|종로구|종로1·2·3·4가동|269,591|130,688|200,140|
|종로구|    종로5·6가동|273,313|281,611|277,462|
|종로구|         이화동|178,429|105,291|141,860|
|종로구|         혜화동|159,892| 91,260|125,576|
|종로구|        창신1동|262,631|127,691|195,161|
|종로구|        창신2동|105,489| 76,719| 91,104|
|종로구|        창신3동|214,241|   NULL|214,241|
|종로구|        숭인1동|183,505| 58,548|121,027|
|종로구|        숭인2동|205,581|111,597|158,589|
|  중구|           중구|282,431|256,317|269,374|
|  중구|         소공동|259,429|197,948|228,689|
+

In [51]:
dong_fee_df.filter((dong_fee_df["1층"].isNULL()))

TypeError: 'Column' object is not callable

In [56]:
dong_fee_df.filter((dong_fee_df["1층"].isNull()))


DataFrame[구: string, 동: string, 1층: string, 1층 외: string, 전체: string]

In [59]:
from pyspark.sql import functions as F

# 먼저 각 구별로 '구' == '동'인 경우의 값을 가져옴
df_with_aggregated_values = dong_fee_df.filter(dong_fee_df["구"] == dong_fee_df["동"]).select(
    "구", 
    F.col("1층").alias("aggregated_1층"),
    F.col("1층 외").alias("aggregated_1층 외"),
    F.col("전체").alias("aggregated_전체")
)

# 원본 데이터프레임에 조인해서 NULL 값을 채움
dong_fee_df_filled = dong_fee_df.alias("original").join(
    df_with_aggregated_values.alias("aggregated"),
    on="구", 
    how="left"
).select(
    "original.구", 
    "original.동", 
    F.coalesce("original.1층", "aggregated.aggregated_1층").alias("1층"),
    F.coalesce("original.1층 외", "aggregated.aggregated_1층 외").alias("1층 외"),
    F.coalesce("original.전체", "aggregated.aggregated_전체").alias("전체")
)

dong_fee_df_filled.show()


+------+---------------+-------+-------+-------+
|    구|             동|    1층| 1층 외|   전체|
+------+---------------+-------+-------+-------+
|종로구|         종로구|230,674|160,559|195,617|
|종로구|     청운효자동|188,588|115,520|152,054|
|종로구|         사직동|235,516|132,729|184,123|
|종로구|         삼청동|153,171|160,559|153,171|
|종로구|         부암동|115,405| 80,929| 98,167|
|종로구|         평창동| 83,479|119,522|101,501|
|종로구|         무악동|230,674|160,559|195,617|
|종로구|         교남동|215,432|152,824|184,128|
|종로구|         가회동|233,527|110,071|171,799|
|종로구|종로1·2·3·4가동|269,591|130,688|200,140|
|종로구|    종로5·6가동|273,313|281,611|277,462|
|종로구|         이화동|178,429|105,291|141,860|
|종로구|         혜화동|159,892| 91,260|125,576|
|종로구|        창신1동|262,631|127,691|195,161|
|종로구|        창신2동|105,489| 76,719| 91,104|
|종로구|        창신3동|214,241|160,559|214,241|
|종로구|        숭인1동|183,505| 58,548|121,027|
|종로구|        숭인2동|205,581|111,597|158,589|
|  중구|           중구|282,431|256,317|269,374|
|  중구|         소공동|259,429|197,948|228,689|
+

In [62]:
sang_df

DataFrame[상권_코드: int, 상권이름: string, X좌표: double, Y좌표: double, 구코드: int, 구 이름: string, 동코드: int, 동이름: string, geometry: string, x: float, y: float]

In [65]:
from pyspark.sql import functions as F

# 동으로 끝나는 경우에는 동 이름과 동코드 매핑
dong_fee_df_with_codes = dong_fee_df_filled \
    .withColumn("is_dong", F.when(F.col("동").endswith("동"), True).otherwise(False)) \
    .join(
        sang_df.select(F.col("동이름").alias("동이름_sang"), F.col("동코드").alias("동코드_sang"), F.col("구코드").alias("구코드_sang")), 
        (F.col("동") == F.col("동이름_sang")) & F.col("is_dong"), "left"
    ) \
    .withColumn("동코드", F.when(F.col("is_dong"), F.col("동코드_sang"))) \
    .withColumn("구코드", F.when(F.col("is_dong"), F.col("구코드_sang")))

# 구로 끝나는 경우에는 구 이름과 구코드 매핑
dong_fee_df_with_codes = dong_fee_df_with_codes \
    .join(
        sang_df.select(F.col("구 이름").alias("구이름_sang"), F.col("구코드").alias("구코드_sang_2")), 
        (F.col("동") == F.col("구이름_sang")) & (F.col("동").endswith("구")), "left"
    ) \
    .withColumn("구코드", F.when(F.col("동").endswith("구"), F.col("구코드_sang_2")).otherwise(F.col("구코드")))

# 필요없는 칼럼 정리
dong_fee_df_with_codes = dong_fee_df_with_codes.drop("is_dong", "동이름_sang", "구이름_sang", "구코드_sang", "구코드_sang_2", "동코드_sang")

# (구, 동) 기준으로 중복 행 제거
dong_fee_df_unique = dong_fee_df_with_codes.dropDuplicates(["구", "동"])

# 결과 출력
dong_fee_df_unique.show()

+------+-------+-------+-------+-------+--------+------+
|    구|     동|    1층| 1층 외|   전체|  동코드|구코드|
+------+-------+-------+-------+-------+--------+------+
|강남구| 강남구|199,480|162,554|181,017|    NULL| 11680|
|강남구|논현1동|205,202|161,758|183,480|11680521| 11680|
|강남구|논현2동|172,159|160,802|166,481|11680531| 11680|
|강남구| 신사동|180,804|133,231|157,018|11680510| 11680|
|강동구| 강동구|140,386| 97,985|119,185|    NULL| 11740|
|강동구| 강일동|201,817| 87,068|144,443|11740515| 11740|
|강동구|고덕1동|133,568|108,680|121,124|11740550| 11740|
|강동구|고덕2동|186,486|149,372|167,929|11740560| 11740|
|강동구|   길동|140,377| 80,584|110,481|11740685| 11740|
|강동구|둔촌1동|140,386| 97,985|119,185|    NULL|  NULL|
|강동구|둔촌2동|122,431| 70,754| 96,593|11740700| 11740|
|강동구|명일1동|150,706|100,557|125,631|11740530| 11740|
|강동구|명일2동|167,823| 96,449|132,136|11740540| 11740|
|강동구| 상일동|139,345|189,428|164,386|11740520| 11740|
|강동구|성내1동|120,784| 81,606|101,195|11740640| 11740|
|강동구|성내2동|138,906|110,612|124,759|11740650| 11740|
|강동구|성내3동|131,230|106,118

In [71]:
from pyspark.sql import functions as F

# dong_df의 컬럼을 사용하여 dong_fee_df_unique에 있는 동코드와 구코드 업데이트
dong_fee_df_updated = dong_fee_df_unique \
    .join(dong_df.select(F.col("동코드").alias("new_동코드"), 
                         F.col("gu_code").alias("new_구코드"), 
                         F.col("동이름")), 
          dong_fee_df_unique["동"] == dong_df["동이름"], "left") \
    .withColumn("동코드", F.coalesce(F.col("new_동코드"), F.col("동코드"))) \
    .withColumn("구코드", F.coalesce(F.col("new_구코드"), F.col("구코드"))) \
    .drop("new_동코드", "new_구코드", "동이름")

# '1층'과 '1층 외' 컬럼을 숫자형으로 변환 (콤마 제거 후)
dong_fee_df_updated = dong_fee_df_updated \
    .withColumn("1층", F.regexp_replace("1층", ",", "")) \
    .withColumn("1층 외", F.regexp_replace("1층 외", ",", ""))

# 결과 출력
dong_fee_df_updated.show()

+------+-------+------+------+-------+--------+------+
|    구|     동|   1층|1층 외|   전체|  동코드|구코드|
+------+-------+------+------+-------+--------+------+
|강남구| 강남구|199480|162554|181,017|    NULL| 11680|
|강남구|논현1동|205202|161758|183,480|11680521| 11680|
|강남구|논현2동|172159|160802|166,481|11680531| 11680|
|강남구| 신사동|180804|133231|157,018|11620685| 11620|
|강남구| 신사동|180804|133231|157,018|11680510| 11680|
|강동구| 강동구|140386| 97985|119,185|    NULL| 11740|
|강동구| 강일동|201817| 87068|144,443|11740515| 11740|
|강동구|고덕1동|133568|108680|121,124|11740550| 11740|
|강동구|고덕2동|186486|149372|167,929|11740560| 11740|
|강동구|   길동|140377| 80584|110,481|11740685| 11740|
|강동구|둔촌1동|140386| 97985|119,185|11740690| 11740|
|강동구|둔촌2동|122431| 70754| 96,593|11740700| 11740|
|강동구|명일1동|150706|100557|125,631|11740530| 11740|
|강동구|명일2동|167823| 96449|132,136|11740540| 11740|
|강동구| 상일동|139345|189428|164,386|11740520| 11740|
|강동구|성내1동|120784| 81606|101,195|11740640| 11740|
|강동구|성내2동|138906|110612|124,759|11740650| 11740|
|강동구|성내3동|1312

In [70]:
connection = mysql.connector.connect(
    host="localhost",
    user="root",
    password="enqnrhkwk108",
    database="S11P21D108",
    charset="utf8mb4",  # 문자셋 설정
    collation="utf8mb4_general_ci",  # collation 설정
    autocommit=False,  # Auto-commit을 끄고 트랜잭션 처리
    connection_timeout=28800,  # 타임아웃 증가
)
cursor = connection.cursor()
def insert_into_mariadb_rent(row):
        sql = """
        INSERT INTO rent (gu_code, dong_code, first_floor, other_floor)
        VALUES (%s, %s, %s, %s)
        """
        # 시도 중인 행 삽입
        cursor.execute(sql, (
            row['구코드'], row['동코드'], 
            row['1층'], row['1층 외']
        ))

# DataFrame의 각 행을 MariaDB에 삽입
for row in dong_fee_df_updated.collect():
    insert_into_mariadb_rent(row)

# 변경사항 커밋 및 연결 종료
connection.commit()
cursor.close()
connection.close()

DatabaseError: 1265 (01000): Data truncated for column 'first_floor' at row 1