From ebe58cee332a3a76af8e3f4ac7701b5b698f2451 Mon Sep 17 00:00:00 2001 From: Vikas Goyal Date: Sun, 14 Jan 2024 22:01:44 +0530 Subject: [PATCH 1/5] redshift optimization --- .../destinations/redshift/__init__.py | 54 +++++++++++-------- 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/mage_integrations/mage_integrations/destinations/redshift/__init__.py b/mage_integrations/mage_integrations/destinations/redshift/__init__.py index f328da22322..131b4fd254f 100644 --- a/mage_integrations/mage_integrations/destinations/redshift/__init__.py +++ b/mage_integrations/mage_integrations/destinations/redshift/__init__.py @@ -131,39 +131,51 @@ def build_insert_commands( ] if unique_constraints and UNIQUE_CONFLICT_METHOD_UPDATE == unique_conflict_method: + self.logger.info(f'Test Vikas Code for UNIQUE_CONFLICT_METHOD_UPDATE') full_table_name_temp = self.full_table_name(schema_name, table_name, prefix='temp_') - full_table_name_old = self.full_table_name(schema_name, table_name, prefix='old_') drop_temp_table_command = f'DROP TABLE IF EXISTS {full_table_name_temp}' - drop_old_table_command = f'DROP TABLE IF EXISTS {full_table_name_old}' unique_constraints_clean = [ f'{self.clean_column_name(col)}' for col in unique_constraints ] - commands = commands + [ - drop_temp_table_command, - drop_old_table_command, - ] + ['\n'.join([ - f'CREATE TABLE {full_table_name_temp} AS ' - f'SELECT {insert_columns} FROM (' - f' SELECT *,' - f' ROW_NUMBER() OVER (' - f' PARTITION BY {", ".join(unique_constraints_clean)} ORDER BY _mage_created_at DESC' # noqa: E501 - f' ) as row_num' - f' FROM {full_table_name})' - f'WHERE row_num = 1' - ]) - ] + [ - f'ALTER TABLE {full_table_name} rename to old_{table_name}', - f'ALTER TABLE {full_table_name_temp} rename to {table_name}', - drop_temp_table_command, - drop_old_table_command, + + create_temp_table_command = (f'CREATE TABLE {full_table_name_temp} AS ' + f'SELECT * FROM ( SELECT * FROM {full_table_name}) WHERE 1=0') + + drop_duplicate_records_from_temp = f'DELETE FROM {full_table_name_temp} ' + f'where ({", ".join(unique_constraints_clean)},_mage_created_at) ' + f'IN ( SELECT {", ".join(unique_constraints_clean)}, _mage_created_at ' + f'FROM ( SELECT {", ".join(unique_constraints_clean)}, _mage_created_at, ROW_NUMBER() ' + f'OVER (PARTITION BY {", ".join(unique_constraints_clean)} ' + f'ORDER BY _mage_created_at DESC) as row_num ' + f'FROM {full_table_name_temp} ) AS subquery_alias WHERE row_num > 1); ' + + delete_records_from_full_table = f'DELETE FROM {full_table_name} where ({", ".join(unique_constraints_clean)}) ' + f'in (SELECT {", ".join(unique_constraints_clean)} from {full_table_name_temp})' + + insert_records_from_temp_table = f'INSERT INTO {full_table_name} SELECT * FROM {full_table_name_temp}' + + drop_temp_table_command = f'DROP TABLE IF EXISTS {full_table_name_temp}' + + commands = [ + '\n'.join([ + f'INSERT INTO {full_table_name_temp} ({insert_columns})', + f'VALUES {insert_values}', + ]), + ] + # This is temp as need to know the best way to create table programmatically */ + commands = [create_temp_table_command] + commands + [ + drop_duplicate_records_from_temp, + delete_records_from_full_table, + insert_records_from_temp_table, + drop_temp_table_command ] if not self.is_redshift_serverless: commands.append( '\n'.join([ 'WITH last_queryid_for_table AS (', - ' SELECT query, MAX(si.starttime) OVER () as last_q_stime, si.starttime as stime', # noqa: E501 + ' SELECT query, MAX(si.starttime) OVER () as last_q_stime, si.starttime as stime', # noqa: E501 ' FROM stl_insert si, SVV_TABLE_INFO sti', f' WHERE sti.table_id=si.tbl AND sti."table"=\'{table_name}\'', ')', From 620af1d1a95e7ebbaf6cfd2ae0eb0cf804528097 Mon Sep 17 00:00:00 2001 From: Vikas Goyal Date: Sun, 14 Jan 2024 22:03:05 +0530 Subject: [PATCH 2/5] redshift optimization --- .../mage_integrations/destinations/redshift/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/mage_integrations/mage_integrations/destinations/redshift/__init__.py b/mage_integrations/mage_integrations/destinations/redshift/__init__.py index 131b4fd254f..6cb1d051806 100644 --- a/mage_integrations/mage_integrations/destinations/redshift/__init__.py +++ b/mage_integrations/mage_integrations/destinations/redshift/__init__.py @@ -131,7 +131,6 @@ def build_insert_commands( ] if unique_constraints and UNIQUE_CONFLICT_METHOD_UPDATE == unique_conflict_method: - self.logger.info(f'Test Vikas Code for UNIQUE_CONFLICT_METHOD_UPDATE') full_table_name_temp = self.full_table_name(schema_name, table_name, prefix='temp_') drop_temp_table_command = f'DROP TABLE IF EXISTS {full_table_name_temp}' unique_constraints_clean = [ From ef91cbdc365c10fb5cb5fc4e40dde9be8fc3ad2f Mon Sep 17 00:00:00 2001 From: Vikas Goyal Date: Sun, 14 Jan 2024 22:59:45 +0530 Subject: [PATCH 3/5] batch size increase --- mage_integrations/mage_integrations/destinations/sql/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mage_integrations/mage_integrations/destinations/sql/base.py b/mage_integrations/mage_integrations/destinations/sql/base.py index 9ad20901ccf..00b81af8411 100644 --- a/mage_integrations/mage_integrations/destinations/sql/base.py +++ b/mage_integrations/mage_integrations/destinations/sql/base.py @@ -20,7 +20,7 @@ class Destination(BaseDestination): SCHEMA_CONFIG_KEY = 'schema' TABLE_CONFIG_KEY = 'table' - BATCH_SIZE = 1000 + BATCH_SIZE = 50000 def __init__(self, **kwargs): super().__init__(**kwargs) From c47244e74fe95fb22b46c0ae82d9091bae00f80a Mon Sep 17 00:00:00 2001 From: Vikas Goyal Date: Wed, 17 Jan 2024 00:59:12 +0530 Subject: [PATCH 4/5] redshift optimization changes --- .../destinations/redshift/__init__.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/mage_integrations/mage_integrations/destinations/redshift/__init__.py b/mage_integrations/mage_integrations/destinations/redshift/__init__.py index 6cb1d051806..b70301d47f6 100644 --- a/mage_integrations/mage_integrations/destinations/redshift/__init__.py +++ b/mage_integrations/mage_integrations/destinations/redshift/__init__.py @@ -131,6 +131,7 @@ def build_insert_commands( ] if unique_constraints and UNIQUE_CONFLICT_METHOD_UPDATE == unique_conflict_method: + self.logger.info(f'Test Vikas Code for UNIQUE_CONFLICT_METHOD_UPDATE') full_table_name_temp = self.full_table_name(schema_name, table_name, prefix='temp_') drop_temp_table_command = f'DROP TABLE IF EXISTS {full_table_name_temp}' unique_constraints_clean = [ @@ -138,8 +139,8 @@ def build_insert_commands( for col in unique_constraints ] - create_temp_table_command = (f'CREATE TABLE {full_table_name_temp} AS ' - f'SELECT * FROM ( SELECT * FROM {full_table_name}) WHERE 1=0') + create_temp_table_command = (f'CREATE TABLE {full_table_name_temp} ' + f'AS SELECT * FROM {full_table_name} where 1=0') drop_duplicate_records_from_temp = f'DELETE FROM {full_table_name_temp} ' f'where ({", ".join(unique_constraints_clean)},_mage_created_at) ' @@ -149,12 +150,14 @@ def build_insert_commands( f'ORDER BY _mage_created_at DESC) as row_num ' f'FROM {full_table_name_temp} ) AS subquery_alias WHERE row_num > 1); ' - delete_records_from_full_table = f'DELETE FROM {full_table_name} where ({", ".join(unique_constraints_clean)}) ' - f'in (SELECT {", ".join(unique_constraints_clean)} from {full_table_name_temp})' + delete_records_from_full_table = (f'DELETE FROM {full_table_name} ' + f'where ({", ".join(unique_constraints_clean)}) ' + f'in (SELECT {", ".join(unique_constraints_clean)} ' + f'from {full_table_name_temp})') insert_records_from_temp_table = f'INSERT INTO {full_table_name} SELECT * FROM {full_table_name_temp}' - drop_temp_table_command = f'DROP TABLE IF EXISTS {full_table_name_temp}' + truncate_records_from_temp_table = f'TRUNCATE table {full_table_name_temp}' commands = [ '\n'.join([ @@ -162,12 +165,11 @@ def build_insert_commands( f'VALUES {insert_values}', ]), ] - # This is temp as need to know the best way to create table programmatically */ - commands = [create_temp_table_command] + commands + [ - drop_duplicate_records_from_temp, + # This is temp as need to know the best way to create table programmatically i will change it properly */ + commands = commands + [ delete_records_from_full_table, insert_records_from_temp_table, - drop_temp_table_command + truncate_records_from_temp_table ] if not self.is_redshift_serverless: From 6cc14ff8db6c74a2500fa148ffd2638e1d075d10 Mon Sep 17 00:00:00 2001 From: Vikas Goyal Date: Thu, 18 Jan 2024 12:12:34 +0530 Subject: [PATCH 5/5] change batch size --- mage_integrations/mage_integrations/destinations/sql/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mage_integrations/mage_integrations/destinations/sql/base.py b/mage_integrations/mage_integrations/destinations/sql/base.py index 00b81af8411..da6d98e5da8 100644 --- a/mage_integrations/mage_integrations/destinations/sql/base.py +++ b/mage_integrations/mage_integrations/destinations/sql/base.py @@ -20,7 +20,7 @@ class Destination(BaseDestination): SCHEMA_CONFIG_KEY = 'schema' TABLE_CONFIG_KEY = 'table' - BATCH_SIZE = 50000 + BATCH_SIZE = 25000 def __init__(self, **kwargs): super().__init__(**kwargs)