-
Notifications
You must be signed in to change notification settings - Fork 0
/
sp_debezium_log_merge.sql
103 lines (86 loc) · 3.75 KB
/
sp_debezium_log_merge.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
CREATE OR REPLACE PROCEDURE `project-id.curated.sp_debezium_log_merge` (dbtbname STRING, colname STRING,pk STRING,region STRING)
BEGIN
DECLARE col_diff INT64;
DECLARE cols STRING;
DECLARE join_key STRING;
SET join_key= (select replace(pk,',','||'));
select join_key;
EXECUTE IMMEDIATE FORMAT("""
CREATE OR REPLACE VIEW """||dbtbname||"""_log_v AS(
SELECT
message_id,
JSON_VALUE(data.payload.op) op,
""" || colname ||""",
TIMESTAMP_MILLIS(CAST(JSON_VALUE(data.payload.source.ts_ms) AS INT64)) source_ts,
publish_time,
bq_load_ts,
JSON_VALUE(data.payload.source.db) ||'.'|| JSON_VALUE(data.payload.source.table) source_db_table,
subscription_name,
CAST(JSON_VALUE(data.payload.source.pos) AS INT64) pos
FROM
""" ||dbtbname||"""_log
WHERE
JSON_VALUE(data.payload.op)IS NOT NULL
ORDER BY
source_ts DESC,pos DESC); """);
--DELETE FROM test.column_diff WHERE table_name= (select format('%s',dbtbname));
execute immediate format("""
select DISTINCT 1 from(
-- INSERT INTO test.column_diff
SELECT '"""|| dbtbname ||"""_log_v' table,* FROM
(SELECT column_name,data_type FROM """ ||region||""".INFORMATION_SCHEMA.COLUMNS where trim(table_catalog)||'.'||trim(table_schema)||'.'||trim(table_name)='"""|| dbtbname ||"""_log_v' and column_name <> 'op'
EXCEPT DISTINCT
SELECT column_name,data_type FROM """ ||region||""".INFORMATION_SCHEMA.COLUMNS where trim(table_catalog)||'.'||trim(table_schema)||'.'||trim(table_name)='"""|| dbtbname ||"""')
)
""") into col_diff;
--SELECT col_diff;
IF 1 = (SELECT col_diff )
--IF EXISTS (SELECT 1 FROM test.column_diff WHERE table_name= (select format('%s',dbtbname)) )
THEN
--DECLARE dbtbname STRING DEFAULT "project-id.test.debezium_usertable";
--DECLARE pk STRING DEFAULT "ycsb_key";
execute immediate format("""
CREATE OR REPLACE TABLE """|| dbtbname ||"""
PARTITION BY DATETIME_TRUNC(publish_time,DAY)
CLUSTER BY """|| pk ||""" AS(
SELECT * EXCEPT(row_num,op)
from
(SELECT * , row_number() over(partition by """|| pk ||""" order by source_ts DESC,pos DESC) row_num
FROM """|| dbtbname ||"""_log_v
) log_latest
where row_num=1
and op <> 'd');
""");
ELSE
BEGIN
BEGIN TRANSACTION;
--DECLARE dbtbname STRING DEFAULT "project-id.test.debezium_usertable";
--DECLARE pk STRING DEFAULT "ycsb_key";
execute immediate format("""
DELETE FROM """|| dbtbname ||"""
WHERE """|| join_key ||""" IN (SELECT DISTINCT """|| join_key ||""" FROM """|| dbtbname ||"""_log_v where publish_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 2 DAY));
""");
execute immediate format("""
SELECT string_agg(column_name) FROM """ ||region||""".INFORMATION_SCHEMA.COLUMNS where trim(table_catalog)||'.'||trim(table_schema)||'.'||trim(table_name)='"""|| dbtbname ||"""_log_v' and column_name <> 'op'
""") into cols;
-- DECLARE dbtbname STRING DEFAULT "project-id.test.debezium_usertable";
-- DECLARE pk STRING DEFAULT "ycsb_key";
execute immediate format("""
INSERT INTO """|| dbtbname ||""" ( """|| cols ||""")
SELECT """|| cols ||"""
from
(SELECT * , row_number() over(partition by """|| pk ||""" order by source_ts DESC,pos DESC) row_num
FROM """|| dbtbname ||"""_log_v
where publish_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP, INTERVAL 2 DAY)
) log_latest
where row_num=1
and op <> 'd';
""");
COMMIT TRANSACTION;
EXCEPTION WHEN ERROR THEN
-- Roll back the transaction inside the exception handler.
SELECT @@error.message;
ROLLBACK TRANSACTION;
END;
END IF;
END;