Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rdl/data_load_tracking/DataLoadTrackerRepository.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def complete_execution(self, execution_id, total_number_of_models,
.one()

execution_end_time = session.query(func.now()).scalar()
total_execution_seconds = (execution_end_time - current_execution.started_on).total_seconds()
total_execution_seconds = max((execution_end_time - current_execution.started_on).total_seconds(), 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add up milliseconds?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. i don't understand.. do you mean round milliseconds?
anyway.. the field this is saved in is for seconds.. so i'm trying to record at least 1 second and avoid recording 0 seconds in cases of close-to-zero timelines.

total_rows_processed = self.get_execution_rows(current_execution.execution_id)
total_batches_processed = self.get_execution_batches(current_execution.execution_id)

Expand All @@ -53,7 +53,7 @@ def complete_execution(self, execution_id, total_number_of_models,
current_execution.rows_processed = total_rows_processed
current_execution.batches_processed = total_batches_processed
session.commit()
self.logger.info(current_execution)
self.logger.info(f'Completed {current_execution}')
session.close()
return total_rows_processed

Expand Down Expand Up @@ -82,7 +82,7 @@ def save_execution_model(self, data_load_tracker):
.one()

execution_end_time = session.query(func.now()).scalar()
total_execution_seconds = (execution_end_time - current_execution_model.started_on).total_seconds()
total_execution_seconds = max((execution_end_time - current_execution_model.started_on).total_seconds(), 1)

current_execution_model.completed_on = execution_end_time
current_execution_model.execution_time_ms = int(total_execution_seconds * 1000)
Expand Down
2 changes: 1 addition & 1 deletion rdl/data_sources/DataSourceFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(self, logger=None):
def create_source(self, connection_string):
for source in self.sources:
if source.can_handle_connection_string(connection_string):
self.logger.debug(f"Found handler '{source}' for connection string.")
self.logger.info(f"Found handler '{source}' for given connection string.")
return source(connection_string)

raise RuntimeError('There are no data sources that can handle this connection string')
Expand Down
45 changes: 32 additions & 13 deletions rdl/entities/execution_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,35 @@ class ExecutionEntity(Base):
models_processed = Column(Integer, nullable=True)

def __str__(self):
if self.status == Constants.ExecutionStatus.STARTED:
return f"Started Execution ID: {self.execution_id} at {self.started_on}"

total_execution_seconds = self.execution_time_s
execution_hours = total_execution_seconds // 3600
execution_minutes = (total_execution_seconds // 60) % 60
execution_seconds = total_execution_seconds % 60

return f"Completed Execution ID: {self.execution_id}" \
f"; Models Processed: {self.models_processed:,}" \
f"; Rows Processed: {self.rows_processed:,}" \
f"; Execution Time: {execution_hours}h {execution_minutes}m {execution_seconds}s" \
f"; Average rows processed per second: {(self.rows_processed//max(total_execution_seconds, 1)):,}."
execution_time_str = None
rows_per_second = None

if self.execution_time_s:
total_execution_seconds = self.execution_time_s

execution_hours = total_execution_seconds // 3600
execution_minutes = (total_execution_seconds // 60) % 60
execution_seconds = total_execution_seconds % 60
execution_time_str = f'{execution_hours}h {execution_minutes}m {execution_seconds}s'

if self.rows_processed:
rows_per_second = (self.rows_processed//max(total_execution_seconds, 1))

return 'Execution ID: {exec_id}; ' \
'Status: {status}; ' \
'Started on: {started}; ' \
'Completed on: {completed}; ' \
'Execution time: {exec_time}; ' \
'Models processed: {models}; ' \
'Batches processed: {batches};' \
'Rows processed: {rows}; ' \
'Average rows processed per second: {rows_per_second};'.format(
exec_id=self.execution_id,
status=self.status,
started=self.started_on.isoformat(),
completed=self.completed_on.isoformat() if self.completed_on else 'n/a',
exec_time=execution_time_str if execution_time_str else 'n/a',
models=f'{self.models_processed:,}' if self.models_processed else 'n/a',
batches=f'{self.batches_processed:,}' if self.batches_processed else 'n/a',
rows=f'{self.rows_processed:,}' if self.rows_processed else 'n/a',
rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else 'n/a')
38 changes: 29 additions & 9 deletions rdl/entities/execution_model_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,32 @@ class ExecutionModelEntity(Base):
failure_reason = Column(String(1000), nullable=True)

def __str__(self):
load_type = 'FULL' if self.is_full_refresh else f"INCREMENTAL from " \
f"version '{self.last_sync_version}' " \
f"to '{self.sync_version}'"

execution_tims_s = max(self.execution_time_ms // 1000, 1)
rows_per_second = self.rows_processed / execution_tims_s
return f"Rows: {self.rows_processed}, " \
f"Load type: {load_type}, " \
f"Total Execution Time: {execution_tims_s}s @ {rows_per_second:.2f} rows per second "
load_type = f'FULL ({self.full_refresh_reason})' if self.is_full_refresh else \
f"INCREMENTAL from version '{self.last_sync_version}' to '{self.sync_version}'"
execution_time_s = None
rows_per_second = None

if self.execution_time_ms:
execution_time_s = max(self.execution_time_ms // 1000, 1)

if self.rows_processed:
rows_per_second = self.rows_processed / execution_time_s

return 'Model: {model}; ' \
'Load type: {load_type}; ' \
'Status: {status}; ' \
'Started on: {started}; ' \
'Completed on: {completed}; ' \
'Execution time: {exec_time}; ' \
'Batches processed: {batches}; ' \
'Rows processed: {rows}; ' \
'Average rows processed per second: {rows_per_second};'.format(
model=self.model_name,
load_type=load_type,
status=self.status,
started=self.started_on.isoformat(),
completed=self.completed_on.isoformat() if self.completed_on else 'n/a',
exec_time=f'{execution_time_s}s' if execution_time_s else 'n/a',
batches=f'{self.batches_processed:,}' if self.batches_processed else 'n/a',
rows=f'{self.rows_processed:,}' if self.rows_processed else 'n/a',
rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else 'n/a')