-
Notifications
You must be signed in to change notification settings - Fork 30
/
003_items.sql
453 lines (396 loc) · 15.6 KB
/
003_items.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
SET SEARCH_PATH TO pgstac, public;
CREATE TABLE items (
id text NOT NULL,
geometry geometry NOT NULL,
collection_id text NOT NULL,
datetime timestamptz NOT NULL,
end_datetime timestamptz NOT NULL,
properties jsonb NOT NULL,
content JSONB NOT NULL
)
PARTITION BY RANGE (datetime)
;
CREATE OR REPLACE FUNCTION properties_idx (IN content jsonb) RETURNS jsonb AS $$
with recursive extract_all as
(
select
ARRAY[key]::text[] as path,
ARRAY[key]::text[] as fullpath,
value
FROM jsonb_each(content->'properties')
union all
select
CASE WHEN obj_key IS NOT NULL THEN path || obj_key ELSE path END,
path || coalesce(obj_key, (arr_key- 1)::text),
coalesce(obj_value, arr_value)
from extract_all
left join lateral
jsonb_each(case jsonb_typeof(value) when 'object' then value end)
as o(obj_key, obj_value)
on jsonb_typeof(value) = 'object'
left join lateral
jsonb_array_elements(case jsonb_typeof(value) when 'array' then value end)
with ordinality as a(arr_value, arr_key)
on jsonb_typeof(value) = 'array'
where obj_key is not null or arr_key is not null
)
, paths AS (
select
array_to_string(path, '.') as path,
value
FROM extract_all
WHERE
jsonb_typeof(value) NOT IN ('array','object')
), grouped AS (
SELECT path, jsonb_agg(distinct value) vals FROM paths group by path
) SELECT coalesce(jsonb_object_agg(path, CASE WHEN jsonb_array_length(vals)=1 THEN vals->0 ELSE vals END) - '{datetime}'::text[], '{}'::jsonb) FROM grouped
;
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE SET JIT TO OFF;
CREATE INDEX "datetime_idx" ON items (datetime);
CREATE INDEX "end_datetime_idx" ON items (end_datetime);
CREATE INDEX "properties_idx" ON items USING GIN (properties jsonb_path_ops);
CREATE INDEX "collection_idx" ON items (collection_id);
CREATE INDEX "geometry_idx" ON items USING GIST (geometry);
CREATE UNIQUE INDEX "items_id_datetime_idx" ON items (datetime, id);
ALTER TABLE items ADD CONSTRAINT items_collections_fk FOREIGN KEY (collection_id) REFERENCES collections(id) ON DELETE CASCADE DEFERRABLE;
CREATE OR REPLACE FUNCTION analyze_empty_partitions() RETURNS VOID AS $$
DECLARE
p text;
BEGIN
FOR p IN SELECT partition FROM all_items_partitions WHERE est_cnt = 0 LOOP
EXECUTE format('ANALYZE %I;', p);
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
CREATE OR REPLACE FUNCTION items_partition_name(timestamptz) RETURNS text AS $$
SELECT to_char($1, '"items_p"IYYY"w"IW');
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
CREATE OR REPLACE FUNCTION items_partition_exists(text) RETURNS boolean AS $$
SELECT EXISTS (SELECT 1 FROM pg_catalog.pg_class WHERE relname=$1);
$$ LANGUAGE SQL;
CREATE OR REPLACE FUNCTION items_partition_exists(timestamptz) RETURNS boolean AS $$
SELECT EXISTS (SELECT 1 FROM pg_catalog.pg_class WHERE relname=items_partition_name($1));
$$ LANGUAGE SQL;
CREATE OR REPLACE FUNCTION items_partition_create_worker(partition text, partition_start timestamptz, partition_end timestamptz) RETURNS VOID AS $$
DECLARE
err_context text;
BEGIN
EXECUTE format(
$f$
CREATE TABLE IF NOT EXISTS %1$I PARTITION OF items
FOR VALUES FROM (%2$L) TO (%3$L);
CREATE UNIQUE INDEX IF NOT EXISTS %4$I ON %1$I (id);
$f$,
partition,
partition_start,
partition_end,
concat(partition, '_id_pk')
);
EXCEPTION
WHEN duplicate_table THEN
RAISE NOTICE 'Partition % already exists.', partition;
WHEN others THEN
GET STACKED DIAGNOSTICS err_context = PG_EXCEPTION_CONTEXT;
RAISE INFO 'Error Name:%',SQLERRM;
RAISE INFO 'Error State:%', SQLSTATE;
RAISE INFO 'Error Context:%', err_context;
END;
$$ LANGUAGE PLPGSQL SET SEARCH_PATH to pgstac, public;
CREATE OR REPLACE FUNCTION items_partition_create(ts timestamptz) RETURNS text AS $$
DECLARE
partition text := items_partition_name(ts);
partition_start timestamptz;
partition_end timestamptz;
BEGIN
IF items_partition_exists(partition) THEN
RETURN partition;
END IF;
partition_start := date_trunc('week', ts);
partition_end := partition_start + '1 week'::interval;
PERFORM items_partition_create_worker(partition, partition_start, partition_end);
RAISE NOTICE 'partition: %', partition;
RETURN partition;
END;
$$ LANGUAGE PLPGSQL SET SEARCH_PATH TO pgstac, public;
CREATE OR REPLACE FUNCTION items_partition_create(st timestamptz, et timestamptz) RETURNS SETOF text AS $$
WITH t AS (
SELECT
generate_series(
date_trunc('week',st),
date_trunc('week', et),
'1 week'::interval
) w
)
SELECT items_partition_create(w) FROM t;
$$ LANGUAGE SQL;
CREATE UNLOGGED TABLE items_staging (
content JSONB NOT NULL
);
CREATE OR REPLACE FUNCTION items_staging_insert_triggerfunc() RETURNS TRIGGER AS $$
DECLARE
p record;
_partitions text[];
BEGIN
CREATE TEMP TABLE new_partitions ON COMMIT DROP AS
SELECT
items_partition_name(stac_datetime(content)) as partition,
date_trunc('week', min(stac_datetime(content))) as partition_start
FROM newdata
GROUP BY 1;
-- set statslastupdated in cache to be old enough cache always regenerated
SELECT array_agg(partition) INTO _partitions FROM new_partitions;
UPDATE search_wheres
SET
statslastupdated = NULL
WHERE _where IN (
SELECT _where FROM search_wheres sw WHERE sw.partitions && _partitions
FOR UPDATE SKIP LOCKED
)
;
FOR p IN SELECT new_partitions.partition, new_partitions.partition_start, new_partitions.partition_start + '1 week'::interval as partition_end FROM new_partitions
LOOP
RAISE NOTICE 'Getting partition % ready.', p.partition;
IF NOT items_partition_exists(p.partition) THEN
RAISE NOTICE 'Creating partition %.', p.partition;
PERFORM items_partition_create_worker(p.partition, p.partition_start, p.partition_end);
END IF;
PERFORM drop_partition_constraints(p.partition);
END LOOP;
INSERT INTO items (id, geometry, collection_id, datetime, end_datetime, properties, content)
SELECT
content->>'id',
stac_geom(content),
content->>'collection',
stac_datetime(content),
stac_end_datetime(content),
properties_idx(content),
content
FROM newdata
;
DELETE FROM items_staging;
FOR p IN SELECT new_partitions.partition FROM new_partitions
LOOP
RAISE NOTICE 'Setting constraints for partition %.', p.partition;
PERFORM partition_checks(p.partition);
END LOOP;
DROP TABLE IF EXISTS new_partitions;
RETURN NULL;
END;
$$ LANGUAGE PLPGSQL;
CREATE TRIGGER items_staging_insert_trigger AFTER INSERT ON items_staging REFERENCING NEW TABLE AS newdata
FOR EACH STATEMENT EXECUTE PROCEDURE items_staging_insert_triggerfunc();
CREATE UNLOGGED TABLE items_staging_ignore (
content JSONB NOT NULL
);
CREATE OR REPLACE FUNCTION items_staging_ignore_insert_triggerfunc() RETURNS TRIGGER AS $$
DECLARE
p record;
_partitions text[];
BEGIN
CREATE TEMP TABLE new_partitions ON COMMIT DROP AS
SELECT
items_partition_name(stac_datetime(content)) as partition,
date_trunc('week', min(stac_datetime(content))) as partition_start
FROM newdata
GROUP BY 1;
-- set statslastupdated in cache to be old enough cache always regenerated
SELECT array_agg(partition) INTO _partitions FROM new_partitions;
UPDATE search_wheres
SET
statslastupdated = NULL
WHERE _where IN (
SELECT _where FROM search_wheres sw WHERE sw.partitions && _partitions
FOR UPDATE SKIP LOCKED
)
;
FOR p IN SELECT new_partitions.partition, new_partitions.partition_start, new_partitions.partition_start + '1 week'::interval as partition_end FROM new_partitions
LOOP
RAISE NOTICE 'Getting partition % ready.', p.partition;
IF NOT items_partition_exists(p.partition) THEN
RAISE NOTICE 'Creating partition %.', p.partition;
PERFORM items_partition_create_worker(p.partition, p.partition_start, p.partition_end);
END IF;
PERFORM drop_partition_constraints(p.partition);
END LOOP;
INSERT INTO items (id, geometry, collection_id, datetime, end_datetime, properties, content)
SELECT
content->>'id',
stac_geom(content),
content->>'collection',
stac_datetime(content),
stac_end_datetime(content),
properties_idx(content),
content
FROM newdata
ON CONFLICT (datetime, id) DO NOTHING
;
DELETE FROM items_staging;
FOR p IN SELECT new_partitions.partition FROM new_partitions
LOOP
RAISE NOTICE 'Setting constraints for partition %.', p.partition;
PERFORM partition_checks(p.partition);
END LOOP;
DROP TABLE IF EXISTS new_partitions;
RETURN NULL;
END;
$$ LANGUAGE PLPGSQL;
CREATE TRIGGER items_staging_ignore_insert_trigger AFTER INSERT ON items_staging_ignore REFERENCING NEW TABLE AS newdata
FOR EACH STATEMENT EXECUTE PROCEDURE items_staging_ignore_insert_triggerfunc();
CREATE UNLOGGED TABLE items_staging_upsert (
content JSONB NOT NULL
);
CREATE OR REPLACE FUNCTION items_staging_upsert_insert_triggerfunc() RETURNS TRIGGER AS $$
DECLARE
p record;
_partitions text[];
BEGIN
CREATE TEMP TABLE new_partitions ON COMMIT DROP AS
SELECT
items_partition_name(stac_datetime(content)) as partition,
date_trunc('week', min(stac_datetime(content))) as partition_start
FROM newdata
GROUP BY 1;
-- set statslastupdated in cache to be old enough cache always regenerated
SELECT array_agg(partition) INTO _partitions FROM new_partitions;
UPDATE search_wheres
SET
statslastupdated = NULL
WHERE _where IN (
SELECT _where FROM search_wheres sw WHERE sw.partitions && _partitions
FOR UPDATE SKIP LOCKED
)
;
FOR p IN SELECT new_partitions.partition, new_partitions.partition_start, new_partitions.partition_start + '1 week'::interval as partition_end FROM new_partitions
LOOP
RAISE NOTICE 'Getting partition % ready.', p.partition;
IF NOT items_partition_exists(p.partition) THEN
RAISE NOTICE 'Creating partition %.', p.partition;
PERFORM items_partition_create_worker(p.partition, p.partition_start, p.partition_end);
END IF;
PERFORM drop_partition_constraints(p.partition);
END LOOP;
INSERT INTO items (id, geometry, collection_id, datetime, end_datetime, properties, content)
SELECT
content->>'id',
stac_geom(content),
content->>'collection',
stac_datetime(content),
stac_end_datetime(content),
properties_idx(content),
content
FROM newdata
ON CONFLICT (datetime, id) DO UPDATE SET
content = EXCLUDED.content
WHERE items.content IS DISTINCT FROM EXCLUDED.content
;
DELETE FROM items_staging;
FOR p IN SELECT new_partitions.partition FROM new_partitions
LOOP
RAISE NOTICE 'Setting constraints for partition %.', p.partition;
PERFORM partition_checks(p.partition);
END LOOP;
DROP TABLE IF EXISTS new_partitions;
RETURN NULL;
END;
$$ LANGUAGE PLPGSQL;
CREATE TRIGGER items_staging_upsert_insert_trigger AFTER INSERT ON items_staging_upsert REFERENCING NEW TABLE AS newdata
FOR EACH STATEMENT EXECUTE PROCEDURE items_staging_upsert_insert_triggerfunc();
CREATE OR REPLACE FUNCTION items_update_triggerfunc() RETURNS TRIGGER AS $$
DECLARE
BEGIN
NEW.id := NEW.content->>'id';
NEW.datetime := stac_datetime(NEW.content);
NEW.end_datetime := stac_end_datetime(NEW.content);
NEW.collection_id := NEW.content->>'collection';
NEW.geometry := stac_geom(NEW.content);
NEW.properties := properties_idx(NEW.content);
IF TG_OP = 'UPDATE' AND NEW IS NOT DISTINCT FROM OLD THEN
RETURN NULL;
END IF;
RETURN NEW;
END;
$$ LANGUAGE PLPGSQL;
CREATE TRIGGER items_update_trigger BEFORE UPDATE ON items
FOR EACH ROW EXECUTE PROCEDURE items_update_triggerfunc();
/*
View to get a table of available items partitions
with date ranges
*/
DROP VIEW IF EXISTS all_items_partitions CASCADE;
CREATE VIEW all_items_partitions AS
WITH base AS
(SELECT
c.oid::pg_catalog.regclass::text as partition,
pg_catalog.pg_get_expr(c.relpartbound, c.oid) as _constraint,
regexp_matches(
pg_catalog.pg_get_expr(c.relpartbound, c.oid),
E'\\(''\([0-9 :+-]*\)''\\).*\\(''\([0-9 :+-]*\)''\\)'
) as t,
reltuples::bigint as est_cnt
FROM pg_catalog.pg_class c, pg_catalog.pg_inherits i
WHERE c.oid = i.inhrelid AND i.inhparent = 'items'::regclass)
SELECT partition, tstzrange(
t[1]::timestamptz,
t[2]::timestamptz
), t[1]::timestamptz as pstart,
t[2]::timestamptz as pend, est_cnt
FROM base
ORDER BY 2 desc;
CREATE OR REPLACE VIEW items_partitions AS
SELECT * FROM all_items_partitions WHERE est_cnt>0;
CREATE OR REPLACE FUNCTION get_item(_id text) RETURNS jsonb AS $$
SELECT content FROM items WHERE id=_id;
$$ LANGUAGE SQL SET SEARCH_PATH TO pgstac,public;
CREATE OR REPLACE FUNCTION delete_item(_id text) RETURNS VOID AS $$
DECLARE
out items%ROWTYPE;
BEGIN
DELETE FROM items WHERE id = _id RETURNING * INTO STRICT out;
END;
$$ LANGUAGE PLPGSQL SET SEARCH_PATH TO pgstac,public;
CREATE OR REPLACE FUNCTION create_item(data jsonb) RETURNS VOID AS $$
INSERT INTO items_staging (content) VALUES (data);
$$ LANGUAGE SQL SET SEARCH_PATH TO pgstac,public;
CREATE OR REPLACE FUNCTION update_item(data jsonb) RETURNS VOID AS $$
DECLARE
out items%ROWTYPE;
BEGIN
UPDATE items SET content=data WHERE id = data->>'id' RETURNING * INTO STRICT out;
END;
$$ LANGUAGE PLPGSQL SET SEARCH_PATH TO pgstac,public;
CREATE OR REPLACE FUNCTION upsert_item(data jsonb) RETURNS VOID AS $$
INSERT INTO items_staging_upsert (content) VALUES (data);
$$ LANGUAGE SQL SET SEARCH_PATH TO pgstac,public;
CREATE OR REPLACE FUNCTION create_items(data jsonb) RETURNS VOID AS $$
INSERT INTO items_staging (content)
SELECT * FROM jsonb_array_elements(data);
$$ LANGUAGE SQL SET SEARCH_PATH TO pgstac,public;
CREATE OR REPLACE FUNCTION upsert_items(data jsonb) RETURNS VOID AS $$
INSERT INTO items_staging_upsert (content)
SELECT * FROM jsonb_array_elements(data);
$$ LANGUAGE SQL SET SEARCH_PATH TO pgstac,public;
CREATE OR REPLACE FUNCTION collection_bbox(id text) RETURNS jsonb AS $$
SELECT (replace(replace(replace(st_extent(geometry)::text,'BOX(','[['),')',']]'),' ',','))::jsonb
FROM items WHERE collection_id=$1;
;
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE SET SEARCH_PATH TO pgstac, public;
CREATE OR REPLACE FUNCTION collection_temporal_extent(id text) RETURNS jsonb AS $$
SELECT to_jsonb(array[array[min(datetime)::text, max(datetime)::text]])
FROM items WHERE collection_id=$1;
;
$$ LANGUAGE SQL IMMUTABLE PARALLEL SAFE SET SEARCH_PATH TO pgstac, public;
CREATE OR REPLACE FUNCTION update_collection_extents() RETURNS VOID AS $$
UPDATE collections SET
content = content ||
jsonb_build_object(
'extent', jsonb_build_object(
'spatial', jsonb_build_object(
'bbox', collection_bbox(collections.id)
),
'temporal', jsonb_build_object(
'interval', collection_temporal_extent(collections.id)
)
)
)
;
$$ LANGUAGE SQL SET SEARCH_PATH TO pgstac, public;