-
Notifications
You must be signed in to change notification settings - Fork 6
/
build_database.py
222 lines (199 loc) · 7.88 KB
/
build_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import json
from pathlib import Path
import sqlite_utils
import csv
base_path = (Path(__file__) / "..").resolve()
jhu_csse_base = base_path / "COVID-19"
nytimes_base = base_path / "covid-19-data"
latimes_base = base_path / "california-coronavirus-data"
economist_base = base_path / "covid-19-excess-deaths-tracker"
EXTRA_CSVS = [
# file_path, table_name
(nytimes_base / "us-counties.csv", "ny_times_us_counties"),
(nytimes_base / "us-states.csv", "ny_times_us_states"),
]
def load_daily_reports():
daily_reports = list(
(jhu_csse_base / "csse_covid_19_data" / "csse_covid_19_daily_reports").glob(
"*.csv"
)
)
for filepath in daily_reports:
mm, dd, yyyy = filepath.stem.split("-")
day = f"{yyyy}-{mm}-{dd}"
with filepath.open() as fp:
for row in csv.DictReader(fp):
# Weirdly, this column is sometimes \ufeffProvince/State
province_or_state = (
row.get("\ufeffProvince/State")
or row.get("Province/State")
or row.get("Province_State")
or None
)
country_or_region = row.get("Country_Region") or row.get(
"Country/Region"
)
yield {
"day": day,
"country_or_region": country_or_region.strip()
if country_or_region
else None,
"province_or_state": province_or_state.strip()
if province_or_state
else None,
"admin2": row.get("Admin2") or None,
"fips": row.get("FIPS", "").strip() or None,
"confirmed": int(row["Confirmed"] or 0),
"deaths": int(row["Deaths"] or 0),
"recovered": int(row["Recovered"] or 0),
"active": int(row["Active"]) if row.get("Active") else None,
"latitude": row.get("Latitude") or row.get("Lat") or None,
"longitude": row.get("Longitude") or row.get("Long_") or None,
"last_update": row.get("Last Update")
or row.get("Last_Update")
or None,
"combined_key": row.get("Combined_Key") or None,
}
def add_missing_latitude_longitude(db):
# Some rows are missing a latitude/longitude, try to backfill those
with db.conn:
for row in db.conn.execute(
"""
select
country_or_region, province_or_state,
max(latitude), max(longitude)
from
johns_hopkins_csse_daily_reports
where
latitude is not null
group by
country_or_region, province_or_state
"""
).fetchall():
country_or_region, province_or_state, latitude, longitude = row
db.conn.execute(
"""
update johns_hopkins_csse_daily_reports
set latitude = ?, longitude = ?
where country_or_region = ?
and province_or_state = ?
and latitude is null
""",
[latitude, longitude, country_or_region, province_or_state],
)
def load_csv(filepath):
with filepath.open() as fp:
for row in csv.DictReader(fp):
for key in row:
if row[key].isdigit():
# Convert integers
row[key] = int(row[key])
else:
try:
float(row[key])
except ValueError:
pass
else:
# Convert floats
row[key] = float(row[key])
yield row
def load_csv_with_cadence(filepath):
for row in load_csv(filepath):
cadence = None
if "week" in row:
cadence = "weekly"
elif "month" in row:
cadence = "monthly"
if cadence is not None:
row["cadence"] = cadence
yield row
def load_economist_data(db, economist_base):
# economist_excess_deaths table
excess_table = db["economist_excess_deaths"]
if excess_table.exists():
excess_table.drop()
for filepath in (economist_base / "output-data" / "excess-deaths").glob("*.csv"):
excess_table.insert_all(load_csv_with_cadence(filepath), alter=True)
# economist_historical_deaths table
historical_table = db["economist_historical_deaths"]
if historical_table.exists():
historical_table.drop()
for filepath in (economist_base / "output-data" / "historical-deaths").glob(
"*.csv"
):
historical_table.insert_all(load_csv_with_cadence(filepath), alter=True)
if __name__ == "__main__":
db = sqlite_utils.Database(base_path / "covid.db")
# Load John Hopkins CSSE daily reports
table = db["johns_hopkins_csse_daily_reports"]
if table.exists():
table.drop()
table.insert_all(load_daily_reports())
table.create_index(["day"], if_not_exists=True)
table.create_index(["province_or_state"], if_not_exists=True)
table.create_index(["country_or_region"], if_not_exists=True)
table.create_index(["combined_key"], if_not_exists=True)
add_missing_latitude_longitude(db)
# Add a view with the old name
if "daily_reports" in db.table_names():
db["daily_reports"].drop()
if "daily_reports" not in db.view_names():
db.create_view(
"daily_reports", "select * from johns_hopkins_csse_daily_reports"
)
csvs_to_load = EXTRA_CSVS[:]
# Add LA Times CSVs, but only if they are in metadata.json
in_metadata = set(
json.load(open("metadata.json"))["databases"]["covid"]["tables"].keys()
)
for path in latimes_base.glob("*.csv"):
table_name = path.stem.replace("-", "_")
if not table_name.startswith("latimes_"):
table_name = "la_times_{}".format(table_name)
if table_name in in_metadata:
csvs_to_load.append((path, table_name))
# Now do the NYTimes and LA times data
for csv_path, table_name in csvs_to_load:
table = db[table_name]
if table.exists():
table.drop()
table.insert_all(load_csv(csv_path))
db["ny_times_us_counties"].create_index(["state"])
db["ny_times_us_counties"].create_index(["county"])
db["ny_times_us_counties"].create_index(["fips"])
# And the US census data
if "us_census_state_populations_2019" not in db.table_names():
db["us_census_state_populations_2019"].insert_all(
load_csv(base_path / "us_census_state_populations_2019.csv")
)
if "us_census_county_populations_2019" not in db.table_names():
db["us_census_county_populations_2019"].insert_all(
load_csv(base_path / "us_census_county_populations_2019.csv"), pk="fips"
)
# The Economist
load_economist_data(db, economist_base)
# More views
db.create_view("latest_ny_times_counties_with_populations", """
select
ny_times_us_counties.date,
ny_times_us_counties.county,
ny_times_us_counties.state,
ny_times_us_counties.fips,
ny_times_us_counties.cases,
ny_times_us_counties.deaths,
us_census_county_populations_2019.population,
1000000 * ny_times_us_counties.deaths / us_census_county_populations_2019.population as deaths_per_million,
1000000 * ny_times_us_counties.cases / us_census_county_populations_2019.population as cases_per_million
from
ny_times_us_counties
join us_census_county_populations_2019 on ny_times_us_counties.fips = us_census_county_populations_2019.fips
where
"date" = (
select
max(date)
from
ny_times_us_counties
)
order by
deaths_per_million desc
""", replace=True)