Skip to content

Commit acca57f

Browse files
committed
feat: search objects v2
1 parent ddc5163 commit acca57f

35 files changed

+1166
-244
lines changed

.docker/docker-compose-infra.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ services:
55

66
tenant_db:
77
image: postgres:15
8+
shm_size: '1gb'
89
ports:
910
- '5432:5432'
1011
healthcheck:
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
ALTER TABLE storage.objects ADD COLUMN user_metadata jsonb NULL;
2-
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN user_metadata jsonb NULL;
1+
ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL;
2+
ALTER TABLE storage.s3_multipart_uploads ADD COLUMN IF NOT EXISTS user_metadata jsonb NULL;
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
-- Add level column to objects
2+
ALTER TABLE storage.objects ADD COLUMN IF NOT EXISTS level INT NULL;
3+
4+
--- Index Functions
5+
CREATE OR REPLACE FUNCTION "storage"."get_level"("name" text)
6+
RETURNS int
7+
AS $func$
8+
SELECT array_length(string_to_array("name", '/'), 1);
9+
$func$ LANGUAGE SQL IMMUTABLE STRICT;
10+
11+
12+
-- Function to check if object with prefix exists
13+
CREATE OR REPLACE FUNCTION storage.object_exists_with_prefix(
14+
p_bucket_id TEXT,
15+
p_name TEXT
16+
)
17+
RETURNS BOOLEAN
18+
LANGUAGE plpgsql
19+
STABLE
20+
SECURITY INVOKER
21+
AS $$
22+
BEGIN
23+
RETURN EXISTS (
24+
SELECT 1
25+
FROM storage.objects o
26+
WHERE o.bucket_id = p_bucket_id
27+
AND o.name LIKE p_name || '%'
28+
);
29+
END;
30+
$$;
31+
32+
-- Table
33+
CREATE TABLE IF NOT EXISTS "storage"."prefixes" (
34+
"bucket_id" text,
35+
"name" text COLLATE "C" NOT NULL,
36+
"level" int GENERATED ALWAYS AS ("storage"."get_level"("name")) STORED,
37+
"created_at" timestamptz DEFAULT now(),
38+
"updated_at" timestamptz DEFAULT now(),
39+
CONSTRAINT "prefixes_bucketId_fkey" FOREIGN KEY ("bucket_id") REFERENCES "storage"."buckets"("id"),
40+
PRIMARY KEY ("bucket_id", "level", "name")
41+
);
42+
43+
ALTER TABLE storage.prefixes ENABLE ROW LEVEL SECURITY;
44+
45+
DROP POLICY IF EXISTS "prefixes_allow_select_for_owned_objects" ON "storage"."prefixes";
46+
CREATE POLICY "prefixes_allow_select_for_owned_objects" ON "storage"."prefixes"
47+
FOR SELECT
48+
USING (
49+
(storage.object_exists_with_prefix("bucket_id", "name"))
50+
);
51+
52+
-- Functions
53+
CREATE OR REPLACE FUNCTION "storage"."get_prefix"("name" text)
54+
RETURNS text
55+
AS $func$
56+
SELECT
57+
CASE WHEN strpos("name", '/') > 0 THEN
58+
regexp_replace("name", '[\/]{1}[^\/]+\/?$', '')
59+
ELSE
60+
''
61+
END;
62+
$func$ LANGUAGE SQL IMMUTABLE STRICT;
63+
64+
CREATE OR REPLACE FUNCTION "storage"."get_prefixes"("name" text)
65+
RETURNS text[]
66+
AS $func$
67+
DECLARE
68+
parts text[];
69+
prefixes text[];
70+
prefix text;
71+
BEGIN
72+
-- Split the name into parts by '/'
73+
parts := string_to_array("name", '/');
74+
prefixes := '{}';
75+
76+
-- Construct the prefixes, stopping one level below the last part
77+
FOR i IN 1..array_length(parts, 1) - 1 LOOP
78+
prefix := array_to_string(parts[1:i], '/');
79+
prefixes := array_append(prefixes, prefix);
80+
END LOOP;
81+
82+
RETURN prefixes;
83+
END;
84+
$func$ LANGUAGE plpgsql IMMUTABLE STRICT;
85+
86+
CREATE OR REPLACE FUNCTION "storage"."add_prefixes"(
87+
"_bucket_id" TEXT,
88+
"_name" TEXT
89+
)
90+
RETURNS void
91+
SECURITY DEFINER
92+
AS $func$
93+
DECLARE
94+
prefixes text[];
95+
BEGIN
96+
prefixes := "storage"."get_prefixes"("_name");
97+
98+
IF array_length(prefixes, 1) > 0 THEN
99+
INSERT INTO storage.prefixes (name, bucket_id)
100+
SELECT UNNEST(prefixes) as name, "_bucket_id" ON CONFLICT DO NOTHING;
101+
END IF;
102+
END;
103+
$func$ LANGUAGE plpgsql VOLATILE;
104+
105+
CREATE OR REPLACE FUNCTION "storage"."delete_prefix" (
106+
"_bucket_id" TEXT,
107+
"_name" TEXT
108+
) RETURNS boolean
109+
SECURITY DEFINER
110+
AS $func$
111+
BEGIN
112+
-- Check if we can delete the prefix
113+
IF EXISTS(
114+
SELECT FROM "storage"."prefixes"
115+
WHERE "prefixes"."bucket_id" = "_bucket_id"
116+
AND level = "storage"."get_level"("_name") + 1
117+
AND "prefixes"."name" COLLATE "C" LIKE "_name" || '/%'
118+
LIMIT 1
119+
)
120+
OR EXISTS(
121+
SELECT FROM "storage"."objects"
122+
WHERE "objects"."bucket_id" = "_bucket_id"
123+
AND "storage"."get_level"("objects"."name") = "storage"."get_level"("_name") + 1
124+
AND "objects"."name" COLLATE "C" LIKE "_name" || '/%'
125+
LIMIT 1
126+
) THEN
127+
-- There are sub-objects, skip deletion
128+
RETURN false;
129+
ELSE
130+
DELETE FROM "storage"."prefixes"
131+
WHERE "prefixes"."bucket_id" = "_bucket_id"
132+
AND level = "storage"."get_level"("_name")
133+
AND "prefixes"."name" = "_name";
134+
RETURN true;
135+
END IF;
136+
END;
137+
$func$ LANGUAGE plpgsql VOLATILE;
138+
139+
-- Triggers
140+
CREATE OR REPLACE FUNCTION "storage"."prefixes_insert_trigger"()
141+
RETURNS trigger
142+
AS $func$
143+
BEGIN
144+
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
145+
RETURN NEW;
146+
END;
147+
$func$ LANGUAGE plpgsql VOLATILE;
148+
149+
CREATE OR REPLACE FUNCTION "storage"."objects_insert_prefix_trigger"()
150+
RETURNS trigger
151+
AS $func$
152+
BEGIN
153+
PERFORM "storage"."add_prefixes"(NEW."bucket_id", NEW."name");
154+
NEW.level := "storage"."get_level"(NEW."name");
155+
156+
RETURN NEW;
157+
END;
158+
$func$ LANGUAGE plpgsql VOLATILE;
159+
160+
CREATE OR REPLACE FUNCTION "storage"."delete_prefix_hierarchy_trigger"()
161+
RETURNS trigger
162+
AS $func$
163+
DECLARE
164+
prefix text;
165+
BEGIN
166+
prefix := "storage"."get_prefix"(OLD."name");
167+
168+
IF coalesce(prefix, '') != '' THEN
169+
PERFORM "storage"."delete_prefix"(OLD."bucket_id", prefix);
170+
END IF;
171+
172+
RETURN OLD;
173+
END;
174+
$func$ LANGUAGE plpgsql VOLATILE;
175+
176+
-- "storage"."prefixes"
177+
CREATE OR REPLACE TRIGGER "prefixes_delete_hierarchy"
178+
AFTER DELETE ON "storage"."prefixes"
179+
FOR EACH ROW
180+
EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"();
181+
182+
-- "storage"."objects"
183+
CREATE OR REPLACE TRIGGER "objects_insert_create_prefix"
184+
BEFORE INSERT ON "storage"."objects"
185+
FOR EACH ROW
186+
EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"();
187+
188+
CREATE OR REPLACE TRIGGER "objects_update_create_prefix"
189+
BEFORE UPDATE ON "storage"."objects"
190+
FOR EACH ROW
191+
WHEN (NEW.name != OLD.name)
192+
EXECUTE FUNCTION "storage"."objects_insert_prefix_trigger"();
193+
194+
CREATE OR REPLACE TRIGGER "objects_delete_delete_prefix"
195+
AFTER DELETE ON "storage"."objects"
196+
FOR EACH ROW
197+
EXECUTE FUNCTION "storage"."delete_prefix_hierarchy_trigger"();
198+
199+
-- Permissions
200+
DO $$
201+
DECLARE
202+
anon_role text = COALESCE(current_setting('storage.anon_role', true), 'anon');
203+
authenticated_role text = COALESCE(current_setting('storage.authenticated_role', true), 'authenticated');
204+
service_role text = COALESCE(current_setting('storage.service_role', true), 'service_role');
205+
BEGIN
206+
EXECUTE 'GRANT ALL ON TABLE storage.prefixes TO ' || service_role || ',' || authenticated_role || ', ' || anon_role;
207+
END$$;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
2+
CREATE OR REPLACE FUNCTION storage.search_v2 (
3+
prefix text,
4+
bucket_name text,
5+
limits int DEFAULT 100,
6+
levels int default 1,
7+
start_after text DEFAULT ''
8+
) RETURNS TABLE (
9+
key text,
10+
name text,
11+
id uuid,
12+
updated_at timestamptz,
13+
created_at timestamptz,
14+
metadata jsonb
15+
)
16+
SECURITY INVOKER
17+
AS $func$
18+
BEGIN
19+
RETURN query EXECUTE
20+
$sql$
21+
SELECT * FROM (
22+
(
23+
SELECT
24+
split_part(name, '/', $4) AS key,
25+
name || '/' AS name,
26+
NULL::uuid AS id,
27+
NULL::timestamptz AS updated_at,
28+
NULL::timestamptz AS created_at,
29+
NULL::jsonb AS metadata
30+
FROM storage.prefixes
31+
WHERE name COLLATE "C" LIKE $1 || '%'
32+
AND bucket_id = $2
33+
AND level = $4
34+
AND name COLLATE "C" > $5
35+
ORDER BY name COLLATE "C" LIMIT $3
36+
)
37+
UNION ALL
38+
(SELECT split_part(name, '/', $4) AS key,
39+
name,
40+
id,
41+
updated_at,
42+
created_at,
43+
metadata
44+
FROM storage.objects
45+
WHERE name COLLATE "C" LIKE $1 || '%'
46+
AND bucket_id = $2
47+
AND level = $4
48+
AND name COLLATE "C" > $5
49+
ORDER BY name COLLATE "C" LIMIT $3)
50+
) obj
51+
ORDER BY name COLLATE "C" LIMIT $3;
52+
$sql$
53+
USING prefix, bucket_name, limits, levels, start_after;
54+
END;
55+
$func$ LANGUAGE plpgsql STABLE;
56+
57+
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- postgres-migrations disable-transaction
2+
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS idx_name_bucket_unique on storage.objects (name COLLATE "C", bucket_id);
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
-- postgres-migrations disable-transaction
2+
-- Backfill prefixes table records
3+
-- We run this with 50k batch size to avoid long running transaction
4+
DO $$
5+
DECLARE
6+
batch_size INTEGER := 50000;
7+
total_scanned INTEGER := 0;
8+
row_returned INTEGER := 0;
9+
last_name TEXT COLLATE "C" := NULL;
10+
last_bucket_id TEXT COLLATE "C" := NULL;
11+
BEGIN
12+
LOOP
13+
-- Fetch a batch of objects ordered by name COLLATE "C"
14+
WITH batch as (
15+
SELECT id, bucket_id, name, owner
16+
FROM storage.objects
17+
WHERE (last_name IS NULL OR ((name COLLATE "C", bucket_id) > (last_name, last_bucket_id)))
18+
ORDER BY name COLLATE "C", bucket_id
19+
LIMIT batch_size
20+
),
21+
batch_count as (
22+
SELECT COUNT(*) as count FROM batch
23+
),
24+
cursor as (
25+
SELECT name as last_name, bucket_id as last_bucket FROM batch b
26+
ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1
27+
),
28+
all_prefixes as (
29+
SELECT UNNEST(storage.get_prefixes(name)) as prefix, bucket_id
30+
FROM batch
31+
),
32+
insert_prefixes as (
33+
INSERT INTO storage.prefixes (bucket_id, name)
34+
SELECT bucket_id, prefix FROM all_prefixes
35+
WHERE coalesce(prefix, '') != ''
36+
ON CONFLICT DO NOTHING
37+
)
38+
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;
39+
40+
RAISE NOTICE 'Object Row returned: %', row_returned;
41+
RAISE NOTICE 'Last Object: %', last_name;
42+
43+
total_scanned := total_scanned + row_returned;
44+
45+
IF row_returned IS NULL OR row_returned < batch_size THEN
46+
RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0);
47+
COMMIT;
48+
EXIT;
49+
ELSE
50+
COMMIT;
51+
END IF;
52+
END LOOP;
53+
END;
54+
$$;
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
-- postgres-migrations disable-transaction
2+
-- Backfill prefixes table records
3+
-- We run this with 10k batch size to avoid long running transaction
4+
DO $$
5+
DECLARE
6+
batch_size INTEGER := 10000;
7+
total_scanned INTEGER := 0;
8+
row_returned INTEGER := 0;
9+
last_name TEXT COLLATE "C" := NULL;
10+
last_bucket_id TEXT COLLATE "C" := NULL;
11+
BEGIN
12+
LOOP
13+
-- Fetch a batch of objects ordered by name COLLATE "C"
14+
WITH batch as (
15+
SELECT id, bucket_id, name, storage.get_level(name) as level
16+
FROM storage.objects
17+
WHERE level IS NULL AND (last_name IS NULL OR (name COLLATE "C", bucket_id) > (last_name, last_bucket_id))
18+
ORDER BY name COLLATE "C", bucket_id
19+
LIMIT batch_size
20+
),
21+
batch_count as (
22+
SELECT COUNT(*) as count FROM batch
23+
),
24+
cursor as (
25+
SELECT name as last_name, bucket_id as last_bucket FROM batch b
26+
ORDER BY name COLLATE "C" DESC, bucket_id DESC LIMIT 1
27+
),
28+
update_level as (
29+
UPDATE storage.objects o
30+
SET level = b.level
31+
FROM batch b
32+
WHERE o.id = b.id
33+
)
34+
SELECT count, cursor.last_name, cursor.last_bucket FROM cursor, batch_count INTO row_returned, last_name, last_bucket_id;
35+
36+
RAISE NOTICE 'Object Row returned: %', row_returned;
37+
RAISE NOTICE 'Last Object: %', last_name;
38+
39+
total_scanned := total_scanned + row_returned;
40+
41+
IF row_returned IS NULL OR row_returned < batch_size THEN
42+
RAISE NOTICE 'Total Object scanned: %', coalesce(total_scanned, 0);
43+
COMMIT;
44+
EXIT;
45+
ELSE
46+
COMMIT;
47+
END IF;
48+
END LOOP;
49+
END;
50+
$$;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- postgres-migrations disable-transaction
2+
CREATE UNIQUE INDEX CONCURRENTLY IF NOT EXISTS "objects_bucket_id_level_idx"
3+
ON "storage"."objects" ("bucket_id", level, "name" COLLATE "C");
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- postgres-migrations disable-transaction
2+
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_objects_lower_name ON storage.objects ((path_tokens[level]), lower(name) text_pattern_ops, bucket_id, level);

0 commit comments

Comments
 (0)