In [0]:
# ライブラリのインストール
%pip install altair
%pip install vega-datasets

# 5. JHU COVID-19 データセットの分析

このノートブックは[2019 Novel Coronavirus COVID-19 (2019-nCoV) Data Repository by Johns Hopkins CSSE](https://github.com/CSSEGISandData/COVID-19)に対する簡単な処理、分析を行うためのものです。データは定期的に`/databricks-datasets/COVID/CSSEGISandData/`で更新されるので、直接データにアクセスすることができます。

[Altair: Declarative Visualization in Python — Altair 4\.1\.0 documentation](https://altair-viz.github.io/index.html)

In [0]:
# ユーザーごとに一意のパスになるようにユーザー名をパスに含めます
import re
from pyspark.sql.types import * 
import os

# Username を取得
username_raw = dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
# Username の英数字以外を除去し、全て小文字化。Username をファイルパスやデータベース名の一部で使用可能にするため。
username = re.sub('[^A-Za-z0-9]+', '', username_raw).lower()

print(username)

os.environ['username']=''.join(username)

In [0]:
# 標準ライブラリ
import io

# 外部ライブラリ
import requests
import numpy as np
import pandas as pd
import altair as alt
from vega_datasets import data

# 地理情報
topo_usa = 'https://vega.github.io/vega-datasets/data/us-10m.json'
topo_wa = 'https://raw.githubusercontent.com/deldersveld/topojson/master/countries/us-states/WA-53-washington-counties.json'
topo_king = 'https://raw.githubusercontent.com/johan/world.geo.json/master/countries/USA/WA/King.geo.json'

## `jhu_daily` テーブルの作成
* ソース: `/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/`
* COVID-19の日次レポートを格納

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, TimestampType
schema = StructType([
  StructField('FIPS', IntegerType(), True), 
  StructField('Admin2', StringType(), True),
  StructField('Province_State', StringType(), True),  
  StructField('Country_Region', StringType(), True),  
  StructField('Last_Update', TimestampType(), True),  
  StructField('Lat', DoubleType(), True),  
  StructField('Long_', DoubleType(), True),
  StructField('Confirmed', IntegerType(), True), 
  StructField('Deaths', IntegerType(), True), 
  StructField('Recovered', IntegerType(), True), 
  StructField('Active', IntegerType(), True),   
  StructField('Combined_Key', StringType(), True),  
  #StructField('Incident_Rate', DoubleType(), True),  
  #StructField('Case_Fatality_Ratio', DoubleType(), True),  
  StructField('process_date', DateType(), True),    
])

# 上記スキーマに基づいて空のSparkデータフレームを作成します
jhu_daily = spark.createDataFrame([], schema)

## それぞれのファイルに対するループ処理

以下のコードスニペットは、各ファイルに以下の処理を行います。
* 日付を特定するためにファイル名を抽出
* 時間と共にスキーマが変化しているので、それぞれのスキーマに応じてロジックを切り替えてデータを追加

> **注意**<br>
> データが日々更新されているため、スキーマを修正する必要性が出てくる場合があります。

In [0]:
import os
import pandas as pd
import glob
from pyspark.sql.functions import input_file_name, lit, col

# すべてのcsvファイルの一覧を作成
globbed_files = glob.glob("/dbfs/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/*.csv") 
#globbed_files = glob.glob("/dbfs/databricks-datasets/COVID/CSSEGISandData/csse_covid_19_data/csse_covid_19_daily_reports/04*.csv")

file_total = len(globbed_files)
i = 0
for csv in globbed_files:
  i = i + 1 

  # ファイル名
  source_file = csv[5:200]
  process_date = csv[100:104] + "-" + csv[94:96] + "-" + csv[97:99]
  
  print(f"{i}/{file_total} {source_file} {process_date}")

  # 一時データフレームにデータを読み込み
  df_tmp = spark.read.option("inferSchema", True).option("header", True).csv(source_file)
  df_tmp.createOrReplaceTempView("df_tmp")

  # スキーマの取得
  schema_txt = ' '.join(map(str, df_tmp.columns)) 
    
  # 3種類のスキーマ (2020-05-27時点) 
  schema_01 = "Province/State Country/Region Last Update Confirmed Deaths Recovered" # 01-22-2020 〜 02-29-2020
  schema_02 = "Province/State Country/Region Last Update Confirmed Deaths Recovered Latitude Longitude" # 03-01-2020 〜 03-21-2020
  schema_03 = "FIPS Admin2 Province_State Country_Region Last_Update Lat Long_ Confirmed Deaths Recovered Active Combined_Key" # 03-22-2020 以降
  schema_04 = "FIPS Admin2 Province_State Country_Region Last_Update Lat Long_ Confirmed Deaths Recovered Active Combined_Key Incident_Rate Case_Fatality_Ratio" # 2020-05-27時点で発見
  schema_05 = "FIPS Admin2 Province_State Country_Region Last_Update Lat Long_ Confirmed Deaths Recovered Active Combined_Key Incident_Rate Case-Fatality_Ratio" # 2020-05-27時点で発見
  schema_06 = "FIPS Admin2 Province_State Country_Region Last_Update Lat Long_ Confirmed Deaths Recovered Active Combined_Key Incidence_Rate Case-Fatality_Ratio" # 2020-05-27時点で発見
    
  # スキーマのタイプに基づいてデータを追加
  if (schema_txt == schema_01):
    df_tmp = (df_tmp
                .withColumn("FIPS", lit(None).cast(IntegerType()))
                .withColumn("Admin2", lit(None).cast(StringType()))
                .withColumn("Province_State", col("Province/State"))
                .withColumn("Country_Region", col("Country/Region"))
                .withColumn("Last_Update", col("Last Update"))
                .withColumn("Lat", lit(None).cast(DoubleType()))
                .withColumn("Long_", lit(None).cast(DoubleType()))
                .withColumn("Active", lit(None).cast(IntegerType()))
                .withColumn("Combined_Key", lit(None).cast(StringType()))
                #.withColumn("Incident_Rate", lit(None).cast(DoubleType()))
                #.withColumn("Case_Fatality_Ratio", lit(None).cast(DoubleType()))
                .withColumn("process_date", lit(process_date))
                .select("FIPS", 
                        "Admin2", 
                        "Province_State", 
                        "Country_Region", 
                        "Last_Update", 
                        "Lat", 
                        "Long_", 
                        "Confirmed", 
                        "Deaths", 
                        "Recovered", 
                        "Active", 
                        "Combined_Key", 
                        #"Incident_Rate", 
                        #"Case_Fatality_Ratio", 
                        "process_date")
               )
    jhu_daily = jhu_daily.union(df_tmp)
  elif (schema_txt == schema_02):
    df_tmp = (df_tmp
                .withColumn("FIPS", lit(None).cast(IntegerType()))
                .withColumn("Admin2", lit(None).cast(StringType()))
                .withColumn("Province_State", col("Province/State"))
                .withColumn("Country_Region", col("Country/Region"))
                .withColumn("Last_Update", col("Last Update"))
                .withColumn("Lat", col("Latitude"))
                .withColumn("Long_", col("Longitude"))
                .withColumn("Active", lit(None).cast(IntegerType()))
                .withColumn("Combined_Key", lit(None).cast(StringType()))
                #.withColumn("Incident_Rate", lit(None).cast(DoubleType()))
                #.withColumn("Case_Fatality_Ratio", lit(None).cast(DoubleType()))
                .withColumn("process_date", lit(process_date))
                .select("FIPS", 
                        "Admin2", 
                        "Province_State", 
                        "Country_Region", 
                        "Last_Update", 
                        "Lat", 
                        "Long_", 
                        "Confirmed", 
                        "Deaths", 
                        "Recovered", 
                        "Active", 
                        "Combined_Key", 
                        #"Incident_Rate", 
                        #"Case_Fatality_Ratio", 
                        "process_date")
               )
    jhu_daily = jhu_daily.union(df_tmp)

  elif (schema_txt == schema_03):
    df_tmp = (df_tmp
                #.withColumn("Incident_Rate", lit(None).cast(DoubleType()))
                #.withColumn("Case_Fatality_Ratio", lit(None).cast(DoubleType()))
                .withColumn("process_date", lit(process_date))
                .select("FIPS", 
                        "Admin2", 
                        "Province_State", 
                        "Country_Region", 
                        "Last_Update", 
                        "Lat", 
                        "Long_", 
                        "Confirmed", 
                        "Deaths", 
                        "Recovered", 
                        "Active", 
                        "Combined_Key", 
                        #"Incident_Rate", 
                        #"Case_Fatality_Ratio", 
                        "process_date")
             )
    jhu_daily = jhu_daily.union(df_tmp)
    
  elif (schema_txt == schema_04):
    df_tmp = (df_tmp.withColumn("process_date", lit(process_date))
                   .select("FIPS", 
                        "Admin2", 
                        "Province_State", 
                        "Country_Region", 
                        "Last_Update", 
                        "Lat", 
                        "Long_", 
                        "Confirmed", 
                        "Deaths", 
                        "Recovered", 
                        "Active", 
                        #"Combined_Key", 
                        #"Incident_Rate", 
                        "Case_Fatality_Ratio", 
                        "process_date")
             )
    
    jhu_daily = jhu_daily.union(df_tmp)
    
  elif (schema_txt == schema_05):
    df_tmp = (df_tmp.withColumn("process_date", lit(process_date))
                   #.withColumn("Case_Fatality_Ratio", col("Case-Fatality_Ratio"))
                    .select("FIPS", 
                        "Admin2", 
                        "Province_State", 
                        "Country_Region", 
                        "Last_Update", 
                        "Lat", 
                        "Long_", 
                        "Confirmed", 
                        "Deaths", 
                        "Recovered", 
                        "Active", 
                        "Combined_Key", 
                        #"Incident_Rate", 
                        #"Case_Fatality_Ratio", 
                        "process_date")
             )
    
    jhu_daily = jhu_daily.union(df_tmp)    

  elif (schema_txt == schema_06):
    df_tmp = (df_tmp.withColumn("process_date", lit(process_date))
                   #.withColumn("Incident_Rate", col("Incidence_Rate"))
                   #.withColumn("Case_Fatality_Ratio", col("Case-Fatality_Ratio"))
              .select("FIPS", 
                        "Admin2", 
                        "Province_State", 
                        "Country_Region", 
                        "Last_Update", 
                        "Lat", 
                        "Long_", 
                        "Confirmed", 
                        "Deaths", 
                        "Recovered", 
                        "Active", 
                        "Combined_Key", 
                        #"Incident_Rate", 
                        #"Case_Fatality_Ratio", 
                        "process_date")
             )
    
    jhu_daily = jhu_daily.union(df_tmp)    
    
  else:
    print(f"Schema may have changed: {schema_txt}")
    raise
  
  # 進捗表示
  #print("%s | %s" % (process_date, schema_txt))

In [0]:
jhu_daily.createOrReplaceTempView("jhu_daily")
display(jhu_daily)

FIPS,Admin2,Province_State,Country_Region,Last_Update,Lat,Long_,Confirmed,Deaths,Recovered,Active,Combined_Key,process_date
,,,Afghanistan,2021-01-02 05:22:33,33.93911,67.709953,51526,2191,41727.0,0.0,4.2522221790940495,2021-01-01
,,,Albania,2021-01-02 05:22:33,41.1533,20.1683,58316,1181,33634.0,23501.0,2.025173194320598,2021-01-01
,,,Algeria,2021-01-02 05:22:33,28.0339,1.6596,99897,2762,67395.0,29740.0,2.764847793227024,2021-01-01
,,,Andorra,2021-01-02 05:22:33,42.5063,1.5218,8117,84,7463.0,570.0,1.0348650979425895,2021-01-01
,,,Angola,2021-01-02 05:22:33,-11.2027,17.8739,17568,405,11146.0,6017.0,2.305327868852459,2021-01-01
,,,Antigua and Barbuda,2021-01-02 05:22:33,17.0608,-61.7964,159,5,148.0,6.0,3.1446540880503147,2021-01-01
,,,Argentina,2021-01-02 05:22:33,-38.4161,-63.6167,1629594,43319,1426676.0,159599.0,2.658269483073698,2021-01-01
,,,Armenia,2021-01-02 05:22:33,40.0691,45.0382,159738,2828,143355.0,13555.0,1.77039902840902,2021-01-01
,,Australian Capital Territory,Australia,2021-01-02 05:22:33,-35.4735,149.0124,118,3,114.0,1.0,2.542372881355932,2021-01-01
,,New South Wales,Australia,2021-01-02 05:22:33,-33.8688,151.2093,4947,54,0.0,1696.0,1.0915706488781078,2021-01-01


In [0]:
#%sh
#rm -fR /dbfs/tmp/$username/COVID/jhu_daily/

In [0]:
# # jhu_dailyテーブルの保存
# file_path = f'/tmp/{username}/COVID/jhu_daily/'
# jhu_daily.repartition(4).write.format("parquet").save(file_path)

## 2019年の人口推定値のダウンロード

In [0]:
%sh mkdir -p /dbfs/tmp/$username/COVID/population_estimates_by_county/ && wget -O /dbfs/tmp/$username/COVID/population_estimates_by_county/co-est2019-alldata.csv https://raw.githubusercontent.com/databricks/tech-talks/master/datasets/co-est2019-alldata.csv && ls -al /dbfs/tmp/$username/COVID/population_estimates_by_county/

In [0]:
map_popest_county = spark.read.option("header", True).option("inferSchema", True).csv(f"/tmp/{username}/COVID/population_estimates_by_county/co-est2019-alldata.csv")
map_popest_county.createOrReplaceTempView("map_popest_county")
fips_popest_county = spark.sql("select State * 1000 + substring(cast(1000 + County as string), 2, 3) as fips, STNAME, CTYNAME, census2010pop, POPESTIMATE2019 from map_popest_county")
fips_popest_county.createOrReplaceTempView("fips_popest_county")

## 人口推定値の取り込み

人口推定値を取り込むために `jhu_daily_pop` 一時テーブルを作成します。3/22より前のデータは `FIPS` 情報を含んでいないため、推定人口値を取り込む際にはデータセットを3/22以降に限定します。

In [0]:
jhu_daily_pop = spark.sql("""
SELECT f.FIPS, f.Admin2, f.Province_State, f.Country_Region, f.Last_Update, f.Lat, f.Long_, f.Confirmed, f.Deaths, f.Recovered, f.Active, f.Combined_Key, f.process_date, p.POPESTIMATE2019 
  FROM jhu_daily f
    JOIN fips_popest_county p
      ON p.fips = f.FIPS
""")
jhu_daily_pop.createOrReplaceTempView("jhu_daily_pop")

## 最初の探索的データ分析(Exploratory Data Analysis)

### NY郡とKing郡における感染者数、死者数の検証

In [0]:
%sql
select process_date, Admin2, Confirmed, Deaths, Recovered, Active from jhu_daily where Province_State in ('New York') and Admin2 in ('New York City')

process_date,Admin2,Confirmed,Deaths,Recovered,Active
2020-03-22,New York City,9654,63,0.0,0
2020-03-23,New York City,12305,99,0.0,0
2020-03-24,New York City,14904,131,0.0,0
2020-03-25,New York City,17856,199,0.0,0
2020-03-26,New York City,21873,281,0.0,0
2020-03-27,New York City,25573,366,0.0,0
2020-03-28,New York City,29776,517,0.0,0
2020-03-29,New York City,33768,678,0.0,0
2020-03-30,New York City,37453,790,0.0,0
2020-03-31,New York City,43119,932,0.0,0


In [0]:
%sql
select process_date, Admin2, Confirmed, Deaths, Recovered, Active from jhu_daily where Province_State in ('Washington') and Admin2 in ('King')

process_date,Admin2,Confirmed,Deaths,Recovered,Active
2021-01-01,King,62580,1050,0.0,61530.0
2021-01-02,King,62580,1050,0.0,61530.0
2021-01-03,King,65265,1049,0.0,64216.0
2021-01-04,King,65570,1052,0.0,64518.0
2021-01-05,King,65834,1068,0.0,64766.0
2021-01-06,King,66286,1085,0.0,65201.0
2021-01-07,King,66998,1093,0.0,65905.0
2021-01-08,King,67932,1105,0.0,66827.0
2021-01-09,King,68799,1105,0.0,67694.0
2021-01-10,King,68799,1105,0.0,67694.0


### NY郡とKing郡における人口に対する感染者数、死者数の比率の検証

In [0]:
%sql
select process_date, Admin2, 100000.*Confirmed/POPESTIMATE2019 as Confirmed_per100K, 100000.*Deaths/POPESTIMATE2019 as Deaths_per100K, Recovered, Active from jhu_daily_pop where Province_State in ('New York') and Admin2 in ('New York City')

process_date,Admin2,Confirmed_per100K,Deaths_per100K,Recovered,Active
2020-03-22,New York City,592.74049460124,3.8681014253,0.0,0
2020-03-23,New York City,755.50774664058,6.07844509691,0.0,0
2020-03-24,New York City,915.08228004318,8.04319502722,0.0,0
2020-03-25,New York City,1096.33046111453,12.21828862913,0.0,0
2020-03-26,New York City,1342.9679758041,17.25296032556,0.0,0
2020-03-27,New York City,1570.14218649652,22.47182732795,0.0,0
2020-03-28,New York City,1828.19981015604,31.74299106162,0.0,0
2020-03-29,New York City,2073.30236396256,41.6281391485,0.0,0
2020-03-30,New York City,2299.55559812514,48.5047639046,0.0,0
2020-03-31,New York City,2647.43913266114,57.22334172036,0.0,0


In [0]:
%sql
select process_date, Admin2, 100000.*Confirmed/POPESTIMATE2019 as Confirmed_per100K, 100000.*Deaths/POPESTIMATE2019 as Deaths_per100K, Recovered, Active from jhu_daily_pop where Province_State in ('Washington') and Admin2 in ('King')

process_date,Admin2,Confirmed_per100K,Deaths_per100K,Recovered,Active
2021-01-01,King,2777.89861602232,46.60903718158,0.0,61530.0
2021-01-02,King,2777.89861602232,46.60903718158,0.0,61530.0
2021-01-03,King,2897.08458252951,46.56464762236,0.0,64216.0
2021-01-04,King,2910.62339809178,46.69781630002,0.0,64518.0
2021-01-05,King,2922.34224172601,47.40804924755,0.0,64766.0
2021-01-06,King,2942.4063224937,48.1626717543,0.0,65201.0
2021-01-07,King,2974.01168865873,48.51778822807,0.0,65905.0
2021-01-08,King,3015.47153697073,49.05046293871,0.0,66827.0
2021-01-09,King,3053.95728481495,49.05046293871,0.0,67694.0
2021-01-10,King,3053.95728481495,49.05046293871,0.0,67694.0


## 郡ごとのCOVID-19感染者数、死者数

In [0]:
# `usa`データフレームの作成
df_usa = spark.sql("select fips, cast(100000.*Confirmed/POPESTIMATE2019 as int) as confirmed_per100K, cast(100000.*Deaths/POPESTIMATE2019 as int) as deaths_per100K, recovered, active, lat, long_, admin2 as county, province_state as state, process_date, cast(replace(process_date, '-', '') as integer) as process_date_num from jhu_daily_pop where lat is not null and long_ is not null and fips is not null and (lat <> 0 and long_ <> 0)")
df_usa.createOrReplaceTempView("df_usa")

# pandasデータフレームに変換
pdf_usa = df_usa.toPandas()
pdf_usa['confirmed_per100K'] = pdf_usa['confirmed_per100K'].astype('int32')
pdf_usa['deaths_per100K'] = pdf_usa['deaths_per100K'].astype('int32')

In [0]:
def map_usa_cases(curr_date):
  # altairの地形情報を取得
  us_states = alt.topo_feature(topo_usa, 'states')
  us_counties = alt.topo_feature(topo_usa, 'counties')

  # 州の境界線
  base_states = alt.Chart(us_states).mark_geoshape().encode(
    stroke=alt.value('lightgray'), fill=alt.value('white')
  ).properties(
    width=1200,
    height=960,
  ).project(
    type='albersUsa',
  )


  # 郡ごとの感染者数
  base_counties = alt.Chart(us_counties).mark_geoshape().encode(
    color=alt.Color('confirmed:Q', scale=alt.Scale(type='log'), title='Confirmed'),
  ).transform_lookup(
    lookup='id',
    from_=alt.LookupData(pdf_usa[(pdf_usa['confirmed'] > 0) & (pdf_usa['process_date'] == curr_date)], 'fips', ['confirmed'])  
  )

  # 緯度経度に基づく死者数
  points = alt.Chart(pdf_usa[(pdf_usa['deaths'] > 0) & (pdf_usa['process_date'] == curr_date)]).mark_point(opacity=0.75, filled=True).encode(
    longitude='long_:Q',
    latitude='lat:Q',
    size=alt.Size('sum(deaths):Q', scale=alt.Scale(type='symlog'), title='deaths'),
    color=alt.value('#BD595D'),
    stroke=alt.value('brown'),
    tooltip=[
      alt.Tooltip('state', title='state'), 
      alt.Tooltip('county', title='county'), 
      alt.Tooltip('confirmed', title='confirmed'),
      alt.Tooltip('deaths', title='deaths'),       
    ],
  ).properties(
    # 図のタイトル
    title=f'COVID-19 Confirmed Cases and Deaths by County {curr_date}'
  )

  # グラフの表示
  return (base_states + base_counties + points)

In [0]:
def map_usa_cases(curr_date):
  # altairの地形情報を取得
  us_states = alt.topo_feature(topo_usa, 'states')
  us_counties = alt.topo_feature(topo_usa, 'counties')

  # 州の境界線
  base_states = alt.Chart(us_states).mark_geoshape().encode(
    stroke=alt.value('lightgray'), fill=alt.value('white')
  ).properties(
    width=1200,
    height=960,
  ).project(
    type='albersUsa',
  )


  # 郡ごとの感染者数
  base_counties = alt.Chart(us_counties).mark_geoshape().encode(
    color=alt.Color('confirmed_per100K:Q', scale=alt.Scale(domain=(1, 7500), type='log'), title='Confirmed per 100K'),
  ).transform_lookup(
    lookup='id',
    from_=alt.LookupData(pdf_usa[(pdf_usa['confirmed_per100K'] > 0) & (pdf_usa['process_date'] == curr_date)], 'fips', ['confirmed_per100K'])  
  )

  # 緯度経度に基づく死者数
  points = alt.Chart(pdf_usa[(pdf_usa['deaths_per100K'] > 0) & (pdf_usa['process_date'] == curr_date)]).mark_point(opacity=0.75, filled=True).encode(
    longitude='long_:Q',
    latitude='lat:Q',
     size=alt.Size('deaths_per100K:Q', scale=alt.Scale(domain=(1, 1000), type='log'), title='deaths_per100K'),
     #size=alt.Size('deaths_per100K:Q', title='deaths_per100K'),
     color=alt.value('#BD595D'),
     stroke=alt.value('brown'),
    tooltip=[
      alt.Tooltip('state', title='state'), 
      alt.Tooltip('county', title='county'), 
      alt.Tooltip('confirmed_per100K', title='confirmed'),
      alt.Tooltip('deaths_per100K', title='deaths'),       
    ],
  ).properties(
    # 図のタイトル
    title=f'COVID-19 Confirmed Cases and Deaths by County (by 100K) {curr_date}'
  )

   # グラフの表示
  return (base_states + base_counties + points)

In [0]:
# 最初の日 (2020-03-22)
map_usa_cases('2020-03-22')

In [0]:
# 最新日 (2020-04-14)
map_usa_cases('2020-04-14')

# END