- Строки
- Агрегатная функция конкатенации строк (аналог
group_concat()
в MySQL) - Как проверить email на валидность?
- Как транслитерировать русские буквы на английские?
- Как распарсить CSV строку в таблицу?
- Как определить пол по ФИО (фамилии, имени, отчеству) на русском языке?
- Как заквотировать строку для использования в регулярном выражении?
- Как заквотировать строку для использования в операторе LIKE?
- Агрегатная функция конкатенации строк (аналог
- JSON
- Массивы
- Поиск по фразе (точный и неточный)
- Деревья и графы
- Оптимизация выполнения запросов
- Как получить записи-дубликаты по значению полей?
- Как получить время выполнения запроса в его результате?
- Как разбить большую таблицу по N тысяч записей, получив диапазоны id?
- Как выполнить следующий SQL запрос, если предыдущий не вернул результат?
- Как развернуть запись в набор колонок?
- Как получить итоговую сумму для каждой записи в одном запросе?
- Как получить возраст по дате рождения?
- Как получить дату или дату и время в формате ISO-8601?
- Как вычислить дистанцию между 2-мя точками на Земле по её поверхности в километрах?
- Как найти ближайшие населённые пункты относительно заданных координат?
- Как вычислить приблизительный объём данных для результата SELECT запроса?
Модификация пользовательских данных (DML)
- Как добавить или обновить записи одним запросом (UPSERT)?
- Как модифицировать данные в нескольких таблицах и вернуть id затронутых записей в одном запросе?
- Как добавить запись с id, значение которого нужно сохранить ещё в другом поле в том же INSERT запросе?
- Как сделать несколько последующих запросов с полученным при вставке id из первого запроса?
- Как модифицировать данные в связанных таблицах одним запросом?
- Как обновить запись так, чтобы не затереть чужие изменения, уже сделанные кем-то?
- Как обновить несколько строк по разным условиям в одном запросе?
- Как обновить несколько миллионов записей в таблице не блокируя все записи и не нагружая БД?
Модификация схемы данных (DDL)
- Как добавить колонку в существующую таблицу без её блокирования?
- Как добавить ограничение таблицы, если оно ещё не существует?
- Как изменить ограничение внешнего ключа без блокирования таблицы?
- Как проверить, что при добавлении или обновлении записи заполнены N полей из M возможных?
- Индексы
- Как сделать ограничение уникальности на колонку в существующей таблице без её блокирования?
- Как сделать составной уникальный индекс, где одно из полей может быть null?
- Как починить сломаный уникальный индекс, имеющий дубликаты?
- Как временно отключить индекс?
- Как сделать компактный уникальный индекс на текстовое поле?
- Как получить список процессов (SQL запросов), выполняющихся сейчас?
- Как остановить или завершить работу процессов?
- Как получить список всех функций БД, включая триггерные процедуры?
- Как получить список всех зависимостей (внешних ключей) между таблицами БД?
- Как получить статистику использования индексов?
- Как пересоздать индекс без блокировок?
- Как получить список установленных расширений (extensions)?
- Как получить список таблиц с размером занимаемого места?
- Как получить и изменить значения параметров конфигурации выполнения?
- Как получить все активные в данный момент процессы автовакуумa и время их работы?
- Как узнать, почему время ответа от базы периодически падает?
- Как обезопасить приложение от тяжёлых миграций, приводящих к блокированию запросов?
- Simple index checking
- Как скопировать таблицы из одной базы данных в другую?
- Как проверить синтаксис SQL кода без его выполнения?
В PostgreSQL есть тип данных массив с богатым набором операторов и функций для работы с ним. Для ускорения выполнения запросов, которые используют условие по колонке с типом массив, используется GIN индекс.
При проектировании таблиц для хранения данных массивы позволяют избавиться от лишних таблиц. Например, необходимо хранить данные по пользователям, где у каждого пользователя может быть несколько адресов электронной почты. Вместо создания вспомогательной таблицы для хранения эл. адресов можно создать колонку emails с типом массив в основной таблице. Если необходимо, чтобы все email были уникальны в рамках всех записей по колонке emails, то можно сделать триггер с соответствующим ограничением. TODO - дать пример SQL кода.
Агрегатная функция конкатенации строк (аналог group_concat() в MySQL)
SELECT STRING_AGG(DISTINCT s, ', ' ORDER BY s) AS field_alias FROM (VALUES ('b'), ('a'), ('b')) AS t(s); -- a, b
SELECT ARRAY_TO_STRING(ARRAY_AGG(DISTINCT s ORDER BY s), ', ') AS field_alias FROM (VALUES ('b'), ('a'), ('b')) AS t(s); -- a, b
Регулярное выражение в файле is_email.sql
взято и адаптировано отсюда
Как транслитерировать русские буквы на английские?
См. slugify.sql
. Всё о транслитерации
Выполнить SQL или Выполнить SQL
-- EXPLAIN --ANALYSE
WITH
-- https://en.wikipedia.org/wiki/Comma-separated_values
-- https://postgrespro.ru/docs/postgresql/10/sql-copy
data AS (SELECT -- скопируйте сюда данные в формате CSV
' 501 ; 8300000000000 ; ";Автономный ;"";округ""
""Ненецкий"";";test1
751;8600800000000; " Автономный округ ""Ханты-Мансийский"", Район Советский" ;
1755;8700300000000;Автономный округ Чукотский, Район Билибинский
1725;7501900000000;Край Забайкальский, Район Петровск-Забайкальский
;;
711;2302100000000;Край Краснодарский, Район Лабинский
729;2401600000000;Край Красноярский, Район Иланский
765;2700700000000;Край Хабаровский, Район Вяземский' AS csv),
options AS (SELECT -- задайте символ, разделяющий столбцы в строках файла,
-- возможные вариаты: ';', ',', '\t' (табуляция)
';' AS delimiter),
prepared AS (SELECT REPLACE('(?: ([^"<delimiter>\r\n]*) #1
| \x20* ("(?:[^"]+|"")*") \x20* #2
) (<delimiter>|[\r\n]+)', '<delimiter>', options.delimiter) AS parse_pattern
FROM options),
parsed AS (
SELECT * FROM (
SELECT
(SELECT ARRAY_AGG(
CASE WHEN LENGTH(field) > 1 AND
LEFT(field, 1) = '"' AND
RIGHT(field, 1) = '"' THEN REPLACE(SUBSTRING(field, 2, LENGTH(field) - 2), '""', '"')
ELSE NULLIF(TRIM(field), '')
END
ORDER BY num)
FROM unnest(string_to_array(t.row, E'\x01;\x02')) WITH ORDINALITY AS q(field, num)
) AS row
FROM data, prepared,
regexp_split_to_table(
regexp_replace(data.csv || E'\n', prepared.parse_pattern, E'\\1\\2\x01\\3\x02', 'gx'),
'\x01[\r\n]+\x02'
) AS t(row)
) AS t
WHERE row IS NOT NULL AND array_to_string(row, '') != ''
)
SELECT
CASE WHEN row[1] ~ '^\d+$' THEN row[1]::integer ELSE NULL END AS id,
row[2] AS kladr_id,
row[3] AS ancestors
FROM parsed
Смотри quote_regexp.sql
Смотри quote_like.sql
SELECT * FROM (
VALUES ('[{"id" : 1, "created_at" : "2003-07-01", "name": "Sony"},
{"id" : 2, "created_at" : "2008-10-27", "name": "Samsung"}]'::jsonb),
('[{"id" : 3, "created_at" : "2010-03-30", "name": "LG"},
{"id" : 4, "created_at" : "2018-12-09", "name": "Apple"}]'::jsonb)
) AS t
WHERE EXISTS(
SELECT *
FROM jsonb_to_recordset(t.column1) AS x(id int, created_at timestamp, name text)
WHERE x.id IN (1, 3) AND x.created_at > '2000-01-01' AND name NOT LIKE 'P%'
)
Смотри jsonb_diff.sql
Смотри array_cat_agg.sql
-- для 2-х массивов
select array_agg(a)
from unnest(array[1, 2, 3, 4, 5]) a
where a = any(array[4, 5, 6, 7, 8]); -- {4,5}
-- для N массивов
select array_agg(a1)
from unnest(array[1, 2, 3, 4, 5]) a1
inner join unnest(array[3, 4, 5, 6, 7]) a2 on a1 = a2
inner join unnest(array[4, 5, 6, 7, 8]) a3 on a1 = a3; -- {4,5}
-- способ 1
SELECT ARRAY_AGG(DISTINCT a ORDER BY a) FROM UNNEST(ARRAY[1,2,3,2,1]) t(a); -- {1,2,3}
-- способ 2
SELECT ARRAY(SELECT DISTINCT UNNEST(ARRAY[1,2,3,2,1]) ORDER BY 1); -- {1,2,3}
-- готовая функция
CREATE FUNCTION array_unique(anyarray) RETURNS anyarray AS $$
SELECT array_agg(DISTINCT x ORDER BY x) FROM unnest($1) t(x);
$$ LANGUAGE SQL IMMUTABLE;
-- было
SELECT *
FROM (VALUES
('foo bar zap bing'),
('man foo'),
('bar zap'),
('bing'),
('foo bar')) AS t(words)
WHERE words LIKE '%bar%' OR words LIKE '%zap%' OR words LIKE '%fix%' OR words LIKE '%new%';
-- стало
SELECT *
FROM (VALUES
('foo bar zap bing'),
('man foo'),
('bar zap'),
('bing'),
('foo bar')) AS t(words)
WHERE words LIKE ANY (ARRAY['%bar%', '%zap%', '%fix%', '%new%']);
См. так же полнотекстовый поиск.
CREATE INDEX /*CONCURRENTLY*/ IF NOT EXISTS t_name_trigram_index ON t USING GIN (lower(name) gin_trgm_ops);
WITH
normalize AS (
SELECT ltrim(REGEXP_REPLACE(LOWER('бар '), '[^а-яёa-z0-9]+', ' ', 'gi')) AS query
),
vars AS (
SELECT CONCAT('%', REPLACE(quote_like(trim(normalize.query)), ' ', '_%'), '%') AS query_like,
CONCAT('(?<![а-яёa-z0-9])', REPLACE(quote_regexp(normalize.query), ' ', '(?:[^а-яёa-z0-9]+|$)')) AS query_regexp
FROM normalize
)
SELECT
t.name,
lower(t.name) LIKE RTRIM(normalize.query) AS is_leading
FROM t, vars, normalize
WHERE
length(rtrim(normalize.query)) > 0 -- для скорости
AND lower(t.name) LIKE vars.query_like -- для скорости
AND lower(t.name) ~* vars.query_regexp -- для точности
ORDER BY
is_leading DESC,
LENGTH(name),
name
LIMIT 100
Функции quote_like.sql
, quote_regexp.sql
Как для слова с опечаткой (ошибкой) получить наиболее подходящие варианты слов для замены (исправление опечаток)?
Смотри typos_correct.sql
Пример результата запроса
word_num | word_from | is_mistake | can_correct | words_to | words_details |
---|---|---|---|---|---|
1 | вадитль | true | true | ["водитель"] | NULL |
2 | дифектолог | true | false | ["дефектолог", "директолог", "диетолог"] | NULL |
3 | формовшица | true | true | ["формовщица"] | NULL |
4 | фрмовщица | true | true | ["формовщица"] | NULL |
5 | бхугалтер | true | true | ["бухгалтер"] | NULL |
6 | лктор | true | true | ["лектор"] | NULL |
7 | дикто | true | true | ["диктор"] | NULL |
8 | вра | true | true | ["врач"] | NULL |
9 | прагромист | true | true | ["программист"] | NULL |
10 | затейник | false | false | NULL | NULL |
11 | смотритель | false | false | NULL | NULL |
12 | unknown | true | false | NULL | NULL |
Описание запроса
- Запрос так же подходит для получения поисковых подсказок (с учётом начала слов), если его немного модернизировать. Практики пока нет, а в теории нужно убрать ограничение по дистанции Левенштейна.
- Запрос использует GIN индекс, поэтому работает быстро.
- Среди пользователей, которые делают опечатки, есть те, которые делают грамматические ошибки. Поэтому, при расчёте расстояния Левенштейна, цена вставки и удаления буквы должна быть выше, чем замены. Цены операций являеются целочисленными числами, которые нам не очень подходят, поэтому приходится делать дополнительный расчёт.
- Пороговые значения подобраны опытным путём.
- Если у нескольких кандидатов подряд рейтинг отличается незначительно, то это не точное исправление (автоисправлять нельзя, только предлагать варианты)
Алгоритм исправления ошибок и опечаток основан поиске совпадающих триграмм (учитывается "похожесть" слова) и на вычислении расстояния Левенштейна.
Точность исправления слова зависит в т.ч. от количества букв в слове.
Получать слова-кандидаты для исправления ошибок необходимо для R > 0.55
и D < 5
Длина слова, букв (L) | Максимальная дистанция Левенштейна (D) | Доля R = (1 − (D ÷ L)) |
---|---|---|
2 | 0 | 1 |
3 | 1 | 0.6666 |
4 | 1 | 0.75 |
5 | 2 | 0.6 |
6 | 2 | 0.6666 |
7 | 3 | 0.5714 |
8 | 3 | 0.625 |
9 | 3 | 0.6666 |
10 | 4 | 0.6 |
11 | 4 | 0.6363 |
12 | 4 | 0.6666 |
13 | 4 | 0.6923 |
14 и более | 4 | 0.7142 |
SELECT
id,
nlevel(ltree_path) AS level,
ltree_path AS id_path,
(SELECT array_agg(st.name ORDER BY nlevel(st.ltree_path)) FROM region AS st WHERE st.ltree_path @> t.ltree_path AND st.ltree_path != t.ltree_path) AS ancestors,
name AS self,
(SELECT array_agg(st.name ORDER BY nlevel(st.ltree_path)) FROM region AS st WHERE st.ltree_path <@ t.ltree_path AND st.ltree_path != t.ltree_path) AS descendants
--, t.*
FROM region AS t
WHERE nlevel(ltree_path) >= 2
ORDER BY nlevel(ltree_path) ASC, ancestors
LIMIT 1000;
WITH paths_with_cycle(depth, path) AS (
WITH RECURSIVE search_graph(parent_id, child_id, depth, path, cycle) AS (
SELECT g.parent_id, g.child_id, 1,
ARRAY[g.parent_id],
false
FROM custom_query_group_relationship AS g
UNION ALL
SELECT g.parent_id, g.child_id, sg.depth + 1,
path || g.parent_id,
g.parent_id = ANY(path)
FROM custom_query_group_relationship AS g, search_graph sg
WHERE g.parent_id = sg.child_id AND cycle IS FALSE
)
SELECT depth, path FROM search_graph WHERE cycle IS TRUE
ORDER BY depth
)
SELECT DISTINCT path FROM paths_with_cycle
WHERE depth = (SELECT MIN(depth) FROM paths_with_cycle)
SQL-запросы WITH RECURSIVE...
должны иметь защиту от зацикливания! Когда запрос зациклится, он будет выполняться очень долго, съедая ресурсы БД. А ещё таких запросов будет много. Повезёт, если сработает защита самого PostgreSQL.
SELECT ot1.name AS name_1, ot2.name as name_2, ot3.name as name_3, ot4.id as id
FROM offer_trade ot4
INNER JOIN offer_trade ot3 ON ot4.order_tree <@ ot3.order_tree AND nlevel(ot3.order_tree) = 3
INNER JOIN offer_trade ot2 ON ot4.order_tree <@ ot2.order_tree AND nlevel(ot2.order_tree) = 2
INNER JOIN offer_trade ot1 ON ot4.order_tree <@ ot1.order_tree AND nlevel(ot1.order_tree) = 1
- Postgres Explain Visualizer (Pev) is a tool I wrote to make EXPLAIN output easier to grok. It creates a graphical representation of the query plan
- PostgreSQL's explain analyze made readable
Смотри json_explain.sql
-- было
SELECT * FROM t WHERE id < 1000 AND val IN (1, ..., 10000);
-- стало (способ 1)
SELECT * FROM t WHERE id < 1000 AND val IN (VALUES (1), ..., (10000));
-- стало (способ 2)
SELECT * FROM t JOIN (VALUES (1), ..., (10000)) AS t(val) UGING(val) WHERE id < 1000;
-- стало (способ 3)
SELECT * FROM t JOIN UNNEST(ARRAY[1, ..., 10000]) AS t(val) UGING(val) WHERE id < 1000;
-- через подзапрос с EXISTS этой же таблицы
SELECT
ROW_NUMBER() OVER(PARTITION BY d.name ORDER BY d.id ASC) AS duplicate_num, -- номер дубля
d.*
FROM person AS d
WHERE EXISTS(SELECT 1
FROM person AS t
WHERE t.name = d.name -- в идеале на это поле должен стоять индекс
-- если нет первичного ключа, замените "id" на "ctid"
AND d.id != t.id -- оригинал и дубликаты
-- AND d.id > t.id -- только дубликаты
)
ORDER BY name, duplicate_num
-- получить ID записей, имеющих дубликаты по полю lower(name)
SELECT id_original, unnest(id_doubles) AS id_double
FROM (
SELECT min(id) AS id_original,
(array_agg(id order by id))[2:] AS id_doubles
FROM skill
GROUP BY lower(name)
HAVING count(*) > 1
ORDER BY count(*) DESC
) AS t;
-- получить ID записей, НЕ имеющих дубликаты по полю slugify(name)
SELECT max(id) AS id
FROM region
GROUP BY slugify(name)
HAVING count(*) = 1;
-- получить разные названия населённых пунктов с одинаковыми kladr_id
SELECT ROW_NUMBER() OVER(PARTITION BY kladr_id ORDER BY address ASC) AS duplicate_num, -- номер дубля
*
FROM (
SELECT kladr_id,
unnest(array_agg(address)) AS address
FROM d
GROUP BY kladr_id
HAVING count(*) > 1
) AS t
ORDER BY kladr_id, duplicate_num
SELECT extract(seconds FROM clock_timestamp() - now()) AS execution_time FROM pg_sleep(1.5);
Применение:
- индексирование данных в поисковых движках типа Sphinx, Solr, Elastic Search
- ускорение выполнения запросов в PostgreSQL через их распараллеливание
WITH
-- отфильтровываем лишние записи и оставляем только колонку id
result1 AS (
SELECT id
FROM resume
WHERE is_publish_status = TRUE
AND is_spam = FALSE
),
-- для каждого id получаем номер пачки
result2 AS (
SELECT
id,
((row_number() OVER (ORDER BY id) - 1) / 100000)::integer AS part
FROM result1
)
-- группируем по номеру пачки и получаем минимальное и максимальное значение id
SELECT
MIN(id) AS min_id,
MAX(id) AS max_id,
-- ARRAY_AGG(id) AS ids, -- список id в пачке, при необходимости
COUNT(id) AS total -- кол-во записей в пачке (для отладки, можно закомментировать эту строку)
FROM result2
GROUP BY part
ORDER BY 1;
Пример результата выполнения
№ | min_id | max_id | total |
---|---|---|---|
1 | 162655 | 6594323 | 100000 |
2 | 6594329 | 6974938 | 100000 |
3 | 6974949 | 7332884 | 100000 |
... | ... | ... | ... |
83 | 24276878 | 24427703 | 100000 |
84 | 24427705 | 24542589 | 77587 |
Далее можно последовательно или параллельно выполнять SQL запросы для каждого диапазона, например:
SELECT *
FROM resume
WHERE id BETWEEN 162655 AND 6594323
AND is_publish_status = TRUE
AND is_spam = FALSE;
Если условие в фильтрации данных тяжёлое, то лучше выбирать по спискам id для каждого диапазона, например:
SELECT *
FROM resume
WHERE id IN (/*список id через запятую*/);
-- how to execute a sql query only if another sql query has no results
WITH r AS (
SELECT * FROM film WHERE length = 120
)
SELECT * FROM r
UNION ALL
SELECT * FROM film
WHERE length = 130
AND NOT EXISTS (SELECT * FROM r)
SELECT (a).*, (b).* -- unnesting the records again
FROM (
SELECT
a, -- nesting the first table as record
b -- nesting the second table as record
FROM (VALUES (1, 'q'), (2, 'w')) AS a (id, name),
(VALUES (7, 'e'), (8, 'r')) AS b (id, name)
) AS t;
SELECT
array_agg(x) over () as frame,
x,
sum(x) over () as sum,
x :: float / sum(x) over () as part
FROM generate_series(1, 4) as t (x);
SELECT EXTRACT(YEAR FROM age('1977-09-10'::date))
-- format date or timestamp to ISO 8601
SELECT trim(both '"' from to_json(birthday)::text),
trim(both '"' from to_json(created_at)::text)
FROM (
SELECT '1977-10-09'::date AS birthday, now() AS created_at
) AS t
Пример результата выполнения:
birthday | created_at |
---|---|
1977-10-09 | 2019-10-24T13:34:38.858211+00:00 |
Если есть модуль earthdistance, то (point(lon1, lat1) <@> point(lon2, lat2)) * 1.609344 AS distance_km
.
Иначе gc_dist(lat1, lon1, lat2, lon2) AS distance_km
, смотри gc_dist.sql
-- select * from pg_available_extensions where installed_version is not null;
with t as (
SELECT 37.61556 AS msk_x, 55.75222 AS msk_y, -- координаты центра Москвы
30.26417 AS spb_x, 59.89444 AS spb_y, -- координаты центра Санкт-Петербурга
1.609344 AS mile_to_kilometre_ratio
)
select (point(msk_x, msk_y) <@> point(spb_x, spb_y)) * mile_to_kilometre_ratio AS dist1_km,
gc_dist(msk_y, msk_x, spb_y, spb_x) AS dist2_km
from t;
Пример результата выполнения:
dist1_km1 | dist2_km |
---|---|
633.045646835722 | 633.0469500660282 |
-- координаты (долготу, широту) лучше сразу хранить не в 2-х отдельных полях, а в одном поле с типом point
create index if not exists region_point_idx on region using gist(point(map_center_x, map_center_y));
--explain
with t as (
SELECT 37.61556 AS msk_x, 55.75222 AS msk_y, -- координаты центра Москвы
1.609344 AS mile_to_kilometre_ratio
)
select (point(msk_x, msk_y) <@> point(map_center_x, map_center_y)) * mile_to_kilometre_ratio AS dist_km,
name
from region, t
order by (select point(msk_x, msk_y) from t) <-> point(map_center_x, map_center_y)
limit 10;
См. так же https://tapoueh.org/blog/2018/05/postgresql-data-types-point/
SELECT pg_size_pretty(SUM(OCTET_LENGTH(t::text) + 1))
FROM (
-- сюда нужно поместить ваш запрос, например:
SELECT * FROM region LIMIT 50000
) AS t
См. INSERT... ON CONFLICT DO NOTHING/UPDATE и ROW LEVEL SECURITY (Habr)
WITH
updated AS (
UPDATE table1
SET x = 5, y = 6 WHERE z > 7
RETURNING id
),
inserted AS (
INSERT INTO table2 (x, y, z) VALUES (5, 7, 10)
RETURNING id
)
SELECT 'table1_updated' AS action, id
FROM updated
UNION ALL
SELECT 'table2_inserted' AS action, id
FROM inserted;
Как добавить запись с id, значение которого нужно сохранить ещё в другом поле в том же INSERT запросе?
По материалам Stackoverflow: Reference value of serial column in another column during same INSERT. А вообще это ошибка проектирования (нарушение нормальной формы)!
WITH t AS (
SELECT nextval(pg_get_serial_sequence('region', 'id')) AS id
)
INSERT INTO region (id, ltree_path, ?r)
SELECT id,
CAST(?s || '.' || id AS ltree) AS ltree_path,
?l
FROM t
RETURNING ltree_path
DO $$
DECLARE t1Id integer;
BEGIN
INSERT INTO t1 (a, b) VALUES ('a', 'b') RETURNING id INTO t1Id;
INSERT INTO t2 (c, d, t1_id) VALUES ('c', 'd', t1Id);
END $$;
При сохранении сущностей возникает задача сохранить данные не только в основную таблицу БД, но ещё в связанные. В запросе ниже "старые" связи будут удалены, "новые" — добавлены, а существующие останутся без изменений. Счётчики полей id serial зря не увеличатся. Приведён пример сохранения регионов вакансии.
WITH
-- у таблицы vacancy_region должен быть уникальный ключ vacancy_id+region_id
-- сначала удаляем все не переданные (несуществующие) регионы размещения для вакансии
-- для ?l в конец массива идентификаторов регионов нужно добавить 0, чтобы запросы не сломались
deleted AS (
DELETE FROM vacancy_region
WHERE vacancy_id = ?0
AND region_id NOT IN (?l1)
-- AND ROW(region_id, some_field) NOT IN (ROW(3, 'a'), ROW(8, 'b'), ...) -- пример для случая, если уникальный ключ состоит из нескольких полей
RETURNING id
),
-- потом добавляем все регионы размещения для вакансии
-- несуществующие id регионов и дубликаты будут проигнорированы, ошибки не будет
-- select нужен, чтобы запрос не сломался по ограничениям внешний ключей, если в списке region_id есть "леваки", они просто проигнорируются
inserted AS (
INSERT INTO vacancy_region (vacancy_id, region_id)
SELECT ?0 AS vacancy_id, id AS region_id FROM region WHERE id IN (?l1)
ON CONFLICT DO NOTHING
RETURNING id
)
SELECT
-- последовательность пречисления полей важна: сначала удаление, потом добавление, иначе будет ошибка
(SELECT COUNT(*) FROM deleted) AS deleted, -- количество удалённых записей
(SELECT COUNT(*) FROM inserted) AS updated -- количество добавленных записей
WITH u AS (
UPDATE t
SET description = 'new text'
WHERE id=123
AND updated_at = '2019-11-08 00:58:33'
--AND md5(t::text) = '1BC29B36F623BA82AAF6724FD3B16718' -- если нет колонки updated_at, вычисляем хеш от данных всей записи
RETURNING *
)
SELECT true AS is_updated, updated_at
FROM u
UNION ALL
SELECT false AS is_updated, updated_at
WHERE id = 123 AND NOT EXISTS (SELECT * FROM u)
См. Stackoverflow
UPDATE users as u SET
email = u2.email,
first_name = u2.first_name,
last_name = u2.last_name
FROM (VALUES
(1, 'hollis@weimann.biz', 'Hollis', 'O''Connell'),
(2, 'robert@duncan.info', 'Robert', 'Duncan')
) as u2(id, email, first_name, last_name)
WHERE u2.id = u.id;
Ниже размещён SQL-шаблон для добавления, обновления или удаления миллионов записей, который имеет следующие возможности (PostgreSQL >= 11):
- Cоздание резервной копии изменяемых данных в отдельную таблицу (для возможности отката)
- Автоматическая адаптация под нагрузку на БД
- Минимальные кратковременные блокировки на запись (чтобы не повлиять на стабильность работы БД)
- Отображение прогресса выполнения в процентах и оставшегося времени завершения
- На реплику данные передаются постепенно небольшими порциями, а не одним огромным куском.
Запросы в файле {JiraTaskId}.sql
обрабатывают большую таблицу пачками по несколько (десятки/сотни/тысячи) записей. В процесе работы размер пачки автоматические подстраивается под максимально установленное время работы для одной пачки (несколько секунд).
Для ускорения выполнения SQL шаблон можно распараллелить по нескольким ядрам процессора скриптом {JiraTaskId}.sh
. Запускать его лучше в Screen. Отслеживать прогресс выполнения каждого процесса можно командой:
$ tail -f {JiraTaskId}_job_{cpu_num}.log
Пример отчёта выполненного скрипта в файле {JiraTaskId}_job_{cpu_num}.log
.
См. Stackoverflow
Реализация команды ALTER TABLE ... ADD CONSTRAINT IF NOT EXISTS ...
, которая отсутствует в PostgreSQL 11.
-- Add constraint if it doesn't already exist
DO $$
DECLARE
exception_message text;
exception_context text;
BEGIN
BEGIN
ALTER TABLE v3_company_awards ADD CONSTRAINT v3_company_awards_year CHECK(year between 1900 and date_part('year', CURRENT_DATE));
EXCEPTION WHEN duplicate_object THEN
GET STACKED DIAGNOSTICS
exception_message = MESSAGE_TEXT,
exception_context = PG_EXCEPTION_CONTEXT;
RAISE NOTICE '%', exception_context;
RAISE NOTICE '%', exception_message;
END;
END $$;
ALTER TABLE group
DROP CONSTRAINT group_company_id_fk,
ADD CONSTRAINT group_company_id_fk
FOREIGN KEY (company_id)
REFERENCES company (id)
ON UPDATE CASCADE
ON DELETE CASCADE
NOT VALID; -- https://postgrespro.ru/docs/postgresql/12/sql-altertable#SQL-ALTERTABLE-NOTES
ALTER TABLE group
VALIDATE CONSTRAINT group_company_id_fk;
CREATE TABLE IF NOT EXISTS table1
(
field1 integer DEFAULT NULL,
field2 integer DEFAULT NULL,
field3 integer DEFAULT NULL,
-- check that any N fields is required on INSERT or UPDATE
CONSTRAINT table1 CHECK (
-- в массиве должны быть данные одного типа!
array_length(array_remove(ARRAY[field1, field2, field3], null), 1) = 1
)
);
Запускать эти запросы нужно НЕ в транзакции, а вручную последовательно. Если при выполнении любого запроса произойдёт ошибка, то нужно выполнить все запросы заново.
-- в случае неудачи построения индекса в конкурентном режиме создаётся "нерабочий" индекс и его нужно удалить
DROP INDEX CONCURRENTLY IF EXISTS person_uniq_auth_id;
-- takes a long time, but doesn’t block queries
CREATE UNIQUE INDEX CONCURRENTLY person_uniq_auth_id ON person (auth_id);
-- blocks queries, but only very briefly
-- ALTER TABLE person ADD CONSTRAINT person_uniq_auth_id UNIQUE USING INDEX person_uniq_auth_id;
create table test (
a varchar NOT NULL,
b varchar default null
);
-- решение 1 (Более предпочтительное решение. Если есть внешние ключи на колонки a и b, то дополнительных индексов делать уже не нужно)
create unique index on test (b, a) where b is not null;
create unique index on test (a) where b is null;
-- решение 2 (Менее предпочтительное решение, т.к. есть зависимость от типа данных, но один индекс компактнее двух)
create unique index on test(a, coalesce(b, '')) -- для чисел вместо '' напишите 0
Чиним битый уникальный индекс на поле skill.name
-- EXPLAIN
WITH
skill AS (
SELECT min(id) AS id_original,
(array_agg(id order by id))[2:] AS id_doubles
FROM skill
GROUP BY lower(name)
HAVING count(*) > 1
),
repair_resume_work_skill AS (
-- собираем связи с дублями в таблицу
SELECT t.resume_id, t.work_skill_id AS id_double, skill.id_original
FROM skill
INNER JOIN resume_work_skill AS t ON t.work_skill_id = ANY (skill.id_doubles)
),
deleted_resume_work_skill AS (
-- удаляем связи с дублями из таблицы
DELETE FROM resume_work_skill
USING repair_resume_work_skill AS t
WHERE resume_work_skill.resume_id = t.resume_id
AND resume_work_skill.work_skill_id = t.id_double
RETURNING id
),
inserted_resume_work_skill AS (
-- добавляем новые правильные связи, при этом такая связь уже может существовать
INSERT INTO resume_work_skill (resume_id, work_skill_id)
SELECT t.resume_id, t.id_original FROM repair_resume_work_skill AS t
ON CONFLICT DO NOTHING
RETURNING id
),
-- удаляем дубликаты
deleted_skill AS (
DELETE FROM skill
USING skill AS t
WHERE skill.id = ANY (t.id_doubles)
RETURNING id
)
SELECT 'resume_work_skill' AS table_name, 'deleted' AS action, COUNT(*) FROM deleted_resume_work_skill
UNION ALL SELECT 'resume_work_skill' AS table_name, 'inserted' AS action, COUNT(*) FROM inserted_resume_work_skill
UNION ALL SELECT 'skill' AS table_name, 'deleted' AS action, COUNT(*) FROM deleted_skill
;
-- удаляем битый индекс и создаём новый уникальный индекс
DROP INDEX IF EXISTS skill.uniq_skill_name;
CREATE UNIQUE INDEX IF NOT EXISTS uniq_skill_name ON skill (lower(name));
Способ 1
--Is it possible to temporarily disable an index in Postgres?
update pg_index set indisvalid = false where indexrelid = 'test_pkey'::regclass
Способ 2
begin;
drop index foo_ndx;
explain analyze select * from foo;
rollback;
-- для поля с типом TEXT
-- md5, приведённый к типу uuid занимает 16 байт вместо 36 (32+4) байт
create unique index on table_name (cast(md5(lower(column_name)) as uuid));
-- для поля с типом TEXT
-- в этом индексе будут учитываться слова (буквы, цифры, точка, дефис) и их позиции в тексте,
-- но не будет учитываться регистр слов и любые другие символы
create unique index on table_name (cast(md5(cast(to_tsvector('simple', column_name) as text)) as uuid));
-- для поля с типом JSONB
create unique index on partner__partners (cast(md5(lower(cast(column_name as text))) as uuid));
В PHPStorm есть возможность настроить для результата запроса значения в колонке application_name
и вписать туда ПО и свою фамилию для своих SQL запросов. Для этого нужно открыть окно "Data Sources and Drivers", выбрать нужное соединение с БД из секции "Project Data Sources", перейти на вкладку "Advanced", отсортировать таблицу по колонке "Name", для "Name" равному "Application Name", изменить значение в колонке "Value" на что-то типа"PhpStorm Petrov Ivan" (строго на английском языке).
SELECT pid, application_name, query, NOW() - query_start AS elapsed
FROM pg_stat_activity
ORDER BY elapsed DESC;
-- Остановить все процессы, работающие более 1 часа, сигналом SIGINT
SELECT pg_cancel_backend(pid), application_name, query, NOW() - query_start AS elapsed
FROM pg_stat_activity
WHERE NOW() - query_start > (60*60)::text::interval
ORDER BY elapsed DESC
-- Принудительно завершить работу всех процессов, работающих более 1 часа, сигналом SIGTERM, если не помогает SIGINT
SELECT pg_terminate_backend(pid), application_name, query, NOW() - query_start AS elapsed
FROM pg_stat_activity
WHERE NOW() - query_start > (60*60)::text::interval
ORDER BY elapsed DESC
SELECT n.nspname AS "Schema",
p.proname AS "Name",
CASE WHEN p.proretset THEN 'setof ' ELSE '' END
|| pg_catalog.format_type(p.prorettype, NULL) AS "Result data type",
CASE WHEN proallargtypes IS NOT NULL
THEN pg_catalog.array_to_string(
ARRAY(SELECT CASE
WHEN p.proargmodes[s.i] = 'i' THEN ''
WHEN p.proargmodes[s.i] = 'o' THEN 'OUT '
WHEN p.proargmodes[s.i] = 'b' THEN 'INOUT '
END || CASE
WHEN COALESCE(p.proargnames[s.i], '') = '' THEN ''
ELSE p.proargnames[s.i] || ' '
END || pg_catalog.format_type(p.proallargtypes[s.i], NULL)
FROM
pg_catalog.generate_series(1, pg_catalog.array_upper(p.proallargtypes, 1)) AS s(i)
), ', '
) ELSE pg_catalog.array_to_string(
ARRAY(SELECT CASE
WHEN COALESCE(p.proargnames[s.i+1], '') = '' THEN ''
ELSE p.proargnames[s.i+1] || ' '
END || pg_catalog.format_type(p.proargtypes[s.i], NULL)
FROM
pg_catalog.generate_series(0, pg_catalog.array_upper(p.proargtypes, 1)) AS s(i)
), ', '
)
END AS "Argument data types",
CASE WHEN p.provolatile = 'i' THEN 'immutable'
WHEN p.provolatile = 's' THEN 'stable'
WHEN p.provolatile = 'v' THEN 'volatile'
END AS "Volatility",
r.rolname AS "Owner",
l.lanname AS "Language",
p.prosrc AS "Source code",
pg_catalog.obj_description(p.oid, 'pg_proc') AS "Description"
FROM pg_catalog.pg_proc p
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
LEFT JOIN pg_catalog.pg_language l ON l.oid = p.prolang
JOIN pg_catalog.pg_roles r ON r.oid = p.proowner
WHERE p.prorettype <> 'pg_catalog.cstring'::pg_catalog.regtype
AND (p.proargtypes[0] IS NULL
OR p.proargtypes[0] <> 'pg_catalog.cstring'::pg_catalog.regtype)
AND NOT p.proisagg
AND pg_catalog.pg_function_is_visible(p.oid)
AND r.rolname <> 'pgsql';
Запрос возвращает колонки from_table
, from_cols
, to_table
, to_cols
и другие.
Для какой-либо таблицы можно получить:
- список исходящих связей (таблицы, которые зависят от текущей таблицы)
- список входящих связей (таблицы, от которых зависит текущая таблица)
SELECT
c.conname AS constraint_name,
(SELECT n.nspname FROM pg_namespace AS n WHERE n.oid=c.connamespace) AS constraint_schema,
tf.name AS from_table,
(
SELECT STRING_AGG(QUOTE_IDENT(a.attname), ', ' ORDER BY t.seq)
FROM
(
SELECT
ROW_NUMBER() OVER (ROWS UNBOUNDED PRECEDING) AS seq,
attnum
FROM UNNEST(c.conkey) AS t(attnum)
) AS t
INNER JOIN pg_attribute AS a ON a.attrelid=c.conrelid AND a.attnum=t.attnum
) AS from_cols,
tt.name AS to_table,
(
SELECT STRING_AGG(QUOTE_IDENT(a.attname), ', ' ORDER BY t.seq)
FROM
(
SELECT
ROW_NUMBER() OVER (ROWS UNBOUNDED PRECEDING) AS seq,
attnum
FROM UNNEST(c.confkey) AS t(attnum)
) AS t
INNER JOIN pg_attribute AS a ON a.attrelid=c.confrelid AND a.attnum=t.attnum
) AS to_cols,
CASE confupdtype WHEN 'r' THEN 'restrict' WHEN 'c' THEN 'cascade' WHEN 'n' THEN 'set null' WHEN 'd' THEN 'set default' WHEN 'a' THEN 'no action' ELSE NULL END AS on_update,
CASE confdeltype WHEN 'r' THEN 'restrict' WHEN 'c' THEN 'cascade' WHEN 'n' THEN 'set null' WHEN 'd' THEN 'set default' WHEN 'a' THEN 'no action' ELSE NULL END AS on_delete,
CASE confmatchtype::text WHEN 'f' THEN 'full' WHEN 'p' THEN 'partial' WHEN 'u' THEN 'simple' WHEN 's' THEN 'simple' ELSE NULL END AS match_type, -- In earlier postgres docs, simple was 'u'nspecified, but current versions use 's'imple. text cast is required.
pg_catalog.pg_get_constraintdef(c.oid, true) as condef
FROM
pg_catalog.pg_constraint AS c
INNER JOIN (
SELECT pg_class.oid, QUOTE_IDENT(pg_namespace.nspname) || '.' || QUOTE_IDENT(pg_class.relname) AS name
FROM pg_class INNER JOIN pg_namespace ON pg_class.relnamespace=pg_namespace.oid
) AS tf ON tf.oid=c.conrelid
INNER JOIN (
SELECT pg_class.oid, QUOTE_IDENT(pg_namespace.nspname) || '.' || QUOTE_IDENT(pg_class.relname) AS name
FROM pg_class INNER JOIN pg_namespace ON pg_class.relnamespace=pg_namespace.oid
) AS tt ON tt.oid=c.confrelid
WHERE c.contype = 'f' ORDER BY 1;
Запрос отображает использование индексов. Что позволяет увидеть наиболее часто использованные индексы, а также и наиболее редко (у которых будет index_scans_count = 0).
Учитываются только пользовательские индексы и не учитываются уникальные, т.к. они используются как ограничения (как часть логики хранения данных).
В начале отображаются наиболее часто используемые индексы (отсортированы по колонке index_scans_count)
SELECT
idstat.relname AS table_name, -- имя таблицы
indexrelname AS index_name, -- индекс
idstat.idx_scan AS index_scans_count, -- число сканирований по этому индексу
pg_size_pretty(pg_relation_size(indexrelid)) AS index_size, -- размер индекса
tabstat.idx_scan AS table_reads_index_count, -- индексных чтений по таблице
tabstat.seq_scan AS table_reads_seq_count, -- последовательных чтений по таблице
tabstat.seq_scan + tabstat.idx_scan AS table_reads_count, -- чтений по таблице
n_tup_upd + n_tup_ins + n_tup_del AS table_writes_count, -- операций записи
pg_size_pretty(pg_relation_size(idstat.relid)) AS table_size -- размер таблицы
FROM
pg_stat_user_indexes AS idstat
JOIN
pg_indexes
ON
indexrelname = indexname
AND
idstat.schemaname = pg_indexes.schemaname
JOIN
pg_stat_user_tables AS tabstat
ON
idstat.relid = tabstat.relid
WHERE
indexdef !~* 'unique'
ORDER BY
idstat.idx_scan DESC,
pg_relation_size(indexrelid) DESC
Цель перестроения индекса - уменьшить занимаемый размер из-за фрагментации. Команда REINDEX имеет опцию CONCURRENTLY, которая появилась только в PostgreSQL 12. В более ранних версиях можно сделать так:
-- Неблокирующая альтернатива команде REINDEX:
CREATE INDEX CONCURRENTLY new_index ON ...; -- делаем дубликат индекса old_index
DROP INDEX CONCURRENTLY old_index;
ALTER INDEX new_index RENAME TO old_index;
-- с PRIMARY KEY так просто не получится, тут подумать нужно, как из обезжиривать их без локов
select * from pg_available_extensions where installed_version is not null;
SELECT nspname || '.' || relname AS "relation",
pg_size_pretty(pg_total_relation_size(C.oid)) AS "total_size"
FROM pg_class C
LEFT JOIN pg_namespace N ON (N.oid = C.relnamespace)
WHERE nspname NOT IN ('pg_catalog', 'information_schema')
AND C.relkind <> 'i'
AND nspname !~ '^pg_toast'
ORDER BY pg_total_relation_size(C.oid) DESC
LIMIT 100
-- способ 1
SHOW pg_trgm.word_similarity_threshold;
SHOW pg_trgm.similarity_threshold;
-- способ 2
SELECT name, setting AS value
FROM pg_settings
WHERE name IN ('pg_trgm.word_similarity_threshold', 'pg_trgm.similarity_threshold');
-- способ 3
SELECT current_setting('pg_trgm.word_similarity_threshold'), current_setting('pg_trgm.similarity_threshold');
-- способ 1
SET pg_trgm.similarity_threshold = 0.3;
SET pg_trgm.word_similarity_threshold = 0.3;
-- способ 2
SELECT set_config('pg_trgm.word_similarity_threshold', 0.2::text, FALSE),
set_config('pg_trgm.similarity_threshold', 0.2::text, FALSE);
Вакуумирование — важная процедура поддержания хорошей работоспособности вашей базы. Если вы видите, что автовакуум процессы всё время работают, это значит что они не успевают за количеством изменений в системе. А это в свою очередь сигнал того что надо срочно принимать меры, иначе есть большой риск “распухания” таблиц и индексов — ситуация когда физический размер объектов в базе очень большой, а при этом полезной информации там в разы меньше.
SELECT (clock_timestamp() - xact_start) AS ts_age,
state, pid, query FROM pg_stat_activity
WHERE query ilike '%autovacuum%' AND NOT pid=pg_backend_pid()
При неправильно настроенных контрольных точках база будет генерировать избыточную дисковую нагрузку. Это будет происходить с высокой частотой, что будет замедлять общее время отклика системы и базы данных. Для того, чтобы понять правильность настройки, надо обратить внимание на следующие отклонения в поведении базы данных: в мониторинге приложения или базы будут видны пики во времени ответа, с хорошо прослеживаемой периодичностью; в моменты "пиков" в логе базы будет отслеживаться большое количество медленных запросов (в случае если логирование таких запросов настроено).
Запрос покажет статистику по контрольным точкам с момента, когда она в последний раз обнулялась. Важными показателями будут минуты между контрольными точками и объем записываемой информации. Большое кол-во данных за короткое время — это серьезная нагрузка на систему ввода-вывода. Если это ваш случай, то ситуацию нужно однозначно менять!
SELECT now()-pg_postmaster_start_time() "Uptime",
now()-stats_reset "Minutes since stats reset",
round(100.0*checkpoints_req/checkpoints,1) "Forced
checkpoint ratio (%)",
round(min_since_reset/checkpoints,2) "Minutes between
checkpoints",
round(checkpoint_write_time::numeric/(checkpoints*1000),2) "Average
write time per checkpoint (s)",
round(checkpoint_sync_time::numeric/(checkpoints*1000),2) "Average
sync time per checkpoint (s)",
round(total_buffers/pages_per_mb,1) "Total MB written",
round(buffers_checkpoint/(pages_per_mb*checkpoints),2) "MB per
checkpoint",
round(buffers_checkpoint/(pages_per_mb*min_since_reset*60),2)
"Checkpoint MBps"
FROM (
SELECT checkpoints_req,
checkpoints_timed + checkpoints_req checkpoints,
checkpoint_write_time,
checkpoint_sync_time,
buffers_checkpoint,
buffers_checkpoint + buffers_clean + buffers_backend total_buffers,
stats_reset,
round(extract('epoch' from now() - stats_reset)/60)::numeric
min_since_reset,
(1024.0 * 1024 / (current_setting('block_size')::numeric))pages_per_mb
FROM pg_stat_bgwriter
) bg
Вначале каждой миграции, которая выполняется внутри транзакции, нужно изменить настройки конфигурации lock_timeout
и statement_timeout
и idle_in_transaction_session_timeout
командой SET LOCAL.
Действие SET LOCAL продолжается только до конца текущей транзакции, независимо от того, фиксируется она или нет.
При выполнении такой команды вне блока транзакции выдаётся предупреждение и больше ничего не происходит.
/*
Здесь SQL команды для наката (или отката), которые не могут работать внутри транзакции
*/
BEGIN;
-- ВНИМАНИЕ! Замечено в PostgreSQL 10.5, что lock_timeout работает не надёжно и DML запросы могут встать в очередь!
DO $$
DECLARE
exception_message text;
exception_context text;
BEGIN
-- Максимальное время блокирования других SQL запросов (простоя веб-сайта) во время миграции. Если будет превышено, то транзакция откатится.
SET LOCAL lock_timeout TO '3s';
-- Максимальное время выполнения любого SQL запроса в этой транзакции. Если будет превышено, то транзакция откатится.
SET LOCAL statement_timeout TO '30min';
-- Максимальное время простаивания транзакции, PostgreSQL >= 10. Если будет превышено, то транзакция откатится.
SET LOCAL idle_in_transaction_session_timeout TO '10s';
EXCEPTION WHEN undefined_object THEN
GET STACKED DIAGNOSTICS
exception_message = MESSAGE_TEXT,
exception_context = PG_EXCEPTION_CONTEXT;
RAISE NOTICE '%', exception_context;
RAISE NOTICE '%', exception_message;
END $$;
/*
Здесь SQL команды для наката (или отката) внутри транзакции
*/
COMMIT;
/*
Здесь SQL команды для наката (или отката), которые не могут работать внутри транзакции
*/
Если какие-либо запросы в миграции приведут к долгому блокированию таблиц, то транзакция с миграцией откатится, а ваши пользователи не пострадают. Если транзакция откатится, то есть 2 варианта: запустить повторно во время меньших нагрузок или оптимизировать код миграции, чтобы свести к минимуму блокировки.
Возможно, есть смысл выставить настройку lock_timeout = '60s'
прямо в postgresql.conf
.
Если что-то пойдёт не так, то пользователи пострадают недолго, а проблема может решиться автоматически.
with table_stats as (
select psut.relname,
psut.n_live_tup,
1.0 * psut.idx_scan / greatest(1, psut.seq_scan + psut.idx_scan) as index_use_ratio
from pg_stat_user_tables psut
order by psut.n_live_tup desc
),
table_io as (
select psiut.relname,
sum(psiut.heap_blks_read) as table_page_read,
sum(psiut.heap_blks_hit) as table_page_hit,
sum(psiut.heap_blks_hit) / greatest(1, sum(psiut.heap_blks_hit) + sum(psiut.heap_blks_read)) as table_hit_ratio
from pg_statio_user_tables psiut
group by psiut.relname
order by table_page_read desc
),
index_io as (
select psiui.relname,
psiui.indexrelname,
sum(psiui.idx_blks_read) as idx_page_read,
sum(psiui.idx_blks_hit) as idx_page_hit,
1.0 * sum(psiui.idx_blks_hit) / greatest(1.0, sum(psiui.idx_blks_hit) + sum(psiui.idx_blks_read)) as idx_hit_ratio
from pg_statio_user_indexes psiui
group by psiui.relname, psiui.indexrelname
order by sum(psiui.idx_blks_read) desc
)
select ts.relname, ts.n_live_tup, ts.index_use_ratio,
ti.table_page_read, ti.table_page_hit, ti.table_hit_ratio,
ii.indexrelname, ii.idx_page_read, ii.idx_page_hit, ii.idx_hit_ratio
from table_stats ts
left outer join table_io ti
on ti.relname = ts.relname
left outer join index_io ii
on ii.relname = ts.relname
order by ti.table_page_read desc, ii.idx_page_read desc
pg_dump -U postgres -h 127.0.0.1 --exclude-table=_* --dbname={database_src} --schema=public --verbose | psql -U postgres -h 127.0.0.1 --dbname={database_dst} --single-transaction --set ON_ERROR_ROLLBACK=on 2> errors.txt
Готовая функция: is_sql.sql
-- PostgreSQL syntax check without running the query
DO $SYNTAX_CHECK$ BEGIN
RETURN;
-- insert your SQL code here
END; $SYNTAX_CHECK$;