Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3006.x] Only process events that are job returns #65402

Merged
merged 3 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/65400.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Client only process events which tag conforms to an event return.
62 changes: 34 additions & 28 deletions salt/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def gather_job_info(self, jid, tgt, tgt_type, listen=True, **kwargs):
tgt_type=tgt_type,
timeout=timeout,
listen=listen,
**kwargs
**kwargs,
)

if "jid" in pub_data:
Expand Down Expand Up @@ -365,7 +365,7 @@ def run_job(
jid="",
kwarg=None,
listen=False,
**kwargs
**kwargs,
):
"""
Asynchronously send a command to connected minions
Expand Down Expand Up @@ -393,7 +393,7 @@ def run_job(
jid=jid,
timeout=self._get_timeout(timeout),
listen=listen,
**kwargs
**kwargs,
)
except SaltClientError:
# Re-raise error with specific message
Expand Down Expand Up @@ -429,7 +429,7 @@ def run_job_async(
kwarg=None,
listen=True,
io_loop=None,
**kwargs
**kwargs,
):
"""
Asynchronously send a command to connected minions
Expand Down Expand Up @@ -458,7 +458,7 @@ def run_job_async(
timeout=self._get_timeout(timeout),
io_loop=io_loop,
listen=listen,
**kwargs
**kwargs,
)
except SaltClientError:
# Re-raise error with specific message
Expand Down Expand Up @@ -511,7 +511,7 @@ def cmd_subset(
cli=False,
progress=False,
full_return=False,
**kwargs
**kwargs,
):
"""
Execute a command on a random subset of the targeted systems
Expand Down Expand Up @@ -553,7 +553,7 @@ def cmd_subset(
kwarg=kwarg,
progress=progress,
full_return=full_return,
**kwargs
**kwargs,
)

def cmd_batch(
Expand All @@ -565,7 +565,7 @@ def cmd_batch(
ret="",
kwarg=None,
batch="10%",
**kwargs
**kwargs,
):
"""
Iteratively execute a command on subsets of minions at a time
Expand Down Expand Up @@ -641,7 +641,7 @@ def cmd(
jid="",
full_return=False,
kwarg=None,
**kwargs
**kwargs,
):
"""
Synchronously execute a command on targeted minions
Expand Down Expand Up @@ -759,7 +759,7 @@ def cmd(
jid,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)

if not pub_data:
Expand All @@ -772,7 +772,7 @@ def cmd(
self._get_timeout(timeout),
tgt,
tgt_type,
**kwargs
**kwargs,
):

if fn_ret:
Expand All @@ -797,7 +797,7 @@ def cmd_cli(
verbose=False,
kwarg=None,
progress=False,
**kwargs
**kwargs,
):
"""
Used by the :command:`salt` CLI. This method returns minion returns as
Expand All @@ -821,7 +821,7 @@ def cmd_cli(
timeout,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)
if not self.pub_data:
yield self.pub_data
Expand All @@ -835,7 +835,7 @@ def cmd_cli(
tgt_type,
verbose,
progress,
**kwargs
**kwargs,
):

if not fn_ret:
Expand Down Expand Up @@ -866,7 +866,7 @@ def cmd_iter(
tgt_type="glob",
ret="",
kwarg=None,
**kwargs
**kwargs,
):
"""
Yields the individual minion returns as they come in
Expand Down Expand Up @@ -901,7 +901,7 @@ def cmd_iter(
timeout,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)

if not pub_data:
Expand All @@ -915,7 +915,7 @@ def cmd_iter(
timeout=self._get_timeout(timeout),
tgt=tgt,
tgt_type=tgt_type,
**kwargs
**kwargs,
):
if not fn_ret:
continue
Expand All @@ -936,7 +936,7 @@ def cmd_iter_no_block(
kwarg=None,
show_jid=False,
verbose=False,
**kwargs
**kwargs,
):
"""
Yields the individual minion returns as they come in, or None
Expand Down Expand Up @@ -972,7 +972,7 @@ def cmd_iter_no_block(
timeout,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)

if not pub_data:
Expand All @@ -985,7 +985,7 @@ def cmd_iter_no_block(
tgt=tgt,
tgt_type=tgt_type,
block=False,
**kwargs
**kwargs,
):
if fn_ret and any([show_jid, verbose]):
for minion in fn_ret:
Expand All @@ -1007,7 +1007,7 @@ def cmd_full_return(
ret="",
verbose=False,
kwarg=None,
**kwargs
**kwargs,
):
"""
Execute a salt command and return
Expand All @@ -1024,7 +1024,7 @@ def cmd_full_return(
timeout,
kwarg=kwarg,
listen=True,
**kwargs
**kwargs,
)

if not pub_data:
Expand All @@ -1046,7 +1046,7 @@ def get_cli_returns(
tgt_type="glob",
verbose=False,
show_jid=False,
**kwargs
**kwargs,
):
"""
Starts a watcher looking at the return data for a specified JID
Expand Down Expand Up @@ -1123,7 +1123,7 @@ def get_iter_returns(
tgt_type="glob",
expect_minions=False,
block=True,
**kwargs
**kwargs,
):
"""
Watch the event system and return job data as it comes in
Expand Down Expand Up @@ -1202,7 +1202,13 @@ def get_iter_returns(
if "missing" in raw.get("data", {}):
missing.update(raw["data"]["missing"])
continue

# Anything below this point is expected to be a job return event.
if not raw["tag"].startswith(f"salt/job/{jid}/ret"):
log.debug("Skipping non return event: %s", raw["tag"])
continue
if "return" not in raw["data"]:
log.warning("Malformed event return: %s", raw["tag"])
continue
if kwargs.get("raw", False):
found.add(raw["data"]["id"])
Expand Down Expand Up @@ -1628,7 +1634,7 @@ def get_cli_event_returns(
progress=False,
show_timeout=False,
show_jid=False,
**kwargs
**kwargs,
):
"""
Get the returns for the command line interface via the event system
Expand Down Expand Up @@ -1658,7 +1664,7 @@ def get_cli_event_returns(
expect_minions=(
kwargs.pop("expect_minions", False) or verbose or show_timeout
),
**kwargs
**kwargs,
):
log.debug("return event: %s", ret)
return_count = return_count + 1
Expand Down Expand Up @@ -1851,7 +1857,7 @@ def pub(
jid="",
timeout=5,
listen=False,
**kwargs
**kwargs,
):
"""
Take the required arguments and publish the given command.
Expand Down Expand Up @@ -1953,7 +1959,7 @@ def pub_async(
timeout=5,
io_loop=None,
listen=True,
**kwargs
**kwargs,
):
"""
Take the required arguments and publish the given command.
Expand Down
35 changes: 35 additions & 0 deletions tests/pytests/integration/cli/test_salt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@
]


@pytest.fixture
def salt_minion_2(salt_master):
"""
A running salt-minion fixture
"""
factory = salt_master.salt_minion_daemon(
"minion-2",
extra_cli_arguments_after_first_start_failure=["--log-level=info"],
)
with factory.started(start_timeout=120):
yield factory


def test_context_retcode_salt(salt_cli, salt_minion):
"""
Test that a nonzero retcode set in the context dunder will cause the
Expand Down Expand Up @@ -234,3 +247,25 @@ def test_interrupt_on_long_running_job(salt_cli, salt_master, salt_minion):
assert "Exiting gracefully on Ctrl-c" in ret.stderr
assert "Exception ignored in" not in ret.stderr
assert "This job's jid is" in ret.stderr


def test_minion_65400(salt_cli, salt_minion, salt_minion_2, salt_master):
"""
Ensure correct exit status when salt CLI starts correctly.

"""
state = f"""
custom_test_state:
test.configurable_test_state:
- name: example
- changes: True
- result: False
- comment: 65400 regression test
"""
with salt_master.state_tree.base.temp_file("test_65400.sls", state):
ret = salt_cli.run("state.sls", "test_65400", minion_tgt="*")
assert isinstance(ret.data, dict)
assert len(ret.data.keys()) == 2
for minion_id in ret.data:
assert ret.data[minion_id] != "Error: test.configurable_test_state"
assert isinstance(ret.data[minion_id], dict)