
#### 3. Silver table 을 생성합니다. 
  - bronze table 에서 해당일 데이터만 조회 후 적재해주세요.
  - bronze table 과 dimension table 을 조인합니다.
    - join 조건 : seq_num  
  - score 컬럼을 추가하고 value 를 생성해주세요.
    - value 생성조건 : 안내 가능 언어 1개당 + 1점, 연중무휴 +1점

    - ex)
      |영어|일본어|중국어|휴무일|score|
      |---|---|---|---|---|
      |Y|Y|Y|설 당일+추석 당일|3|
      |N|NULL|Y|NULL|1|
      |N|N|Y|연중무휴|2|

  - 데이터를 filtering 합니다. 
    - filter column : 시도명
    - filtering 데이터로 silver table 을 생성해 주세요.

      ```
      경상남도 - Gyeongsangnam-do
      경기도 - Gyeonggi-do
      전라남도 - Jeollanam-do
      대전광역시 - Daejeon
      강원도 - Gangwon-do
      강원특별자치도 - Gangwon-Special
      충청남도 - Chungcheongnam-do
      충청북도 - Chungcheongbuk-do
      부산광역시 - Busan
      울산광역시 - Ulsan
      서울특별시 - Seoul
      대구광역시 - Daegu
      광주광역시 - Gwangju
      제주특별자치도 - Jeju-Special
      경상북도 - Gyeongsangbuk-do
      전라북도 - Jeollabuk-do
      인천광역시 - Incheon

      ```
    - 테이블명 예시 경상남도  :  **`edu2501.{user_schema}.korea_tuor_gyeongsangnam_do_silver`**


|Type|Source Path|수집간격|mode|
|---|---|---|---|
|batch|bronze table|매일 07시 00분 실행|append|

❗️ bronze task Depends



In [0]:
from pyspark.sql.functions import col, lit, when, expr
import re
from datetime import datetime, timedelta


In [0]:
# TODO catalog_name 변경
catalog_name = 'edu2501'
username = spark.sql("SELECT current_user()").first()[0]
clean_username = re.sub("[^a-zA-Z0-9]", "_", username)

print(f'catalog_name = {catalog_name}')
print(f'username = {username}')
print(f'clean_username = {clean_username}')

spark.sql(f'set edu.catalog={catalog_name}')
spark.sql(f"set edu.username={username}")
spark.sql(f"set edu.clean_username={clean_username}")


## ❗️실제 테이블명이 아래와 다른경우 테이블명 입력해주세요.

In [0]:
bronze_table = f"{catalog_name}.{clean_username}.korea_tour_bronze"

dim_table = f"{catalog_name}.{clean_username}.korea_tour_dimension"


### Bronze table 에서 **`시도명 value`** 를 추출합니다. 

In [0]:
filter_col = spark.table(bronze_table).select('시도명').distinct().collect()
print(filter_col)


### DataFrame 반복 사용을 위해 persist() 등록 합니다.

In [0]:
from_date = (datetime.now()+ timedelta(hours=9)).strftime('%Y-%m-%d')
to_date = (datetime.now()+ timedelta(days=1)+ timedelta(hours=9)).strftime('%Y-%m-%d')

bronze_df = spark.table(bronze_table).filter((col('etl_ymdh') >= from_date) & (col('etl_ymdh') < to_date))

bronze_df.persist()


### Spark timezone 설정

In [0]:
spark.conf.set("spark.sql.session.timeZone", "Asia/Seoul")

In [0]:
display(bronze_df)


### 함수 선언

In [0]:
#############################
# 매핑 정보로 테이블 이름을 생성합니다.
#############################

def create_table_name(params) :
    sido_dic = {
    "경상남도" : "gyeongsangnam-do",
    "경기도" : "gyeonggi-do",
    "전라남도" : "jeollanam-do",
    "대전광역시" : "daejeon",
    "강원도" : "gangwon-do",
    "강원특별자치도" : "gangwon-special",
    "충청남도" : "Chungcheongnam-do",
    "충청북도" : "Chungcheongbuk-do",
    "부산광역시" : "Busan",
    "울산광역시" : "Ulsan",
    "서울특별시" : "Seoul",
    "대구광역시" : "Daegu", 
    "광주광역시" : "Gwangju",
    "제주특별자치도" : "Jeju-Special",
    "경상북도" : "Gyeongsangbuk-do",
    "전라북도" : "Jeollabuk-do",
    "인천광역시" : "Incheon"
    }

    temp_value = sido_dic.get(params)
    convert_value = re.sub("[^a-zA-Z0-9]", "_", temp_value).lower()
    result =  f"{catalog_name}.{clean_username}.korea_tour_" + convert_value + '_silver'

    return result

#############################
# score 컬럼을 생성하고 점수를 부여합니다.
#############################

def create_score(bronze_df):
    temp_df = bronze_df \
    .withColumn('score1', when(col('영어안내가능여부') == 'Y', 1).otherwise(0)) \
    .withColumn('score2', when(col('일본어안내가능여부') == 'Y', 1).otherwise(0))\
    .withColumn('score3', when(col('중국어안내가능여부') == 'Y', 1).otherwise(0)) \
    .withColumn('score4', when(col('휴무일') == '연중무휴', 1).otherwise(0)) \
    .withColumn('score', expr("score1 + score2 + score3 + score4"))

    score_df = temp_df.drop(col('score1'), col('score2'), col('score3'), col('score4'))
    
    return score_df

#############################
# 시도명 으로 데이터를 필터링 합니다. 
#############################

def split_df(sido, bronze_df):
    split_df = bronze_df.filter(col('시도명') == sido)
    silver_table_name = create_table_name(sido)
    result_df = create_score(split_df)

    return result_df, silver_table_name

#############################
# silver table 을 생성합니다. 
#############################

def creeate_silver_table(df, silver_table_name):

    df.write.mode('append').option("delta.columnMapping.mode", "name").option('mergeSchema', 'true').saveAsTable(silver_table_name)


### 반복 실행 

In [0]:
for row in filter_col:
    df, sliver_table_name = split_df(row['시도명'], bronze_df)
    print("===="*5, sliver_table_name, "===="*5)
    display(df)
    creeate_silver_table(df, sliver_table_name)

In [0]:
print(sliver_table_list)