1+
2+ CREATE OR REPLACE FUNCTION refresh_agg_payout (
3+ bucket_part text , -- 'hour' | 'day' | 'month'
4+ p_from timestamptz DEFAULT now() - interval ' 2 hours' ,
5+ p_to timestamptz DEFAULT now()
6+ ) RETURNS void
7+ LANGUAGE plpgsql
8+ AS $$
9+ DECLARE
10+ v_from timestamp ;
11+ v_to timestamp ;
12+ v_step interval;
13+ v_gran " AggGranularity" ;
14+ v_deleted_count int ;
15+ v_inserted_count int ;
16+ BEGIN
17+ IF bucket_part NOT IN (' hour' ,' day' ,' month' ) THEN
18+ RAISE EXCEPTION ' bucket_part must be hour|day|month' ;
19+ END IF;
20+
21+ v_step := CASE bucket_part
22+ WHEN ' hour' THEN interval ' 1 hour'
23+ WHEN ' day' THEN interval ' 1 day'
24+ WHEN ' month' THEN interval ' 1 month'
25+ END;
26+
27+ v_gran := CASE bucket_part
28+ WHEN ' hour' THEN ' HOUR' ::" AggGranularity"
29+ WHEN ' day' THEN ' DAY' ::" AggGranularity"
30+ ELSE ' MONTH' ::" AggGranularity"
31+ END;
32+
33+ v_from := date_trunc(bucket_part, p_from, ' America/Chicago' );
34+ v_to := date_trunc(bucket_part, p_to, ' America/Chicago' ) + v_step;
35+
36+ RAISE NOTICE ' refresh_agg_payout(%): Processing % to %' , bucket_part, v_from, v_to;
37+
38+ -- Idempotent: clear the window for this granularity
39+ DELETE FROM " AggPayOut"
40+ WHERE granularity = v_gran
41+ AND " timeBucket" >= v_from
42+ AND " timeBucket" < v_to;
43+
44+ GET DIAGNOSTICS v_deleted_count = ROW_COUNT;
45+ RAISE NOTICE ' Deleted % existing rows' , v_deleted_count;
46+
47+ -- One pass: all 8 slices
48+ INSERT INTO " AggPayOut"
49+ (granularity," timeBucket" ," payOutType" ," payInType" ," subName" ," userId" ,
50+ " sumMtokens" ," countUsers" ," countGroup" ," slice" )
51+ WITH payouts_paid_facts AS (
52+ WITH paid AS (
53+ SELECT id,
54+ " payInStateChangedAt" AS changed_at,
55+ " payInType" AS payin_type
56+ FROM " PayIn"
57+ WHERE " payInState" = ' PAID'
58+ AND " payInStateChangedAt" >= (v_from AT TIME ZONE ' UTC' ) AND " payInStateChangedAt" < (v_to AT TIME ZONE ' UTC' )
59+ ),
60+ -- Aggregate territory mtokens per (payin, sub)
61+ territory_mtokens AS (
62+ SELECT
63+ pot." payInId" AS payin_id,
64+ spc." subName" AS sub_name,
65+ SUM (pot." mtokens" ) AS mtokens_sub
66+ FROM " PayOutCustodialToken" pot
67+ JOIN " SubPayOutCustodialToken" spc
68+ ON spc." payOutCustodialTokenId" = pot." id"
69+ JOIN paid p
70+ ON p .id = pot." payInId"
71+ GROUP BY pot." payInId" , spc." subName"
72+ ),
73+ -- Total mtokens per payin for proportion denominator
74+ territory_totals AS (
75+ SELECT payin_id, SUM (mtokens_sub) AS mtokens_total
76+ FROM territory_mtokens
77+ GROUP BY payin_id
78+ ),
79+ -- Proportion per (payin, sub): preserve your special-case (mtokens_sub=0 -> 1)
80+ territory_prop AS (
81+ SELECT
82+ tm .payin_id ,
83+ tm .sub_name ,
84+ CASE
85+ WHEN tm .mtokens_sub = 0 THEN 1 .0
86+ ELSE tm .mtokens_sub ::numeric / NULLIF(tt .mtokens_total , 0 )
87+ END AS mtokens_proportion
88+ FROM territory_mtokens tm
89+ JOIN territory_totals tt USING (payin_id)
90+ ),
91+ -- Non-territory payouts (both sources), to be split across subs via the proportion
92+ non_territory AS (
93+ SELECT pot." payInId" AS payin_id,
94+ pot." payOutType" AS payout_type,
95+ pot." userId" AS user_id,
96+ pot." mtokens" ::numeric AS amount
97+ FROM " PayOutCustodialToken" pot
98+ JOIN paid p ON p .id = pot." payInId"
99+ WHERE pot." payOutType" <> ' TERRITORY_REVENUE'
100+ UNION ALL
101+ SELECT pob." payInId" AS payin_id,
102+ pob." payOutType" AS payout_type,
103+ pob." userId" AS user_id,
104+ pob." msats" ::numeric AS amount
105+ FROM " PayOutBolt11" pob
106+ JOIN paid p ON p .id = pob." payInId"
107+ WHERE pob." payOutType" <> ' TERRITORY_REVENUE'
108+ ),
109+ -- Apply proportions to non-territory payouts (one row per sub)
110+ split_non_territory AS (
111+ SELECT
112+ nt .payin_id ,
113+ tp .sub_name ,
114+ nt .payout_type ,
115+ nt .user_id ,
116+ (nt .amount * COALESCE(tp .mtokens_proportion , 1 .0 ))::bigint AS mtokens
117+ FROM non_territory nt
118+ LEFT JOIN territory_prop tp
119+ ON tp .payin_id = nt .payin_id
120+ ),
121+ -- Territory revenue: attributed directly to the specific sub
122+ direct_territory AS (
123+ SELECT
124+ pot." payInId" AS payin_id,
125+ spc." subName" AS sub_name,
126+ pot." payOutType" AS payout_type,
127+ pot." userId" AS user_id,
128+ pot." mtokens" ::bigint AS mtokens
129+ FROM " PayOutCustodialToken" pot
130+ JOIN " SubPayOutCustodialToken" spc
131+ ON spc." payOutCustodialTokenId" = pot." id"
132+ JOIN paid p ON p .id = pot." payInId"
133+ WHERE pot." payOutType" = ' TERRITORY_REVENUE'
134+ )
135+ SELECT
136+ p .id AS payin_id,
137+ p .changed_at ,
138+ x .payout_type ,
139+ p .payin_type ,
140+ x .sub_name ,
141+ x .user_id ,
142+ x .mtokens
143+ FROM paid p
144+ JOIN (
145+ SELECT * FROM split_non_territory
146+ UNION ALL
147+ SELECT * FROM direct_territory
148+ ) x ON x .payin_id = p .id
149+ WHERE x .payout_type IS NOT NULL
150+ AND p .payin_type IS NOT NULL
151+ ),
152+ facts AS (
153+ SELECT
154+ date_trunc(bucket_part, changed_at, ' America/Chicago' ) AS bucket,
155+ payout_type AS " payOutType" ,
156+ payin_type AS " payInType" ,
157+ sub_name AS " subName" , -- may be NULL; kept
158+ user_id AS " userId" , -- may be NULL; kept
159+ mtokens
160+ FROM payouts_paid_facts
161+ ),
162+ rolled AS (
163+ SELECT
164+ bucket, " payOutType" , " payInType" , " subName" , " userId" ,
165+ SUM (mtokens) AS " sumMtokens" ,
166+ COUNT (DISTINCT " userId" ) AS " countUsers" ,
167+ COUNT (* ) AS " countGroup" ,
168+ /* bit mask: payOutType=8, payInType=4, subName=2, userId=1 (1 = grand-totalled) */
169+ GROUPING(" payOutType" ," payInType" ," subName" ," userId" ) AS gmask
170+ FROM facts
171+ GROUP BY GROUPING SETS (
172+ -- GLOBAL family
173+ (bucket), -- gmask = 15 (1111)
174+ (bucket, " payOutType" ), -- gmask = 7 (0111)
175+ (bucket, " payOutType" ," payInType" ), -- gmask = 3 (0011)
176+
177+ -- SUB family
178+ (bucket, " subName" ), -- gmask = 13 (1101)
179+ (bucket, " subName" ," payOutType" ), -- gmask = 5 (0101)
180+ (bucket, " subName" ," payOutType" ," payInType" ), -- gmask = 1 (0001)
181+
182+ -- USER family
183+ (bucket, " userId" ), -- gmask = 14 (1110)
184+ (bucket, " userId" ," payOutType" ), -- gmask = 6 (0110)
185+ (bucket, " userId" ," payOutType" ," payInType" ), -- gmask = 2 (0010)
186+
187+ -- SUB × USER family
188+ (bucket, " subName" ," userId" ), -- gmask = 12 (1100)
189+ (bucket, " userId" ," subName" ," payOutType" ), -- gmask = 4 (0100)
190+ (bucket, " userId" ," subName" ," payOutType" ," payInType" ) -- gmask = 0 (0000)
191+ )
192+ )
193+ SELECT
194+ v_gran AS granularity,
195+ bucket AS " timeBucket" ,
196+ " payOutType" ," payInType" ," subName" ," userId" ,
197+ " sumMtokens" ," countUsers" ," countGroup" ,
198+ CASE gmask
199+ WHEN 15 THEN ' GLOBAL' ::" AggSlice"
200+ WHEN 7 THEN ' GLOBAL_BY_TYPE' ::" AggSlice" -- (bucket, payOutType)
201+ WHEN 3 THEN ' GLOBAL_BY_TYPE' ::" AggSlice" -- (bucket, payOutType, payInType)
202+ WHEN 13 THEN ' SUB_TOTAL' ::" AggSlice"
203+ WHEN 5 THEN ' SUB_BY_TYPE' ::" AggSlice" -- (bucket, subName, payOutType)
204+ WHEN 1 THEN ' SUB_BY_TYPE' ::" AggSlice" -- (bucket, subName, payOutType, payInType)
205+ WHEN 14 THEN ' USER_TOTAL' ::" AggSlice"
206+ WHEN 6 THEN ' USER_BY_TYPE' ::" AggSlice" -- (bucket, userId, payOutType)
207+ WHEN 2 THEN ' USER_BY_TYPE' ::" AggSlice" -- (bucket, userId, payOutType, payInType)
208+ WHEN 12 THEN ' SUB_BY_USER' ::" AggSlice"
209+ WHEN 4 THEN ' USER_SUB_BY_TYPE' ::" AggSlice" -- (bucket, userId, subName, payOutType)
210+ WHEN 0 THEN ' USER_SUB_BY_TYPE' ::" AggSlice" -- (bucket, userId, subName, payOutType, payInType)
211+ END AS slice
212+ FROM rolled;
213+
214+ GET DIAGNOSTICS v_inserted_count = ROW_COUNT;
215+ RAISE NOTICE ' Inserted % aggregate rows' , v_inserted_count;
216+ END;
217+ $$;
218+
219+ DROP VIEW IF EXISTS payouts_paid_fact;
220+
221+ -- bucket_part: 'hour' | 'day' | 'month'
222+ CREATE OR REPLACE FUNCTION refresh_agg_payin (
223+ bucket_part text ,
224+ p_from timestamptz DEFAULT now() - interval ' 2 hours' ,
225+ p_to timestamptz DEFAULT now()
226+ ) RETURNS void
227+ LANGUAGE plpgsql
228+ AS $$
229+ DECLARE
230+ v_from timestamptz ;
231+ v_to timestamptz ;
232+ v_step interval;
233+ v_gran " AggGranularity" ;
234+ v_deleted_count int ;
235+ v_inserted_count int ;
236+ BEGIN
237+ IF bucket_part NOT IN (' hour' ,' day' ,' month' ) THEN
238+ RAISE EXCEPTION ' bucket_part must be "hour" or "day" or "month"' ;
239+ END IF;
240+
241+ v_step := CASE bucket_part WHEN ' hour' THEN interval ' 1 hour' WHEN ' day' THEN interval ' 1 day' WHEN ' month' THEN interval ' 1 month' END;
242+ v_gran := CASE bucket_part WHEN ' hour' THEN ' HOUR' WHEN ' day' THEN ' DAY' WHEN ' month' THEN ' MONTH' END;
243+ v_from := date_trunc(bucket_part, p_from, ' America/Chicago' );
244+ v_to := date_trunc(bucket_part, p_to, ' America/Chicago' ) + v_step;
245+
246+ RAISE NOTICE ' refresh_agg_payin(%): Processing % to %' , bucket_part, v_from, v_to;
247+
248+ -- idempotent: clear current window for this granularity
249+ DELETE FROM " AggPayIn"
250+ WHERE granularity = v_gran
251+ AND " timeBucket" >= v_from
252+ AND " timeBucket" < v_to;
253+
254+ GET DIAGNOSTICS v_deleted_count = ROW_COUNT;
255+ RAISE NOTICE ' Deleted % existing rows' , v_deleted_count;
256+
257+ INSERT INTO " AggPayIn"
258+ (granularity," timeBucket" ," payInType" ," subName" ," userId" ,
259+ " sumMcost" ," countUsers" ," countGroup" ," slice" )
260+ WITH payins_paid_facts AS (
261+ WITH paid AS (
262+ SELECT id, " payInStateChangedAt" AS changed_at,
263+ " payInType" AS payin_type, " userId" AS user_id, " mcost"
264+ FROM " PayIn"
265+ WHERE " payInState" = ' PAID'
266+ AND " payInStateChangedAt" >= (v_from AT TIME ZONE ' UTC' ) AND " payInStateChangedAt" < (v_to AT TIME ZONE ' UTC' )
267+ ),
268+ per_split AS (
269+ SELECT
270+ pot." payInId" AS payin_id,
271+ spc." subName" AS sub_name,
272+ pot." mtokens" ,
273+ SUM (pot." mtokens" ) OVER (PARTITION BY pot." payInId" ) AS mtokens_total
274+ FROM " PayOutCustodialToken" pot
275+ JOIN " SubPayOutCustodialToken" spc
276+ ON spc." payOutCustodialTokenId" = pot." id"
277+ -- only rows that can actually match PAID payins
278+ JOIN paid p ON p .id = pot." payInId"
279+ )
280+ SELECT
281+ p .id AS payin_id,
282+ p .changed_at ,
283+ p .payin_type ,
284+ ps .sub_name ,
285+ p .user_id ,
286+ /* use numeric division; avoid /0; keep your special-case when mtokens=0 */
287+ (p." mcost" * COALESCE(
288+ CASE
289+ WHEN ps." mtokens_total" = 0 OR ps." mtokens_total" IS NULL THEN 1 .0
290+ ELSE ps." mtokens" ::numeric / ps." mtokens_total"
291+ END
292+ ))::bigint AS mcost
293+ FROM paid p
294+ LEFT JOIN per_split ps ON ps .payin_id = p .id
295+ ),
296+ facts AS (
297+ SELECT
298+ date_trunc(bucket_part, changed_at, ' America/Chicago' ) AS " timeBucket" ,
299+ payin_type AS " payInType" ,
300+ sub_name AS " subName" , -- may be NULL; kept
301+ user_id AS " userId" ,
302+ mcost
303+ FROM payins_paid_facts
304+ ),
305+ rolled AS (
306+ SELECT
307+ " timeBucket" , " payInType" , " subName" , " userId" ,
308+ SUM (mcost) AS " sumMcost" ,
309+ COUNT (DISTINCT " userId" ) AS " countUsers" ,
310+ COUNT (* ) AS " countGroup" ,
311+ /* bit mask: payInType=4, subName=2, userId=1 (1 = grand-totalled) */
312+ GROUPING(" payInType" ," subName" ," userId" ) AS " groupingId"
313+ FROM facts
314+ GROUP BY GROUPING SETS (
315+ (" timeBucket" ), -- 7 GLOBAL
316+ (" timeBucket" , " payInType" ), -- 3 GLOBAL_BY_TYPE
317+ (" timeBucket" , " subName" ), -- 5 SUB_TOTAL
318+ (" timeBucket" , " subName" ," payInType" ), -- 1 SUB_BY_TYPE
319+ (" timeBucket" , " userId" ), -- 6 USER_TOTAL
320+ (" timeBucket" , " userId" ," payInType" ), -- 2 USER_BY_TYPE
321+ (" timeBucket" , " subName" ," userId" ), -- 4 SUB_BY_USER
322+ (" timeBucket" , " userId" ," subName" ," payInType" ) -- 0 USER_SUB_BY_TYPE
323+ )
324+ )
325+ SELECT
326+ v_gran AS granularity,
327+ " timeBucket" ,
328+ " payInType" ," subName" ," userId" ,
329+ " sumMcost" ," countUsers" ," countGroup" ,
330+ CASE " groupingId"
331+ WHEN 7 THEN ' GLOBAL' ::" AggSlice"
332+ WHEN 3 THEN ' GLOBAL_BY_TYPE' ::" AggSlice"
333+ WHEN 5 THEN ' SUB_TOTAL' ::" AggSlice"
334+ WHEN 1 THEN ' SUB_BY_TYPE' ::" AggSlice"
335+ WHEN 6 THEN ' USER_TOTAL' ::" AggSlice"
336+ WHEN 2 THEN ' USER_BY_TYPE' ::" AggSlice"
337+ WHEN 4 THEN ' SUB_BY_USER' ::" AggSlice"
338+ WHEN 0 THEN ' USER_SUB_BY_TYPE' ::" AggSlice"
339+ END AS slice
340+ FROM rolled;
341+
342+ GET DIAGNOSTICS v_inserted_count = ROW_COUNT;
343+ RAISE NOTICE ' Inserted % aggregate rows' , v_inserted_count;
344+ END;
345+ $$;
346+
347+ DROP VIEW IF EXISTS payins_paid_fact;
0 commit comments