From 375b9e9572b17121d5cd42c7ca595f2e835a28ae Mon Sep 17 00:00:00 2001 From: Ben Edwards Date: Tue, 26 Nov 2019 08:47:56 +1100 Subject: [PATCH] Add indexes to execution model Indexes should make commands that run very frequently run even faster. Most commands are on execution id + model name, or on model name + completed on, so indexes for these scenarios now exist. --- ...c837f5f2_add_indexes_to_execution_model.py | 34 +++ rdl/entities/execution_model_entity.py | 209 +++++++++--------- 2 files changed, 141 insertions(+), 102 deletions(-) create mode 100644 rdl/alembic/versions/3834c837f5f2_add_indexes_to_execution_model.py diff --git a/rdl/alembic/versions/3834c837f5f2_add_indexes_to_execution_model.py b/rdl/alembic/versions/3834c837f5f2_add_indexes_to_execution_model.py new file mode 100644 index 0000000..44303e6 --- /dev/null +++ b/rdl/alembic/versions/3834c837f5f2_add_indexes_to_execution_model.py @@ -0,0 +1,34 @@ +"""add indexes to execution_model + +Revision ID: 3834c837f5f2 +Revises: bb0c5e8d05e2 +Create Date: 2019-11-26 08:42:13.575198 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '3834c837f5f2' +down_revision = 'bb0c5e8d05e2' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_index('execution_model__index_on_execution_id_model_name', 'execution_model', ['execution_id', 'model_name'], unique=True, schema='rdl') + op.create_index('execution_model__index_on_model_name_completed_on', 'execution_model', ['model_name', 'completed_on'], unique=False, schema='rdl') + op.create_index(op.f('ix_rdl_execution_model_completed_on'), 'execution_model', ['completed_on'], unique=False, schema='rdl') + op.create_index(op.f('ix_rdl_execution_model_status'), 'execution_model', ['status'], unique=False, schema='rdl') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_rdl_execution_model_status'), table_name='execution_model', schema='rdl') + op.drop_index(op.f('ix_rdl_execution_model_completed_on'), table_name='execution_model', schema='rdl') + op.drop_index('execution_model__index_on_model_name_completed_on', table_name='execution_model', schema='rdl') + op.drop_index('execution_model__index_on_execution_id_model_name', table_name='execution_model', schema='rdl') + # ### end Alembic commands ### diff --git a/rdl/entities/execution_model_entity.py b/rdl/entities/execution_model_entity.py index 531d0f2..63ac72e 100644 --- a/rdl/entities/execution_model_entity.py +++ b/rdl/entities/execution_model_entity.py @@ -1,102 +1,107 @@ -import uuid - -from sqlalchemy import ( - Column, - DateTime, - Integer, - String, - Boolean, - BigInteger, - ForeignKey, -) -from sqlalchemy.sql import func -from sqlalchemy.dialects.postgresql import UUID -from sqlalchemy.inspection import inspect - -from rdl.entities import Base -from rdl.entities import ExecutionEntity -from rdl.shared import Constants - - -class ExecutionModelEntity(Base): - __tablename__ = "execution_model" - __table_args__ = {"schema": Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME} - execution_model_id = Column( - UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 - ) - created_on = Column( - DateTime(timezone=True), nullable=False, server_default=func.now() - ) - updated_on = Column( - DateTime(timezone=True), - nullable=False, - server_default=func.now(), - onupdate=func.now(), - ) - execution_id = Column( - UUID(as_uuid=True), - ForeignKey( - f"{Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}." - f"{inspect(ExecutionEntity).tables[0].name}." - f"{inspect(ExecutionEntity).primary_key[0].name}" - ), - nullable=False, - ) - model_name = Column(String(250), nullable=False) - status = Column( - String(50), - nullable=False, - server_default=str(Constants.ExecutionModelStatus.STARTED), - ) - last_sync_version = Column(BigInteger, nullable=False) - sync_version = Column(BigInteger, nullable=False) - is_full_refresh = Column(Boolean, nullable=False) - full_refresh_reason = Column(String(100), nullable=False) - started_on = Column( - DateTime(timezone=True), server_default=func.now(), nullable=False - ) - completed_on = Column(DateTime(timezone=True), nullable=True) - execution_time_ms = Column(BigInteger, nullable=True) - rows_processed = Column(BigInteger, nullable=True) - batches_processed = Column(Integer, nullable=True) - model_checksum = Column(String(100), nullable=False) - failure_reason = Column(String(1000), nullable=True) - - def __str__(self): - load_type = ( - f"FULL ({self.full_refresh_reason})" - if self.is_full_refresh - else f"INCREMENTAL from version '{self.last_sync_version}' to '{self.sync_version}'" - ) - execution_time_s = None - rows_per_second = None - - if self.execution_time_ms: - execution_time_s = max(self.execution_time_ms // 1000, 1) - - if self.rows_processed: - rows_per_second = self.rows_processed / execution_time_s - - return ( - "Model: {model}; " - "Load type: {load_type}; " - "Status: {status}; " - "Started on: {started}; " - "Completed on: {completed}; " - "Execution time: {exec_time}; " - "Batches processed: {batches}; " - "Rows processed: {rows}; " - "Average rows processed per second: {rows_per_second};".format( - model=self.model_name, - load_type=load_type, - status=self.status, - started=self.started_on.isoformat(), - completed=self.completed_on.isoformat() if self.completed_on else "n/a", - exec_time=f"{execution_time_s}s" if execution_time_s else "n/a", - batches=f"{self.batches_processed:,}" - if self.batches_processed - else "n/a", - rows=f"{self.rows_processed:,}" if self.rows_processed else "n/a", - rows_per_second=f"{rows_per_second:,.2f}" if rows_per_second else "n/a", - ) - ) +import uuid + +from sqlalchemy import ( + Column, + DateTime, + Integer, + String, + Boolean, + BigInteger, + ForeignKey, + Index, +) +from sqlalchemy.sql import func +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.inspection import inspect + +from rdl.entities import Base +from rdl.entities import ExecutionEntity +from rdl.shared import Constants + + +class ExecutionModelEntity(Base): + __tablename__ = "execution_model" + __table_args__ = {"schema": Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME} + execution_model_id = Column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + created_on = Column( + DateTime(timezone=True), nullable=False, server_default=func.now() + ) + updated_on = Column( + DateTime(timezone=True), + nullable=False, + server_default=func.now(), + onupdate=func.now(), + ) + execution_id = Column( + UUID(as_uuid=True), + ForeignKey( + f"{Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}." + f"{inspect(ExecutionEntity).tables[0].name}." + f"{inspect(ExecutionEntity).primary_key[0].name}" + ), + nullable=False, + ) + model_name = Column(String(250), nullable=False) + status = Column( + String(50), + nullable=False, + server_default=str(Constants.ExecutionModelStatus.STARTED), + index=True + ) + last_sync_version = Column(BigInteger, nullable=False) + sync_version = Column(BigInteger, nullable=False) + is_full_refresh = Column(Boolean, nullable=False) + full_refresh_reason = Column(String(100), nullable=False) + started_on = Column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + completed_on = Column(DateTime(timezone=True), nullable=True, index=True) + execution_time_ms = Column(BigInteger, nullable=True) + rows_processed = Column(BigInteger, nullable=True) + batches_processed = Column(Integer, nullable=True) + model_checksum = Column(String(100), nullable=False) + failure_reason = Column(String(1000), nullable=True) + + index_on_execution_id_model_name = Index("execution_model__index_on_execution_id_model_name", execution_id, model_name, unique=True) + index_on_model_name_completed_on = Index("execution_model__index_on_model_name_completed_on", model_name, completed_on) + + def __str__(self): + load_type = ( + f"FULL ({self.full_refresh_reason})" + if self.is_full_refresh + else f"INCREMENTAL from version '{self.last_sync_version}' to '{self.sync_version}'" + ) + execution_time_s = None + rows_per_second = None + + if self.execution_time_ms: + execution_time_s = max(self.execution_time_ms // 1000, 1) + + if self.rows_processed: + rows_per_second = self.rows_processed / execution_time_s + + return ( + "Model: {model}; " + "Load type: {load_type}; " + "Status: {status}; " + "Started on: {started}; " + "Completed on: {completed}; " + "Execution time: {exec_time}; " + "Batches processed: {batches}; " + "Rows processed: {rows}; " + "Average rows processed per second: {rows_per_second};".format( + model=self.model_name, + load_type=load_type, + status=self.status, + started=self.started_on.isoformat(), + completed=self.completed_on.isoformat() if self.completed_on else "n/a", + exec_time=f"{execution_time_s}s" if execution_time_s else "n/a", + batches=f"{self.batches_processed:,}" + if self.batches_processed + else "n/a", + rows=f"{self.rows_processed:,}" if self.rows_processed else "n/a", + rows_per_second=f"{rows_per_second:,.2f}" if rows_per_second else "n/a", + ) + )