In [1]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [None]:
# 데이터마트용 계정 생성
mysql> create user bigSVC@'%' identified by 'bigSVCl1234@';

mysql> grant all privileges on *.* to bigSVC@'%' with grant option;
    - with grant option: 권한 전달이 가능한 계정

In [3]:
from datetime import date, datetime
from pyspark.sql.types import *
from pyspark.sql.functions import *
import datetime as dt
import pandas as pd
import matplotlib.pyplot as plt #그래프 패키지 모듈 등록
%matplotlib inline 


In [4]:
import platform

from matplotlib import font_manager, rc
plt.rcParams['axes.unicode_minus'] = False

if platform.system() == 'Darwin':  # 맥OS 
    rc('font', family='AppleGothic')
elif platform.system() == 'Windows':  # 윈도우
    path = "c:/Windows/Fonts/malgun.ttf"
    font_name = font_manager.FontProperties(fname=path).get_name()
    rc('font', family=font_name)
else:
    rc('font', family='D2Coding')

In [5]:
def cal_std_day(befor_day):   
    x = dt.datetime.now() - dt.timedelta(befor_day)
    year = x.year
    month = x.month if x.month >= 10 else '0'+ str(x.month)
    day = x.day if x.day >= 10 else '0'+ str(x.day)  
    return str(year)+ '-' +str(month)+ '-' +str(day)

### JDBC 연결정보

In [6]:
# dmdb 연결정보
conf_dm = {
      'url':'jdbc:mysql://localhost:3306/etlMysqlDM?characterEncoding=utf8&serverTimezone=Asia/Seoul'
     ,'props':{
      'user':'bigMysql',
      'password':'bigMysql1234@'   
      }
}

# service db 연결정보
conf_svc = {
      'url':'jdbc:mysql://localhost:3306/etlMysqlSVC?characterEncoding=utf8&serverTimezone=Asia/Seoul'
     ,'props':{
      'user':'bigSVC',
      'password':'bigSVC1234@'   
      }
}

## 활용할 db 준비
- create database etlMysqlSVC;
- mysql -u bigSVC -p
- use etlMysqlSVC;

## Load DataMart

### 1. 지역별 단위면적(km)당 인구수와 코로나 확진자 수 데이터 저장

- base data load
    - LOC table : 지역별 인구와 면적
    - dm db data read -> 처리코드 -> service db data load

In [8]:
spark.read.jdbc(url=conf_dm['url'], table='LOC', properties=conf_dm['props']).show()

                                                                                

+----+------+---------+
| LOC|  AREA|     POPU|
+----+------+---------+
|서울|  605 | 9736027 |
|부산|  770 | 3396109 |
|대구|  883 | 2412642 |
|인천| 1065 | 3014739 |
|광주|  501 | 1462545 |
|대전|  540 | 1469543 |
|울산| 1062 | 1138419 |
|세종|  465 |  376779 |
|경기|10195 |13925862 |
|강원|16830 | 1555876 |
|충북| 7407 | 1633472 |
|충남| 8246 | 2181835 |
|전북| 8070 | 1817186 |
|전남|12348 | 1865459 |
|경북|19034 | 2677709 |
|경남|10541 | 3377331 |
|제주| 1850 |  697476 |
+----+------+---------+



In [9]:
# db table 읽기 함수, 저장 함수
    # 데이터 찾아서 읽어올 일이 있으면 이 함수 불러올 것이다
#read
def find_data(config, table_name):
    return spark.read.jdbc(url=config['url'], table=table_name, properties=config['props'])

#write
def save_data(config, df, table_name):
    return df.write.jdbc(url=config['url'], table=table_name, mode='append', properties=config['props'])

#### 단위면적당 인구수, 코로나 발생현황 data
- dmdb LOC, CORONA_PATIENTS

In [10]:
popu = find_data(conf_dm, 'LOC')
patients = find_data(conf_dm, 'CORONA_PATIENTS')
type(popu)
type(patients)

pyspark.sql.dataframe.DataFrame

pyspark.sql.dataframe.DataFrame

In [11]:
popu.columns
patients.columns

['LOC', 'AREA', 'POPU']

['LOC', 'DEATH_CNT', 'DEF_CNT', 'LOC_OCC_CNT', 'QUR_RATE', 'STD_DAY']

In [12]:
['LOC', 'DEATH_CNT', 'DEF_CNT', 'LOC_OCC_CNT', 'QUR_RATE', 'STD_DAY']
# 지역   누적사망자수  누적 발생수  지역발생수     만명당 발생률  기준일자

['LOC', 'DEATH_CNT', 'DEF_CNT', 'LOC_OCC_CNT', 'QUR_RATE', 'STD_DAY']

In [13]:
popu.count()
patients.count()

                                                                                

17

102

In [14]:
popu.join(patients, on='LOC').tail(10)

                                                                                

[Row(LOC='충남', AREA='8246 ', POPU='2181835 ', DEATH_CNT='174', DEF_CNT='19997', LOC_OCC_CNT='93', QUR_RATE='944', STD_DAY='2022-01-17'),
 Row(LOC='충남', AREA='8246 ', POPU='2181835 ', DEATH_CNT='166', DEF_CNT='19779', LOC_OCC_CNT='110', QUR_RATE='933', STD_DAY='2022-01-15'),
 Row(LOC='충남', AREA='8246 ', POPU='2181835 ', DEATH_CNT='177', DEF_CNT='20142', LOC_OCC_CNT='140', QUR_RATE='950', STD_DAY='2022-01-18'),
 Row(LOC='충남', AREA='8246 ', POPU='2181835 ', DEATH_CNT='170', DEF_CNT='19893', LOC_OCC_CNT='105', QUR_RATE='939', STD_DAY='2022-01-16'),
 Row(LOC='충북', AREA='7407 ', POPU='1633472 ', DEATH_CNT='121', DEF_CNT='12497', LOC_OCC_CNT='38', QUR_RATE='782', STD_DAY='2022-01-17'),
 Row(LOC='충북', AREA='7407 ', POPU='1633472 ', DEATH_CNT='120', DEF_CNT='12263', LOC_OCC_CNT='58', QUR_RATE='768', STD_DAY='2022-01-13'),
 Row(LOC='충북', AREA='7407 ', POPU='1633472 ', DEATH_CNT='120', DEF_CNT='12333', LOC_OCC_CNT='67', QUR_RATE='772', STD_DAY='2022-01-14'),
 Row(LOC='충북', AREA='7407 ', POPU='163

In [15]:
# 이 중에서 10만명당 data를 볼 수 있는 데이터들을 뽑을것
popPatient = popu.join(patients, on='LOC')\
                    .select('LOC',
                            ceil(col('POPU')/col('AREA')).alias('단위면적당인구'),
                            'QUR_RATE', 'STD_DAY')\
                    .orderBy(col('STD_DAY'))

In [16]:
popPatient.show(5)
popPatient.tail(5)

+----+--------------+--------+----------+
| LOC|단위면적당인구|QUR_RATE|   STD_DAY|
+----+--------------+--------+----------+
|대구|          2733|    1024|2022-01-13|
|울산|          1072|     653|2022-01-13|
|대전|          2722|     888|2022-01-13|
|경남|           321|     668|2022-01-13|
|부산|          4411|     819|2022-01-13|
+----+--------------+--------+----------+
only showing top 5 rows



                                                                                

[Row(LOC='전남', 단위면적당인구=152, QUR_RATE='433', STD_DAY='2022-01-18'),
 Row(LOC='전북', 단위면적당인구=226, QUR_RATE='629', STD_DAY='2022-01-18'),
 Row(LOC='제주', 단위면적당인구=378, QUR_RATE='724', STD_DAY='2022-01-18'),
 Row(LOC='충남', 단위면적당인구=265, QUR_RATE='950', STD_DAY='2022-01-18'),
 Row(LOC='충북', 단위면적당인구=221, QUR_RATE='786', STD_DAY='2022-01-18')]

- 각 DB에 고유번호를 추가해서 레코드를 유일하게 구별하는 컬럼으로 사용
    - Spark 데이터 프레임의 StatFunctions 패키지 함수 중 monotonically_increasing_id
    - 데이터 프레임의 로우에 되는매핑되는  고유 id 반환

In [17]:
popPatFin = popPatient.withColumn('CP_IDX',monotonically_increasing_id())
popPatFin.show(2)

+----+--------------+--------+----------+------+
| LOC|단위면적당인구|QUR_RATE|   STD_DAY|CP_IDX|
+----+--------------+--------+----------+------+
|강원|            93|     869|2022-01-13|     0|
|경기|          1366|    1495|2022-01-13|     1|
+----+--------------+--------+----------+------+
only showing top 2 rows



In [18]:
save_data(conf_svc, popPatFin, 'CO_POP_DENSITY')

                                                                                

### 2.  백신접종 완료자와 코로나 확진자 수 데이터 저장
- 백신접종 3차 접종자수(누적): 인구 10만명당 접종자 수
- 코로나 발생현황: QUR_RATE(인구 10만명 당 발생자 수)

In [19]:
vaccine = find_data(conf_dm, 'CORONA_VACCINE')

In [20]:
vaccine.show(3)
vaccine.printSchema()

+----+----------+----+---------+
| LOC|   STD_DAY|V_TH|    V_CNT|
+----+----------+----+---------+
|충북|2022-01-18|  v2|1379378.0|
|충북|2022-01-18|  v1|  1408620|
|충북|2022-01-18|  v3| 783588.0|
+----+----------+----+---------+
only showing top 3 rows

root
 |-- LOC: string (nullable = true)
 |-- STD_DAY: string (nullable = true)
 |-- V_TH: string (nullable = true)
 |-- V_CNT: string (nullable = true)



In [21]:
# V_CNT열 형변환
vaccine = vaccine.withColumn('V_CNT', vaccine['V_CNT'].cast(IntegerType()))
vaccine.show(2)

+----+----------+----+-------+
| LOC|   STD_DAY|V_TH|  V_CNT|
+----+----------+----+-------+
|충북|2022-01-18|  v2|1379378|
|충북|2022-01-18|  v1|1408620|
+----+----------+----+-------+
only showing top 2 rows



In [22]:
## long df -> wide df 변환(V_TH 컬럼값을 컬럼으로 변환)
## pivot사용: pandas df로 변환
pdVac = vaccine.pandas_api()
type(pdVac)



pyspark.pandas.frame.DataFrame

In [25]:
pdVacv1 = pdVac.pivot_table(index=['LOC', 'STD_DAY'], columns='V_TH', values='V_CNT')

In [26]:
# pdVacv1.head()
pdVacv1 = pdVacv1.reset_index()
pdVacv1.head(1)

                                                                                

V_TH,LOC,STD_DAY,v1,v2,v3
0,전남,2022-01-15,1631047.0,1599517.0,1007985.0


In [27]:
# spark df로 변환
vaccine = pdVacv1.to_spark()



In [28]:
vaccine.show(2)

[Stage 77:>                                                         (0 + 1) / 1]

+----+----------+---------+---------+---------+
| LOC|   STD_DAY|       v1|       v2|       v3|
+----+----------+---------+---------+---------+
|전남|2022-01-15|1631047.0|1599517.0|1007985.0|
|울산|2022-01-17| 956456.0| 930969.0| 467195.0|
+----+----------+---------+---------+---------+
only showing top 2 rows



                                                                                

In [29]:
# 지역별 인구 10만명당 백신 접종률 계산
# 인구 data 필요
popu = find_data(conf_dm, 'LOC')
# 코로나 현황 data 필요
patients = find_data(conf_dm,'CORONA_PATIENTS')

In [30]:
popu.columns
patients.columns

['LOC', 'AREA', 'POPU']

['LOC', 'DEATH_CNT', 'DEF_CNT', 'LOC_OCC_CNT', 'QUR_RATE', 'STD_DAY']

In [31]:
vacRate= vaccine.join(popu, on='LOC')\
                .select('LOC',
                        'STD_DAY',
                        ceil(col('v3')/col('POPU') *100000).alias('THRD_RATE'))

In [32]:
vacRate.show(1)

+----+----------+---------+
| LOC|   STD_DAY|THRD_RATE|
+----+----------+---------+
|경북|2022-01-16|    45861|
+----+----------+---------+
only showing top 1 row



In [33]:
vacRate.columns
patients.columns

['LOC', 'STD_DAY', 'THRD_RATE']

['LOC', 'DEATH_CNT', 'DEF_CNT', 'LOC_OCC_CNT', 'QUR_RATE', 'STD_DAY']

In [34]:
coPatVac= vacRate.join(patients, on=['LOC','STD_DAY'])\
                    .select('LOC','STD_DAY','THRD_RATE','QUR_RATE')

In [35]:
coPatVac.show(3)

                                                                                

+----+----------+---------+--------+
| LOC|   STD_DAY|THRD_RATE|QUR_RATE|
+----+----------+---------+--------+
|경북|2022-01-16|    45861|     637|
|경북|2022-01-15|    45392|     633|
|경북|2022-01-17|    45875|     641|
+----+----------+---------+--------+
only showing top 3 rows



In [36]:
coPatVac = coPatVac.withColumn('CV_IDX',monotonically_increasing_id())

In [37]:
coPatVac.show(2)

+----+----------+---------+--------+------+
| LOC|   STD_DAY|THRD_RATE|QUR_RATE|CV_IDX|
+----+----------+---------+--------+------+
|경북|2022-01-16|    45861|     637|     0|
|경북|2022-01-15|    45392|     633|     1|
+----+----------+---------+--------+------+
only showing top 2 rows



In [38]:
save_data(conf_svc, coPatVac, 'CO_VACCINE_PATIENT')

                                                                                

### 3. 다중이용시설과 코로나 확진자 수의 data
- dmdb LOC_FACILITY_CNT
    - 인구 10만명당 다중 이용시설 수
- dmdb CORONA_PATIENTS
- dmdb LOC

In [41]:
# 지역별 인구 10만명당 다중 이용 시설 수
# 인구 data 필요
popu = find_data(conf_dm, 'LOC')
# 코로나 현황 data 필요
patients = find_data(conf_dm,'CORONA_PATIENTS')

In [46]:
conf_dm

{'url': 'jdbc:mysql://localhost:3306/etlMysqlDM?characterEncoding=utf8&serverTimezone=Asia/Seoul',
 'props': {'user': 'bigMysql', 'password': 'bigMysql1234@'}}

In [47]:
facil = find_data(conf_dm, 'LOC_FACILITY')

In [48]:
facil.show(2)

+----+-------+
| LOC|FAC_CNT|
+----+-------+
|경북|    944|
|대전|    767|
+----+-------+
only showing top 2 rows



In [49]:
# 인구 10만명당 다중 이용시설 수
facPop = popu.join(facil, on='LOC')\
            .select('LOC',
                    ceil(facil.FAC_CNT/popu.POPU*100000).alias('FAC_POPU'))

In [50]:
facPop.show(2)

+----+--------+
| LOC|FAC_POPU|
+----+--------+
|강원|      38|
|경기|      46|
+----+--------+
only showing top 2 rows



In [51]:
coFacPat = patients.join(facPop, on='LOC')\
                    .select('LOC', 'FAC_POPU','QUR_RATE','STD_DAY')\
                    .withColumn('CF_IDX',monotonically_increasing_id())

In [52]:
coFacPat.show(2)

+----+--------+--------+----------+------+
| LOC|FAC_POPU|QUR_RATE|   STD_DAY|CF_IDX|
+----+--------+--------+----------+------+
|경북|      36|     641|2022-01-17|     0|
|경북|      36|     626|2022-01-13|     1|
+----+--------+--------+----------+------+
only showing top 2 rows



In [53]:
save_data(conf_svc, coFacPat, 'CO_FACT_PATIENTS')

### 4. 요일별 코로나 확진자 수를 구해보자
- 코로나현황 data : STD_DAY 기준일 data -> 요일로 변경(dayofweek()) -> 요일별 그룹 생성 후 신규확진자수 합산

In [54]:
# 코로나 현황 data 필요
patients = find_data(conf_dm,'CORONA_PATIENTS')

In [55]:
# 기준일을 요일로 변환 
patWeek = patients.withColumn('DAY_OF_WEEK', dayofweek(col('STD_DAY')))

In [56]:
patWeek.show(10)

+----+---------+-------+-----------+--------+----------+-----------+
| LOC|DEATH_CNT|DEF_CNT|LOC_OCC_CNT|QUR_RATE|   STD_DAY|DAY_OF_WEEK|
+----+---------+-------+-----------+--------+----------+-----------+
|충남|      165|  19553|        126|     923|2022-01-13|          5|
|전북|      120|  10724|         99|     600|2022-01-14|          6|
|서울|     2009| 239014|        858|    2513|2022-01-13|          5|
|대구|      382|  24526|         77|    1028|2022-01-14|          6|
|전북|      121|  11094|        114|     621|2022-01-17|          2|
|강원|      110|  13368|         82|     869|2022-01-13|          5|
|부산|      329|  27456|        156|     819|2022-01-13|          5|
|세종|        4|   2315|          6|     622|2022-01-13|          5|
|대전|      184|  12965|         64|     893|2022-01-14|          6|
|광주|       55|   9660|        224|     670|2022-01-14|          6|
+----+---------+-------+-----------+--------+----------+-----------+
only showing top 10 rows



In [57]:
weekP = patWeek.groupby(patWeek.DAY_OF_WEEK).agg(sum(col('LOC_OCC_CNT')).alias('PATIENTS'))

In [58]:
weekP.show()

+-----------+--------+
|DAY_OF_WEEK|PATIENTS|
+-----------+--------+
|          1|  3813.0|
|          6|  4133.0|
|          3|  3763.0|
|          5|  3776.0|
|          7|  4077.0|
|          2|  3551.0|
+-----------+--------+



In [59]:
weekP = weekP.withColumn('DAY_OF_WEEK', when(weekP.DAY_OF_WEEK == 1, 'MON')
                                         .when(weekP.DAY_OF_WEEK == 2, 'TUE')
                                         .when(weekP.DAY_OF_WEEK == 3, 'WED')
                                         .when(weekP.DAY_OF_WEEK == 4, 'THE')
                                         .when(weekP.DAY_OF_WEEK == 5, 'FRI')
                                         .when(weekP.DAY_OF_WEEK == 6, 'SAT')
                                         .when(weekP.DAY_OF_WEEK == 7, 'SUN'))

In [60]:
weekP.show()

+-----------+--------+
|DAY_OF_WEEK|PATIENTS|
+-----------+--------+
|        MON|  3813.0|
|        SAT|  4133.0|
|        WED|  3763.0|
|        FRI|  3776.0|
|        SUN|  4077.0|
|        TUE|  3551.0|
+-----------+--------+



In [61]:
# long -> wide형으로 변경 후 저장
pdWeek = weekP.pandas_api()

In [62]:
# pdWeek
pdWeekpvt = pdWeek.pivot_table(columns='DAY_OF_WEEK', values='PATIENTS')
pdWeek = pdWeekpvt.reset_index()

In [63]:
weekP = pdWeek.to_spark()
weekP.show()

+--------+------+------+------+------+------+------+
|   index|   FRI|   MON|   SAT|   SUN|   TUE|   WED|
+--------+------+------+------+------+------+------+
|PATIENTS|3776.0|3813.0|4133.0|4077.0|3551.0|3763.0|
+--------+------+------+------+------+------+------+



In [64]:
weekP.withColumn('STD_DAY', lit(cal_std_day(365*3+4))).show()

+--------+------+------+------+------+------+------+----------+
|   index|   FRI|   MON|   SAT|   SUN|   TUE|   WED|   STD_DAY|
+--------+------+------+------+------+------+------+----------+
|PATIENTS|3776.0|3813.0|4133.0|4077.0|3551.0|3763.0|2022-01-18|
+--------+------+------+------+------+------+------+----------+



In [65]:
save_data(conf_svc, weekP, 'CO_WEEKDAY')