Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore: update realtime functions to use fully qualified names #862

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
defmodule Realtime.Tenants.Migrations.UseFullyQualifiedNames do
@moduledoc false

use Ecto.Migration

def change do
execute("
-- build_prepared_statement_sql function

CREATE OR REPLACE FUNCTION realtime.build_prepared_statement_sql(prepared_statement_name text, entity regclass, columns realtime.wal_column[])
RETURNS text
LANGUAGE sql
SET search_path = ''
AS $function$
/*
Builds a sql string that, if executed, creates a prepared statement to
tests retrive a row from *entity* by its primary key columns.
Example
select realtime.build_prepared_statement_sql('public.notes', '{\"id\"}'::text[], '{\"bigint\"}'::text[])
*/
select
'prepare ' || prepared_statement_name || ' as
select
exists(
select
1
from
' || entity || '
where
' || pg_catalog.string_agg(pg_catalog.quote_ident(pkc.name) || '=' || pg_catalog.quote_nullable(pkc.value #>> '{}') , ' and ') || '
)'
from
pg_catalog.unnest(columns) pkc
where
pkc.is_pkey
group by
entity;
$function$;

-- cast function

CREATE OR REPLACE FUNCTION realtime.\"cast\"(val text, type_ regtype)
RETURNS jsonb
LANGUAGE plpgsql
SET search_path = ''
IMMUTABLE
AS $function$
declare
res jsonb;
begin
execute pg_catalog.format('select to_jsonb(%L::'|| type_::text || ')', val) into res;
return res;
end
$function$;

-- channel_name function

CREATE OR REPLACE FUNCTION realtime.channel_name()
RETURNS text
SET search_path = ''
LANGUAGE sql
STABLE
AS $function$
select nullif(pg_catalog.current_setting('realtime.channel_name', true), '')::text;
$function$;

-- check_equality_op function

CREATE OR REPLACE FUNCTION realtime.check_equality_op(op realtime.equality_op, type_ regtype, val_1 text, val_2 text)
RETURNS boolean
LANGUAGE plpgsql
SET search_path = ''
IMMUTABLE
AS $function$
/*
Casts *val_1* and *val_2* as type *type_* and check the *op* condition for truthiness
*/
declare
op_symbol text = (
case
when op = 'eq' then '='
when op = 'neq' then '!='
when op = 'lt' then '<'
when op = 'lte' then '<='
when op = 'gt' then '>'
when op = 'gte' then '>='
when op = 'in' then '= any'
else 'UNKNOWN OP'
end
);
res boolean;
begin
execute pg_catalog.format(
'select %L::'|| type_::text || ' ' || op_symbol
|| ' ( %L::'
|| (
case
when op = 'in' then type_::text || '[]'
else type_::text end
)
|| ')', val_1, val_2) into res;
return res;
end;
$function$;

-- is_visible_through_filters function

CREATE OR REPLACE FUNCTION realtime.is_visible_through_filters(columns realtime.wal_column[], filters realtime.user_defined_filter[])
RETURNS boolean
LANGUAGE sql
SET search_path = ''
IMMUTABLE
AS $function$
/*
Should the record be visible (true) or filtered out (false) after *filters* are applied
*/
select
-- Default to allowed when no filters present
$2 is null -- no filters. this should not happen because subscriptions has a default
or pg_catalog.array_length($2, 1) is null -- array length of an empty array is null
or bool_and(
coalesce(
realtime.check_equality_op(
op:=f.op,
type_:=coalesce(
col.type_oid::regtype, -- null when wal2json version <= 2.4
col.type_name::regtype
),
-- cast jsonb to text
val_1:=col.value #>> '{}',
val_2:=f.value
),
false -- if null, filter does not match
)
)
from
pg_catalog.unnest(filters) f
join pg_catalog.unnest(columns) col
on f.column_name = col.name;
$function$;

-- quote_wal2json function
CREATE OR REPLACE FUNCTION realtime.list_changes(publication name, slot_name name, max_changes integer, max_record_bytes integer)
RETURNS SETOF realtime.wal_rls
LANGUAGE sql
SET search_path = ''
SET log_min_messages TO 'fatal'
AS $function$
with pub as (
select
concat_ws(
',',
case when bool_or(pubinsert) then 'insert' else null end,
case when bool_or(pubupdate) then 'update' else null end,
case when bool_or(pubdelete) then 'delete' else null end
) as w2j_actions,
coalesce(
string_agg(
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
','
) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),
''
) w2j_add_tables
from
pg_publication pp
left join pg_publication_tables ppt
on pp.pubname = ppt.pubname
where
pp.pubname = publication
group by
pp.pubname
limit 1
),
w2j as (
select
x.*, pub.w2j_add_tables
from
pub,
pg_catalog.pg_logical_slot_get_changes(
slot_name, null, max_changes,
'include-pk', 'true',
'include-transaction', 'false',
'include-timestamp', 'true',
'include-type-oids', 'true',
'format-version', '2',
'actions', pub.w2j_actions,
'add-tables', pub.w2j_add_tables
) x
)
select
xyz.wal,
xyz.is_rls_enabled,
xyz.subscription_ids,
xyz.errors
from
w2j,
realtime.apply_rls(
wal := w2j.data::jsonb,
max_record_bytes := max_record_bytes
) xyz(wal, is_rls_enabled, subscription_ids, errors)
where
w2j.w2j_add_tables <> ''
and xyz.subscription_ids[1] is not null
$function$;

-- quote_wal2json function

CREATE OR REPLACE FUNCTION realtime.quote_wal2json(entity regclass)
RETURNS text
LANGUAGE sql
SET search_path = ''
IMMUTABLE STRICT
AS $function$
select
(
select pg_catalog.string_agg('' || ch,'')
from pg_catalog.unnest(pg_catalog.string_to_array(nsp.nspname::text, null)) with ordinality x(ch, idx)
where
not (x.idx = 1 and x.ch = '\"')
and not (
x.idx = pg_catalog.array_length(pg_catalog.string_to_array(nsp.nspname::text, null), 1)
and x.ch = '\"'
)
)
|| '.'
|| (
select string_agg('' || ch,'')
from pg_catalog.unnest(pg_catalog.string_to_array(pc.relname::text, null)) with ordinality x(ch, idx)
where
not (x.idx = 1 and x.ch = '\"')
and not (
x.idx = pg_catalog.array_length(pg_catalog.string_to_array(nsp.nspname::text, null), 1)
and x.ch = '\"'
)
)
from
pg_class pc
join pg_namespace nsp
on pc.relnamespace = nsp.oid
where
pc.oid = entity
$function$;

-- subscription_check_filters function

CREATE OR REPLACE FUNCTION realtime.subscription_check_filters()
RETURNS trigger
SET search_path = ''
LANGUAGE plpgsql
AS $function$
/*
Validates that the user defined filters for a subscription:
- refer to valid columns that the claimed role may access
- values are coercable to the correct column type
*/
declare
col_names text[] = coalesce(
pg_catalog.array_agg(c.column_name order by c.ordinal_position),
'{}'::text[]
)
from
information_schema.columns c
where
pg_catalog.format('%I.%I', c.table_schema, c.table_name)::regclass = new.entity
and pg_catalog.has_column_privilege(
(new.claims ->> 'role'),
pg_catalog.format('%I.%I', c.table_schema, c.table_name)::regclass,
c.column_name,
'SELECT'
);
filter realtime.user_defined_filter;
col_type regtype;

in_val jsonb;
begin
for filter in select * from pg_catalog.unnest(new.filters) loop
-- Filtered column is valid
if not filter.column_name = any(col_names) then
raise exception 'invalid column for filter %', filter.column_name;
end if;

-- Type is sanitized and safe for string interpolation
col_type = (
select atttypid::regtype
from pg_catalog.pg_attribute
where attrelid = new.entity
and attname = filter.column_name
);
if col_type is null then
raise exception 'failed to lookup type for column %', filter.column_name;
end if;

-- Set maximum number of entries for in filter
if filter.op = 'in'::realtime.equality_op then
in_val = realtime.cast(filter.value, (col_type::text || '[]')::regtype);
if coalesce(pg_catalog.jsonb_array_length(in_val), 0) > 100 then
raise exception 'too many values for `in` filter. Maximum 100';
end if;
else
-- raises an exception if value is not coercable to type
perform realtime.cast(filter.value, col_type);
end if;

end loop;

-- Apply consistent order to filters so the unique constraint on
-- (subscription_id, entity, filters) can't be tricked by a different filter order
new.filters = coalesce(
pg_catalog.array_agg(f order by f.column_name, f.op, f.value),
'{}'
) from pg_catalog.unnest(new.filters) f;

return new;
end;
$function$;

CREATE OR REPLACE FUNCTION realtime.to_regrole(role_name text)
RETURNS regrole
SET search_path = ''
LANGUAGE sql
IMMUTABLE
AS $function$ select role_name::regrole $function$;
")
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.29.14",
version: "2.29.15",
elixir: "~> 1.16.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading