-
Notifications
You must be signed in to change notification settings - Fork 736
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
redshift optimization #4775
base: master
Are you sure you want to change the base?
redshift optimization #4775
Changes from 3 commits
7f4075e
7c33120
8d85715
e6ddcd6
270985d
629b58e
e4d9a4c
fb9c902
247b9bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,8 @@ def build_create_table_commands( | |
they are not enforced by Amazon Redshift. | ||
https://docs.aws.amazon.com/redshift/latest/dg/t_Defining_constraints.html | ||
""" | ||
temp_table_name = self.full_table_name(schema_name, table_name, prefix='temp_') | ||
|
||
return [ | ||
build_create_table_command( | ||
column_type_mapping=self.column_type_mapping(schema), | ||
|
@@ -63,6 +65,14 @@ def build_create_table_commands( | |
unique_constraints=unique_constraints, | ||
use_lowercase=self.use_lowercase, | ||
), | ||
build_create_table_command( | ||
column_type_mapping=self.column_type_mapping(schema), | ||
columns=schema['properties'].keys(), | ||
full_table_name=temp_table_name, | ||
if_not_exists=True, | ||
unique_constraints=unique_constraints, | ||
use_lowercase=self.use_lowercase, | ||
) | ||
] | ||
|
||
def build_alter_table_commands( | ||
|
@@ -132,31 +142,40 @@ def build_insert_commands( | |
|
||
if unique_constraints and UNIQUE_CONFLICT_METHOD_UPDATE == unique_conflict_method: | ||
full_table_name_temp = self.full_table_name(schema_name, table_name, prefix='temp_') | ||
full_table_name_old = self.full_table_name(schema_name, table_name, prefix='old_') | ||
drop_temp_table_command = f'DROP TABLE IF EXISTS {full_table_name_temp}' | ||
drop_old_table_command = f'DROP TABLE IF EXISTS {full_table_name_old}' | ||
unique_constraints_clean = [ | ||
f'{self.clean_column_name(col)}' | ||
for col in unique_constraints | ||
] | ||
|
||
drop_duplicate_records_from_temp = (f'DELETE FROM {full_table_name_temp} ' | ||
f'WHERE ({", ".join(unique_constraints_clean)},_mage_created_at) ' | ||
f'IN ( SELECT {", ".join(unique_constraints_clean)}, _mage_created_at ' | ||
f'FROM ( SELECT {", ".join(unique_constraints_clean)}, _mage_created_at, ROW_NUMBER() ' | ||
f'OVER (PARTITION BY {", ".join(unique_constraints_clean)} ' | ||
f'ORDER BY _mage_created_at DESC) as row_num ' | ||
f'FROM {full_table_name_temp} ) AS subquery_alias WHERE row_num > 1);') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you share an example of what this query looks like with the variable values filled in? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dy46 Please find sample query here DELETE FROM temp_abc There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dy46 changes done as suggested |
||
|
||
delete_records_from_full_table = (f'DELETE FROM {full_table_name} ' | ||
f'WHERE ({", ".join(unique_constraints_clean)}) ' | ||
f'IN (SELECT {", ".join(unique_constraints_clean)} ' | ||
f'FROM {full_table_name_temp})') | ||
|
||
insert_records_from_temp_table = f'INSERT INTO {full_table_name} SELECT * FROM {full_table_name_temp}' | ||
truncate_records_from_temp_table = f'TRUNCATE TABLE {full_table_name_temp}' | ||
|
||
commands = [ | ||
'\n'.join([ | ||
f'INSERT INTO {full_table_name_temp} ({insert_columns})', | ||
f'VALUES {insert_values}', | ||
]), | ||
] | ||
# This is temp as need to know the best way to create table programmatically i will change it properly | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you elaborate on this comment? not sure what you mean here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, my bad will remove this comment |
||
commands = commands + [ | ||
drop_temp_table_command, | ||
drop_old_table_command, | ||
] + ['\n'.join([ | ||
f'CREATE TABLE {full_table_name_temp} AS ' | ||
f'SELECT {insert_columns} FROM (' | ||
f' SELECT *,' | ||
f' ROW_NUMBER() OVER (' | ||
f' PARTITION BY {", ".join(unique_constraints_clean)} ORDER BY _mage_created_at DESC' # noqa: E501 | ||
f' ) as row_num' | ||
f' FROM {full_table_name})' | ||
f'WHERE row_num = 1' | ||
]) | ||
] + [ | ||
f'ALTER TABLE {full_table_name} rename to old_{table_name}', | ||
f'ALTER TABLE {full_table_name_temp} rename to {table_name}', | ||
drop_temp_table_command, | ||
drop_old_table_command, | ||
drop_duplicate_records_from_temp, | ||
delete_records_from_full_table, | ||
insert_records_from_temp_table, | ||
truncate_records_from_temp_table, | ||
] | ||
|
||
# Not query data from stl_insert table anymore since it's inefficient. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have any logic to delete this temporary table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No actually, i guess we don't have should we add this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should if we are going to be creating it every sync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually currently it's tried if table not exists only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I still think we should delete it at the end of the sync to avoid leaving a temp table in the database.