Skip to content

Commit

Permalink
vdk-core: add config option for logging execution result
Browse files Browse the repository at this point in the history
Why?

User reasearch indicates that the execution result is too verbose
for local runs. Users don't expect a lot of output for successful
jobs and expect error output for failing jobs.

What?

Introduce the LOG_EXECUTION_RESULT config option that enables/disables displaying the
end result.

How was this tested?

Ran locally with successful and failing jobs
Functional tests

What kind of change is this?

Feature/non-breaking

Signed-off-by: Dilyan Marinov <mdilyan@vmware.com>
  • Loading branch information
Dilyan Marinov committed Nov 14, 2023
1 parent 16793ad commit 76c87dd
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 43 deletions.
45 changes: 28 additions & 17 deletions projects/vdk-core/src/vdk/internal/builtin_plugins/run/cli_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from vdk.internal.builtin_plugins.config.job_config import JobConfig
from vdk.internal.builtin_plugins.run import job_input_error_classifier
from vdk.internal.builtin_plugins.run.data_job import DataJobFactory
from vdk.internal.builtin_plugins.run.execution_results import ExecutionResult
from vdk.internal.builtin_plugins.run.execution_tracking import (
ExecutionTrackingPlugin,
)
Expand Down Expand Up @@ -120,6 +121,26 @@ def __warn_on_python_version_disparity(
"""
)

def __log_exec_result(self, execution_result: ExecutionResult) -> None:
# On some platforms, if the size of a string is too large, the
# logging module starts throwing OSError: [Errno 40] Message too long,
# so it is safer if we split large strings into smaller chunks.
string_exec_result = str(execution_result)
if len(string_exec_result) > 5000:
temp_exec_result = json.loads(string_exec_result)
steps = temp_exec_result.pop("steps_list")

log.info(
f"Data Job execution summary: {json.dumps(temp_exec_result, indent=2)}"
)

chunks = math.ceil(len(string_exec_result) / 5000)
for i in self.__split_into_chunks(exec_steps=steps, chunks=chunks):
log.info(f"Execution Steps: {json.dumps(i, indent=2)}")

else:
log.info(f"Data Job execution summary: {execution_result}")

def create_and_run_data_job(
self,
context: CoreContext,
Expand All @@ -141,24 +162,14 @@ def create_and_run_data_job(
execution_result = None
try:
execution_result = job.run(args)
# On some platforms, if the size of a string is too large, the
# logging module starts throwing OSError: [Errno 40] Message too long,
# so it is safer if we split large strings into smaller chunks.
string_exec_result = str(execution_result)
if len(string_exec_result) > 5000:
temp_exec_result = json.loads(string_exec_result)
steps = temp_exec_result.pop("steps_list")

log.info(
f"Data Job execution summary: {json.dumps(temp_exec_result, indent=2)}"
)

chunks = math.ceil(len(string_exec_result) / 5000)
for i in self.__split_into_chunks(exec_steps=steps, chunks=chunks):
log.info(f"Execution Steps: {json.dumps(i, indent=2)}")

if context.configuration.get_value("LOG_EXECUTION_RESULT"):
self.__log_exec_result(execution_result)
else:
log.info(f"Data Job execution summary: {execution_result}")
if execution_result.is_success():
log.info("Job execution result: SUCCESS")
if execution_result.is_failed():
log.info("Job execution result: FAILED")

except BaseException as e:
log.error(
"\n".join(
Expand Down
22 changes: 14 additions & 8 deletions projects/vdk-core/tests/functional/run/test_run_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,17 @@ def test_user_error_handled(tmp_termination_msg_file):


def test_error_from_pandas_user_error(tmp_termination_msg_file):
errors.resolvable_context().clear()
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("pandas-key-error-job")])
cli_assert_equal(1, result)
assert _get_job_status(tmp_termination_msg_file) == "User error"
assert '"blamee": "User Error"' in result.output
assert '"exception_name": "KeyError"' in result.output
with mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
errors.resolvable_context().clear()
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("pandas-key-error-job")])
cli_assert_equal(1, result)
assert _get_job_status(tmp_termination_msg_file) == "User error"
assert '"blamee": "User Error"' in result.output
assert '"exception_name": "KeyError"' in result.output
41 changes: 24 additions & 17 deletions projects/vdk-core/tests/functional/run/test_run_job_cancel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os
import unittest.mock
from unittest.mock import MagicMock
from unittest.mock import patch
Expand Down Expand Up @@ -27,23 +28,29 @@ def test_run(self, patched_writer: MagicMock):
proper message is logged confirming job was cancelled from cancel_job_execution() method.
:return:
"""
test_runner = CliEntryBasedTestRunner()
with unittest.mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
test_runner = CliEntryBasedTestRunner()

# clear recorded errors before invoking run since other tests might leave a dirty state
errors.clear_intermediate_errors()
# clear recorded errors before invoking run since other tests might leave a dirty state
errors.clear_intermediate_errors()

result: Result = test_runner.invoke(["run", util.job_path("cancel-job")])
result: Result = test_runner.invoke(["run", util.job_path("cancel-job")])

# Check the writer plugin wasn't called with user or platform errors.
# 1st param is overall blamee, should be None.
# 2nd parm is overall user error in this case it's empty string which is a falsy value.
# 3rd param is context.configuration and can be ignored for this test.
patched_writer.assert_called_once_with(None, "", unittest.mock.ANY)
assert "Step Cancel 1." in result.output
assert "Step Cancel 2." in result.output
assert "Step Cancel 3." not in result.output
assert (
"Job/template execution was skipped from job/template step code."
in result.output
)
cli_assert_equal(0, result)
# Check the writer plugin wasn't called with user or platform errors.
# 1st param is overall blamee, should be None.
# 2nd parm is overall user error in this case it's empty string which is a falsy value.
# 3rd param is context.configuration and can be ignored for this test.
patched_writer.assert_called_once_with(None, "", unittest.mock.ANY)
assert "Step Cancel 1." in result.output
assert "Step Cancel 2." in result.output
assert "Step Cancel 3." not in result.output
assert (
"Job/template execution was skipped from job/template step code."
in result.output
)
cli_assert_equal(0, result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import os
import unittest.mock

from click.testing import Result
from functional.run import util
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner


def test_run_log_execution_result_enabled():
with unittest.mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("simple-job")])

cli_assert_equal(0, result)
assert "Data Job execution summary" in result.output
assert "steps_list" in result.output


def test_run_log_execution_result_enabled_on_fail():
with unittest.mock.patch.dict(
os.environ,
{
"VDK_LOG_EXECUTION_RESULT": "True",
},
):
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("fail-job")])

cli_assert_equal(1, result)
assert "Data Job execution summary" in result.output
assert "steps_list" in result.output


def test_run_log_execution_result_disabled():
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("simple-job")])

cli_assert_equal(0, result)
assert "Data Job execution summary" not in result.output
assert "steps_list" not in result.output
assert "Job execution result: SUCCESS" in result.output


def test_run_log_execution_result_disabled_on_fail():
runner = CliEntryBasedTestRunner()

result: Result = runner.invoke(["run", util.job_path("fail-job")])

cli_assert_equal(1, result)
assert "Data Job execution summary" not in result.output
assert "steps_list" not in result.output
assert "Job execution result: FAILED" in result.output
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
log = logging.getLogger(__name__)

VDK_DB_DEFAULT_TYPE = "VDK_DB_DEFAULT_TYPE"
VDK_LOG_EXECUTION_RESULT = "VDK_LOG_EXECUTION_RESULT"


class ValidatedSqLite3MemoryDbPlugin:
Expand Down Expand Up @@ -79,7 +80,10 @@ def test_run_dbapi_connection_no_such_db_type():
logging.getLogger("vdk").setLevel(logging.INFO)
runner = CliEntryBasedTestRunner()

with mock.patch.dict(os.environ, {VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY}):
with mock.patch.dict(
os.environ,
{VDK_DB_DEFAULT_TYPE: DB_TYPE_SQLITE_MEMORY, VDK_LOG_EXECUTION_RESULT: "True"},
):
result: Result = runner.invoke(
[
"run",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def setUp(self) -> None:
{
"VDK_INGEST_METHOD_DEFAULT": "memory",
"VDK_INGEST_PAYLOAD_PREPROCESS_SEQUENCE": "vdk-gdp-execution-id",
"VDK_LOG_EXECUTION_RESULT": "True",
},
)
def test_ingested_payload_expansion(self) -> None:
Expand All @@ -46,6 +47,7 @@ def test_ingested_payload_expansion(self) -> None:
"VDK_INGEST_METHOD_DEFAULT": "memory",
"VDK_INGEST_PAYLOAD_PREPROCESS_SEQUENCE": "vdk-gdp-execution-id",
"VDK_GDP_EXECUTION_ID_MICRO_DIMENSION_NAME": "micro_dimension_name_customized",
"VDK_LOG_EXECUTION_RESULT": "True",
},
)
def test_micro_dimension_name_configurable(self) -> None:
Expand Down
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-trino/tests/test_ingest_to_trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
VDK_TRINO_PORT = "VDK_TRINO_PORT"
VDK_TRINO_USE_SSL = "VDK_TRINO_USE_SSL"
VDK_INGEST_METHOD_DEFAULT = "VDK_INGEST_METHOD_DEFAULT"
VDK_LOG_EXECUTION_RESULT = "VDK_LOG_EXECUTION_RESULT"


@pytest.mark.usefixtures("trino_service")
Expand All @@ -29,6 +30,7 @@
VDK_TRINO_PORT: "8080",
VDK_TRINO_USE_SSL: "False",
VDK_INGEST_METHOD_DEFAULT: "TRINO",
VDK_LOG_EXECUTION_RESULT: "True",
},
)
class IngestToTrinoTests(TestCase):
Expand Down
2 changes: 2 additions & 0 deletions projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY = (
"VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY"
)
VDK_LOG_EXECUTION_RESULT = "VDK_LOG_EXECUTION_RESULT"

org_move_data_to_table = TrinoTemplateQueries.move_data_to_table

Expand Down Expand Up @@ -60,6 +61,7 @@ def mock_os_environ():
VDK_TRINO_PORT: "8080",
VDK_TRINO_USE_SSL: "False",
VDK_TRINO_TEMPLATES_DATA_TO_TARGET_STRATEGY: "INSERT_SELECT",
VDK_LOG_EXECUTION_RESULT: "True",
},
):
yield
Expand Down

0 comments on commit 76c87dd

Please sign in to comment.