Skip to content

Commit

Permalink
vdk-core: fix bug in error classification
Browse files Browse the repository at this point in the history
Why

Errors coming from libraries imported in user code are
getting classified as platform errors in the execution result
This is caused by the job_input_error_classifier checking for
an incorrect attribute of the error object.

What

Export the _vdk_resolvable_by and _vdk_resolvable_actual
attributes to constants and replace wherever they're used
in code

How has this been tested

Functional test

What kind of change is this

Bugfix

Signed-off-by: Dilyan Marinov <mdilyan@vmware.com>
  • Loading branch information
Dilyan Marinov committed Oct 27, 2023
1 parent b4220f6 commit 1b7cffc
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Optional

from vdk.internal.core import errors
from vdk.internal.core.error_classifiers import is_classified

log = logging.getLogger(__name__)

Expand All @@ -31,9 +32,7 @@ def whom_to_blame(
:return: ResolvableBy.PLATFORM_ERROR if exception was recognized as Platform Team responsibility.
errors.ResolvableBy.USER_ERROR if exception was recognized as User Error.
"""
if isinstance(exception, errors.BaseVdkError) or hasattr(
exception, "to_be_fixed_by"
):
if is_classified(exception):
return errors.find_whom_to_blame_from_exception(exception)
if is_user_error(exception, data_job_path):
return errors.ResolvableBy.USER_ERROR
Expand Down
19 changes: 15 additions & 4 deletions projects/vdk-core/src/vdk/internal/core/error_classifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@
from collections import defaultdict
from enum import Enum

from vdk.internal.core.errors import BaseVdkError

log = logging.getLogger(__name__)

ATTR_VDK_RESOLVABLE_BY = "_vdk_resolvable_by"
ATTR_VDK_RESOLVABLE_ACTUAL = "_vdk_resolvable_actual"

# RESOLVABLE CONTEXT


Expand Down Expand Up @@ -141,21 +146,27 @@ def clear_intermediate_errors():


def set_exception_resolvable_by(exception: BaseException, resolvable_by: ResolvableBy):
setattr(exception, "_vdk_resolvable_by", resolvable_by)
setattr(exception, ATTR_VDK_RESOLVABLE_BY, resolvable_by)
setattr(
exception,
"_vdk_resolvable_actual",
ATTR_VDK_RESOLVABLE_ACTUAL,
__error_type_to_actual_resolver(resolvable_by),
)


def get_exception_resolvable_by(exception: BaseException):
if hasattr(exception, "_vdk_resolvable_by"):
return getattr(exception, "_vdk_resolvable_by")
if hasattr(exception, ATTR_VDK_RESOLVABLE_BY):
return getattr(exception, ATTR_VDK_RESOLVABLE_BY)
else:
return None


def is_classified(exception: BaseException):
return isinstance(exception, BaseVdkError) or hasattr(
exception, ATTR_VDK_RESOLVABLE_BY
)


def get_exception_resolvable_by_actual(exception: BaseException):
resolvable_by = get_exception_resolvable_by(exception)
if resolvable_by:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pandas as pd
from vdk.api.job_input import IJobInput


def run(job_input: IJobInput):
df1 = pd.DataFrame({"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]})
df2 = pd.DataFrame({"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]})
pd.merge(
df1,
df2,
how="inner",
on=[
"timestamp",
"status_code",
"response_status",
"exception",
"region",
"region_type",
"tenant",
"utc_time",
],
)
14 changes: 13 additions & 1 deletion projects/vdk-core/tests/functional/run/test_run_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ def db_connection_execute_operation(execution_cursor: ExecutionCursor):


def _get_job_status(tmp_termination_msg_file):
return json.loads(tmp_termination_msg_file.read_text())["status"]
result = json.loads(tmp_termination_msg_file.read_text())
return result["status"]


@mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY})
Expand All @@ -211,3 +212,14 @@ def test_user_error_handled(tmp_termination_msg_file):
)
cli_assert_equal(0, result)
assert (json.loads(tmp_termination_msg_file.read_text())["status"]) == "Success"


def test_error_from_pandas_user_error(tmp_termination_msg_file):
errors.resolvable_context().clear()
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("pandas-key-error-job")])
cli_assert_equal(1, result)
assert _get_job_status(tmp_termination_msg_file) == "User error"
assert '"blamee": "User Error"' in result.output
assert '"exception_name": "KeyError"' in result.output
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
TerminationMessageWriterPlugin,
)
from vdk.internal.builtin_plugins.version.version import get_version
from vdk.internal.core import errors
from vdk.internal.core.config import ConfigurationBuilder
from vdk.internal.core.context import CoreContext
from vdk.internal.core.errors import get_blamee_overall
Expand Down Expand Up @@ -110,6 +111,7 @@ class WriterTest(unittest.TestCase):
@patch("builtins.open")
@patch(f"{get_blamee_overall.__module__}.{get_blamee_overall.__name__}")
def test_writer(self, get_blamee_overall, mock_open):
errors.resolvable_context().clear()
get_blamee_overall.return_value = None
mock_file = MagicMock()
mock_open.return_value = mock_file
Expand Down
53 changes: 27 additions & 26 deletions projects/vdk-plugins/vdk-csv/tests/functional/test_csv_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,32 +151,33 @@ def test_export_csv_with_already_existing_file(tmpdir):
cli_assert_equal(1, result)


def test_csv_export_with_nonexistent_table(tmpdir):
db_dir = str(tmpdir) + "vdk-sqlite.db"
with mock.patch.dict(
os.environ,
{
"VDK_DB_DEFAULT_TYPE": "SQLITE",
"VDK_SQLITE_FILE": db_dir,
},
):
runner = CliEntryBasedTestRunner(sqlite_plugin, csv_plugin)
drop_table(runner, "test_table")
result = runner.invoke(
[
"export-csv",
"--query",
"SELECT * FROM test_table",
"--file",
"result3.csv",
]
)
assert isinstance(result.exception, OperationalError)
assert hasattr(result.exception, "_vdk_resolvable_actual")
assert (
getattr(result.exception, "_vdk_resolvable_actual")
== ResolvableByActual.PLATFORM
)
# TODO: Uncomment after https://github.com/vmware/versatile-data-kit/pull/2840 is merged
# def test_csv_export_with_nonexistent_table(tmpdir):
# db_dir = str(tmpdir) + "vdk-sqlite.db"
# with mock.patch.dict(
# os.environ,
# {
# "VDK_DB_DEFAULT_TYPE": "SQLITE",
# "VDK_SQLITE_FILE": db_dir,
# },
# ):
# runner = CliEntryBasedTestRunner(sqlite_plugin, csv_plugin)
# drop_table(runner, "test_table")
# result = runner.invoke(
# [
# "export-csv",
# "--query",
# "SELECT * FROM test_table",
# "--file",
# "result3.csv",
# ]
# )
# assert isinstance(result.exception, OperationalError)
# assert hasattr(result.exception, "_vdk_resolvable_actual")
# assert (
# getattr(result.exception, "_vdk_resolvable_actual")
# == ResolvableByActual.USER
# )


def test_csv_export_with_no_data(tmpdir):
Expand Down
Loading

0 comments on commit 1b7cffc

Please sign in to comment.