-
-
Notifications
You must be signed in to change notification settings - Fork 14
/
__init__.py
155 lines (134 loc) · 4.85 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from datasette import hookimpl
from typing import List
import json
TILE_LAYER = "https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png"
TILE_LAYER_OPTIONS = {
"maxZoom": 19,
"detectRetina": True,
"attribution": '© <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors',
}
@hookimpl
def extra_js_urls(database, table, columns, view_name, datasette):
if not find_columns(database, table, columns, view_name, datasette):
return []
return [
{
"url": datasette.urls.static_plugins(
"datasette-cluster-map", "datasette-cluster-map.js"
),
"module": True,
}
]
@hookimpl
def extra_body_script(database, table, columns, view_name, datasette):
location_columns = find_columns(database, table, columns, view_name, datasette)
if not location_columns:
return []
config = (
datasette.plugin_config("datasette-cluster-map", database=database, table=table)
or {}
)
js = []
js.append(
"window.DATASETTE_CLUSTER_MAP_TILE_LAYER = {};".format(
json.dumps(config.get("tile_layer") or TILE_LAYER)
)
)
js.append(
"window.DATASETTE_CLUSTER_MAP_TILE_LAYER_OPTIONS = {};".format(
json.dumps(config.get("tile_layer_options") or TILE_LAYER_OPTIONS)
)
)
if config.get("container"):
js.append(
"window.DATASETTE_CLUSTER_MAP_CONTAINER = {};".format(
json.dumps(config["container"])
)
)
# latitude_column and longitude_column
js.append(
"window.DATASETTE_CLUSTER_MAP_LATITUDE_COLUMN = {};".format(
json.dumps(location_columns[0])
)
)
js.append(
"window.DATASETTE_CLUSTER_MAP_LONGITUDE_COLUMN = {};".format(
json.dumps(location_columns[1])
)
)
js.append("window.datasette = window.datasette || {};")
js.append(
"datasette.cluster_map = {\n"
+ " MARKERCLUSTER_URL: '{}',\n".format(
datasette.urls.static_plugins(
"datasette-cluster-map", "leaflet.markercluster.min.js"
)
)
+ " MARKERCLUSTER_CSS_URL: '{}'\n}};".format(
datasette.urls.static_plugins(
"datasette-cluster-map", "leaflet.markercluster.css"
)
)
)
return "\n".join(js)
def find_columns(database, table, columns, view_name, datasette):
print(
"find_columns: database={}, table={}, columns={}, view_name={}".format(
database, table, columns, view_name
)
)
if view_name not in ("database", "table"):
return []
if not columns:
return []
# If columns are configured, check for those
columns = [column.lower() for column in columns]
config = (
datasette.plugin_config("datasette-cluster-map", database=database, table=table)
or {}
)
latitude_column = config.get("latitude_column")
longitude_column = config.get("longitude_column")
if not latitude_column or not longitude_column:
# Detect those columns instead
location_columns = location_columns_from_columns(columns)
if not location_columns:
return []
latitude_column, longitude_column = location_columns
if latitude_column.lower() in columns and longitude_column.lower() in columns:
return [latitude_column, longitude_column]
def _match(pattern, column):
# latitude matches "latitude" or "foo_latitude"
return column.lower() == pattern or column.lower().endswith("_" + pattern)
LATITUDE_PATTERNS = ["latitude", "lat"]
LONGITUDE_PATTERNS = ["longitude", "lon", "lng", "long"]
LOCATION_PRIORITIES = (
("latitude", "longitude"),
("lat", "lon"),
("lat", "lng"),
("lat", "long"),
)
def location_columns_from_columns(columns: List[str]) -> List[str]:
latitude_col = None
longitude_col = None
lowercase_columns = [col.lower() for col in columns]
cols_to_case = {col.lower(): col for col in columns}
# First look for the priority pairings - return if found
for lat, lon in LOCATION_PRIORITIES:
if lat in lowercase_columns and lon in lowercase_columns:
return [cols_to_case[lat], cols_to_case[lon]]
# Now try for the wildcard patterns instead
for col in columns:
if any(_match(lat, col) for lat in LATITUDE_PATTERNS):
if latitude_col is not None:
# Already have latitude, so this is ambiguous
return []
latitude_col = col
elif any(_match(lon, col) for lon in LONGITUDE_PATTERNS):
if longitude_col is not None:
# Already have longitude, so this is ambiguous
return []
longitude_col = col
if latitude_col is None or longitude_col is None:
return []
return [latitude_col, longitude_col]