Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-core: implement config option for logging execution result #2831

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions projects/vdk-core/src/vdk/internal/builtin_plugins/run/cli_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from vdk.internal.builtin_plugins.config.job_config import JobConfig
from vdk.internal.builtin_plugins.run import job_input_error_classifier
from vdk.internal.builtin_plugins.run.data_job import DataJobFactory
from vdk.internal.builtin_plugins.run.execution_results import ExecutionResult
from vdk.internal.builtin_plugins.run.execution_tracking import (
ExecutionTrackingPlugin,
)
Expand Down Expand Up @@ -120,6 +121,26 @@ def __warn_on_python_version_disparity(
"""
)

def __log_exec_result(self, execution_result: ExecutionResult) -> None:
# On some platforms, if the size of a string is too large, the
# logging module starts throwing OSError: [Errno 40] Message too long,
# so it is safer if we split large strings into smaller chunks.
string_exec_result = str(execution_result)
DeltaMichael marked this conversation as resolved.
Show resolved Hide resolved
if len(string_exec_result) > 5000:
temp_exec_result = json.loads(string_exec_result)
DeltaMichael marked this conversation as resolved.
Show resolved Hide resolved
steps = temp_exec_result.pop("steps_list")
DeltaMichael marked this conversation as resolved.
Show resolved Hide resolved

log.info(
f"Data Job execution summary: {json.dumps(temp_exec_result, indent=2)}"
)

chunks = math.ceil(len(string_exec_result) / 5000)
for i in self.__split_into_chunks(exec_steps=steps, chunks=chunks):
log.info(f"Execution Steps: {json.dumps(i, indent=2)}")

else:
log.info(f"Data Job execution summary: {execution_result}")

def create_and_run_data_job(
self,
context: CoreContext,
Expand All @@ -141,24 +162,14 @@ def create_and_run_data_job(
execution_result = None
try:
execution_result = job.run(args)
# On some platforms, if the size of a string is too large, the
# logging module starts throwing OSError: [Errno 40] Message too long,
# so it is safer if we split large strings into smaller chunks.
string_exec_result = str(execution_result)
if len(string_exec_result) > 5000:
temp_exec_result = json.loads(string_exec_result)
steps = temp_exec_result.pop("steps_list")

log.info(
f"Data Job execution summary: {json.dumps(temp_exec_result, indent=2)}"
)

chunks = math.ceil(len(string_exec_result) / 5000)
for i in self.__split_into_chunks(exec_steps=steps, chunks=chunks):
log.info(f"Execution Steps: {json.dumps(i, indent=2)}")

if context.configuration.get_value("LOG_EXECUTION_RESULT"):
DeltaMichael marked this conversation as resolved.
Show resolved Hide resolved
self.__log_exec_result(execution_result)
else:
log.info(f"Data Job execution summary: {execution_result}")
if execution_result.is_success():
log.info("Job execution result: SUCCESS")
if execution_result.is_failed():
log.info("Job execution result: FAILED")

except BaseException as e:
log.error(
"\n".join(
Expand Down
22 changes: 14 additions & 8 deletions projects/vdk-core/tests/functional/run/test_run_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,17 @@ def test_user_error_handled(tmp_termination_msg_file):


def test_error_from_pandas_user_error(tmp_termination_msg_file):
errors.resolvable_context().clear()
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("pandas-key-error-job")])
cli_assert_equal(1, result)
assert _get_job_status(tmp_termination_msg_file) == "User error"
assert '"blamee": "User Error"' in result.output
assert '"exception_name": "KeyError"' in result.output
with mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
errors.resolvable_context().clear()
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("pandas-key-error-job")])
cli_assert_equal(1, result)
assert _get_job_status(tmp_termination_msg_file) == "User error"
assert '"blamee": "User Error"' in result.output
assert '"exception_name": "KeyError"' in result.output
41 changes: 24 additions & 17 deletions projects/vdk-core/tests/functional/run/test_run_job_cancel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os
import unittest.mock
from unittest.mock import MagicMock
from unittest.mock import patch
Expand Down Expand Up @@ -27,23 +28,29 @@ def test_run(self, patched_writer: MagicMock):
proper message is logged confirming job was cancelled from cancel_job_execution() method.
:return:
"""
test_runner = CliEntryBasedTestRunner()
with unittest.mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
test_runner = CliEntryBasedTestRunner()

# clear recorded errors before invoking run since other tests might leave a dirty state
errors.clear_intermediate_errors()
# clear recorded errors before invoking run since other tests might leave a dirty state
errors.clear_intermediate_errors()

result: Result = test_runner.invoke(["run", util.job_path("cancel-job")])
result: Result = test_runner.invoke(["run", util.job_path("cancel-job")])

# Check the writer plugin wasn't called with user or platform errors.
# 1st param is overall blamee, should be None.
# 2nd parm is overall user error in this case it's empty string which is a falsy value.
# 3rd param is context.configuration and can be ignored for this test.
patched_writer.assert_called_once_with(None, "", unittest.mock.ANY)
assert "Step Cancel 1." in result.output
assert "Step Cancel 2." in result.output
assert "Step Cancel 3." not in result.output
assert (
"Job/template execution was skipped from job/template step code."
in result.output
)
cli_assert_equal(0, result)
# Check the writer plugin wasn't called with user or platform errors.
# 1st param is overall blamee, should be None.
# 2nd parm is overall user error in this case it's empty string which is a falsy value.
# 3rd param is context.configuration and can be ignored for this test.
patched_writer.assert_called_once_with(None, "", unittest.mock.ANY)
assert "Step Cancel 1." in result.output
assert "Step Cancel 2." in result.output
assert "Step Cancel 3." not in result.output
assert (
"Job/template execution was skipped from job/template step code."
in result.output
)
cli_assert_equal(0, result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os
import unittest.mock

from click.testing import Result
from functional.run import util
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner


def test_run_log_execution_result_enabled():
with unittest.mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("simple-job")])

cli_assert_equal(0, result)
assert "Data Job execution summary" in result.output
assert "steps_list" in result.output


def test_run_log_execution_result_enabled_on_fail():
with unittest.mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("fail-job")])

cli_assert_equal(1, result)
assert "Data Job execution summary" in result.output
assert "steps_list" in result.output


def test_run_log_execution_result_disabled():
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("simple-job")])

cli_assert_equal(0, result)
assert "Data Job execution summary" not in result.output
assert "steps_list" not in result.output
assert "Job execution result: SUCCESS" in result.output


def test_run_log_execution_result_disabled_on_fail():
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("fail-job")])

cli_assert_equal(1, result)
assert "Data Job execution summary" not in result.output
assert "steps_list" not in result.output
assert "Job execution result: FAILED" in result.output
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
log = logging.getLogger(__name__)

VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE"
VDK_LOG_EXECUTION_RESULT = "VDK_LOG_EXECUTION_RESULT"


class ValidatedSqLite3MemoryDbPlugin:
Expand Down Expand Up @@ -79,7 +80,10 @@ def test_run_dbapi_connection_no_such_db_type():
logging.getLogger("vdk").setLevel(logging.INFO)
runner = CliEntryBasedTestRunner()

with mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY}):
with mock.patch.dict(
os.environ,
{VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY, VDK_LOG_EXECUTION_RESULT: "True"},
):
result: Result = runner.invoke(
[
"run",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def setUp(self) -> None:
{
"VDK_INGEST_METHOD_DEFAULT": "memory",
"VDK_INGEST_PAYLOAD_PREPROCESS_SEQUENCE": "vdk-gdp-execution-id",
"VDK_LOG_EXECUTION_RESULT": "True",
},
)
def test_ingested_payload_expansion(self) -> None:
Expand All @@ -46,6 +47,7 @@ def test_ingested_payload_expansion(self) -> None:
"VDK_INGEST_METHOD_DEFAULT": "memory",
"VDK_INGEST_PAYLOAD_PREPROCESS_SEQUENCE": "vdk-gdp-execution-id",
"VDK_GDP_EXECUTION_ID_MICRO_DIMENSION_NAME": "micro_dimension_name_customized",
"VDK_LOG_EXECUTION_RESULT": "True",
},
)
def test_micro_dimension_name_configurable(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-trino/tests/test_ingest_to_trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
VDK_TRINO_PORT = "VDK_TRINO_PORT"
VDK_TRINO_USE_SSL = "VDK_TRINO_USE_SSL"
VDK_INGEST_METHOD_DEFAULT = "VDK_INGEST_METHOD_DEFAULT"
VDK_LOG_EXECUTION_RESULT = "VDK_LOG_EXECUTION_RESULT"


@pytest.mark.usefixtures("trino_service")
Expand All @@ -29,6 +30,7 @@
VDK_TRINO_PORT: "8080",
VDK_TRINO_USE_SSL: "False",
VDK_INGEST_METHOD_DEFAULT: "TRINO",
VDK_LOG_EXECUTION_RESULT: "True",
},
)
class IngestToTrinoTests(TestCase):
Expand Down
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY = (
"VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY"
)
VDK_LOG_EXECUTION_RESULT = "VDK_LOG_EXECUTION_RESULT"

org_move_data_to_table = TrinoTemplateQueries.move_data_to_table

Expand Down Expand Up @@ -60,6 +61,7 @@ def mock_os_environ():
VDK_TRINO_PORT: "8080",
VDK_TRINO_USE_SSL: "False",
VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY: "INSERT_SELECT",
VDK_LOG_EXECUTION_RESULT: "True",
},
):
yield
Expand Down
Loading