Skip to content

Commit

Permalink
Merge pull request #294 from metasfresh/FRESH-592-gh293
Browse files Browse the repository at this point in the history
FRESH-592 #293 changes & additions:
  • Loading branch information
metas-ts committed Aug 19, 2016
2 parents 7cdd0b8 + c290611 commit e50cbaf
Show file tree
Hide file tree
Showing 26 changed files with 524 additions and 3 deletions.
2 changes: 1 addition & 1 deletion de.metas.async/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<artifactId>de.metas.async</artifactId>

<properties>
<migration-sql-basedir>42-de.metas.async</migration-sql-basedir>
<migration-sql-basedir></migration-sql-basedir>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@


DROP FUNCTION IF EXISTS dlm.Archive_C_Queue_Data(integer, integer);
CREATE OR REPLACE FUNCTION dlm.Archive_C_Queue_Data(daysBack integer, limitRecords integer)
RETURNS TABLE(
Count_Workpackages_ToArchive int,
Count_Queue_Element_Archived int,
Count_Queue_WorkPackage_Param_Archived int,
Count_Queue_WorkPackage_Log_Archived int,
Count_Queue_Workpackage_Archived int,
Count_Queue_Block_Archived int,
Remaining_ToArchive int)
AS $$
DECLARE
Count_Workpackages_ToArchive int;
Count_Queue_Element_Archived int;
Count_Queue_Element_Archived_Inserted int;
Count_Queue_WorkPackage_Param_Archived int;
Count_Queue_WorkPackage_Param_Archived_Inserted int;
Count_Queue_WorkPackage_Log_Archived int;
Count_Queue_WorkPackage_Log_Archived_Inserted int;
Count_Queue_Workpackage_Archived int;
Count_Queue_Workpackage_Archived_Inserted int;
Count_Queue_Block_Archived int;
Count_Queue_Block_Archived_Inserted int;
Remaining_C_Queue_Workpackage_ToArchive_All int;
C_Queue_Workpackage_ToArchive_All_Initial_Count int;
C_Queue_Workpackage_ToArchive_All_Inserted_Count int;
BEGIN
-- Prepare "to delete" list (ALL)
select count(1) from dlm.C_Queue_Workpackage_ToArchive_All INTO C_Queue_Workpackage_ToArchive_All_Initial_Count;

IF (C_Queue_Workpackage_ToArchive_All_Initial_Count=0)
THEN
RAISE NOTICE 'Table dlm.C_Queue_Workpackage_ToArchive_All is empty; we recreate and fill it using parameter daysBack=%',daysBack;
drop table if exists dlm.C_Queue_Workpackage_ToArchive_All;

create table dlm.C_Queue_Workpackage_ToArchive_All as
select C_Queue_Workpackage_ID, C_Queue_Block_ID, C_Async_Batch_ID
from C_Queue_Workpackage wp
where wp.Processed='Y'
and wp.Updated <= now() - $1 -- older than $1 days
;

GET DIAGNOSTICS C_Queue_Workpackage_ToArchive_All_Inserted_Count = ROW_COUNT;

-- Index it
create index on dlm.C_Queue_Workpackage_ToArchive_All(C_Queue_Workpackage_ID);
create index on dlm.C_Queue_Workpackage_ToArchive_All(C_Queue_Block_ID);
create index on dlm.C_Queue_Workpackage_ToArchive_All(C_Async_Batch_ID);

RAISE NOTICE 'Inserted % records into dlm.C_Queue_Workpackage_ToArchive_All.',C_Queue_Workpackage_ToArchive_All_Inserted_Count;
ELSE
RAISE NOTICE 'Table dlm.C_Queue_Workpackage_ToArchive_All still contains % records from a previous run, so we ignore parameter daysBack=%',C_Queue_Workpackage_ToArchive_All_Initial_Count, daysBack;
END IF;


--
-- Prepare "to delete" list (Chunk)
drop table if exists TMP_C_Queue_Workpackage_ToArchive;
create temporary table TMP_C_Queue_Workpackage_ToArchive as select * from dlm.C_Queue_Workpackage_ToArchive_All limit 0;

RAISE NOTICE 'Inserting records into TMP_C_Queue_Workpackage_ToArchive.';

-- move 'limitRecords' records from dlm.C_Queue_Workpackage_ToArchive_All to TMP_C_Queue_Workpackage_ToArchive
with items as (
delete from dlm.C_Queue_Workpackage_ToArchive_All t
where t.C_Queue_Workpackage_ID in (
select C_Queue_Workpackage_ID
from dlm.C_Queue_Workpackage_ToArchive_All
-- order by t.C_Queue_Workpackage_ID -- oldest first; commented out, because it's too expensive
limit $2 -- Chunk size
)
returning *
)
insert into TMP_C_Queue_Workpackage_ToArchive
select * from items
;

-- Index it and update the table statistics. the index is going to help us in moving additional records
create index on TMP_C_Queue_Workpackage_ToArchive(C_Queue_Workpackage_ID);
create index on TMP_C_Queue_Workpackage_ToArchive(C_Queue_Block_ID);
create index on TMP_C_Queue_Workpackage_ToArchive(C_Async_Batch_ID);
ANALYZE TMP_C_Queue_Workpackage_ToArchive;

-- also move such records from dlm.C_Queue_Workpackage_ToArchive_All to TMP_C_Queue_Workpackage_ToArchive that share a C_Queue_Block_ID with an already moved record
RAISE NOTICE 'Inserting additional records into TMP_C_Queue_Workpackage_ToArchive that share a C_Queue_Block_ID with an already indested record.';
with items as (
delete from dlm.C_Queue_Workpackage_ToArchive_All t
where t.C_Queue_Block_ID in (
select C_Queue_Block_ID
from TMP_C_Queue_Workpackage_ToArchive
)
returning *
)
insert into TMP_C_Queue_Workpackage_ToArchive
select * from items
;

-- also move such records from dlm.C_Queue_Workpackage_ToArchive_All to TMP_C_Queue_Workpackage_ToArchive that share a C_ASync_Batch_ID with an already moved record
RAISE NOTICE 'Inserting additional records into TMP_C_Queue_Workpackage_ToArchive that share a C_ASync_Batch_ID with an already indested record.';
with items as (
delete from dlm.C_Queue_Workpackage_ToArchive_All t
where t.C_ASync_Batch_ID in (
select C_ASync_Batch_ID
from TMP_C_Queue_Workpackage_ToArchive
)
returning *
)
insert into TMP_C_Queue_Workpackage_ToArchive
select * from items
;

-- update the table statistics again
ANALYZE TMP_C_Queue_Workpackage_ToArchive;

select count(1) from TMP_C_Queue_Workpackage_ToArchive INTO Count_Workpackages_ToArchive;
RAISE NOTICE 'Workpackages to archive: % (param limitRecords=%) ', Count_Workpackages_ToArchive, limitRecords;

-- Archive C_Queue_Elements
with deleted_rows as (
delete from C_Queue_Element t
where exists (select 1 from TMP_C_Queue_Workpackage_ToArchive s where s.C_Queue_Workpackage_ID=t.C_Queue_Workpackage_ID)
returning *
)
insert into dlm.C_Queue_Element_Archived
select * from deleted_rows;

GET DIAGNOSTICS Count_Queue_Element_Archived_Inserted = ROW_COUNT;
SELECT count(1) from dlm.C_Queue_Element_Archived INTO Count_Queue_Element_Archived;
RAISE NOTICE 'Moved % records to table dlm.C_Queue_Element_Archived, it now has % records', Count_Queue_Element_Archived_Inserted, Count_Queue_Element_Archived;

-- Archive C_Queue_WorkPackage_Param_Archived
with deleted_rows as (
delete from C_Queue_Workpackage_Param t
where exists (select 1 from TMP_C_Queue_Workpackage_ToArchive s where s.C_Queue_Workpackage_ID=t.C_Queue_Workpackage_ID)
returning *
)
insert into dlm.C_Queue_Workpackage_Param_Archived
select * from deleted_rows;

GET DIAGNOSTICS Count_Queue_WorkPackage_Param_Archived_Inserted = ROW_COUNT;
SELECT count(1) from dlm.C_Queue_Workpackage_Param_Archived INTO Count_Queue_WorkPackage_Param_Archived;
RAISE NOTICE 'Moved % records to table dlm.C_Queue_Workpackage_Param_Archived, it now has % records', Count_Queue_WorkPackage_Param_Archived_Inserted, Count_Queue_WorkPackage_Param_Archived;

-- Archive C_Queue_WorkPackage_Log_Archived
with deleted_rows as (
delete from C_Queue_Workpackage_Log t
where exists (select 1 from TMP_C_Queue_Workpackage_ToArchive s where s.C_Queue_Workpackage_ID=t.C_Queue_Workpackage_ID)
returning *
)
insert into dlm.C_Queue_Workpackage_Log_Archived
select * from deleted_rows;

GET DIAGNOSTICS Count_Queue_WorkPackage_Log_Archived_Inserted = ROW_COUNT;
SELECT count(1) from dlm.C_Queue_Workpackage_Log_Archived INTO Count_Queue_WorkPackage_Log_Archived;
RAISE NOTICE 'Moved % records to table dlm.C_Queue_Workpackage_Log_Archived, it now has % records', Count_Queue_WorkPackage_Log_Archived_Inserted, Count_Queue_WorkPackage_Log_Archived;

-- Archive C_Queue_Workpackages
with deleted_rows as (
delete from C_Queue_Workpackage t
where exists (select 1 from TMP_C_Queue_Workpackage_ToArchive s where s.C_Queue_Workpackage_ID=t.C_Queue_Workpackage_ID)
returning *
)
insert into dlm.C_Queue_Workpackage_Archived
select * from deleted_rows;

GET DIAGNOSTICS Count_Queue_Workpackage_Archived_Inserted = ROW_COUNT;
SELECT count(1) from dlm.C_Queue_Workpackage_Archived INTO Count_Queue_Workpackage_Archived;
RAISE NOTICE 'Moved % records to table dlm.C_Queue_Workpackage_Archived, it now has % records', Count_Queue_Workpackage_Archived_Inserted, Count_Queue_Workpackage_Archived;

-- Archive C_Queue_Blocks
with deleted_rows as (
delete from C_Queue_Block t
where true
and exists (select 1 from TMP_C_Queue_Workpackage_ToArchive s where s.C_Queue_Block_ID=t.C_Queue_Block_ID)
and not exists (select 1 from C_Queue_Workpackage wp where wp.C_Queue_Block_ID=t.C_Queue_Block_ID) -- there are no other workpackages
returning *
)
insert into dlm.C_Queue_Block_Archived
select * from deleted_rows;

GET DIAGNOSTICS Count_Queue_Block_Archived_Inserted = ROW_COUNT;
SELECT count(1) from dlm.C_Queue_Block_Archived INTO Count_Queue_Block_Archived;
RAISE NOTICE 'Moved % records to table dlm.C_Queue_Block_Archived, it now has % records', Count_Queue_Block_Archived_Inserted, Count_Queue_Block_Archived;

-- Count remaining things to delete
select count(1) from dlm.C_Queue_Workpackage_ToArchive_All INTO Remaining_C_Queue_Workpackage_ToArchive_All;
RAISE NOTICE 'Remaining records in dlm.C_Queue_Workpackage_ToArchive_All: % ', Remaining_C_Queue_Workpackage_ToArchive_All;

RAISE NOTICE 'Updating the stats of our production tables (analyze)';
ANALYZE c_queue_block;
ANALYZE c_queue_element;
ANALYZE c_queue_workpackage;
ANALYZE c_queue_workpackage_log;
ANALYZE c_queue_workpackage_param;

RETURN QUERY select Count_Workpackages_ToArchive ,
Count_Queue_Element_Archived ,
Count_Queue_WorkPackage_Param_Archived ,
Count_Queue_WorkPackage_Log_Archived,
Count_Queue_Workpackage_Archived ,
Count_Queue_Block_Archived,
Remaining_C_Queue_Workpackage_ToArchive_All;
END;
$$ LANGUAGE plpgsql;

COMMENT ON FUNCTION dlm.Archive_C_Queue_Data(integer, integer) IS 'Moves async wrokpackage data do and "archive"-table in the dlm schema;
Parameters:
daysBack: work packages older than the given number of days are moved
limitRecords: the chunk-size, i.e. the number of workpackages that are moved per invokaction.
Tip: to see more about what the fucntion does, do
BEGIN; SELECT * FROM dlm.Archive_C_Queue_Data(3,30000); ROLLBACK;
and follow the function''s output (notice level)
see https://github.com/metasfresh/metasfresh/issues/293';
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
-- this is crude but i spend already the whole day with this task and need to move on.
-- we only want to create the tables if they don't exists yet
-- CREATE TABLE IF NOT EXISTS ... AS SELECT ... doesn'T work, at least not with pg-9.1

CREATE OR REPLACE FUNCTION create_tables_if_not_exist ()
RETURNS void AS
$func$
BEGIN
IF EXISTS (SELECT 1 FROM pg_catalog.pg_tables
WHERE schemaname = 'dlm' AND tablename = 'c_queue_workpackage_archived') THEN
RAISE NOTICE 'Table dlm.c_queue_workpackage_archived already exists. Assuming that all the tables exist';
ELSE

CREATE TABLE dlm.c_queue_workpackage_archived AS SELECT * FROM c_queue_workpackage LIMIT 0;
COMMENT ON TABLE dlm.c_queue_workpackage_archived IS 'Contains old records from the c_queue_workpackage table; see https://github.com/metasfresh/metasfresh/issues/293';

CREATE TABLE dlm.c_queue_workpackage_log_archived AS SELECT * FROM c_queue_workpackage_log LIMIT 0;
COMMENT ON TABLE dlm.c_queue_workpackage_log_archived IS 'Contains old records from the c_queue_workpackage_log table; see https://github.com/metasfresh/metasfresh/issues/293';

CREATE TABLE dlm.c_queue_workpackage_param_archived AS SELECT * FROM c_queue_workpackage_param LIMIT 0;
COMMENT ON TABLE dlm.c_queue_workpackage_param_archived IS 'Contains old records from the c_queue_workpackage_param table; see https://github.com/metasfresh/metasfresh/issues/293';

CREATE TABLE dlm.c_queue_element_archived AS SELECT * FROM c_queue_element LIMIT 0;
COMMENT ON TABLE dlm.c_queue_element_archived IS 'Contains old records from the c_queue_element table; see https://github.com/metasfresh/metasfresh/issues/293';

CREATE TABLE dlm.c_queue_block_archived AS SELECT * FROM c_queue_block LIMIT 0;
COMMENT ON TABLE dlm.c_queue_block_archived IS 'Contains old records from the C_Queue_Block table; see https://github.com/metasfresh/metasfresh/issues/293';


CREATE TABLE dlm.C_Queue_Workpackage_ToArchive_All AS SELECT C_Queue_Workpackage_ID, C_Queue_Block_ID FROM C_Queue_Workpackage LIMIT 0;
COMMENT ON TABLE dlm.C_Queue_Workpackage_ToArchive_All IS 'Contains C_queue_Workpackage records that will be moved for archive tables by future invocation(s) of the dlm.Archive_C_Queue_Data() function; see https://github.com/metasfresh/metasfresh/issues/293';

DROP INDEX IF EXISTS dlm.c_queue_workpackage_toarchive_all_c_queue_block_id_idx;
CREATE INDEX c_queue_workpackage_toarchive_all_c_queue_block_id_idx
ON dlm.c_queue_workpackage_toarchive_all
USING btree
(c_queue_block_id);
DROP INDEX IF EXISTS dlm.c_queue_workpackage_toarchive_all_c_queue_workpackage_id_idx;
CREATE INDEX c_queue_workpackage_toarchive_all_c_queue_workpackage_id_idx
ON dlm.c_queue_workpackage_toarchive_all
USING btree
(c_queue_workpackage_id);

END IF;
END
$func$ LANGUAGE plpgsql;


select create_tables_if_not_exist();
drop function create_tables_if_not_exist();
Loading

0 comments on commit e50cbaf

Please sign in to comment.