# 第十章 PostgreSQL服务器编程

In [1]:
%load_ext sql

  warn("IPython.utils.traitlets has moved to a top-level traitlets package.")


###  连接你所创建的数据库

通过pgAdmin III在PostgreSQL数据库中创建Ex6数据库，增加postgis扩展，并连接该数据库

In [2]:
%%sql postgresql://postgres:postgres@localhost:5432/Ex6

SET statement_timeout = 0;
SET lock_timeout = 0;
SET client_encoding = 'GBK';
SET standard_conforming_strings = on;
SET check_function_bodies = false;
SET client_min_messages = warning;

Done.
Done.
Done.
Done.
Done.
Done.


[]

建议查看PostgreSQL数据库的<a href="http://www.postgresql.org/docs/current/static/plpgsql.html" target="_blank">PL/pgSQL</a>帮助文档和<a href="http://www.postgresql.org/docs/current/static/plpgsql-trigger.html" target="_blank">Trigger</a>的帮助文档，学习PL/pgSQL语言、PostgreSQL的函数和触发器写法

### 10.1.1 PostgreSQL服务器

银行转账函数

In [3]:
%%sql
Drop table if exists accounts;
Create table accounts(owner text, balance numeric);
Insert into accounts values ('Bob', 100);
Insert into accounts values ('Mary', 200);
Update accounts set balance = balance-14 where owner = 'Bob';
Update accounts set balance = balance+14 where owner = 'Mary';

Done.
Done.
1 rows affected.
1 rows affected.
1 rows affected.
1 rows affected.


[]

In [4]:
%%sql 
CREATE OR REPLACE FUNCTION transfer (
                i_payer text,
                i_recipient text,
                i_amount numeric(15,2))
RETURNS text
AS
$$
DECLARE
    payer_bal numeric;
BEGIN
    SELECT balance INTO payer_bal
        FROM accounts
    WHERE owner = i_payer FOR UPDATE;
    IF NOT FOUND THEN
        RETURN 'Payer account not found';
    END IF;
    IF payer_bal < i_amount THEN
        RETURN 'Not enough founds';
    END IF;
UPDATE accounts
        SET balance = balance + i_amount
    WHERE owner = i_recipient;
    IF NOT FOUND THEN
        RETURN 'Recipient account not found';
    END IF;

    UPDATE accounts
        SET balance = balance - i_amount
    WHERE owner = i_payer;

    RETURN 'OK';
END;
$$ LANGUAGE plpgsql;

Done.


[]

In [5]:
%sql SELECT  transfer('Bob', 'Mary', 14);

1 rows affected.


transfer
OK


In [6]:
%sql SELECT * FROM transfer('Bob', 'Mary', 1000);

1 rows affected.


transfer
Not enough founds


In [7]:
%sql select * from accounts;

2 rows affected.


owner,balance
Mary,228
Bob,72


### 10.1.3 自定义类型和操作符

Fruit_QTY类似和fruity_qty_larger_than操作符

定义类型fruit_qty来表示水果的数量，比较苹果和橘子的价值，假设一个橘子等于1.5个苹果的价值

思考PostGIS中Geometry类型及相关空间函数的实现？

In [8]:
%%sql
drop OPERATOR if exists > (FRUIT_QTY, FRUIT_QTY);
drop function if exists fruit_qty_larger_than (left_fruit FRUIT_QTY, right_fruit FRUIT_QTY);
drop type if exists fruit_qty;

CREATE TYPE FRUIT_QTY as (name text, qty int);

CREATE OR REPLACE FUNCTION fruit_qty_larger_than (left_fruit FRUIT_QTY,right_fruit FRUIT_QTY)
RETURNS boolean
AS $$
BEGIN
    IF (left_fruit.name = 'APPLE' AND right_fruit.name = 'ORANGE') THEN
        RETURN left_fruit.qty > (1.5 * right_fruit.qty);
    END IF;
    IF (left_fruit.name = 'ORANGE' AND right_fruit.name = 'APPLE') THEN
        RETURN (1.5 * left_fruit.qty) > right_fruit.qty;
    END IF;
    RETURN left_fruit.qty > right_fruit.qty;
END;  
$$ LANGUAGE plpgsql;

Done.
Done.
Done.
Done.
Done.


[]

In [9]:
%sql SELECT '("APPLE", 3)'::FRUIT_QTY

1 rows affected.


fruit_qty
"(APPLE,3)"


In [10]:
%sql SELECT fruit_qty_larger_than('("APPLE", 3)'::FRUIT_QTY, '("ORANGE", 2)'::FRUIT_QTY);

1 rows affected.


fruit_qty_larger_than
False


OPERATOR > 操作符

In [11]:
%%sql
CREATE OPERATOR > (
    leftarg = FRUIT_QTY,
    rightarg = FRUIT_QTY,
    procedure = fruit_qty_larger_than,
    commutator = >);

Done.


[]

In [12]:
%sql SELECT '("ORANGE", 2)'::FRUIT_QTY > '("APPLE", 3)'::FRUIT_QTY

1 rows affected.


?column?
False


### 10.2.1 函数结构

In [13]:
%%sql
CREATE OR REPLACE FUNCTION mid(varchar, integer, integer) RETURNS varchar
AS $$
BEGIN
    RETURN substring($1, $2, $3);
END
$$
LANGUAGE plpgsql;

Done.


[]

In [14]:
%sql select mid('adcdefg', 2, 5)

1 rows affected.


mid
dcdef


In [15]:
%%sql
CREATE OR REPLACE FUNCTION mid(keyfield varchar, starting_point integer)
     RETURNS varchar
AS $$
BEGIN
    RETURN substring(keyfield, starting_point);
END
$$
LANGUAGE plpgsql;

Done.


[]

In [16]:
%sql select mid('adcdefg', 2)

1 rows affected.


mid
dcdefg


In [17]:
%%sql
CREATE OR REPLACE FUNCTION mid(keyfield varchar, starting_point integer)
     RETURNS varchar
AS $$
DECLARE temp varchar;
BEGIN
    temp := substring(keyfield, starting_point);
    return temp;
END
$$ LANGUAGE plpgsql;

Done.


[]

In [18]:
%sql select mid('adcdefg', 2)

1 rows affected.


mid
dcdefg


### 10.2.2 条件表达式

通过计数器循环实现Fibonacci序列计算

In [19]:
%%sql
CREATE OR REPLACE FUNCTION fib(n integer)
    RETURNS decimal(1000, 0)
AS $$
    DECLARE counter integer := 0;
    DECLARE a decimal(1000, 0) := 0;
    DECLARE b decimal(1000, 0) := 1;
BEGIN
    IF (n < 1) THEN RETURN 0; END IF;
    LOOP
        EXIT WHEN counter = n;
        counter := counter + 1;
        SELECT b, a+b INTO a, b;
    END LOOP;
    RETURN a;
END;
$$ LANGUAGE plpgsql;

Done.


[]

In [20]:
r = %sql select fib(0)
print r

r = %sql select fib(1)
print r

r = %sql select fib(10)
print r

1 rows affected.
+-----+
| fib |
+-----+
|  0  |
+-----+
1 rows affected.
+-----+
| fib |
+-----+
|  1  |
+-----+
1 rows affected.
+-----+
| fib |
+-----+
|  55 |
+-----+


### 10.2.3 返回集合

返回Fibonacci序列整数集合

In [21]:
%%sql
CREATE OR REPLACE FUNCTION fib_seq(num integer)
    RETURNS SETOF integer AS $$
DECLARE a int := 0;
        b int := 1;
BEGIN
    IF (num < 1) THEN RETURN; END IF;
    RETURN NEXT a;
    LOOP
        EXIT WHEN num <= 1;
        RETURN NEXT b;
        num := num - 1;
        SELECT b, a+b INTO a, b;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

Done.


[]

In [22]:
r = %sql SELECT fib_seq(4);
print r

r = %sql SELECT * FROM fib_seq(5);
print r

r = %sql SELECT * FROM fib_seq(6) WHERE 1 = ANY(SELECT fib_seq(6));
print r

4 rows affected.
+---------+
| fib_seq |
+---------+
|    0    |
|    1    |
|    1    |
|    2    |
+---------+
5 rows affected.
+---------+
| fib_seq |
+---------+
|    0    |
|    1    |
|    1    |
|    2    |
|    3    |
+---------+
6 rows affected.
+---------+
| fib_seq |
+---------+
|    0    |
|    1    |
|    1    |
|    2    |
|    3    |
|    5    |
+---------+


返回数据库Ex6已经安装的语言

In [23]:
%%sql
CREATE OR REPLACE FUNCTION installed_languages()
    RETURNS SETOF pg_language AS $$
BEGIN
    RETURN QUERY SELECT * FROM pg_language;
END;
$$ LANGUAGE plpgsql;

Done.


[]

In [24]:
%sql SELECT * FROM installed_languages();

4 rows affected.


lanname,lanowner,lanispl,lanpltrusted,lanplcallfoid,laninline,lanvalidator,lanacl
internal,10,False,False,0,0,2246,
c,10,False,False,0,0,2247,
sql,10,False,True,0,0,2248,
plpgsql,10,True,True,12356,12357,12358,


### 10.2.4 OUT参数与记录集

In [25]:
%%sql
CREATE OR REPLACE FUNCTION positive (INOUT a int, INOUT b int)
AS $$
BEGIN
    IF a < 0 THEN a = null; END IF;
    IF b < 0 THEN b = null; END IF;
END;
$$ LANGUAGE plpgsql;

Done.


[]

In [26]:
%sql select positive(10, -5)

1 rows affected.


positive
"(10,)"


In [27]:
%%sql
CREATE or replace FUNCTION sum_n_product(x int, y int, OUT sum int, OUT prod int)
AS $$
BEGIN
    sum := x + y;
    prod := x * y;
END;
$$ LANGUAGE plpgsql;

Done.


[]

In [28]:
%sql select sum_n_product(5, 7)

1 rows affected.


sum_n_product
"(12,35)"


In [29]:
%%sql 
CREATE OR REPLACE FUNCTION swap(INOUT a int, INOUT b int)
    RETURNS SETOF RECORD
AS $$
BEGIN
    RETURN NEXT;
    SELECT b, a INTO a, b; 
    RETURN NEXT;
END;
$$ LANGUAGE plpgsql;

Done.


[]

In [31]:
%sql select swap (5, 10)

2 rows affected.


swap
"(5,10)"
"(10,5)"


In [32]:
%%sql
Drop table if exists sales;
Create table sales(itemno int, quantity int, price numeric);
Insert into sales values (101, 100, 20.2);
Insert into sales values (101, 200, 30.3);

CREATE OR REPLACE FUNCTION extended_sales (p_itemno int)
    RETURNS TABLE(quantity int, total numeric)
AS $$
BEGIN
    RETURN QUERY SELECT s.quantity, s.quantity * s.price 
                                  FROM sales AS s
                                  WHERE s.itemno = p_itemno;
END;
$$ LANGUAGE plpgsql;

Done.
Done.
1 rows affected.
1 rows affected.
Done.


[]

In [33]:
%sql select extended_sales(101);

2 rows affected.


extended_sales
"(100,2020.0)"
"(200,6060.0)"


### 10.2.5 返回游标

In [34]:
%%sql
drop table if exists fiverows cascade;
Create table fiverows(id serial primary key, data text);
Insert into fiverows(data) values('one'), ('two'), ('three'), ('four'), ('five');

CREATE or replace FUNCTION curtest1(cur refcursor, tag text) 
    RETURNS refcursor 
AS $$
BEGIN
    OPEN cur FOR SELECT id, data || '+' || tag FROM fiverows;
    RETURN cur;
END;
$$ LANGUAGE plpgsql;

Done.
Done.
5 rows affected.
Done.


[]

In [35]:
%%sql
CREATE or replace FUNCTION curtest2(tag1 text, tag2 text) RETURNS SETOF fiverows AS $$
DECLARE  cur1 refcursor; cur2 refcursor; row record;
BEGIN
    cur1 = curtest1(NULL, tag1);
    cur2 = curtest1(NULL, tag2);
    LOOP
        FETCH cur1 INTO row;
        EXIT WHEN NOT FOUND;
        RETURN NEXT row;
        FETCH cur2 INTO row;
        EXIT WHEN NOT FOUND;
        RETURN NEXT row;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

Done.


[]

In [36]:
%sql select curtest2('two', 'four');

10 rows affected.


curtest2
"(1,one+two)"
"(1,one+four)"
"(2,two+two)"
"(2,two+four)"
"(3,three+two)"
"(3,three+four)"
"(4,four+two)"
"(4,four+four)"
"(5,five+two)"
"(5,five+four)"


### 10.3.1 创建触发器
notify_trigger，RAISE NOTICE可在pgAdmin III的消息窗口查看

In [37]:
%%sql
CREATE OR REPLACE FUNCTION notify_trigger()
    RETURNS TRIGGER AS $$
BEGIN
    RAISE NOTICE 'Hi, I got % invoked for % % % on %', 
              TG_NAME, TG_LEVEL, TG_WHEN, TG_OP, TG_TABLE_NAME;
    RETURN NEW;
END
$$ LANGUAGE plpgsql;

drop table if exists notify_test;
CREATE TABLE notify_test(i int);

drop trigger if exists notify_insert_trigger on notify_test;
CREATE TRIGGER notify_insert_trigger
    AFTER INSERT ON notify_test
    FOR EACH ROW
    EXECUTE PROCEDURE notify_trigger();


Done.
Done.
Done.
Done.
Done.


[]

In [38]:
%sql INSERT INTO notify_test VALUES (1), (2);

2 rows affected.


[]

### 10.3.2 审核触发器
audit_trigger

In [39]:
%%sql
drop table if exists audit_log;
CREATE TABLE audit_log (
    username text,             -- who did the change
    event_time_utc timestamp,  -- when the event was recorded
    table_name text,           -- contains schema-qualified table name
    operation text,            -- INSERT, UPDATE, DELETE or TRUNCATE
    before_value json,         -- the OLD tuple value
    after_value json           -- the NEW tuple value
);

Done.
Done.


[]

In [40]:
%%sql
CREATE OR REPLACE FUNCTION audit_trigger()
    RETURNS TRIGGER 
AS $$
DECLARE old_row json := NULL;
        new_row json := NULL;
BEGIN
    IF TG_OP IN ('UPDATE', 'DELETE') THEN
        old_row = row_to_json(OLD);
    END IF;
    IF TG_OP IN ('INSERT', 'UPDATE') THEN
        new_row = row_to_json(NEW);
    END IF;
    INSERT INTO audit_log VALUES(session_user, current_timestamp AT TIME ZONE 'UTC', TG_TABLE_SCHEMA || '.' || TG_TABLE_NAME, TG_OP, old_row, new_row);
    RETURN NEW;
END; 
$$ LANGUAGE plpgsql;

Done.


[]

In [41]:
%%sql
drop trigger if exists audit_log on notify_test;
CREATE TRIGGER audit_log
    AFTER INSERT OR UPDATE OR DELETE
    ON notify_test
    FOR EACH ROW
    EXECUTE PROCEDURE audit_trigger();

Done.
Done.


[]

In [42]:
%sql delete from notify_test;
%sql delete from audit_log;
%sql insert into notify_test values(1), (2), (3), (4);
%sql update notify_test set i = i + 100 where i < 3;
%sql delete from notify_test where i < 100;
%sql select * from audit_log;

4 rows affected.
4 rows affected.
4 rows affected.
2 rows affected.
2 rows affected.
8 rows affected.


username,event_time_utc,table_name,operation,before_value,after_value
postgres,2016-05-10 04:01:16.789143,public.notify_test,INSERT,,{u'i': 1}
postgres,2016-05-10 04:01:16.789143,public.notify_test,INSERT,,{u'i': 2}
postgres,2016-05-10 04:01:16.789143,public.notify_test,INSERT,,{u'i': 3}
postgres,2016-05-10 04:01:16.789143,public.notify_test,INSERT,,{u'i': 4}
postgres,2016-05-10 04:01:16.797934,public.notify_test,UPDATE,{u'i': 1},{u'i': 101}
postgres,2016-05-10 04:01:16.797934,public.notify_test,UPDATE,{u'i': 2},{u'i': 102}
postgres,2016-05-10 04:01:16.803696,public.notify_test,DELETE,{u'i': 3},
postgres,2016-05-10 04:01:16.803696,public.notify_test,DELETE,{u'i': 4},


### 10.3.3 数据保护触发器
Before、After触发器

In [43]:
%%sql
CREATE OR REPLACE FUNCTION cancel_op()
    RETURNS TRIGGER 
AS $$
BEGIN
    IF TG_WHEN = 'AFTER' THEN
        RAISE EXCEPTION 'You are not allowed to % rows in %.%', TG_OP, TG_TABLE_SCHEMA, TG_TABLE_NAME;
    END IF;
    RAISE NOTICE '% on rows in %.% will not happen', TG_OP, TG_TABLE_SCHEMA, TG_TABLE_NAME;
    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

Done.


[]

In [45]:
%%sql
drop trigger if exists disallow_delete on notify_test;
CREATE TRIGGER disallow_delete
    AFTER DELETE ON notify_test
    FOR EACH STATEMENT
    EXECUTE PROCEDURE cancel_op();
    
drop trigger if exists disallow_truncate on notify_test;
CREATE TRIGGER disallow_truncate
    AFTER TRUNCATE ON notify_test
    FOR EACH STATEMENT
    EXECUTE PROCEDURE cancel_op();

Done.
Done.
Done.
Done.


[]

In [46]:
%sql delete from notify_test where i > 100;

InternalError: (psycopg2.InternalError) 错误:  You are not allowed to DELETE rows in public.notify_test
 [SQL: 'delete from notify_test where i > 100;']

### 10.3.3数据保护触发器
使用new、old

In [47]:
%%sql
drop table if exists modify_test;
CREATE TABLE modify_test (
    id serial PRIMARY KEY,
    data text,
    created_by text default SESSION_USER,
    created_at timestamp default CURRENT_TIMESTAMP,
    last_changed_by text default SESSION_USER,
    last_changed_at timestamp default CURRENT_TIMESTAMP);

CREATE OR REPLACE FUNCTION changestamp()
    RETURNS TRIGGER AS $$
BEGIN
        NEW.last_changed_by = SESSION_USER;
        NEW.last_changed_at  = CURRENT_TIMESTAMP;
        RETURN NEW;
END;
$$ LANGUAGE plpgsql;


drop trigger if exists changestamp on modify_test;
CREATE TRIGGER changestamp
    BEFORE UPDATE ON modify_test FOR EACH ROW
    EXECUTE PROCEDURE changestamp();


INSERT INTO modify_test(data) VALUES('something');
UPDATE modify_test SET data = 'something else' WHERE id = 1;
select * from modify_test;

Done.
Done.
Done.
Done.
Done.
1 rows affected.
1 rows affected.
1 rows affected.


id,data,created_by,created_at,last_changed_by,last_changed_at
1,something else,postgres,2016-05-10 12:13:16.424889,postgres,2016-05-10 12:13:16.433131


In [48]:
%%sql
CREATE OR REPLACE FUNCTION usagestamp()
    RETURNS TRIGGER AS $$
BEGIN
        IF TG_OP = 'INSERT' THEN
            NEW.created_by = SESSION_USER;
            NEW.created_at  = CURRENT_TIMESTAMP;
        ELSE
            NEW.created_by = OLD.created_by;
            NEW.created_at  = OLD.created_at;
        END IF;
        NEW.last_changed_by = SESSION_USER;
        NEW.last_changed_at  = CURRENT_TIMESTAMP;
        RETURN NEW;
END;
$$ LANGUAGE plpgsql;


CREATE TRIGGER usagestamp
    BEFORE INSERT OR UPDATE ON modify_test FOR EACH ROW
    EXECUTE PROCEDURE usagestamp();

DROP TRIGGER changestamp on modify_test;
UPDATE modify_test SET created_by = 'notpostgres', created_at = '2001-01-01';
SELECT * FROM modify_test;

Done.
Done.
Done.
1 rows affected.
1 rows affected.


id,data,created_by,created_at,last_changed_by,last_changed_at
1,something else,postgres,2016-05-10 12:13:16.424889,postgres,2016-05-10 12:15:13.066437


### 10.3.4 触发器效率与调试
使用when

In [49]:
%%sql
drop table if exists new_t;
CREATE TABLE new_t (i int);
    
CREATE OR REPLACE FUNCTION cancel_with_message()
    RETURNS TRIGGER 
AS $$
BEGIN
        RAISE EXCEPTION '%', TG_ARGV[0];
        RETURN NULL;
END;
$$ LANGUAGE plpgsql;

drop trigger if exists no_updates_on_friday_afternoon on new_t;
CREATE TRIGGER no_updates_on_friday_afternoon
    BEFORE INSERT OR UPDATE OR DELETE OR TRUNCATE ON new_t
    FOR EACH STATEMENT
    WHEN (CURRENT_TIME > '12:00' AND extract (DOW from CURRENT_TIMESTAMP) = 5)
    EXECUTE PROCEDURE cancel_with_message('Sorry, we have a "No task change on Friday afternoon" policy!');

insert into new_t values(1);

Done.
Done.
Done.
Done.
Done.
1 rows affected.


[]

## 《PostGIS in Action》书中空间函数与触发器案例

### 1. Creating Equal Areas by Sharding

In [None]:
%%sql
CREATE OR REPLACE FUNCTION 
    slicegeometry(
        ageom geometry, numsections integer, 
        OUT bucket integer, OUT geom geometry)
RETURNS SETOF record 
AS $$

WITH RECURSIVE
    
ref (geom,the_box,targ_area,x_mov,y_mov,  -- 1. efine constants
    x_length,y_length,xmin,ymin) AS ( 
    SELECT 
        geom, 
        ST_MakeEnvelope(
            xmin, ymin, 
            xmin + CAST(x_length/ngrid_xy AS integer), 
            ymin + CAST(y_length/ngrid_xy AS integer), 
            ST_SRID(s.geom)
        ) AS the_box, 
        ST_Area(geom)/$2 AS targ_area, 
        CAST(x_length/ngrid_xy AS integer) AS x_mov,  
        CAST(y_length/ngrid_xy AS integer) y_mov, 
        s.x_length, s.y_length, xmin, ymin        
    FROM (
        SELECT 
            $1 AS geom, ST_XMin($1) AS xmin, ST_YMin($1) AS ymin, 
            ST_XMax($1) - ST_XMin($1) AS x_length, 
            ST_YMax($1) - ST_YMin($1) AS y_length, 
            15*$2 AS ngrid_xy) AS s                   
    ),                                                         

X(x) AS ( -- 2. Start position of squares
    VALUES (CAST(0 AS float))
    UNION ALL                                         
    SELECT x + ref.x_mov FROM X CROSS JOIN ref WHERE x <  ref.x_length
),              
       
       
Y(y) AS ( 
    VALUES (CAST(0 AS float))       
    UNION ALL         
    SELECT y + ref.y_mov FROM Y CROSS JOIN ref WHERE y < ref.y_length
),        
   
diced AS (  -- 3. cut into shards
    SELECT ROW_NUMBER() OVER(ORDER BY x,y) AS row_num, g.x, g.y, g.geom
    FROM (
        SELECT 
            x, y, 
            ST_Intersection(ref.geom,
                ST_Translate(ref.the_box,x,y)) AS geom
        FROM x CROSS JOIN y CROSS JOIN ref        
        WHERE ST_Intersects(ref.geom, ST_Translate(ref.the_box,x,y))
    ) AS g                                    
),                                                    

T (bucket, row_num, geom, total_area, targ_area, 
 remaining_area) AS ( -- 4. bucket the shards
      SELECT 
        1 AS bucket, row_num, diced.geom, 
        ST_Area(diced.geom) AS total_area,  
        ref.targ_area, 
        ST_Area(ref.geom) - ST_Area(diced.geom) AS remaining_area
    FROM diced CROSS JOIN ref 
    WHERE diced.row_num = 1            
    UNION ALL    
    SELECT 
        CASE 
            WHEN 
                T2.total_area + ST_Area(diced.geom) < T2.targ_area 
                OR 
                T2.remaining_area < T2.targ_area/4 
            THEN 
                T2.bucket 
            ELSE T2.bucket + 1 END AS bucket, 
        diced.row_num, 
        diced.geom,                            
        CASE 
            WHEN T2.total_area + ST_Area(diced.geom) < T2.targ_area 
            THEN T2.total_area + ST_Area(diced.geom) 
            ELSE ST_Area(diced.geom) 
        END AS total_area, 
        T2.targ_area, 
        T2.remaining_area - ST_Area(diced.geom) AS remaining_area
    FROM 
        diced INNER JOIN 
        (SELECT * FROM T ORDER BY row_num DESC LIMIT 1) AS T2
    ON diced.row_num = T2.row_num + 1 
)
    
SELECT bucket, ST_Union(geom) AS geom  -- 5. union shards by bucket
    FROM T GROUP BY T.bucket, T.targ_area  

$$
LANGUAGE 'sql' IMMUTABLE;

### 2. Cut linestrings and multilinestrings at nearest point junctions

In [None]:
%%sql
CREATE OR REPLACE FUNCTION cutlineatpoints(
    param_mlgeom geometry, 
    param_mpgeom geometry, 
    param_tol double precision
)
RETURNS geometry AS
$$
DECLARE
    var_resultgeom geometry;
    var_sline geometry;
    var_eline geometry;
    var_perc_line double precision;
    var_refgeom geometry;
    var_pset geometry[] :=  -- 1. Convert geometries to array
        ARRAY(SELECT geom FROM ST_Dump(param_mpgeom));             
    var_lset geometry[] := 
        ARRAY(SELECT geom FROM ST_Dump(param_mlgeom));  
BEGIN

FOR i in 1 .. array_upper(var_pset,1) LOOP -- 2. Loop through each point
    FOR j in 1 .. array_upper(var_lset,1) LOOP -- 3. Loop throught each point
        IF 
            ST_DWithin(var_lset[j],var_pset[i],param_tol) AND -- If point within tolerance of line, make a cut
            NOT ST_Intersects(ST_Boundary(var_lset[j]),var_pset[i])
        THEN                                 -- Recurse if multilinestring
            IF ST_NumGeometries(ST_Multi(var_lset[j])) = 1 THEN 
                var_perc_line := 
                ST_Line_Locate_Point(var_lset[j],var_pset[i]);
                IF var_perc_line BETWEEN 0.0001 and 0.9999 THEN
                    var_sline := 
                        ST_Line_Substring(var_lset[j],0,var_perc_line);
                    var_eline := 
                        ST_Line_Substring(var_lset[j],var_perc_line,1);
                    var_eline := 
                        ST_SetPoint(var_eline,0,ST_EndPoint(var_sline));
                    var_lset[j] := ST_Collect(var_sline,var_eline);
                END IF;
            ELSE
                var_lset[j] :=   -- Convert geometries to array
                    cutlineatpoints(var_lset[j],var_pset[i]);
            END IF;
        END IF;
    END LOOP;
END LOOP;
  
RETURN ST_Union(var_lset);

END;
$$
LANGUAGE 'plpgsql' IMMUTABLE STRICT;

### 3. Creating an ST_SimplifyPreserveTopoloty wrapper for geography

In [None]:
%%sql
CREATE OR REPLACE FUNCTION 
    SimplifyPreserveTopology(geography, double precision)
RETURNS geography AS
$$
SELECT 
    geography(
        ST_Transform(
            ST_SimplifyPreserveTopology(
                ST_Transform(geometry($1),_ST_BestSRID($1,$1)), -- <co id="co_code_ugeog_simplifypreservetopology_1"/> 
                $2
            ),
        4326)
    )
$$
LANGUAGE sql IMMUTABLE STRICT
COST 300;

### 4. PL/pgSQL Before Insert trigger function to redirect inserts

In [None]:
%%sql
drop table if exists pairs;
drop table if exists paris_rejects;

CREATE TABLE paris (
    gid SERIAL PRIMARY KEY, 
    osm_id bigint, 
    ar_num integer, 
    feature_name varchar(200), 
    feature_type varchar(50), 
    geom geometry(geometry, 32631)
);

CREATE TABLE paris_rejects (
    gid integer NOT NULL PRIMARY KEY,
    osm_id integer,
    ar_num integer,
    feature_name varchar(200),
    feature_type varchar(50),
    geom geometry, tags hstore
);

CREATE OR REPLACE FUNCTION trigger_paris_insert() 
RETURNS trigger AS
$$
DECLARE 
    var_geomtype text;
BEGIN
    var_geomtype := geometrytype(NEW.geom); -- 1. Use temporary variables
    IF var_geomtype IN ('MULTIPOLYGON', 'POLYGON') THEN
        NEW.geom := ST_Multi(NEW.geom);
        INSERT INTO ch14.paris_polygons(
            gid,osm_id,ar_num,feature_name,feature_type,geom,tags
        )
        SELECT gid,osm_id,ar_num,feature_name,feature_type,geom,tags
        FROM (SELECT NEW.*) As foo; -- 2. NEW is alias for table that contains new record
    ELSIF var_geomtype = 'POINT' THEN
        INSERT INTO ch14.paris_points (
            gid,osm_id,ar_num,feature_name,feature_type,geom,tags
        )
        SELECT gid,osm_id,ar_num,feature_name,feature_type,geom,tags
        FROM (SELECT NEW.*) As foo;
    ELSIF var_geomtype = 'LINESTRING' THEN
        INSERT INTO ch14.paris_linestrings (
            gid,osm_id,ar_num,feature_name,feature_type,geom,tags
        )
        SELECT gid,osm_id,ar_num,feature_name,feature_type,geom,tags
        FROM (SELECT NEW.*) As foo;
    ELSE
        INSERT INTO ch14.paris_rejects (
            gid,osm_id,ar_num,feature_name,feature_type,geom,tags
        )
        SELECT gid,osm_id,ar_num,feature_name,feature_type,geom,tags 
        FROM (SELECT NEW.*) As foo; -- 3. Nonstandard geometry types go into rejects table                        
    END IF;
    RETURN NULL; -- 4. Cancel original insert
END;
$$
LANGUAGE 'plpgsql' VOLATILE;


CREATE TRIGGER trigger1_paris_insert BEFORE INSERT
ON paris FOR EACH ROW
EXECUTE PROCEDURE trigger_paris_insert();

### 5. Trigger that dynamically creates tables as needed

In [None]:
%%sql 
drop table if exists pairs_points;
CREATE TABLE paris_points(
    gid SERIAL PRIMARY KEY, 
    osm_id bigint,
    ar_num integer, 
    feature_name varchar(200),
    feature_type varchar(50), 
    geom geometry(Point, 32631)
); 

CREATE OR REPLACE FUNCTION trigger_paris_child_insert() 
RETURNS TRIGGER AS 
$$
DECLARE
    var_sql text;
    var_tbl text;
BEGIN
    var_tbl :=  
        TG_TABLE_NAME || '_ar' || lpad(NEW.ar_num::text,2,'0'); -- 1. Assign destination table name to variable
    IF NOT EXISTS (
        SELECT * 
        FROM information_schema.tables -- 2. Check if destination table exists
        WHERE table_schema = TG_TABLE_SCHEMA AND table_name = var_tbl) 
    THEN        
        var_sql := 
            'CREATE TABLE ' || TG_TABLE_SCHEMA || '.' || var_tbl || 
            '(CONSTRAINT pk_' || var_tbl || 
            ' PRIMARY KEY(gid)) INHERITS (' || TG_TABLE_SCHEMA || 
            '.' || TG_TABLE_NAME  || '); CREATE INDEX idx_' || 
            var_tbl || '_geom ON ' || TG_TABLE_SCHEMA || '.' || 
            var_tbl || ' USING gist(geom); ALTER TABLE ' || 
            TG_TABLE_SCHEMA || '.' || var_tbl || 
            ' ADD CONSTRAINT chk_ar_num CHECK (ar_num = ' || 
            NEW.ar_num::text || ');';
        EXECUTE var_sql; -- 3. Create destination table if absent
    END IF;
    var_sql := 
        'INSERT INTO ' || TG_TABLE_SCHEMA || '.' || var_tbl || 
        '(gid,osm_id,ar_num,feature_name,feature_type,geom,tags) ' || 
        'VALUES($1,$2,$3,$4,$5,$6,$7)'; -- 4. Prepare and execute insert SQL
    EXECUTE var_sql 
    USING 
        NEW.gid,NEW.osm_id,NEW.ar_num,NEW.feature_name,
        NEW.feature_type,NEW.geom,NEW.tags;                       
    RETURN NULL; -- Cancel original insert
END;
$$ language plpgsql;


CREATE TRIGGER trig01_paris_child_insert BEFORE INSERT
ON paris_points FOR EACH ROW
EXECUTE PROCEDURE trigger_paris_child_insert();

### 6. Create a PL/pgSQL stored function to output GeoJSON

In [None]:
%%sql
CREATE OR REPLACE FUNCTION get_features(
    param_geom json,
    param_table text,
    param_props text,
    param_limit integer DEFAULT 10
) 
RETURNS json AS 
$$
DECLARE 
    var_sql text; var_result json; var_srid integer; var_geo geometry; 
    var_table text; var_cols text; var_input_srid integer; 
    var_geom_col text;
BEGIN
    SELECT 
        f_geometry_column, 
        quote_ident(f_table_schema) || '.' || quote_ident(f_table_name) 
    FROM geometry_columns
    INTO var_geom_col, var_table -- 1. Verify table is a geometry table
    WHERE f_table_schema || '.' || f_table_name = param_table
    LIMIT 1;  
 
    IF var_geom_col IS NULL THEN
        RAISE EXCEPTION 'No such geometry table as %', param_table;
    END IF;
    var_geo := ST_GeomFromGeoJSON($1::text); -- 2. Convert location to geometry
    var_input_srid := ST_SRID(var_geo); -- 3. Get SRID of requested location
    If var_input_srid < 1 THEN 
        var_input_srid = 4326; 
        var_geo := ST_SetSRID( 
        ST_GeomFromGeoJSON($1::text),var_input_srid); 
    END IF; 
  
    var_sql := 'SELECT ST_SRID(geom) FROM ' || var_table || ' LIMIT 1'; -- 4. Get SRID of table

    EXECUTE var_sql INTO var_srid; -- <co id="co_code_get_features_4b"/>
  
    SELECT string_agg(quote_ident(trim(a)), ',') 
    INTO var_cols -- <co id="co_code_get_features_5a"/>
    FROM unnest(string_to_array(param_props, ',')) As a; -- 5.  Sanitize column names
     
    var_sql := 
        'SELECT row_to_json(fc) 
        FROM (
            SELECT 
                ''FeatureCollection'' As type, 
                array_to_json(array_agg(f)) As features
            FROM (
                SELECT 
                    ''Feature'' As type, 
                    ST_AsGeoJSON(ST_Transform(
                        lg.' || quote_ident(var_geom_col) || ', $4)
                    )::json As geometry,
                    row_to_json(
                        (SELECT l FROM (SELECT ' || var_cols || ') As l)
                    ) As properties 
                FROM ' || var_table || ' AA lg 
                WHERE ST_Intersects(lg.geom,ST_Transform($1,$2)) LIMIT $3
            ) As f
        ) As fc;'; -- 6. Build parameterized SQL

    EXECUTE var_sql INTO var_result 
    USING var_geo, var_srid, param_limit, var_input_srid; -- 7. Execute parameterized SQL using variables, output to var_result, and return
     
    RETURN var_result; 
END;
$$
LANGUAGE plpgsql;