##データのインポート
日本国内の新型コロナウイルス (COVID-19) 感染状況追跡ダッシュボードのGit hubからデータを取得します

[Git hub](https://github.com/reustle/covid19japan)

In [2]:
%sh 
wget -N -P /dbfs/japan_covid19/patient_data/ https://data.covid19japan.com/patient_data/latest.json


##データの読み取り

In [4]:
#パス設定
jsonCovid19Path = "/japan_covid19/patient_data/"

In [5]:
df = spark.read.option("multiline","true").json(jsonCovid19Path)
df.printSchema()
display(df)

ageBracket,charterFlightPassenger,cityPrefectureNumber,confirmedPatient,cruisePassengerDisembarked,dateAnnounced,deceasedDate,detectedCityTown,detectedPrefecture,gender,knownCluster,mhlwPatientNumber,notes,patientId,patientStatus,prefecturePatientNumber,prefectureSourceURL,relatedPatients,residence,sourceURL
30,,,True,,2020-01-15,,,Kanagawa,M,,1,Recovered (as of 2/28),15,Recovered,Kanagawa#1,https://www.pref.kanagawa.jp/docs/ga4/bukanshi/occurrence.html,,,
40,,,True,,2020-01-24,,,Tokyo,M,,2,Chinese traveler,TOK1,Recovered,Tokyo#1,,,"Wuhan, China",https://www.mhlw.go.jp/stf/newpage_09531.html
30,,,True,,2020-01-25,,,Tokyo,F,,3,Chinese traveler,TOK2,Recovered,Tokyo#2,,,"Wuhan, China",https://www.mhlw.go.jp/stf/newpage_09531.html
40,,,True,,2020-01-26,,,Aichi,M,,4,Chinese traveler,18,,Aichi#1,https://www.pref.aichi.jp/uploaded/attachment/321138.pdf,28,"Wuhan, China",https://www.mhlw.go.jp/stf/newpage_09531.html
40,,,True,,2020-01-28,,,Aichi,M,,5,Chinese traveler,19,Hospitalized,Aichi#2,https://www.pref.aichi.jp/uploaded/attachment/321141.pdf,,"Wuhan, China",https://www.mhlw.go.jp/stf/newpage_09531.html
60,,,True,,2020-01-28,,,Nara,M,,6,Bus driver for tourism bus,20,,Nara#1,,2226,Nara,https://www.mhlw.go.jp/stf/newpage_09531.html
40,,,True,,2020-01-28,,,Hokkaido,F,,7,Chinese traveler,21,Discharged,Hokkaido#1,http://www.pref.hokkaido.lg.jp/hf/kth/kak/hasseijoukyou.htm#1/28,,"Wuhan, China",https://www.mhlw.go.jp/stf/newpage_09531.html
40,,,True,,2020-01-29,,,Osaka,F,,8 & 148,"Travel guide, Reinfected",OSK1,Hospitalized,Osaka#1,,Nara#1,Osaka,https://www.mhlw.go.jp/stf/newpage_09531.html
50,True,,True,,2020-01-30,,,Unspecified,M,Charter Flight,9,Japanese lives in China,1,Discharged,,,,"Wuhan, China",https://www.mhlw.go.jp/stf/newpage_09531.html
50,,,True,,2020-01-30,,,Mie,M,,10,"Returned from Wuhan 1/13, Discharged 2/17",23,Recovered,Mie#1,https://www.pref.mie.lg.jp/YAKUMUS/HP/m0068000071_00004.htm,,Mie,https://www.mhlw.go.jp/stf/newpage_09531.html


## deltaフォーマットでそのまま保存:Bronze

利用方法はparquet->deltaのみ

In [7]:
#パス設定
deltaCovid19Path_raw = "/japan_covid19/delta/rawdata/"
dbutils.fs.rm(deltaCovid19Path_raw, recurse=True)

In [8]:
#formatを変更するだけ
# df_silver.write.mode('overwrite').format('parquet').save(deltaCovid19Path_raw)
df.write.mode('overwrite').format('delta').save(deltaCovid19Path_raw)

## 整形加工:Silver

In [10]:
#パス設定
deltaCovid19Path_silver = "/japan_covid19/delta/silver/"
dbutils.fs.rm(deltaCovid19Path_silver, recurse=True)

In [11]:
from pyspark.sql.functions import to_date,month

jpnCovid = spark.read.format('delta').load(deltaCovid19Path_raw)
#日付型を修正
df_date = jpnCovid.withColumn('date',to_date('dateAnnounced', "yyyy-MM-dd"))
df_silver = df_date.select("date", "detectedPrefecture","ageBracket")
df_silver_3M = df_silver.filter(month("date") < 4)
df_silver_3M.createOrReplaceTempView("df_silver_3M")

#追加用データセット
df_silver_add = df_silver.filter(month("date") > 3)
df_silver_add.createOrReplaceTempView("df_silver_add")

In [12]:
%sql 
-- Current example is creating a new table instead of in-place import so will need to change this code
DROP TABLE IF EXISTS jpnCovid_silver;

CREATE TABLE jpnCovid_silver
USING delta
LOCATION '/japan_covid19/delta/silver'
AS 
SELECT
  *
FROM 
  df_silver_3M


In [13]:
display(\
  spark.sql(f"SELECT * FROM jpnCovid_silver order by date desc limit 5")
)

date,detectedPrefecture,ageBracket
2020-03-31,Aichi,60
2020-03-31,Kumamoto,70
2020-03-31,Aichi,50
2020-03-31,Hyogo,30
2020-03-31,Fukuoka,60


## ストリーミングとバッチの統合
デルタレイク内のテーブルは、バッチテーブルだけでなく、ストリーミングソースとシンクです。 ストリーミングデータインジェスト、バッチ履歴バックフィル、対話型クエリはすべてすぐに利用可能です。

## 集計の可視化:Gold

In [16]:
%sql 
SELECT
  date,
  detectedPrefecture,
  COUNT(1)CT,
  SUM(COUNT(1)) OVER(PARTITION BY(detectedPrefecture) ORDER BY Date) AS ACUM_CT
FROM 
  jpnCovid_silver
WHERE 
  detectedPrefecture in("Tokyo","Osaka","Aichi")
GROUP BY
  date,
  detectedPrefecture

ORDER BY 
  4 DESC

date,detectedPrefecture,CT,ACUM_CT
2020-03-31,Tokyo,80,529
2020-03-30,Tokyo,13,449
2020-03-29,Tokyo,70,436
2020-03-28,Tokyo,66,366
2020-03-27,Tokyo,40,300
2020-03-26,Tokyo,48,260
2020-03-31,Osaka,28,244
2020-03-30,Osaka,8,216
2020-03-25,Tokyo,41,212
2020-03-29,Osaka,17,208


## ストリームデータの確認

テーブルにバッチでデータが追加される状態をストリームで観察可能

In [18]:
stream = spark.readStream.format("delta").load(deltaCovid19Path_silver)
stream.createOrReplaceTempView("streamview")

In [19]:
%sql 
SELECT
  date,
  detectedPrefecture,
  COUNT(1)CT
FROM 
  streamview
WHERE 
  detectedPrefecture in("Tokyo","Osaka","Aichi")
GROUP BY
  date,
  detectedPrefecture



date,detectedPrefecture,CT
2020-02-23,Aichi,2
2020-03-31,Tokyo,80
2020-02-28,Osaka,2
2020-02-24,Tokyo,3
2020-03-28,Aichi,4
2020-02-14,Aichi,1
2020-04-01,Tokyo,66
2020-03-20,Aichi,6
2020-01-25,Tokyo,1
2020-01-28,Aichi,1


In [20]:
df1 = sql("select distinct date from df_silver_add order by date")
datelist = [row.date for row in df1.collect()]
len(datelist)
print(datelist)

In [21]:
import time 
datelist_len = len(datelist) - 1

i = 0
while (i < datelist_len+1):
  date_item = datelist[i]
  print() 
  insert_sql = "insert into jpnCovid_silver select * from df_silver_add where date =" + "'" + date_item.strftime('%Y-%m-%d') + "'"
  dfsql = sql(insert_sql)
  print(date_item.strftime('%Y-%m-%d') +" data inserted ")
  time.sleep(2)
  i = i + 1

In [22]:
%fs ls /japan_covid19/delta/silver/_delta_log/

path,name,size
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000000.crc,00000000000000000000.crc,89
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000000.json,00000000000000000000.json,1364
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000001.crc,00000000000000000001.crc,89
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000001.json,00000000000000000001.json,801
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000002.crc,00000000000000000002.crc,89
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000002.json,00000000000000000002.json,799
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000003.crc,00000000000000000003.crc,89
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000003.json,00000000000000000003.json,801
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000004.crc,00000000000000000004.crc,90
dbfs:/japan_covid19/delta/silver/_delta_log/00000000000000000004.json,00000000000000000004.json,801


## Upserts や deleteをサポート

**Note**: Delta Lake 0.3.0でリリース

Delta Lake は、UPDATE、DELETE、MERGE INTO を含む標準的な DML をサポートしており、従来のファイルフォーマットでは難しかったビッグデータセットを管理するためのより多くの操作を提供します。

## DELETE

In [25]:
%sql
DELETE FROM jpnCovid_silver WHERE detectedPrefecture = 'Tokyo'

In [26]:
%sql 
SELECT
  date,
  detectedPrefecture,
  COUNT(1)CT,
  SUM(COUNT(1)) OVER(PARTITION BY(detectedPrefecture) ORDER BY Date) AS ACUM_CT
FROM 
  jpnCovid_silver
WHERE 
  detectedPrefecture in("Tokyo","Osaka","Aichi")
GROUP BY
  date,
  detectedPrefecture

ORDER BY 
  date asc

date,detectedPrefecture,CT,ACUM_CT
2020-01-26,Aichi,1,1
2020-01-28,Aichi,1,2
2020-01-29,Osaka,1,1
2020-02-14,Aichi,1,3
2020-02-15,Aichi,1,4
2020-02-16,Aichi,2,6
2020-02-18,Aichi,1,7
2020-02-19,Aichi,1,8
2020-02-20,Aichi,1,9
2020-02-21,Aichi,2,11


## UPDATE

In [28]:
%sql
UPDATE jpnCovid_silver SET detectedPrefecture = 'NEW_Osaka' WHERE detectedPrefecture = 'Osaka'

In [29]:
%sql 
SELECT
  date,
  detectedPrefecture,
  COUNT(1)CT,
  SUM(COUNT(1)) OVER(PARTITION BY(detectedPrefecture) ORDER BY Date) AS ACUM_CT
FROM 
  jpnCovid_silver
WHERE 
  detectedPrefecture in("Tokyo","Osaka","Aichi","NEW_Osaka")
GROUP BY
  date,
  detectedPrefecture

ORDER BY 
  date asc

date,detectedPrefecture,CT,ACUM_CT
2020-01-26,Aichi,1,1
2020-01-28,Aichi,1,2
2020-01-29,NEW_Osaka,1,1
2020-02-14,Aichi,1,3
2020-02-15,Aichi,1,4
2020-02-16,Aichi,2,6
2020-02-18,Aichi,1,7
2020-02-19,Aichi,1,8
2020-02-20,Aichi,1,9
2020-02-21,Aichi,2,11


## MERGE INTO 
新規行の追加および既存行の更新(Upsert)が可能です

In [31]:
# testデータ投入
items = [('2020-05-01', 'batch',5), ('2020-05-02', 'batch',20)]
cols = ['date', 'detectedPrefecture','ageBracket']
tempdf = spark.createDataFrame(items, cols)
insert_table = tempdf.withColumn('date',to_date('date', "yyyy-MM-dd"))
insert_table.createOrReplaceTempView("insert_table")
# display(spark.sql("select * from insert_table"))

spark.sql("INSERT INTO jpnCovid_silver SELECT * FROM insert_table")

date,detectedPrefecture,ageBracket
2020-05-01,batch,5
2020-05-02,batch,20


In [32]:
%sql
--元テーブルに追加されていることを確認
SELECT * FROM jpnCovid_silver WHERE detectedPrefecture ="batch"

date,detectedPrefecture,ageBracket
2020-05-01,batch,5
2020-05-02,batch,20


In [33]:
#マージ用データ作成
items = [('2020-05-01', 'batch',50), ('2020-05-03', 'batch',None)]
cols = ['date', 'detectedPrefecture','ageBracket']
tempdf = spark.createDataFrame(items, cols)
merge_table = tempdf.withColumn('date',to_date('date', "yyyy-MM-dd"))
merge_table.createOrReplaceTempView("merge_table")
display(spark.sql("SELECT * FROM merge_table"))

date,detectedPrefecture,ageBracket
2020-05-01,batch,50.0
2020-05-03,batch,


In [34]:
%sql
--条件に基づいてUpsertを行います。マッチしたらDELETEも可能。

MERGE INTO jpnCovid_silver as d
USING merge_table as m
on d.date = m.date
and d.detectedPrefecture = m.detectedPrefecture
WHEN MATCHED THEN 
  UPDATE SET *
WHEN NOT MATCHED 
  THEN INSERT *

In [35]:
%sql
--既存行の年代が修正され、新規行も追加されています
SELECT * FROM jpnCovid_silver WHERE detectedPrefecture ="batch"

date,detectedPrefecture,ageBracket
2020-05-01,batch,50.0
2020-05-02,batch,20.0
2020-05-03,batch,


## Schema Evolution
デルタテーブルは自動で列の検証を行い、安全な行追加が可能です。

In [37]:
#追加用データセットを先ほど削除した東京のデータで作成します。性別の列を追加しておきます。
df_new_Tokyo = df_date.select("date", "detectedPrefecture","ageBracket","Gender")\
  .filter("detectedPrefecture = 'Tokyo'")

In [38]:
# Deltaテーブルは列定義が一致しないデータの追加を禁止します。今までは性別がテーブル上になかったのでappendが禁止されます
df_new_Tokyo.write.format("delta").mode("append").save(deltaCovid19Path_silver)

In [39]:
# Optionを指定することで新しい列定義をマージ可能です
df_new_Tokyo.write.option("mergeSchema","true").format("delta").mode("append").save(deltaCovid19Path_silver)

In [40]:
%sql
--先ほど削除された東京のデータが性別の列とともに追加されていることを確認します。

SELECT
  Gender,
  detectedPrefecture,
  COUNT(1)CT
FROM 
  jpnCovid_silver
WHERE 
  detectedPrefecture in("Tokyo")
GROUP BY
  detectedPrefecture,
  Gender
ORDER BY 
  1

Gender,detectedPrefecture,CT
,Tokyo,647
F,Tokyo,975
M,Tokyo,1611


## Time Travel
テーブルのバージョン番号あるいはTimestampからテーブルの状態を再生できます。

In [42]:
%sql
DESCRIBE HISTORY jpnCovid_silver

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend
25,2020-04-22T09:17:40.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3786880841438384),0201-031853-reals908,24.0,WriteSerializable,True
24,2020-04-22T09:16:16.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,MERGE,Map(predicate -> ((d.`date` = m.`date`) AND (d.`detectedPrefecture` = m.`detectedPrefecture`))),,List(3786880841438384),0201-031853-reals908,23.0,WriteSerializable,False
23,2020-04-22T09:15:25.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3786880841438384),0201-031853-reals908,22.0,WriteSerializable,True
22,2020-04-22T09:15:08.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,UPDATE,Map(predicate -> (detectedPrefecture#176346 = Osaka)),,List(3786880841438384),0201-031853-reals908,21.0,WriteSerializable,False
21,2020-04-22T09:14:08.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,DELETE,"Map(predicate -> [""(default.jpncovid_silver.`detectedPrefecture` = 'Tokyo')""])",,List(3786880841438384),0201-031853-reals908,20.0,WriteSerializable,False
20,2020-04-22T08:55:41.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3786880841438384),0201-031853-reals908,19.0,WriteSerializable,True
19,2020-04-22T08:55:33.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3786880841438384),0201-031853-reals908,18.0,WriteSerializable,True
18,2020-04-22T08:55:25.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3786880841438384),0201-031853-reals908,17.0,WriteSerializable,True
17,2020-04-22T08:55:17.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3786880841438384),0201-031853-reals908,16.0,WriteSerializable,True
16,2020-04-22T08:55:10.000+0000,5897928613386536,ryoma.nagata@avantcorp.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(3786880841438384),0201-031853-reals908,15.0,WriteSerializable,True


In [43]:
%sql
--3月のデータのみのバージョンを再生します
SELECT
  date,
  detectedPrefecture,
  COUNT(1)CT,
  SUM(COUNT(1)) OVER(PARTITION BY(detectedPrefecture) ORDER BY Date) AS ACUM_CT
FROM 
  jpnCovid_silver VERSION AS OF 0 
WHERE 
  detectedPrefecture in("Tokyo")
GROUP BY
  date,
  detectedPrefecture
ORDER BY 
  date asc

date,detectedPrefecture,CT,ACUM_CT
2020-01-24,Tokyo,1,1
2020-01-25,Tokyo,1,2
2020-01-30,Tokyo,1,3
2020-02-13,Tokyo,2,5
2020-02-14,Tokyo,2,7
2020-02-15,Tokyo,7,14
2020-02-16,Tokyo,4,18
2020-02-18,Tokyo,3,21
2020-02-19,Tokyo,3,24
2020-02-21,Tokyo,3,27
