-
Notifications
You must be signed in to change notification settings - Fork 114
/
functions.sql
453 lines (414 loc) · 13.9 KB
/
functions.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
DROP FUNCTION IF EXISTS json_arr_to_text_arr;
DROP FUNCTION IF EXISTS json_arr_to_json_element_array;
DROP FUNCTION IF EXISTS create_key;
DROP FUNCTION IF EXISTS create_upload;
DROP FUNCTION IF EXISTS upsert_pin;
DROP FUNCTION IF EXISTS upsert_pins;
DROP FUNCTION IF EXISTS user_used_storage;
DROP FUNCTION IF EXISTS user_auth_keys_list;
DROP FUNCTION IF EXISTS find_deals_by_content_cids;
DROP FUNCTION IF EXISTS upsert_user;
-- transform a JSON array property into an array of SQL text elements
CREATE OR REPLACE FUNCTION json_arr_to_text_arr(_json json)
RETURNS text[] LANGUAGE sql IMMUTABLE PARALLEL SAFE AS
'SELECT ARRAY(SELECT json_array_elements_text(_json))';
-- transform a JSON array property into an array of SQL json elements
CREATE OR REPLACE FUNCTION json_arr_to_json_element_array(_json json)
RETURNS json[] LANGUAGE sql IMMUTABLE PARALLEL SAFE AS
'SELECT ARRAY(SELECT * FROM json_array_elements(_json))';
CREATE OR REPLACE FUNCTION create_key(data json) RETURNS TEXT
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
inserted_key_id BIGINT;
BEGIN
insert into auth_key (name, secret, user_id, inserted_at, updated_at)
VALUES (data ->> 'name',
data ->> 'secret',
(data ->> 'user_id')::BIGINT,
(data ->> 'inserted_at')::timestamptz,
(data ->> 'updated_at')::timestamptz)
returning id into inserted_key_id;
return (inserted_key_id)::TEXT;
END
$$;
-- Creates a content table, with relative pins and pin_requests
CREATE OR REPLACE FUNCTION create_content(data json) RETURNS TEXT
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
pin json;
pin_result_id BIGINT;
inserted_cid TEXT;
BEGIN
-- Set timeout as imposed by heroku
SET LOCAL statement_timeout = '30s';
-- Add to content table if new
insert into content (cid, dag_size, updated_at, inserted_at)
values (data ->> 'content_cid',
(data ->> 'dag_size')::BIGINT,
(data ->> 'updated_at')::timestamptz,
(data ->> 'inserted_at')::timestamptz)
ON CONFLICT ( cid ) DO NOTHING
returning cid into inserted_cid;
-- Iterate over received pins
foreach pin in array json_arr_to_json_element_array(data -> 'pins')
loop
INSERT INTO pin_location (peer_id, peer_name, ipfs_peer_id, region)
SELECT * FROM (
SELECT pin -> 'location' ->> 'peer_id' AS peer_id,
pin -> 'location' ->> 'peer_name' AS peer_name,
pin -> 'location' ->> 'ipfs_peer_id' AS ipfs_peer_id,
pin -> 'location' ->> 'region' AS region
) AS tmp
WHERE NOT EXISTS (
SELECT 42 FROM pin_location WHERE peer_id = pin -> 'location' ->> 'peer_id'
);
INSERT INTO pin (content_cid, status, pin_location_id, updated_at, inserted_at)
SELECT data ->> 'content_cid' AS content_cid,
(pin ->> 'status')::pin_status_type AS status,
id AS pin_location_id,
(data ->> 'updated_at')::timestamptz AS updated_at,
(data ->> 'inserted_at')::timestamptz AS inserted_at
FROM pin_location
WHERE peer_id = pin -> 'location' ->> 'peer_id'
-- Force update on conflict to get result, otherwise needs a follow up select
ON CONFLICT ( content_cid, pin_location_id ) DO UPDATE
SET "updated_at" = (data ->> 'updated_at')::timestamptz
returning id into pin_result_id;
end loop;
return (inserted_cid);
END
$$;
-- Creates an upload with relative content, pins and backups.
CREATE OR REPLACE FUNCTION create_upload(data json) RETURNS TEXT
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
inserted_upload_id BIGINT;
BEGIN
-- Set timeout as imposed by heroku
SET LOCAL statement_timeout = '30s';
PERFORM create_content(data);
insert into upload as upld (user_id,
auth_key_id,
content_cid,
source_cid,
type,
name,
inserted_at,
updated_at,
backup_urls)
values ((data ->> 'user_id')::BIGINT,
(data ->> 'auth_key_id')::BIGINT,
data ->> 'content_cid',
data ->> 'source_cid',
(data ->> 'type')::upload_type,
data ->> 'name',
(data ->> 'inserted_at')::timestamptz,
(data ->> 'updated_at')::timestamptz,
json_arr_to_text_arr(data -> 'backup_urls'))
ON CONFLICT ( user_id, source_cid ) DO UPDATE
SET "updated_at" = (data ->> 'updated_at')::timestamptz,
"name" = data ->> 'name',
"deleted_at" = null,
"backup_urls" = upld.backup_urls || json_arr_to_text_arr(data -> 'backup_urls')
returning id into inserted_upload_id;
return (inserted_upload_id)::TEXT;
END
$$;
-- Creates a pin request with relative content, pins, pin_requests and backups.
CREATE OR REPLACE FUNCTION create_psa_pin_request(data json) RETURNS TEXT
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
-- TODO - Validate UUID type is available
inserted_pin_request_id TEXT;
BEGIN
-- Set timeout as imposed by heroku
SET LOCAL statement_timeout = '30s';
PERFORM create_content(data);
insert into psa_pin_request (
auth_key_id,
content_cid,
source_cid,
name,
origins,
meta,
inserted_at,
updated_at
)
values (
(data ->> 'auth_key_id')::BIGINT,
data ->> 'content_cid',
data ->> 'source_cid',
data ->> 'name',
(data ->> 'origins')::jsonb,
(data ->> 'meta')::jsonb,
(data ->> 'inserted_at')::timestamptz,
(data ->> 'updated_at')::timestamptz
)
returning id into inserted_pin_request_id;
-- TODO - Validate and use UUID type
return (inserted_pin_request_id)::TEXT;
END
$$;
CREATE OR REPLACE FUNCTION upsert_pin(data json) RETURNS TEXT
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
pin_result_id BIGINT;
BEGIN
-- DATA => content_cid, pin(status, location(peer_id, peer_name, region))
-- Add to pin_location table if new
INSERT INTO pin_location (peer_id, peer_name, ipfs_peer_id, region)
SELECT * FROM (
SELECT data -> 'pin' -> 'location' ->> 'peer_id' AS peer_id,
data -> 'pin' -> 'location' ->> 'peer_name' AS peer_name,
data -> 'pin' -> 'location' ->> 'ipfs_peer_id' AS ipfs_peer_id,
data -> 'pin' -> 'location' ->> 'region' AS region
) AS tmp
WHERE NOT EXISTS (
SELECT 42 FROM pin_location WHERE peer_id = data -> 'pin' -> 'location' ->> 'peer_id'
);
-- Add to pin table if new
insert into pin (content_cid, status, pin_location_id, updated_at)
SELECT data ->> 'content_cid' AS content_cid,
(data -> 'pin' ->> 'status')::pin_status_type AS status,
id AS pin_location_id,
(NOW())::timestamptz AS updated_at
FROM pin_location
WHERE peer_id = data -> 'pin' -> 'location' ->> 'peer_id'
ON CONFLICT ( content_cid, pin_location_id ) DO UPDATE
SET "status" = (data -> 'pin' ->> 'status')::pin_status_type,
"updated_at" = NOW()
returning id into pin_result_id;
return (pin_result_id)::TEXT;
END
$$;
CREATE TYPE stored_bytes AS (uploaded TEXT, psa_pinned TEXT, total TEXT);
CREATE OR REPLACE FUNCTION user_used_storage(query_user_id BIGINT)
RETURNS stored_bytes
LANGUAGE plpgsql
AS
$$
DECLARE
used_storage stored_bytes;
uploaded BIGINT;
psa_pinned BIGINT;
total BIGINT;
BEGIN
uploaded :=
(
SELECT COALESCE((
SELECT SUM(dag_size)
FROM (
SELECT c.cid,
c.dag_size
FROM upload u
JOIN content c ON c.cid = u.content_cid
JOIN pin p ON p.content_cid = u.content_cid
WHERE u.user_id = query_user_id::BIGINT
AND u.deleted_at is null
AND p.status = 'Pinned'
GROUP BY c.cid,
c.dag_size
) AS uploaded_content), 0)
);
psa_pinned :=
(
SELECT COALESCE((
SELECT SUM(dag_size)
FROM (
SELECT psa_pr.content_cid,
c.dag_size
FROM psa_pin_request psa_pr
JOIN content c ON c.cid = psa_pr.content_cid
JOIN pin p ON p.content_cid = psa_pr.content_cid
JOIN auth_key a ON a.id = psa_pr.auth_key_id
WHERE a.user_id = query_user_id::BIGINT
AND psa_pr.deleted_at is null
AND p.status = 'Pinned'
GROUP BY psa_pr.content_cid,
c.dag_size
) AS pinned_content), 0)
);
total := uploaded + psa_pinned;
SELECT uploaded::TEXT,
psa_pinned::TEXT,
total::TEXT
INTO used_storage;
return used_storage;
END
$$;
CREATE OR REPLACE FUNCTION users_by_storage_used(
from_percent INTEGER,
to_percent INTEGER DEFAULT NULL,
user_id_gt BIGINT DEFAULT 0,
user_id_lte BIGINT DEFAULT NULL
)
RETURNS TABLE
(
id TEXT,
name TEXT,
email TEXT,
storage_quota TEXT,
storage_used TEXT
)
LANGUAGE plpgsql
AS
$$
DECLARE
-- Default storage quota 1TiB = 1099511627776 bytes
default_quota BIGINT := 1099511627776;
BEGIN
RETURN QUERY
WITH user_account AS (
SELECT u.id::TEXT AS id,
u.name AS name,
u.email AS email,
COALESCE(ut.value::BIGINT, default_quota)::TEXT AS storage_quota,
(user_used_storage(u.id)).total::TEXT AS storage_used
FROM public.user u
LEFT JOIN user_tag ut ON u.id = ut.user_id
WHERE (ut.tag IS NULL OR ut.tag = 'StorageLimitBytes')
AND ut.deleted_at IS NULL
AND NOT EXISTS (
SELECT 42
FROM user_tag r
WHERE u.id = r.user_id
AND r.tag = 'HasAccountRestriction'
AND r.value ILIKE 'true'
AND r.deleted_at IS NULL
)
AND u.id > user_id_gt
AND u.id <= user_id_lte
)
SELECT *
FROM user_account
WHERE user_account.storage_used::BIGINT >= (from_percent/100::NUMERIC) * user_account.storage_quota::BIGINT
AND (to_percent IS NULL OR user_account.storage_used::BIGINT < (to_percent/100::NUMERIC) * user_account.storage_quota::BIGINT)
ORDER BY user_account.storage_used::BIGINT DESC;
END
$$;
CREATE OR REPLACE FUNCTION upsert_pins(data json) RETURNS TEXT[]
LANGUAGE plpgsql
volatile
PARALLEL UNSAFE
AS
$$
DECLARE
pin json;
pin_ids TEXT[];
BEGIN
FOREACH pin IN array json_arr_to_json_element_array(data -> 'pins')
LOOP
SELECT pin_ids || upsert_pin(pin -> 'data') INTO pin_ids;
END LOOP;
RETURN pin_ids;
END
$$;
CREATE OR REPLACE FUNCTION user_auth_keys_list(query_user_id BIGINT, include_deleted BOOLEAN default false)
RETURNS TABLE
(
"id" text,
"name" text,
"secret" text,
"created" timestamptz,
"has_uploads" boolean
)
LANGUAGE sql
AS
$$
SELECT (ak.id)::TEXT AS id,
ak.name AS name,
ak.secret AS secret,
ak.inserted_at AS created,
EXISTS(SELECT 42 FROM upload u WHERE u.auth_key_id = ak.id) AS has_uploads
FROM auth_key ak
WHERE ak.user_id = query_user_id AND
(include_deleted OR ak.deleted_at IS NULL)
$$;
CREATE OR REPLACE FUNCTION find_deals_by_content_cids(cids text[])
RETURNS TABLE
(
status text,
"created" timestamptz,
"updated" timestamptz,
"dealId" bigint,
"dataModelSelector" text,
"statusText" text,
"dealActivation" timestamptz,
"dealExpiration" timestamptz,
"storageProvider" text,
"pieceCid" text,
"batchRootCid" text,
"dataCid" text
)
LANGUAGE sql
STABLE
PARALLEL SAFE
AS
$$
SELECT COALESCE(de.status, 'queued') as status,
de.entry_created as created,
de.entry_last_updated as updated,
de.deal_id as dealId,
ae.datamodel_selector as dataModelSelector,
de.status_meta as statusText,
de.start_time as dealActivation,
de.end_time as dealExpiration,
de.provider as storageProvider,
a.piece_cid as pieceCid,
ae.aggregate_cid as batchRootCid,
ae.cid_v1 as dataCid
FROM cargo.aggregate_entries ae
join cargo.aggregates a using (aggregate_cid)
LEFT JOIN cargo.deals de USING (aggregate_cid)
WHERE ae.cid_v1 = ANY (cids)
ORDER BY de.entry_last_updated
$$;
-- a custom UPSERT operation for user account, so that we can distinguish between
-- newly inserted users and updated ones.
CREATE OR REPLACE FUNCTION upsert_user(_name TEXT, _picture TEXT, _email TEXT, _issuer TEXT, _github TEXT, _public_address TEXT)
RETURNS TABLE (
"id" TEXT,
"issuer" TEXT,
"inserted" BOOLEAN
)
LANGUAGE plpgsql
AS
$$
#variable_conflict use_column
DECLARE
inserted BOOLEAN;
BEGIN
SELECT (COUNT(id) = 0) into inserted FROM public.user WHERE issuer = _issuer;
RETURN QUERY
INSERT INTO public.user AS u (name, picture, email, issuer, github, public_address)
VALUES (_name, _picture, _email, _issuer, _github, _public_address)
ON CONFLICT (issuer) DO UPDATE
SET
name = EXCLUDED.name,
picture = EXCLUDED.picture,
email = EXCLUDED.email,
github = EXCLUDED.github,
public_address = EXCLUDED.public_address
RETURNING u.id::TEXT, u.issuer, inserted;
END
$$;