-
Notifications
You must be signed in to change notification settings - Fork 16
/
overture-buildings-parquet-add-columns.py
146 lines (116 loc) · 5.89 KB
/
overture-buildings-parquet-add-columns.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# This script is used to take an Overture Parquet file and add columns
# useful for partitioning - it can put in both a quadkey and the country
# ISO code. And then it will write out parquet and use gpq to convert the
# parquet to geoparquet.
#
# There is much more to do, my plan is to incorporate it into the open_buildings
# CLI and let people pick which of the columns they want to add. Also could
# be nice to add the ability to get the data downloaded - this just assumes
# you've already got it. Also need to add the command to create the
# countries.parquet, but it's basically the one in https://github.com/OvertureMaps/data/blob/main/duckdb_queries/admins.sql
# but saved to parquet. You also could just use that command to pull it
# directly into your duckdb database, and change this code (perhaps we
# add an option to pull it remote if not present). This also would
# ideally work with any of the Overture data types, and let you choose
# your table names.
import os
import duckdb
import time
import tempfile
import subprocess
import glob
from duckdb.typing import *
def lat_lon_to_quadkey(lat: DOUBLE, lon: DOUBLE, level: INTEGER) -> VARCHAR:
lat = (lat + 90.0) / 180.0 # Normalize lat to 0-1 range
lon = (lon + 180.0) / 360.0 # Normalize lon to 0-1 range
lat_bin = int(lat * (1 << level)) # Convert normalized lat to binary
lon_bin = int(lon * (1 << level)) # Convert normalized lon to binary
quadKey = ""
for i in range(level, 0, -1):
digit = 0
mask = 1 << (i - 1)
if (lon_bin & mask) != 0:
digit += 1
if (lat_bin & mask) != 0:
digit += 2
quadKey += str(digit)
return quadKey
def midpoint(minval: DOUBLE, maxval: DOUBLE) -> DOUBLE:
return (minval + maxval) / 2.0
def add_quadkey(con):
# Register Python UDFs
con.create_function('lat_lon_to_quadkey', lat_lon_to_quadkey, [DOUBLE, DOUBLE, INTEGER], VARCHAR)
con.create_function('midpoint', midpoint, [DOUBLE, DOUBLE], DOUBLE)
# Add a quadkey column to the table if it doesn't exist
con.execute("ALTER TABLE buildings ADD COLUMN IF NOT EXISTS quadkey VARCHAR")
# Update the quadkey column
con.execute("""
UPDATE buildings
SET quadkey = lat_lon_to_quadkey(
midpoint(bbox.miny, bbox.maxy),
midpoint(bbox.minx, bbox.maxx),
12
);
""")
def add_country_iso(con, country_parquet_path):
# Load country parquet file into duckdb
con.execute(f"CREATE TABLE countries AS SELECT * FROM read_parquet('{country_parquet_path}')")
# Add a country_iso column to the buildings table
con.execute("ALTER TABLE buildings ADD COLUMN IF NOT EXISTS country_iso VARCHAR")
# Update the country_iso column in the buildings table
con.execute("""
UPDATE buildings
SET country_iso = countries.isocountrycodealpha2
FROM countries
WHERE ST_Intersects(ST_GeomFromWKB(countries.geometry), ST_GeomFromWKB(buildings.geometry))
""")
def process_parquet_file(input_parquet_path, output_folder, country_parquet_path, overwrite=False, add_quadkey_option=False, add_country_iso_option=False):
# Ensure output_folder exists
os.makedirs(output_folder, exist_ok=True)
# Get unique identifier from file name
unique_id = os.path.basename(input_parquet_path).split('_')[-1]
# Define output paths
output_db_path = os.path.join(output_folder, f'{unique_id}.duckdb')
output_parquet_path = os.path.join(output_folder, f'{unique_id}.parquet')
# Check if output files exist
if (os.path.exists(output_db_path) or os.path.exists(output_parquet_path)) and not overwrite:
print(f'Files with ID {unique_id} already exist. Skipping...')
return
# Overwrite mode: remove existing files
if overwrite:
for file_path in [output_db_path, output_parquet_path]:
if os.path.exists(file_path):
os.remove(file_path)
print(f"Processing file {input_parquet_path}")
# Connect to DuckDB
con = duckdb.connect(output_db_path)
con.execute('LOAD spatial;')
# Load parquet file into duckdb
con.execute(f"CREATE TABLE buildings AS SELECT * FROM read_parquet('{input_parquet_path}')")
if add_quadkey_option:
add_quadkey(con)
if add_country_iso_option:
add_country_iso(con, country_parquet_path)
# Write out to Parquet
con.execute(f"COPY (SELECT * FROM buildings ORDER BY quadkey) TO '{output_parquet_path}' WITH (FORMAT Parquet)")
# Create a temporary file
temp_file = tempfile.NamedTemporaryFile(suffix=".parquet", delete=False)
temp_file.close() # Close the file so gpq can open it
# Convert the Parquet file to a GeoParquet file using gpq
gpq_cmd = ['gpq', 'convert', f'{output_parquet_path}', temp_file.name]
subprocess.run(gpq_cmd, check=True)
# Rename the temp file to the final filename
os.rename(temp_file.name, f'{output_parquet_path}')
print(f"Processing complete for file {input_parquet_path}")
def process_parquet_files(input_path, output_folder, country_parquet_path, overwrite=False, add_quadkey_option=False, add_country_iso_option=False):
# If input_path is a directory, process all Parquet files in it
if os.path.isdir(input_path):
for file in glob.glob(os.path.join(input_path, "*")):
process_parquet_file(file, output_folder, country_parquet_path, overwrite, add_quadkey_option, add_country_iso_option)
else:
process_parquet_file(input_path, output_folder, country_parquet_path, overwrite, add_quadkey_option, add_country_iso_option)
# Call the function
input_path = '/Volumes/fastdata/overture/s3-data/buildings/'
output_folder = '/Users/cholmes/geodata/overture/processed-buildings'
country_parquet_path = '/Volumes/fastdata/overture/countries.parquet'
process_parquet_files(input_path, output_folder, country_parquet_path, overwrite=False, add_quadkey_option=True, add_country_iso_option=True)