# **DataWarehouse Data Transform**
- 동일한 컬럼 가지고 있는 데이터끼리 분류하여 '관광지 분류' 컬럼으로 생성  
- 축제행사, 공연 : 날짜 지난 관광지 제거
- 필터링 기능에 포함시킬 컬럼 전처리
  - 요금정보 ['이용요금', '입장료', '상세정보'] : 유료/무료 형태로
  - 유모차 대여 여부 ['유모차 대여 여부'] : 없음/불가/가능
  - 애완동물 동반 가능 여부 ['애완동물 동반 가능 여부'] : 없음/불가/가능
  - 지역 ['지역'] : 경기도/서울/인천
- 관람/체험 연령 관련 컬럼 전처리

## **Packages**

In [1]:
# packages
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from datetime import date, datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from hdfs import InsecureClient
import datetime as dt
import pandas as pd
import matplotlib.pyplot as plt #그래프 패키지 모듈 등록
%matplotlib inline 

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:
# matplotlib 한글폰트 
import platform

from matplotlib import font_manager, rc
plt.rcParams['axes.unicode_minus'] = False

if platform.system() == 'Darwin':  # 맥OS 
    rc('font', family='AppleGothic')
elif platform.system() == 'Windows':  # 윈도우
    path = "c:/Windows/Fonts/malgun.ttf"
    font_name = font_manager.FontProperties(fname=path).get_name()
    rc('font', family=font_name)
else:
    rc('font', family='D2Coding')
       
# rc('font', family='NanumGothic') # 나눔폰트 사용시

## **DW에 있는 Data 사용하기 위한 Hdfs 클라이언트 객체 생성**

In [3]:
client = InsecureClient('http://localhost:9870', user='root')

## **Data 불러오기**
- travel_data_dropped.json : 불필요한 컬럼을 제거한 raw data 

In [4]:
df_tmp = spark.read.json('/dw_data/travel_data_dropped.json', encoding='utf-8')

                                                                                

In [5]:
df_tmp.printSchema()

root
 |-- 개요: string (nullable = true)
 |-- 개장기간: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 관람가능연령: string (nullable = true)
 |-- 관람소요시간: string (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 분류: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 우편번호: string (nullable = true)
 |-- 유모차 대여 여부: string (nullable = true)
 |-- 이용시간: string (nullable = true)
 |-- 이용시기: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 입장료: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주소: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 주차요금: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 체험가능연령: string (nullable = true)
 |-- 체험안내: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 행사종료일: long (nullable = true)



## **1️⃣ 동일한 컬럼 가지고 있는 데이터끼리 분류하여 '관광지 분류' 컬럼으로 생성**

[관광지 분류]
- 축제공연행사
    - 축제
    - 공연행사 
- 문화시설
    - 문화시설 
- 레포츠
    - 레포츠 
- 인문관광지
    - 휴양관광지
    - 역사관광지
    - 자연관광지
    - 관광자원 

In [6]:
df_tmp.select("분류").distinct().show()

+----------+
|      분류|
+----------+
|  문화시설|
|자연관광지|
|    레포츠|
|역사관광지|
|      축제|
|  공연행사|
|  관광자원|
|휴양관광지|
+----------+



In [7]:
# package
from pyspark.sql import functions as F


# '분류' values <-> '관광지 분류' values 매핑시키기
def map_classification(category):
    if category in ['축제', '공연행사']:
        return '축제공연행사'
    elif category == '문화시설':
        return '문화시설'
    elif category == '레포츠':
        return '레포츠'
    elif category in ['휴양관광지', '역사관광지', '자연관광지', '관광자원']:
        return '인문관광지'
    else:
        return 'Unknown'

mapping_udf = F.udf(map_classification)


# 매핑된 결과 '관광지 분류'로 생성
df_result = df_tmp.withColumn('관광지 분류', mapping_udf(df_tmp['분류']))

In [8]:
# 확인
df_result.filter(df_result['관광지 분류'] == '인문관광지').select('관광지 분류', '분류').distinct().show()
df_result.filter(df_result['관광지 분류'] == '레포츠').select('관광지 분류', '분류').distinct().show()
df_result.filter(df_result['관광지 분류'] == '축제공연행사').select('관광지 분류', '분류').distinct().show()
df_result.filter(df_result['관광지 분류'] == '문화시설').select('관광지 분류', '분류').distinct().show()

+-----------+----------+
|관광지 분류|      분류|
+-----------+----------+
| 인문관광지|자연관광지|
| 인문관광지|역사관광지|
| 인문관광지|  관광자원|
| 인문관광지|휴양관광지|
+-----------+----------+

+-----------+------+
|관광지 분류|  분류|
+-----------+------+
|     레포츠|레포츠|
+-----------+------+

+------------+--------+
| 관광지 분류|    분류|
+------------+--------+
|축제공연행사|공연행사|
|축제공연행사|    축제|
+------------+--------+

+-----------+--------+
|관광지 분류|    분류|
+-----------+--------+
|   문화시설|문화시설|
+-----------+--------+



## **2️⃣ '축제공연행사' : 날짜 지난 관광지 제거**
- '행사종료일' 기준으로 서비스 출시일 2월8일 이전에 종료되는 데이터 제거
- '행사종료일' = LongType (=Represents 8-byte signed integer numbers)

In [9]:
# 20240208 이후 데이터만 저장 
df_filtered = df_result.filter(df_result.행사종료일 >= 20240208)

In [10]:
# 확인

## 제거하고 남은 데이터 개수 확인 
print('제거하고 남은 데이터 개수: ', df_filtered.count())

## 행사종료일이 '2023'으로 시작하는 데이터 (null 나와야 함)
df_filtered.select('행사종료일').filter(df_filtered.행사종료일.like('2023%')).count()

## 행사종료일이 '2024'으로 시작하는 데이터 (0208 이후 데이터만 포함되어야 함)
df_filtered.select('행사종료일').filter(df_filtered.행사종료일.like('2024%')).distinct().show()

제거하고 남은 데이터 개수:  36


0

+----------+
|행사종료일|
+----------+
|  20241222|
|  20240331|
|  20240229|
|  20240324|
|  20240228|
|  20241205|
|  20240721|
|  20241231|
|  20240225|
|  20240505|
|  20241207|
|  20240317|
|  20240303|
|  20240720|
|  20240226|
|  20240218|
|  20240901|
|  20240509|
|  20240212|
|  20240413|
+----------+
only showing top 20 rows



In [11]:
# 원본데이터의 '축제공연행사' 데이터 중 위 df_filtered(0208 이후 데이터) 만 남기기

## merge 할 때 구분하기 위해 새로운 컬럼 생성 
df_fest = df_filtered.withColumn("구분", lit("yes"))
df_result_new = df_result.withColumn("구분", lit(None))

## 두 데이터 합치기
merged_df = df_result_new.union(df_fest)

## 합친 데이터에서 "구분" 컬럼 None 행 제거
filtered_df = merged_df.filter(
    (merged_df['관광지 분류'] != "축제공연행사") | (merged_df['구분'].isNotNull()))

## "구분" 컬럼 drop
df_fin = filtered_df.drop("구분")

In [12]:
# 확인

## 데이터 개수 확인 
print('데이터 개수: ', df_fin.count())

## 행사종료일이 '2024'으로 시작하는 데이터 (0208 이후 데이터만 포함되어야 함)
df_fin.select('행사종료일').filter(df_fin.행사종료일.like('2024%')).distinct().show()

데이터 개수:  3646
+----------+
|행사종료일|
+----------+
|  20241222|
|  20240331|
|  20240229|
|  20240324|
|  20240228|
|  20241205|
|  20240721|
|  20241231|
|  20240225|
|  20240505|
|  20241207|
|  20240317|
|  20240303|
|  20240720|
|  20240226|
|  20240218|
|  20240901|
|  20240509|
|  20240212|
|  20240413|
+----------+
only showing top 20 rows



## **3️⃣ 필터링 기능에 포함시킬 컬럼 전처리**
- 요금정보 ['이용요금', '입장료', '상세정보'] : 유료/무료 형태로
- 유모차 대여 여부 ['유모차 대여 여부'] : 없음/불가/가능
- 애완동물 동반 가능 여부 ['애완동물 동반 가능 여부'] : 없음/불가/가능

### 📍 요금정보 ['이용요금', '입장료', '상세정보'] : 유료/무료 형태로
- '유/무료 여부' 컬럼 새로 생성

### 📍 유모차 대여 여부 ['유모차 대여 여부'] : 없음/불가/가능

In [13]:
# 원본 데이터 값 확인
df_fin.select('유모차 대여 여부').distinct().show()

+----------------+
|유모차 대여 여부|
+----------------+
|            null|
|            없음|
|            불가|
|            가능|
+----------------+



In [14]:
# Transform
df_fin_replaced = df_fin.withColumn("유모차 대여 여부", 
                                    when((df_fin["유모차 대여 여부"].isNull()) | 
                                         (df_fin["유모차 대여 여부"] == "없음"), "불가")
                                    .otherwise(df_fin["유모차 대여 여부"]))

In [15]:
# 확인

## 불가, 가능으로 구성되어졌는지
df_fin_replaced.select('유모차 대여 여부').distinct().show()

## 갯수 맞게 들어갔는지 
### origin ver
tmp = df_fin.toPandas()
tmp['유모차 대여 여부'].value_counts(dropna=False)
### replaced ver
fin = df_fin_replaced.toPandas()
fin['유모차 대여 여부'].value_counts(dropna=False)

+----------------+
|유모차 대여 여부|
+----------------+
|            불가|
|            가능|
+----------------+



24/01/29 00:56:09 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


유모차 대여 여부
없음      2792
None     539
불가       248
가능        67
Name: count, dtype: int64

유모차 대여 여부
불가    3579
가능      67
Name: count, dtype: int64

### 📍 애완동물 동반 가능 여부 ['애완동물 동반 가능 여부'] : 없음/불가/가능

In [16]:
# 원본 데이터 값 확인
df_fin_replaced.select('애완동물 동반 가능 여부').distinct().show()

+-----------------------+
|애완동물 동반 가능 여부|
+-----------------------+
|                   null|
|                   없음|
|                   불가|
|                   가능|
+-----------------------+



In [17]:
# Transform
df_replaced = df_fin_replaced.withColumn("애완동물 동반 가능 여부", 
                                    when((df_fin["애완동물 동반 가능 여부"].isNull()) | 
                                         (df_fin["애완동물 동반 가능 여부"] == "없음"), "불가")
                                    .otherwise(df_fin["애완동물 동반 가능 여부"]))

In [18]:
# 확인

## 불가, 가능으로 구성되어졌는지
df_replaced.select('애완동물 동반 가능 여부').distinct().show()

## 갯수 맞게 들어갔는지 
### origin ver
tmp = df_fin.toPandas()
tmp['애완동물 동반 가능 여부'].value_counts(dropna=False)
### replaced ver
fin = df_replaced.toPandas()
fin['애완동물 동반 가능 여부'].value_counts(dropna=False)

+-----------------------+
|애완동물 동반 가능 여부|
+-----------------------+
|                   불가|
|                   가능|
+-----------------------+



애완동물 동반 가능 여부
불가      1416
없음      1133
가능       650
None     447
Name: count, dtype: int64

애완동물 동반 가능 여부
불가    2996
가능     650
Name: count, dtype: int64

## **4️⃣ 관람/체험 연령 관련 컬럼 전처리**

(총 데이터 갯수 3648)
- [축제공연행사] 관람가능연령 : 7개 데이터
- [레포츠]
    - 체험가능연령 : 767데이터, 그 중 70%가 예약 관련 정보로 들어가 있음
    - 주차시설 : 연령 관련 정보가 여기 들어가 있는데 63개 데이터
- [인문관광지] 체험가능연령 : 177개 데이터

➡️ 관람가능연령, 체험가능연령 컬럼 제거 <br/>
➡️ 주차시설에 연령 관련 정보 데이터 Null 처리

In [32]:
# "관람가능연령", "체험가능연령" 컬럼 drop
df_dropped = df_replaced.drop("관람가능연령", "체험가능연령")

In [36]:
# "관광지 분류"가 '레포츠' 인 데이터 중 "주차시설"인 컬럼 Null 값으로 바꾸기 
result_df = df_dropped.withColumn(
    "주차시설",
    when(col("관광지 분류") == "레포츠", lit(None)).otherwise(col("주차시설"))
)

In [37]:
# 확인

## Schema 확인 ("주차시설" 유지, "관람가능연령"&"체험가능연령" 삭제)
result_df.printSchema()

root
 |-- 개요: string (nullable = true)
 |-- 개장기간: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 관람소요시간: string (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 분류: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 우편번호: string (nullable = true)
 |-- 유모차 대여 여부: string (nullable = true)
 |-- 이용시간: string (nullable = true)
 |-- 이용시기: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 입장료: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주소: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 주차요금: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 체험안내: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 행사종료일: long (nullable = true)
 |-- 관광지 분류: string (nullable = true)



In [59]:
## 갯수 맞게 들어갔는지 
### origin ver
origin = df_replaced.toPandas()
tmp = origin[origin['관광지 분류'] == '레포츠']
print('origin 레포츠 주차시설 데이터 갯수: ', tmp['주차시설'].count())

### replaced ver
result = result_df.toPandas()
tmp2 = result[result['관광지 분류'] == '레포츠']
print('replaced 레포츠 주차시설 데이터 갯수: ', tmp2['주차시설'].count())
print('전처리 한 주차시설 데이터 확인: ')
result['주차시설'].value_counts(dropna=False)

origin 레포츠 주차시설 데이터 갯수:  63
replaced 레포츠 주차시설 데이터 갯수:  0
전처리 한 주차시설 데이터 확인: 


주차시설
None                                                                           1615
있음                                                                              378
가능                                                                              227
가능<br>요금(무료)                                                                    165
주차가능                                                                            154
                                                                               ... 
가능(300대 가능)<br>요금(무료)                                                             1
가능(국청사 주차장 / 산성로터리 주차장)<br>요금(무료(국청사 무료 / 산성로터리 주차장 평일 3,000원 / 주말 5,000원))       1
가능 (시민체육공원 10대)                                                                   1
주차불가(별도 주차공간 없음)                                                                  1
가능(신흥제3공영주차장)<br>요금(최초 30분 400원(초과 10분당 200원) 1일 최대 6,000원)                       1
Name: count, Length: 699, dtype: int64

In [64]:
result_df.coalesce(1).write.format('json').save('/dw_data/travel_data_preprocessed_tmp.json')