In [None]:
import json
import datetime as dt
import pandas as pd
from pyspark.sql.functions import col, monotonically_increasing_id, row_number
from pyspark.sql.window import Window
from pyspark.sql.types import *

In [2]:
JDBC = {
    'url':'jdbc:oracle:thin:@realestate_high?TNS_ADMIN=/home/big/study/db/Wallet_REALESTATE'
    ,'props':{
        'user':'dw_realestate',
        'password':'123qwe!@#QWE'
    }   
}

In [3]:
from enum import Enum

# 데이터웨어하우스 ENUM
class DataWarehouse(Enum):
    URL='jdbc:oracle:thin:@realestate_high?TNS_ADMIN=/home/big/study/db/Wallet_REALESTATE'
    PROPS={
        'user':'dw_realestate',
        'password':'123qwe!@#QWE'
    }

In [4]:
def cal_std_day(befor_day):   
    x = dt.datetime.now() - dt.timedelta(befor_day)
    year = x.year
    month = x.month if x.month >= 10 else '0'+ str(x.month)
    day = x.day if x.day >= 10 else '0'+ str(x.day)  
    return str(year) +str(month) +str(day)

In [5]:
# 데이터웨어하우스, 데이터마트에 저장하기 위한 함수
def save_data(config, dataframe, table_name):
    dataframe.write.jdbc(url=config.URL.value,
                        table=table_name,
                        mode='append',
                        properties=config.PROPS.value)

# 데이터웨어하우스, 데이터마트에 덮어쓰기 위한 함수
def overwrite_data(config, dataframe, table_name):
    dataframe.write.jdbc(url=config.URL.value,
                        table=table_name,
                        mode='overwrite',
                        properties=config.PROPS.value)

# 데이터웨어하우스, 데이터마트에서 데이터 가져오기 위한 함수
def find_data(config, table_name):
    return spark.read.jdbc(url=config.URL.value,
                                        table=table_name,
                                        properties=config.PROPS.value)

In [6]:
df_loc = find_data(DataWarehouse, 'LOC')
df_loc.show()

                                                                                

+--------+---------+------------+----------+--------+
|LOC_CODE|SIDO_CODE|SIGUNGU_CODE|      SIDO| SIGUNGU|
+--------+---------+------------+----------+--------+
|   27170|       27|         170|대구광역시|    서구|
|   27200|       27|         200|대구광역시|    남구|
|   30200|       30|         200|대전광역시|  유성구|
|   27140|       27|         140|대구광역시|    동구|
|   26000|       26|         000|부산광역시|    null|
|   26110|       26|         110|부산광역시|    중구|
|   26140|       26|         140|부산광역시|    서구|
|   26170|       26|         170|부산광역시|    동구|
|   26200|       26|         200|부산광역시|  영도구|
|   26230|       26|         230|부산광역시|부산진구|
|   26260|       26|         260|부산광역시|  동래구|
|   26290|       26|         290|부산광역시|    남구|
|   26320|       26|         320|부산광역시|    북구|
|   26350|       26|         350|부산광역시|해운대구|
|   26380|       26|         380|부산광역시|  사하구|
|   26410|       26|         410|부산광역시|  금정구|
|   26440|       26|         440|부산광역시|  강서구|
|   26470|       26|         470|부산광역시|  연제구|
|

In [7]:
loc_code = df_loc.select(['SIDO','LOC_CODE']).filter(df_loc.SIGUNGU.isNull()).collect()
df_loc_code = spark.createDataFrame(loc_code)
df_loc_code.show()

                                                                                

+--------------+--------+
|          SIDO|LOC_CODE|
+--------------+--------+
|    부산광역시|   26000|
|    대구광역시|   27000|
|    인천광역시|   28000|
|    광주광역시|   29000|
|    대전광역시|   30000|
|    울산광역시|   31000|
|세종특별자치시|   36110|
|        경기도|   41000|
|        강원도|   42000|
|      충청북도|   43000|
|      충청남도|   44000|
|      전라북도|   45000|
|      전라남도|   46000|
|      경상북도|   47000|
|      경상남도|   48000|
|    서울특별시|   11000|
|제주특별자치도|   50000|
+--------------+--------+



In [9]:
file_name = '/realestate_data/nationality/nationality_data_'+cal_std_day(13)+'.json'
tmp = spark.read.json(file_name, encoding='UTF-8')
tmp.show()

[Stage 7:>                                                          (0 + 1) / 1]

+---------------------+
|               result|
+---------------------+
|{{APIINFO-0001, 정...|
+---------------------+



                                                                                

In [10]:
tmp.printSchema()

root
 |-- result: struct (nullable = true)
 |    |-- head: struct (nullable = true)
 |    |    |-- returnCode: string (nullable = true)
 |    |    |-- returnMessage: string (nullable = true)
 |    |    |-- totalCount: string (nullable = true)
 |    |-- items: struct (nullable = true)
 |    |    |-- item: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- adminRegn1Name: string (nullable = true)
 |    |    |    |    |-- adminRegn2Name: string (nullable = true)
 |    |    |    |    |-- nationalName: string (nullable = true)
 |    |    |    |    |-- resDate: string (nullable = true)
 |    |    |    |    |-- tot: string (nullable = true)



In [11]:
tmp2 = tmp.select('result').first()
df = spark.createDataFrame(tmp2)
tmp3 = df.select('items').first()
tmp4 = spark.createDataFrame(tmp3).first()
df2 = spark.createDataFrame(tmp4['item'])
df2.show()

                                                                                

+--------------+--------------+------------+----------+---+
|adminRegn1Name|adminRegn2Name|nationalName|   resDate|tot|
+--------------+--------------+------------+----------+---+
|    서울특별시|              |    뉴질랜드|2022-09-23|  1|
|    서울특별시|              |        미국|2022-09-23|  1|
|    서울특별시|              |        중국|2022-09-23|  4|
|    서울특별시|              |      타이완|2022-09-23|  1|
|    서울특별시|              |      필리핀|2022-09-23|  1|
|    부산광역시|              |        미국|2022-09-23|  1|
|    부산광역시|              |        중국|2022-09-23|  1|
|    대구광역시|              |      베트남|2022-09-23|  1|
|    대구광역시|              |        중국|2022-09-23|  1|
|    인천광역시|              |      베트남|2022-09-23|  1|
|    인천광역시|              |우즈베키스탄|2022-09-23|  1|
|    인천광역시|              |        중국|2022-09-23|  6|
|    인천광역시|              |    키르기즈|2022-09-23|  1|
|        경기도|              |      러시아|2022-09-23|  1|
|        경기도|              |        미국|2022-09-23|  3|
|        경기도|              |      

OF_IDX 

RES_DATE 

RES_REGN_CODE 

BUYER_NATION 

TOT

In [12]:
df_foreigner = df2.select(df2.adminRegn1Name.alias('SIDO'),df2.resDate.alias('RES_DATE'),df2.nationalName.alias('BUYER_NATION'),df2.tot.alias('TOT'))
df_foreigner.show()

+----------+----------+------------+---+
|      SIDO|  RES_DATE|BUYER_NATION|TOT|
+----------+----------+------------+---+
|서울특별시|2022-09-23|    뉴질랜드|  1|
|서울특별시|2022-09-23|        미국|  1|
|서울특별시|2022-09-23|        중국|  4|
|서울특별시|2022-09-23|      타이완|  1|
|서울특별시|2022-09-23|      필리핀|  1|
|부산광역시|2022-09-23|        미국|  1|
|부산광역시|2022-09-23|        중국|  1|
|대구광역시|2022-09-23|      베트남|  1|
|대구광역시|2022-09-23|        중국|  1|
|인천광역시|2022-09-23|      베트남|  1|
|인천광역시|2022-09-23|우즈베키스탄|  1|
|인천광역시|2022-09-23|        중국|  6|
|인천광역시|2022-09-23|    키르기즈|  1|
|    경기도|2022-09-23|      러시아|  1|
|    경기도|2022-09-23|        미국|  3|
|    경기도|2022-09-23|      베트남|  1|
|    경기도|2022-09-23|        중국| 16|
|    경기도|2022-09-23|        기타|  2|
|  충청북도|2022-09-23|        중국|  2|
|  충청북도|2022-09-23|      타이완|  1|
+----------+----------+------------+---+
only showing top 20 rows



In [14]:
own_foreigner = df_foreigner.join(df_loc_code, on='SIDO')
own_foreigner = own_foreigner.select(col('LOC_CODE').alias('RES_REGN_CODE'),col('TOT').cast('int'),col('BUYER_NATION'),col('RES_DATE').cast(DateType()))
own_foreigner = own_foreigner.withColumn('OF_IDX', row_number().over(Window.orderBy(monotonically_increasing_id())))
own_foreigner.show()

[Stage 26:>                                                         (0 + 1) / 1]

+-------------+---+------------+----------+------+
|RES_REGN_CODE|TOT|BUYER_NATION|  RES_DATE|OF_IDX|
+-------------+---+------------+----------+------+
|        41000|  1|      러시아|2022-09-23|     1|
|        41000|  3|        미국|2022-09-23|     2|
|        41000|  1|      베트남|2022-09-23|     3|
|        41000| 16|        중국|2022-09-23|     4|
|        41000|  2|        기타|2022-09-23|     5|
|        48000|  4|        중국|2022-09-23|     6|
|        47000|  1|      베트남|2022-09-23|     7|
|        27000|  1|      베트남|2022-09-23|     8|
|        27000|  1|        중국|2022-09-23|     9|
|        26000|  1|        미국|2022-09-23|    10|
|        26000|  1|        중국|2022-09-23|    11|
|        11000|  1|    뉴질랜드|2022-09-23|    12|
|        11000|  1|        미국|2022-09-23|    13|
|        11000|  4|        중국|2022-09-23|    14|
|        11000|  1|      타이완|2022-09-23|    15|
|        11000|  1|      필리핀|2022-09-23|    16|
|        28000|  1|      베트남|2022-09-23|    17|
|        28000|  1|우즈베키

                                                                                

In [15]:
own_foreigner.printSchema()

root
 |-- RES_REGN_CODE: string (nullable = true)
 |-- TOT: integer (nullable = true)
 |-- BUYER_NATION: string (nullable = true)
 |-- RES_DATE: date (nullable = true)
 |-- OF_IDX: integer (nullable = false)



In [16]:
own_foreigner.write.jdbc(url=JDBC['url'], table='OWN_FOREIGNER', mode='append', properties=JDBC['props'])

                                                                                