Skip to content

Commit

Permalink
fix: enable filter listeners to receive delete events
Browse files Browse the repository at this point in the history
  • Loading branch information
w3b6x9 committed Sep 19, 2022
1 parent 49686fe commit c3156d5
Showing 1 changed file with 293 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
defmodule Realtime.RLS.Repo.Migrations.UpdateApplyRlsFunctionToPassThroughDeleteEventsOnFilter do
use Ecto.Migration

def change do
execute("
create or replace function realtime.apply_rls(wal jsonb, max_record_bytes int = 1024 * 1024)
returns setof realtime.wal_rls
language plpgsql
volatile
as $$
declare
-- Regclass of the table e.g. public.notes
entity_ regclass = (quote_ident(wal ->> 'schema') || '.' || quote_ident(wal ->> 'table'))::regclass;
-- I, U, D, T: insert, update ...
action realtime.action = (
case wal ->> 'action'
when 'I' then 'INSERT'
when 'U' then 'UPDATE'
when 'D' then 'DELETE'
else 'ERROR'
end
);
-- Is row level security enabled for the table
is_rls_enabled bool = relrowsecurity from pg_class where oid = entity_;
subscriptions realtime.subscription[] = array_agg(subs)
from
realtime.subscription subs
where
subs.entity = entity_;
-- Subscription vars
roles regrole[] = array_agg(distinct us.claims_role)
from
unnest(subscriptions) us;
working_role regrole;
claimed_role regrole;
claims jsonb;
subscription_id uuid;
subscription_has_access bool;
visible_to_subscription_ids uuid[] = '{}';
-- structured info for wal's columns
columns realtime.wal_column[];
-- previous identity values for update/delete
old_columns realtime.wal_column[];
error_record_exceeds_max_size boolean = octet_length(wal::text) > max_record_bytes;
-- Primary jsonb output for record
output jsonb;
begin
perform set_config('role', null, true);
columns =
array_agg(
(
x->>'name',
x->>'type',
x->>'typeoid',
realtime.cast(
(x->'value') #>> '{}',
coalesce(
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4
(x->>'type')::regtype
)
),
(pks ->> 'name') is not null,
true
)::realtime.wal_column
)
from
jsonb_array_elements(wal -> 'columns') x
left join jsonb_array_elements(wal -> 'pk') pks
on (x ->> 'name') = (pks ->> 'name');
old_columns =
array_agg(
(
x->>'name',
x->>'type',
x->>'typeoid',
realtime.cast(
(x->'value') #>> '{}',
coalesce(
(x->>'typeoid')::regtype, -- null when wal2json version <= 2.4
(x->>'type')::regtype
)
),
(pks ->> 'name') is not null,
true
)::realtime.wal_column
)
from
jsonb_array_elements(wal -> 'identity') x
left join jsonb_array_elements(wal -> 'pk') pks
on (x ->> 'name') = (pks ->> 'name');
for working_role in select * from unnest(roles) loop
-- Update `is_selectable` for columns and old_columns
columns =
array_agg(
(
c.name,
c.type_name,
c.type_oid,
c.value,
c.is_pkey,
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')
)::realtime.wal_column
)
from
unnest(columns) c;
old_columns =
array_agg(
(
c.name,
c.type_name,
c.type_oid,
c.value,
c.is_pkey,
pg_catalog.has_column_privilege(working_role, entity_, c.name, 'SELECT')
)::realtime.wal_column
)
from
unnest(old_columns) c;
if action <> 'DELETE' and count(1) = 0 from unnest(columns) c where c.is_pkey then
return next (
jsonb_build_object(
'schema', wal ->> 'schema',
'table', wal ->> 'table',
'type', action
),
is_rls_enabled,
-- subscriptions is already filtered by entity
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),
array['Error 400: Bad Request, no primary key']
)::realtime.wal_rls;
-- The claims role does not have SELECT permission to the primary key of entity
elsif action <> 'DELETE' and sum(c.is_selectable::int) <> count(1) from unnest(columns) c where c.is_pkey then
return next (
jsonb_build_object(
'schema', wal ->> 'schema',
'table', wal ->> 'table',
'type', action
),
is_rls_enabled,
(select array_agg(s.subscription_id) from unnest(subscriptions) as s where claims_role = working_role),
array['Error 401: Unauthorized']
)::realtime.wal_rls;
else
output = jsonb_build_object(
'schema', wal ->> 'schema',
'table', wal ->> 'table',
'type', action,
'commit_timestamp', to_char(
(wal ->> 'timestamp')::timestamptz,
'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"'
),
'columns', (
select
jsonb_agg(
jsonb_build_object(
'name', pa.attname,
'type', pt.typname
)
order by pa.attnum asc
)
from
pg_attribute pa
join pg_type pt
on pa.atttypid = pt.oid
where
attrelid = entity_
and attnum > 0
and pg_catalog.has_column_privilege(working_role, entity_, pa.attname, 'SELECT')
)
)
-- Add \"record\" key for insert and update
|| case
when action in ('INSERT', 'UPDATE') then
case
when error_record_exceeds_max_size then
jsonb_build_object(
'record',
(
select jsonb_object_agg((c).name, (c).value)
from unnest(columns) c
where (c).is_selectable and (octet_length((c).value::text) <= 64)
)
)
else
jsonb_build_object(
'record',
(select jsonb_object_agg((c).name, (c).value) from unnest(columns) c where (c).is_selectable)
)
end
else '{}'::jsonb
end
-- Add \"old_record\" key for update and delete
|| case
when action in ('UPDATE', 'DELETE') then
case
when error_record_exceeds_max_size then
jsonb_build_object(
'old_record',
(
select jsonb_object_agg((c).name, (c).value)
from unnest(old_columns) c
where (c).is_selectable and (octet_length((c).value::text) <= 64)
)
)
else
jsonb_build_object(
'old_record',
(select jsonb_object_agg((c).name, (c).value) from unnest(old_columns) c where (c).is_selectable)
)
end
else '{}'::jsonb
end;
-- Create the prepared statement
if is_rls_enabled and action <> 'DELETE' then
if (select 1 from pg_prepared_statements where name = 'walrus_rls_stmt' limit 1) > 0 then
deallocate walrus_rls_stmt;
end if;
execute realtime.build_prepared_statement_sql('walrus_rls_stmt', entity_, columns);
end if;
visible_to_subscription_ids = '{}';
for subscription_id, claims in (
select
subs.subscription_id,
subs.claims
from
unnest(subscriptions) subs
where
subs.entity = entity_
and subs.claims_role = working_role
and (
realtime.is_visible_through_filters(columns, subs.filters)
or action = 'DELETE'
)
) loop
if not is_rls_enabled or action = 'DELETE' then
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id;
else
-- Check if RLS allows the role to see the record
perform
set_config('role', working_role::text, true),
set_config('request.jwt.claims', claims::text, true);
execute 'execute walrus_rls_stmt' into subscription_has_access;
if subscription_has_access then
visible_to_subscription_ids = visible_to_subscription_ids || subscription_id;
end if;
end if;
end loop;
perform set_config('role', null, true);
return next (
output,
is_rls_enabled,
visible_to_subscription_ids,
case
when error_record_exceeds_max_size then array['Error 413: Payload Too Large']
else '{}'
end
)::realtime.wal_rls;
end if;
end loop;
perform set_config('role', null, true);
end;
$$;
")
end
end

0 comments on commit c3156d5

Please sign in to comment.