Skip to content

Commit

Permalink
[COST-4440] Warn when a partition already exists for OCP processing (#…
Browse files Browse the repository at this point in the history
…4838)

* Warn when a partition already exists for OCP processing

This appears to be a problem only with OCP processing and only when two workers
are running the same workload practiaclly at the same exact time. This may
indicate a problem with our worker caching not preventing simultaneous
execution, or a problem with celery allowing duplicate tasks to run. We don’t
have enough information to determine that currently.

Since this is isolated to OCP processing, I fixed it only in the OPC processing
instead of making changes to update_summary_tables, which would affect all
provider types.

* Add tests
  • Loading branch information
samdoran authored Dec 19, 2023
1 parent 5cc2278 commit e01b5ce
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 1 deletion.
17 changes: 16 additions & 1 deletion koku/masu/database/ocp_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from api.metrics.constants import DEFAULT_DISTRIBUTION_TYPE
from api.provider.models import Provider
from koku.database import SQLScriptAtomicExecutorMixin
from koku.trino_database import TrinoStatementExecError
from masu.database import OCP_REPORT_TABLE_MAP
from masu.database.report_db_accessor_base import ReportDBAccessorBase
from masu.util.common import filter_dictionary
Expand Down Expand Up @@ -368,7 +369,21 @@ def populate_line_item_daily_summary_table_trino(
"storage_exists": storage_exists,
}

self._execute_trino_multipart_sql_query(sql, bind_params=sql_params)
try:
self._execute_trino_multipart_sql_query(sql, bind_params=sql_params)
except TrinoStatementExecError as trino_exc:
if "one or more partitions already exist" in str(trino_exc).lower():
LOG.warning(
log_json(
ctx=self.extract_context_from_sql_params(sql_params),
msg=getattr(trino_exc.__cause__, "message", None),
error_type=getattr(trino_exc.__cause__, "error_type", None),
error_name=getattr(trino_exc.__cause__, "error_name", None),
query_id=getattr(trino_exc.__cause__, "query_id", None),
)
)
else:
raise

def populate_pod_label_summary_table(self, report_period_ids, start_date, end_date):
"""Populate the line item aggregated totals data table."""
Expand Down
90 changes: 90 additions & 0 deletions koku/masu/test/database/test_ocp_report_db_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from api.iam.test.iam_test_case import FakeTrinoConn
from api.provider.models import Provider
from koku import trino_database as trino_db
from koku.trino_database import TrinoStatementExecError
from masu.database import OCP_REPORT_TABLE_MAP
from masu.database.ocp_report_db_accessor import OCPReportDBAccessor
from masu.test import MasuTestCase
Expand Down Expand Up @@ -84,6 +85,95 @@ def test_populate_line_item_daily_summary_table_trino(self, mock_execute, *args)
)
mock_execute.assert_called()

@patch("masu.database.ocp_report_db_accessor.trino_table_exists", return_value=True)
def test_populate_line_item_daily_summary_table_trino_exception_warn(self, mock_table_exists):
"""
Test that a warning is logged when a TrinoStatementExecError is raised because
a partion already exists.
"""

start_date = self.dh.this_month_start
end_date = self.dh.next_month_start
cluster_id = "ocp-cluster"
cluster_alias = "OCP FTW"
report_period_id = 1
source = self.provider_uuid
message = "One or more Partitions Already exist"
with (
patch.object(self.accessor, "delete_ocp_hive_partition_by_day"),
patch.object(self.accessor, "_execute_trino_multipart_sql_query") as mock_sql_query,
self.accessor as acc,
self.assertLogs("masu.database.ocp_report_db_accessor", level="WARN") as logger,
):
mock_sql_query.side_effect = TrinoStatementExecError(message)
mock_sql_query.side_effect.__cause__ = TrinoExternalError({"message": message})

acc.populate_line_item_daily_summary_table_trino(
start_date, end_date, report_period_id, cluster_id, cluster_alias, source
)

self.assertIn(
f"WARNING:masu.database.ocp_report_db_accessor:{{'message': '{message}'",
logger.output[0],
)

@patch("masu.database.ocp_report_db_accessor.trino_table_exists", return_value=True)
def test_populate_line_item_daily_summary_table_trino_exception(self, mock_table_exists):
"""
Test that a TrinoStatementExecError is raised for errors that are not partition related.
"""

start_date = self.dh.this_month_start
end_date = self.dh.next_month_start
cluster_id = "ocp-cluster"
cluster_alias = "OCP FTW"
report_period_id = 1
source = self.provider_uuid
with (
patch.object(self.accessor, "delete_ocp_hive_partition_by_day"),
patch.object(self.accessor, "_execute_trino_multipart_sql_query") as mock_sql_query,
self.accessor as acc,
self.assertRaisesRegex(TrinoStatementExecError, "Some other reason"),
):
mock_sql_query.side_effect = TrinoStatementExecError("Some other reason")

acc.populate_line_item_daily_summary_table_trino(
start_date, end_date, report_period_id, cluster_id, cluster_alias, source
)

@patch("masu.database.ocp_report_db_accessor.trino_table_exists", return_value=True)
def test_populate_line_item_daily_summary_table_trino_exception_other(self, mock_table_exists):
"""
Test that a warning is logged when a TrinoStatementExecError is raised because
a partion already exists and that no errors are encountered if an exception other
than a TrinoQueryError is the cause.
"""

start_date = self.dh.this_month_start
end_date = self.dh.next_month_start
cluster_id = "ocp-cluster"
cluster_alias = "OCP FTW"
report_period_id = 1
source = self.provider_uuid
message = "One or more Partitions Already exist"
with (
patch.object(self.accessor, "delete_ocp_hive_partition_by_day"),
patch.object(self.accessor, "_execute_trino_multipart_sql_query") as mock_sql_query,
self.accessor as acc,
self.assertLogs("masu.database.ocp_report_db_accessor", level="WARN") as logger,
):
mock_sql_query.side_effect = TrinoStatementExecError(message)
mock_sql_query.side_effect.__cause__ = ValueError("Some other exception type")

acc.populate_line_item_daily_summary_table_trino(
start_date, end_date, report_period_id, cluster_id, cluster_alias, source
)

self.assertIn(
"WARNING:masu.database.ocp_report_db_accessor:{'message': None",
logger.output[0],
)

@patch("masu.database.ocp_report_db_accessor.trino_table_exists")
@patch("masu.database.ocp_report_db_accessor.pkgutil.get_data")
@patch("masu.database.report_db_accessor_base.trino_db.connect")
Expand Down

0 comments on commit e01b5ce

Please sign in to comment.