### Packages

In [1]:
# packages
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from datetime import date, datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from hdfs import InsecureClient
import datetime as dt
import pandas as pd
import matplotlib.pyplot as plt #그래프 패키지 모듈 등록
%matplotlib inline 

In [2]:
# matplotlib 한글폰트 
import platform

from matplotlib import font_manager, rc
plt.rcParams['axes.unicode_minus'] = False

if platform.system() == 'Darwin':  # 맥OS 
    rc('font', family='AppleGothic')
elif platform.system() == 'Windows':  # 윈도우
    path = "c:/Windows/Fonts/malgun.ttf"
    font_name = font_manager.FontProperties(fname=path).get_name()
    rc('font', family=font_name)
else:
    rc('font', family='D2Coding')
       
# rc('font', family='NanumGothic') # 나눔폰트 사용시

### DW에 있는 Data 사용하기 위한 Hdfs 클라이언트 객체 생성

In [3]:
client = InsecureClient('http://localhost:9870', user='lab09')

### Data 불러오기
- 문화시설 = cultural.json
- 공연행사 = festival1.json
- 축제 = festival2.json
- 역사관광지 = historical.json
- 관광자원 = nature1.json
- 자연관광지 = nature2.json
- 레포츠 = reports.json
- 휴양관광지 = resorts.json

In [4]:
cultural = spark.read.json('/raw_data/cultural.json', encoding='utf-8')
festival1 = spark.read.json('/raw_data/festival1.json', encoding='utf-8')
festival2 = spark.read.json('/raw_data/festival2.json', encoding='utf-8')
historical = spark.read.json('/raw_data/historical.json', encoding='utf-8')
nature1 = spark.read.json('/raw_data/nature1.json', encoding='utf-8')
nature2 = spark.read.json('/raw_data/nature2.json', encoding='utf-8')
reports = spark.read.json('/raw_data/reports.json', encoding='utf-8')
resorts = spark.read.json('/raw_data/resorts.json', encoding='utf-8')

### 한 DF 로 합치기

- merged_df1
    - 문화시설 = cultural.json ✔️
    - 공연행사 = festival1.json ✔️
- 축제 = festival2.json
- 역사관광지 = historical.json
- 관광자원 = nature1.json
- 자연관광지 = nature2.json
- 레포츠 = reports.json
- 휴양관광지 = resorts.json

In [5]:
merged_cols = set(cultural.columns) | set(festival1.columns)

def get_new_columns(columns, merged_columns):
    return [col(x) if x in columns else lit(None).alias(x) for x in merged_columns]

new_cultural = cultural.select(*get_new_columns(cultural.columns, merged_cols))
new_festival1 = festival1.select(*get_new_columns(festival1.columns, merged_cols))

merged_df1 = new_cultural.unionByName(new_festival1)

In [6]:
merged_df1.printSchema()
print('cultural shape: ', (cultural.count(), len(cultural.columns)))
print('festival1 shape: ', (festival1.count(), len(festival1.columns)))
print('merged_df1 shape: ', (merged_df1.count(), len(merged_df1.columns)))

root
 |-- 신용카드 가능 여부: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 주관사 정보: string (nullable = true)
 |-- 행사장소: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 예매처: string (nullable = true)
 |-- 경도: double (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 수용인원: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주최자 연락처: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 진행형태: string (nullable = true)
 |-- 행사종료일: long (nullable = true)
 |-- 위도: double (nullable = true)
 |-- 할인정보: string (nullable = true)
 |-- 프로그램: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 주관사 연락처: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 관리자: string (nullable = true)
 |-- 유모차 대여 여부: string (nullable = true)
 |-- 관람가능연령: string (nullable = true)
 |-- 관람소요시간: string (nullable = true)
 |--

- merged_df2
    - merged_df1 ✔️
        - 문화시설 = cultural.json 
        - 공연행사 = festival1.json 
    - 축제 = festival2.json ✔️
- 역사관광지 = historical.json
- 관광자원 = nature1.json
- 자연관광지 = nature2.json
- 레포츠 = reports.json
- 휴양관광지 = resorts.json

In [7]:
merged_cols = set(merged_df1.columns) | set(festival2.columns)

def get_new_columns(columns, merged_columns):
    return [col(x) if x in columns else lit(None).alias(x) for x in merged_columns]

new_merged_df1 = merged_df1.select(*get_new_columns(merged_df1.columns, merged_cols))
new_festival2 = festival2.select(*get_new_columns(festival2.columns, merged_cols))

merged_df2 = new_merged_df1.unionByName(new_festival2)

In [8]:
merged_df2.printSchema()
print('merged_df1 shape: ', (merged_df1.count(), len(merged_df1.columns)))
print('festival2 shape: ', (festival2.count(), len(festival2.columns)))
print('merged_df2 shape: ', (merged_df2.count(), len(merged_df2.columns)))

root
 |-- 신용카드 가능 여부: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 주관사 정보: string (nullable = true)
 |-- 행사장소: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 예매처: string (nullable = true)
 |-- 경도: double (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 수용인원: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주최자 연락처: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 진행형태: string (nullable = true)
 |-- 행사종료일: long (nullable = true)
 |-- 위도: double (nullable = true)
 |-- 프로그램: string (nullable = true)
 |-- 할인정보: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 주관사 연락처: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 관리자: string (nullable = true)
 |-- 유모차 대여 여부: string (nullable = true)
 |-- 관람가능연령: string (nullable = true)
 |-- 관람소요시간: string (nullable = true)
 |--

- merged_df3
    - merged_df2 ✔️
        - merged_df1 
            - 문화시설 = cultural.json 
            - 공연행사 = festival1.json 
        - 축제 = festival2.json 
    - 역사관광지 = historical.json ✔️
- 관광자원 = nature1.json
- 자연관광지 = nature2.json
- 레포츠 = reports.json
- 휴양관광지 = resorts.json

In [9]:
merged_cols = set(merged_df2.columns) | set(historical.columns)

def get_new_columns(columns, merged_columns):
    return [col(x) if x in columns else lit(None).alias(x) for x in merged_columns]

new_merged_df2 = merged_df2.select(*get_new_columns(merged_df2.columns, merged_cols))
new_historical = historical.select(*get_new_columns(historical.columns, merged_cols))

merged_df3 = new_merged_df2.unionByName(new_historical)

In [10]:
merged_df3.printSchema()
print('merged_df2 shape: ', (merged_df2.count(), len(merged_df2.columns)))
print('historical shape: ', (historical.count(), len(historical.columns)))
print('merged_df3 shape: ', (merged_df3.count(), len(merged_df3.columns)))

root
 |-- 신용카드 가능 여부: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 주관사 정보: string (nullable = true)
 |-- 행사장소: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 예매처: string (nullable = true)
 |-- 경도: double (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 수용인원: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주최자 연락처: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 진행형태: string (nullable = true)
 |-- 행사종료일: long (nullable = true)
 |-- 위도: double (nullable = true)
 |-- 할인정보: string (nullable = true)
 |-- 프로그램: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 주관사 연락처: string (nullable = true)
 |--  유산구분: string (nullable = true)
 |-- 개장일: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 관리자: string (nullable = true)
 |-- 유모차 대여 여부: string (nullable = true)
 |-- 관람가

- merged_df4
    - merged_df3 ✔️
        - merged_df2 
            - merged_df1 
                - 문화시설 = cultural.json 
                - 공연행사 = festival1.json 
            - 축제 = festival2.json 
        - 역사관광지 = historical.json 
    - 관광자원 = nature1.json ✔️
- 자연관광지 = nature2.json
- 레포츠 = reports.json
- 휴양관광지 = resorts.json

In [11]:
merged_cols = set(merged_df3.columns) | set(nature1.columns)

def get_new_columns(columns, merged_columns):
    return [col(x) if x in columns else lit(None).alias(x) for x in merged_columns]

new_merged_df3 = merged_df3.select(*get_new_columns(merged_df3.columns, merged_cols))
new_nature1 = nature1.select(*get_new_columns(nature1.columns, merged_cols))

merged_df4 = new_merged_df3.unionByName(new_nature1)

In [12]:
merged_df4.printSchema()
print('merged_df3 shape: ', (merged_df3.count(), len(merged_df3.columns)))
print('nature1 shape: ', (nature1.count(), len(nature1.columns)))
print('merged_df4 shape: ', (merged_df4.count(), len(merged_df4.columns)))

root
 |-- 신용카드 가능 여부: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 주관사 정보: string (nullable = true)
 |-- 행사장소: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 예매처: string (nullable = true)
 |-- 경도: double (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 수용인원: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주최자 연락처: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 진행형태: string (nullable = true)
 |-- 행사종료일: long (nullable = true)
 |-- 위도: double (nullable = true)
 |-- 프로그램: string (nullable = true)
 |-- 할인정보: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 주관사 연락처: string (nullable = true)
 |--  유산구분: string (nullable = true)
 |-- 개장일: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 관리자: string (nullable = true)
 |-- 유모차 대여 여부: string (nullable = true)
 |-- 관람가

- merged_df5
    - merged_df4 ✔️
        - merged_df3 
            - merged_df2 
                - merged_df1 
                    - 문화시설 = cultural.json 
                    - 공연행사 = festival1.json 
                - 축제 = festival2.json 
            - 역사관광지 = historical.json 
        - 관광자원 = nature1.json
    - 자연관광지 = nature2.json ✔️
- 레포츠 = reports.json
- 휴양관광지 = resorts.json

In [13]:
merged_cols = set(merged_df4.columns) | set(nature2.columns)

def get_new_columns(columns, merged_columns):
    return [col(x) if x in columns else lit(None).alias(x) for x in merged_columns]

new_merged_df4 = merged_df4.select(*get_new_columns(merged_df4.columns, merged_cols))
new_nature2 = nature2.select(*get_new_columns(nature2.columns, merged_cols))

merged_df5 = new_merged_df4.unionByName(new_nature2)

In [14]:
merged_df5.printSchema()
print('merged_df4 shape: ', (merged_df4.count(), len(merged_df4.columns)))
print('nature2 shape: ', (nature2.count(), len(nature2.columns)))
print('merged_df5 shape: ', (merged_df5.count(), len(merged_df5.columns)))

root
 |-- 신용카드 가능 여부: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 주관사 정보: string (nullable = true)
 |-- 행사장소: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 예매처: string (nullable = true)
 |-- 경도: double (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 수용인원: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주최자 연락처: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 진행형태: string (nullable = true)
 |-- 행사종료일: long (nullable = true)
 |-- 위도: double (nullable = true)
 |-- 할인정보: string (nullable = true)
 |-- 프로그램: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 주관사 연락처: string (nullable = true)
 |--  유산구분: string (nullable = true)
 |-- 개장일: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 관리자: string (nullable = true)
 |-- 유모차 대여 여부: string (nullable = true)
 |-- 관람가

- merged_df6
    - merged_df5 ✔️
        - merged_df4 
            - merged_df3 
                - merged_df2 
                    - merged_df1 
                        - 문화시설 = cultural.json 
                        - 공연행사 = festival1.json 
                    - 축제 = festival2.json 
                - 역사관광지 = historical.json 
            - 관광자원 = nature1.json
        - 자연관광지 = nature2.json
    - 레포츠 = reports.json ✔️
- 휴양관광지 = resorts.json

In [15]:
merged_cols = set(merged_df5.columns) | set(reports.columns)

def get_new_columns(columns, merged_columns):
    return [col(x) if x in columns else lit(None).alias(x) for x in merged_columns]

new_merged_df5 = merged_df5.select(*get_new_columns(merged_df5.columns, merged_cols))
new_reports = reports.select(*get_new_columns(reports.columns, merged_cols))

merged_df6 = new_merged_df5.unionByName(new_reports)

In [16]:
merged_df6.printSchema()
print('merged_df5 shape: ', (merged_df5.count(), len(merged_df5.columns)))
print('reports shape: ', (reports.count(), len(reports.columns)))
print('merged_df6 shape: ', (merged_df6.count(), len(merged_df6.columns)))

root
 |-- 신용카드 가능 여부: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 주관사 정보: string (nullable = true)
 |-- 행사장소: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 개장기간: string (nullable = true)
 |-- 예매처: string (nullable = true)
 |-- 경도: double (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 수용인원: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주최자 연락처: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 진행형태: string (nullable = true)
 |-- 행사종료일: long (nullable = true)
 |-- 위도: double (nullable = true)
 |-- 프로그램: string (nullable = true)
 |-- 할인정보: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 주관사 연락처: string (nullable = true)
 |--  유산구분: string (nullable = true)
 |-- 개장일: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 관리자: string (nullable = true)
 |-- 유모차 대여 여

- merged_df_fin
    - merged_df6 ✔️
        - merged_df5 
            - merged_df4 
                - merged_df3 
                    - merged_df2 
                        - merged_df1 
                            - 문화시설 = cultural.json 
                            - 공연행사 = festival1.json 
                        - 축제 = festival2.json 
                    - 역사관광지 = historical.json 
                - 관광자원 = nature1.json
            - 자연관광지 = nature2.json
        - 레포츠 = reports.json 
    - 휴양관광지 = resorts.json ✔️

In [17]:
merged_cols = set(merged_df6.columns) | set(resorts.columns)

def get_new_columns(columns, merged_columns):
    return [col(x) if x in columns else lit(None).alias(x) for x in merged_columns]

new_merged_df6 = merged_df6.select(*get_new_columns(merged_df6.columns, merged_cols))
new_resorts = resorts.select(*get_new_columns(resorts.columns, merged_cols))

merged_df_fin = new_merged_df6.unionByName(new_resorts)

In [18]:
merged_df_fin.printSchema()
print('merged_df6 shape: ', (merged_df6.count(), len(merged_df6.columns)))
print('resorts shape: ', (resorts.count(), len(resorts.columns)))
print('merged_df_fin shape: ', (merged_df_fin.count(), len(merged_df_fin.columns)))

root
 |-- 신용카드 가능 여부: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 주관사 정보: string (nullable = true)
 |-- 행사장소: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 개장기간: string (nullable = true)
 |-- 예매처: string (nullable = true)
 |-- 경도: double (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 수용인원: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주최자 연락처: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 진행형태: string (nullable = true)
 |-- 행사종료일: long (nullable = true)
 |-- 위도: double (nullable = true)
 |-- 할인정보: string (nullable = true)
 |-- 프로그램: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 주관사 연락처: string (nullable = true)
 |--  유산구분: string (nullable = true)
 |-- 개장일: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 관리자: string (nullable = true)
 |-- 유모차 대여 여

In [19]:
merged_df_fin

DataFrame[신용카드 가능 여부: string, 지역: string, 주관사 정보: string, 행사장소: string, 행사시작일: bigint, 개장기간: string, 예매처: string, 경도: double, 명칭: string, 수용인원: string, 쉬는날: string, 이용요금: string, 공연시간: string, 전화번호: string, 주최자 연락처: string, 상세정보: string, 진행형태: string, 행사종료일: bigint, 위도: double, 할인정보: string, 프로그램: string, 애완동물 동반 가능 여부: string, 주관사 연락처: string,  유산구분: string, 개장일: string, 주차시설: string, 관리자: string, 유모차 대여 여부: string, 관람가능연령: string, 관람소요시간: string, 체험안내: string, 주소: string, 행사장 위치안내: string, 체험가능연령: string, 이용시기: string, Unnamed: 0: string, 분류: string, 규모: string, 주최자 정보: string, 부대행사: string, 문의 및 안내: string, 주차요금: string, 이벤트 홈페이지: string, 입장료: string, 이용시간: string, 축제형태: string, 개요: string, 우편번호: string]

In [20]:
merged_df_fin.coalesce(1).write.format('json').save('/dw_data/travel_data.json')

### 불필요한 컬럼 제거

- 위도
- 경도
- 행사장소
- 행사장 위치안내
- 관리자
- 주최자 정보
- 주관사 정보
- 주관사 연락처
- 주최자 연락처
- 신용카드 가능 여부
- 할인정보
- 개장일
- 문의 및 안내
- 수용인원
- 규모
- 이벤트 홈페이지
- 유산구분
- 부대행사
- 진행형태
- 축제형태
- 예매처
- 프로그램

In [21]:
from pyspark.sql.functions import col

In [22]:
merged_df_fin = spark.read.json('/dw_data/travel_data.json', encoding='utf-8')

In [23]:
# merged_df_fin copy
df_fin = merged_df_fin.select('*')

In [24]:
drop_columns = ['위도', '경도', '행사장소', '행사장 위치안내', '관리자', '주최자 정보', '주관사 정보', '주관사 연락처', 
                '주최자 연락처', '신용카드 가능 여부', '할인정보', '개장일', '문의 및 안내', '수용인원', '규모', '이벤트 홈페이지',
                ' 유산구분', '부대행사', '진행형태', '축제형태', '예매처', '프로그램', 'Unnamed: 0']


dropped_df_fin = df_fin.drop(*drop_columns)

In [25]:
dropped_df_fin.printSchema()

root
 |-- 개요: string (nullable = true)
 |-- 개장기간: string (nullable = true)
 |-- 공연시간: string (nullable = true)
 |-- 관람가능연령: string (nullable = true)
 |-- 관람소요시간: string (nullable = true)
 |-- 명칭: string (nullable = true)
 |-- 분류: string (nullable = true)
 |-- 상세정보: string (nullable = true)
 |-- 쉬는날: string (nullable = true)
 |-- 애완동물 동반 가능 여부: string (nullable = true)
 |-- 우편번호: string (nullable = true)
 |-- 유모차 대여 여부: string (nullable = true)
 |-- 이용시간: string (nullable = true)
 |-- 이용시기: string (nullable = true)
 |-- 이용요금: string (nullable = true)
 |-- 입장료: string (nullable = true)
 |-- 전화번호: string (nullable = true)
 |-- 주소: string (nullable = true)
 |-- 주차시설: string (nullable = true)
 |-- 주차요금: string (nullable = true)
 |-- 지역: string (nullable = true)
 |-- 체험가능연령: string (nullable = true)
 |-- 체험안내: string (nullable = true)
 |-- 행사시작일: long (nullable = true)
 |-- 행사종료일: long (nullable = true)



In [26]:
df_fin_pd = dropped_df_fin.toPandas()

In [27]:
df_fin_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4149 entries, 0 to 4148
Data columns (total 25 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   개요             4143 non-null   object 
 1   개장기간           583 non-null    object 
 2   공연시간           499 non-null    object 
 3   관람가능연령         112 non-null    object 
 4   관람소요시간         326 non-null    object 
 5   명칭             4147 non-null   object 
 6   분류             4149 non-null   object 
 7   상세정보           3666 non-null   object 
 8   쉬는날            2393 non-null   object 
 9   애완동물 동반 가능 여부  3199 non-null   object 
 10  우편번호           4028 non-null   object 
 11  유모차 대여 여부      3107 non-null   object 
 12  이용시간           2791 non-null   object 
 13  이용시기           12 non-null     object 
 14  이용요금           1167 non-null   object 
 15  입장료            24 non-null     object 
 16  전화번호           599 non-null    object 
 17  주소             4145 non-null   object 
 18  주차시설    

In [28]:
dropped_df_fin.coalesce(1).write.format('json').save('/dw_data/travel_data_dropped.json')