## 코로나 API

In [16]:
import pyarrow
import requests
import xmltodict
import time
import pandas as pd
import re
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

from pymongo import MongoClient
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as f

In [136]:
# coronaAPI = spark.read.parquet("hdfs://localhost:9000/data/corona")

In [137]:
url_decide_base = "http://openapi.data.go.kr/openapi/service/rest/Covid19/getCovid19InfStateJson"
url_decide_serviceKey = "S8%2Ftx%2BhEP7bZDZI%2By0P1ZKvPuHpx%2BVUKpt6ay8faxnxR%2FTRO9M5UAy8%2BafhJBNVzQG%2Fgwoym2S4Xbe1dUXivUw%3D%3D"

In [138]:
# corona open api 가져오기
url_pages = "1000" #페이지당열갯수
url_start_date = "20200303" #시작날짜
url_end_date = datetime.today().strftime("%Y%m%d%H%M%S")[:8] #끝날짜
url = url_decide_base + "?serviceKey=" + url_decide_serviceKey + "&pageNo=1&numOfRows=" + url_pages + "&startCreateDt="+ url_start_date + "&endCreateDt=" + url_end_date

# type 변환
req = requests.get(url).content
xmlObject = xmltodict.parse(req)
dict_data = xmlObject['response']['body']['items']['item']

# Dataframe으로 변환 후 전처리 진행
dfDecide = pd.DataFrame(dict_data)
dfDecide.drop(['createDt', 'seq', 'stateTime', 'updateDt', 'resutlNegCnt'], axis=1, inplace=True)
dfDecide.sort_values(['stateDt'], ascending=False, inplace=True)
dfDecide['stateDt'] = pd.to_datetime(dfDecide['stateDt']) - pd.DateOffset(days=1) # 당일 들어오는 데이터는 실제로 전날에 발생한 데이터
dfDecide = dfDecide.astype({"accExamCnt":"int64", "accExamCompCnt":"int64", "careCnt":"int64", "clearCnt":"int64", "deathCnt":"int64", "decideCnt":"int64", "examCnt":"int64", "accDefRate":"float64"}).copy()
dfDecide['newDecideCnt'] = 0

# 새로운 컬럼 생성
for i in range(len(dfDecide)-1):
    dfDecide['newDecideCnt'][i] = int(dfDecide.iloc[i]['decideCnt']) - int(dfDecide.iloc[i+1]['decideCnt'])
    
# Dataframe 형태 정리    
dfDecide = dfDecide[['stateDt', 'newDecideCnt', 'examCnt', 'decideCnt', 'deathCnt', 'clearCnt', 'careCnt', 'accDefRate', 'accExamCnt', 'accExamCompCnt']].copy()
dfDecide.rename(columns={'stateDt':'날짜', 'newDecideCnt':'당일확진자수', 'examCnt':'검사진행수', 'decideCnt':'누적확진자수',\
                         'deathCnt':'누적사망자수', 'clearCnt':'누적격리해제수', 
                         'careCnt':'치료중환자수', 'accDefRate':'누적확진률', 'accExamCnt':'누적검사수',\
                         'accExamCompCnt':'누적검사완료수'}, inplace=True)
dfDecide

Unnamed: 0,날짜,당일확진자수,검사진행수,누적확진자수,누적사망자수,누적격리해제수,치료중환자수,누적확진률,누적검사수,누적검사완료수
0,2021-09-28,2883,1137617,308725,2474,272724,33527,2.312955,14485260,13347643
1,2021-09-27,2289,1135394,305842,2464,270928,32450,2.301263,14425573,13290179
2,2021-09-26,2381,1120111,303553,2456,269132,31965,2.291688,14365935,13245824
3,2021-09-25,2771,1096862,301172,2450,268140,30582,2.276591,14325938,13229076
4,2021-09-24,3269,1056223,298401,2441,266414,29546,2.255638,14285339,13229116
...,...,...,...,...,...,...,...,...,...,...
571,2020-03-06,483,19620,6767,44,118,6134,4.267543,178189,158569
572,2020-03-05,518,21832,6284,42,108,5643,4.397235,164740,142908
573,2020-03-04,438,21810,5766,35,88,5255,4.622748,146541,124731
574,2020-03-03,516,28414,5328,32,41,4750,4.919986,136707,108293


In [139]:
#dfDecide.to_parquet('corona')

In [14]:
# spark 연결
spark = SparkSession \
    .builder \
    .appName("coronaAPI") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017") \
    .config("spark.mongodb.input.database","ojo_db") \
    .config("spark.mongodb.input.collection", "coronaAPI") \
    .config("packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1").getOrCreate()
sc =spark.sparkContext

# # pymongo connect
# client = MongoClient('localhost',27017) # mongodb 27017 port
# db = client.ojo_db

# # mognodb에 넣기위한 json 형태 변환 후 mongodb collection(corona)로 적재
# for k in range(len(dfDecide)):
#     tmp ={}
#     for key,value in zip(dfDecide.columns,dfDecide.values[k]):
#         tmp[key] = value
#     db.coronaAPI.insert_one(tmp)

### 지역별 확진자 수

In [22]:
url_district_base = "http://openapi.data.go.kr/openapi/service/rest/Covid19/getCovid19SidoInfStateJson"
url_district_serviceKey = "S8%2Ftx%2BhEP7bZDZI%2By0P1ZKvPuHpx%2BVUKpt6ay8faxnxR%2FTRO9M5UAy8%2BafhJBNVzQG%2Fgwoym2S4Xbe1dUXivUw%3D%3D"

# corona open api 가져오기
nowtime = datetime.today().strftime("%Y-%m-%d")
url_pages = "1000" #페이지당열갯수
url_start_date = datetime.today().strftime("%Y%m%d%H%M%S")[:8]#"20200303" #시작날짜
url_end_date = datetime.today().strftime("%Y%m%d%H%M%S")[:8] #끝날짜



# open api 가져오기
url_district= url_district_base + "?serviceKey=" + url_district_serviceKey + "&pageNo=1&numOfRows=" + url_pages + "&startCreateDt="+ url_start_date + "&endCreateDt=" + url_end_date
# type 변환 
req = requests.get(url_district).content
xmlObject = xmltodict.parse(req)
dict_data = xmlObject['response']['body']['items']['item']

# 전처리
dfDistrict = pd.DataFrame(dict_data)

# 합계, 검역 삭제 후 dataframe 내에서 index 다시 0부터 차례대로 지정
dfDistrict = dfDistrict[(dfDistrict['gubun'] != '합계') & (dfDistrict['gubun'] != '검역')].reset_index(drop=True).copy()
dfDistrict.drop(['stdDay', 'updateDt', 'seq', 'qurRate', 'gubunCn'], axis=1, inplace=True)
dfDistrict = dfDistrict.astype({"createDt":"datetime64[ns]"}).copy()
# dfDistrict['createDt'] = dfDistrict['createDt'] - pd.DateOffset(days=2)
dfDistrict['stateDt'] = dfDistrict['createDt'].dt.date         # YYYY-MM-DD(문자)
dfDistrict.drop(['createDt'], axis=1, inplace=True)
dfDistrict.drop(['gubunEn'], axis=1, inplace=True)

# type 변환
dfDistrict = dfDistrict.astype({"stateDt":"datetime64[ns]", "defCnt":"int64", "deathCnt":"int64", "incDec":"int64"}).copy()
dfDistrict = dfDistrict[['stateDt', 'gubun', 'defCnt', 'deathCnt', 'incDec', 'isolClearCnt', 'isolIngCnt', 'localOccCnt', 'overFlowCnt']].copy()

# null값 처리
dfDistrict['isolClearCnt'] = dfDistrict['isolClearCnt'].fillna(0).astype("int64")
dfDistrict['isolIngCnt'] = dfDistrict['isolIngCnt'].fillna(0).astype("int64")
dfDistrict['localOccCnt'] = dfDistrict['localOccCnt'].fillna(0).astype("int64")
dfDistrict['overFlowCnt'] = dfDistrict['overFlowCnt'].fillna(0).astype("int64")

# Dataframe 형태 정리
dfDistrict = dfDistrict.astype({"isolClearCnt":"int64", "isolIngCnt":"int64", "localOccCnt":"int64", "overFlowCnt":"int64"}).copy()
dfDistrict.rename(columns={'stateDt':'날짜', 'gubun':'Area', 'defCnt':'확진자수', 'deathCnt':'사망자수', 'incDec':'전일대비증감수',
                           'isolClearCnt':'격리해제수', 'isolIngCnt':'격리중환자수', 'localOccCnt':'지역발생수', 
                           'overFlowCnt':'해외유입수'}, inplace=True)

dfDistrict.to_parquet('coronaAPI_'+ nowtime)

In [21]:
coronaAPI = spark.read.parquet("hdfs://localhost:9000/data/coronaAPI_2021-09-29")
coronaAPI.show()

+-------------------+----+--------+--------+--------------+----------+------------+----------+----------+
|               날짜|Area|확진자수|사망자수|전일대비증감수|격리해제수|격리중환자수|지역발생수|해외유입수|
+-------------------+----+--------+--------+--------------+----------+------------+----------+----------+
|2021-09-29 09:00:00|제주|    2860|       2|             5|      2767|          91|         5|         0|
|2021-09-29 09:00:00|경남|   11393|      37|            84|     10790|         566|        81|         3|
|2021-09-29 09:00:00|경북|    8271|      91|            88|      7594|         586|        88|         0|
|2021-09-29 09:00:00|전남|    3060|      21|            23|      2833|         206|        23|         0|
|2021-09-29 09:00:00|전북|    4480|      61|            41|      3969|         450|        41|         0|
|2021-09-29 09:00:00|충남|    9073|      56|            77|      7953|        1064|        75|         2|
|2021-09-29 09:00:00|충북|    6284|      77|            84|      5656|         551|        83|    

### 코로나 단계

In [21]:
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from pyvirtualdisplay import Display

from datetime import datetime, timedelta
from pymongo import MongoClient
import pandas as pd 
import numpy as np
import json


# pymongo connect
client = MongoClient('localhost',27017) # mongodb 27017 port
db = client.ojo_db

display = Display(visible=0, size=(1024, 768)) 
display.start()
 
path = '/home/ubuntu/chromedriver' 
driver = webdriver.Chrome(path)


driver.get('http://ncov.mohw.go.kr/regSocdisBoardView.do')
info_df = pd.DataFrame(columns=("Area","Stage","Description"))
idx = 0

for i in range(1,18):
    location = driver.find_element_by_xpath(f'//*[@id="main_maplayout"]/button[{i}]')
    location.click()
    
    area = driver.find_element_by_xpath(f'//*[@id="step_map_city{i}"]/h3').text
    stage =  driver.find_element_by_xpath(f'//*[@id="step_map_city{i}"]/h4').text
    description = driver.find_element_by_xpath(f'//*[@id="step_map_city{i}"]/p').text
    
    # 확인용
    area_info = [area,stage,description]
    info_df.loc[idx] = area_info
    idx += 1
    
    driver.implicitly_wait(3)
    
info_df.to_parquet('coronaStage_'+ nowtime)

In [17]:
# info_df = pd.DataFrame(columns=("Area","Stage","Description"))

# nowtime = datetime.today().strftime("%Y-%m-%d")

# dfDistrict.to_parquet('coronaAPI_'+ nowtime)
# info_df.to_parquet('coronaStage_'+ nowtime)

In [None]:
nowtime = datetime.today().strftime("%Y-%m-%d")

df_coronaAPI = pd.DataFrame(xmlObject['response']['body']['items']['item'])
    
driver.get('http://ncov.mohw.go.kr/regSocdisBoardView.do')
df_coronaCrawling = pd.DataFrame(columns=("Area","Stage","Description"))

df_coronaAPI.to_parquet('coronaAPI_'+ nowtime)
df_coronaCrawling.to_parquet('coronaStage_'+ nowtime)

In [144]:
print(type(info_df))
print(type(dfDistrict))

<class 'pandas.core.frame.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


In [15]:
coronaStage = spark.read.parquet("hdfs://localhost:9000/data/coronaStage_2021-09-29")
#coronaStage.show()

In [None]:
df = df.drop('날짜')

In [17]:
coronaStage.show()

+----+------+-----------------------------+-----------------+
|Area| Stage|                  Description|__index_level_0__|
+----+------+-----------------------------+-----------------+
|서울|4 단계|  - 서울 전지역 4단계 (21....|                0|
|부산|3 단계|  - 부산 전지역 3단계 (21....|                1|
|대구|3 단계|  - 대구 전지역 3단계 (21....|                2|
|인천|4 단계| - 인천 일부지역 4단계 (21...|                3|
|광주|3 단계|  - 광주 전지역 3단계 (21....|                4|
|대전|3 단계|  - 대전 전지역 3단계 (21....|                5|
|울산|3 단계|  - 울산 전지역 3단계 (21....|                6|
|세종|3 단계|  - 세종 전지역 3단계 (21....|                7|
|경기|4 단계|  - 경기 전지역 4단계 (21....|                8|
|강원|3 단계|- 춘천시, 원주시, 강릉시, ...|                9|
|충북|3 단계|  - 충북 전지역 3단계 (21....|               10|
|충남|3 단계|  - 충남 전지역 3단계 (21....|               11|
|전북|3 단계| - 전북 일부지역 3단계 (21...|               12|
|전남|3 단계|  - 전남 전지역 3단계 (21....|               13|
|경북|3 단계| - 경북 일부지역 3단계 (21...|               14|
|경남|3 단계|  - 경남 전지역 3단계 (21....|               15|
|제주

### 코로나 지역별 확진자 + 단계

In [57]:
df_INNER_JOIN = pd.merge(dfDistrict, info_df, left_on='Area', right_on='Area', how='left')
df_INNER_JOIN[0:17]
print(type(df_INNER_JOIN))

<class 'pandas.core.frame.DataFrame'>


In [47]:
# mognodb에 넣기위한 json 형태 변환 후 mongodb collection(corona)로 적재
for k in range(len(df_INNER_JOIN)):
    tmp ={}
    for key,value in zip(df_INNER_JOIN.columns,df_INNER_JOIN.values[k]):
        tmp[key] = value
    db.corona_test.insert_one(tmp)

In [26]:
disp = spark.read.parquet("hdfs://localhost:9000/data/corona/coronaAPI_2021-09-30")
disp_st = spark.read.parquet("hdfs://localhost:9000/data/corona/coronaStage_2021-09-30")

df = disp.select('날짜','area','확진자수')

In [27]:
df1 = df.select('날짜')

In [28]:
df1.withColumnRenamed

<bound method DataFrame.withColumnRenamed of DataFrame[날짜: timestamp]>

In [29]:
df1

DataFrame[날짜: timestamp]

In [30]:
df.select(f.to_date(df['날짜'])).show()

+-------------+
|to_date(날짜)|
+-------------+
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
|   2021-09-30|
+-------------+



In [63]:
from pyspark.conf import SparkConf 
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark
import pyspark.sql.functions as f 
from pymongo import MongoClient

spark = SparkSession.builder.appName("example-pyspark-read-and-write").getOrCreate()

disp = spark.read.parquet("hdfs://localhost:9000/data/corona/coronaAPI_2021-09-30")
disp_st = spark.read.parquet("hdfs://localhost:9000/data/corona/coronaStage_2021-09-30")

df = disp.select('날짜','area','확진자수')
df = df.withColumn('date',f.to_date(df['날짜']))
df.date.cast(StringType())
df = df.drop('날짜')


In [64]:
df = df.withColumnRenamed("date", "날짜")

In [65]:
disp_st = disp_st.drop('__index_level_0__')

In [66]:
disp_st = disp_st.withColumnRenamed("Stage", "거리두기단계")

In [67]:
df.show()

+----+--------+----------+
|area|확진자수|      날짜|
+----+--------+----------+
|제주|    2875|2021-09-30|
|경남|   11461|2021-09-30|
|경북|    8323|2021-09-30|
|전남|    3084|2021-09-30|
|전북|    4515|2021-09-30|
|충남|    9133|2021-09-30|
|충북|    6345|2021-09-30|
|강원|    6437|2021-09-30|
|경기|   90130|2021-09-30|
|세종|    1239|2021-09-30|
|울산|    5165|2021-09-30|
|대전|    6985|2021-09-30|
|광주|    4989|2021-09-30|
|인천|   15802|2021-09-30|
|대구|   15499|2021-09-30|
|부산|   12721|2021-09-30|
|서울|  100495|2021-09-30|
+----+--------+----------+



In [68]:
disp_st.show()

+----+------------+-----------------------------+
|Area|거리두기단계|                  Description|
+----+------------+-----------------------------+
|서울|      4 단계|  - 서울 전지역 4단계 (21....|
|부산|      3 단계|  - 부산 전지역 3단계 (21....|
|대구|      3 단계|  - 대구 전지역 3단계 (21....|
|인천|      4 단계| - 인천 일부지역 4단계 (21...|
|광주|      3 단계|  - 광주 전지역 3단계 (21....|
|대전|      3 단계|  - 대전 전지역 3단계 (21....|
|울산|      3 단계|  - 울산 전지역 3단계 (21....|
|세종|      3 단계|  - 세종 전지역 3단계 (21....|
|경기|      4 단계|  - 경기 전지역 4단계 (21....|
|강원|      3 단계|- 춘천시, 원주시, 강릉시, ...|
|충북|      3 단계|  - 충북 전지역 3단계 (21....|
|충남|      3 단계|  - 충남 전지역 3단계 (21....|
|전북|      3 단계| - 전북 일부지역 3단계 (21...|
|전남|      3 단계|  - 전남 전지역 3단계 (21....|
|경북|      3 단계| - 경북 일부지역 3단계 (21...|
|경남|      3 단계|  - 경남 전지역 3단계 (21....|
|제주|      3 단계|  - 제주 전지역 3단계 (21....|
+----+------------+-----------------------------+



In [69]:
df_corona = df.join(disp_st, on=['Area'], how='left_outer')

In [70]:
df_corona = df_corona.withColumnRenamed("area", "지역")

In [71]:
df_corona.show()

+----+--------+----------+------------+-----------------------------+
|지역|확진자수|      날짜|거리두기단계|                  Description|
+----+--------+----------+------------+-----------------------------+
|제주|    2875|2021-09-30|      3 단계|  - 제주 전지역 3단계 (21....|
|경남|   11461|2021-09-30|      3 단계|  - 경남 전지역 3단계 (21....|
|경북|    8323|2021-09-30|      3 단계| - 경북 일부지역 3단계 (21...|
|전남|    3084|2021-09-30|      3 단계|  - 전남 전지역 3단계 (21....|
|전북|    4515|2021-09-30|      3 단계| - 전북 일부지역 3단계 (21...|
|충남|    9133|2021-09-30|      3 단계|  - 충남 전지역 3단계 (21....|
|충북|    6345|2021-09-30|      3 단계|  - 충북 전지역 3단계 (21....|
|강원|    6437|2021-09-30|      3 단계|- 춘천시, 원주시, 강릉시, ...|
|경기|   90130|2021-09-30|      4 단계|  - 경기 전지역 4단계 (21....|
|세종|    1239|2021-09-30|      3 단계|  - 세종 전지역 3단계 (21....|
|울산|    5165|2021-09-30|      3 단계|  - 울산 전지역 3단계 (21....|
|대전|    6985|2021-09-30|      3 단계|  - 대전 전지역 3단계 (21....|
|광주|    4989|2021-09-30|      3 단계|  - 광주 전지역 3단계 (21....|
|인천|   15802|2021-09-30|      4 단계| - 인천 

In [72]:
new_df = df_corona.toJSON().map(lambda x: json.loads(x)).collect()

In [131]:
#print('new_df', type(new_df[0]), new_df)

In [132]:
print(new_df[0])

{'지역': '제주', '확진자수': 2860, '날짜': '2021-09-29', '거리두기단계': '3 단계', '상세내용': '- 제주 전지역 3단계 (21.9.23~10.3.)'}


In [74]:
new_df

[{'지역': '제주',
  '확진자수': 2875,
  '날짜': '2021-09-30',
  '거리두기단계': '3 단계',
  'Description': '- 제주 전지역 3단계 (21.9.23~10.3.)',
  '_id': ObjectId('615543f56cf33354fb1a7e58')},
 {'지역': '경남',
  '확진자수': 11461,
  '날짜': '2021-09-30',
  '거리두기단계': '3 단계',
  'Description': '- 경남 전지역 3단계 (21.9.6~10.3.)',
  '_id': ObjectId('615543f56cf33354fb1a7e59')},
 {'지역': '경북',
  '확진자수': 8323,
  '날짜': '2021-09-30',
  '거리두기단계': '3 단계',
  'Description': '- 경북 일부지역 3단계 (21.9.6~10.3.)\n- 성주군 선남면 3단계 (21.9.6~10.3.)\n- 문경시, 상주시 2단계 (21.9.6~10.3.)\n- 군위군, 의성군, 청송군, 영양군, 영덕군, 청도군, 고령군, 성주군(선남면 제외), 예천군, 봉화군, 울릉군, 울진군 1단계 (21.9.6~10.3.)',
  '_id': ObjectId('615543f56cf33354fb1a7e5a')},
 {'지역': '전남',
  '확진자수': 3084,
  '날짜': '2021-09-30',
  '거리두기단계': '3 단계',
  'Description': '- 전남 전지역 3단계 (21.9.6~10.3.)',
  '_id': ObjectId('615543f56cf33354fb1a7e5b')},
 {'지역': '전북',
  '확진자수': 4515,
  '날짜': '2021-09-30',
  '거리두기단계': '3 단계',
  'Description': '- 전북 일부지역 3단계 (21.9.6~10.3.)\n- 정읍시, 남원시, 김제시, 완주군, 진안군, 무주군, 장수군, 임실군, 순창군, 고창군, 부안군

In [73]:
for i in new_df:
    db.corona.insert_one(i)