Skip to content

Commit

Permalink
Regression test for #56273
Browse files Browse the repository at this point in the history
  • Loading branch information
dwoz authored and Ch3LL committed Jun 8, 2021
1 parent 5a0dba1 commit 5e190cf
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 6 deletions.
5 changes: 3 additions & 2 deletions salt/cli/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ def __init__(self, opts, eauth=None, quiet=False, parser=None):
self.pub_kwargs = eauth if eauth else {}
self.quiet = quiet
self.local = salt.client.get_local_client(opts["conf_file"])
self.minions, self.ping_gen, self.down_minions = self.__gather_minions()
# self.minions, self.ping_gen, self.down_minions = self.__gather_minions()
self.options = parser

def __gather_minions(self):
def gather_minions(self):
"""
Return a list of minions to use for the batch run
"""
Expand Down Expand Up @@ -106,6 +106,7 @@ def run(self):
"""
Execute the batch run
"""
self.minions, self.ping_gen, self.down_minions = self.gather_minions()
args = [
[],
self.opts["fun"],
Expand Down
8 changes: 4 additions & 4 deletions salt/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,9 @@ def get_returns_no_block(self, tag, match_type=None):
)
yield raw

def returns_for_job(self, jid):
return self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)

def get_iter_returns(
self,
jid,
Expand Down Expand Up @@ -1088,10 +1091,7 @@ def get_iter_returns(
missing = set()
# Check to see if the jid is real, if not return the empty dict
try:
if (
self.returners["{}.get_load".format(self.opts["master_job_cache"])](jid)
== {}
):
if self.returns_for_job(jid) == {}:
log.warning("jid does not exist")
yield {}
# stop the iteration, since the jid is invalid
Expand Down
182 changes: 182 additions & 0 deletions tests/pytests/functional/cli/test_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
"""
tests.pytests.functional.cli.test_batch
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
import pytest
import salt.cli.batch
import salt.config
import salt.utils.jid
from mock import Mock, patch


class PubRetSide:
"""
Mock salt.client.LocalClient.pub method
"""

calls = 0
initial_ping = False
batch1_jid = None
batch1_tgt = None
batch2_jid = None
batch2_tgt = None
batch3_jid = None
batch3_tgt = None

def __call__(self, tgt, fun, *args, **kwargs):
if tgt == "minion*" and fun == "test.ping":
PubRetSide.calls += 1
PubRetSide.initial_ping = salt.utils.jid.gen_jid({})
pub_ret = {
"jid": PubRetSide.initial_ping,
"minions": ["minion0", "minion1", "minion2", "minion3"],
}
elif fun == "state.sls":
if PubRetSide.calls == 1:
PubRetSide.calls += 1
PubRetSide.batch1_tgt = list(tgt)
PubRetSide.batch1_jid = jid = salt.utils.jid.gen_jid({})
pub_ret = {"jid": jid, "minions": tgt}
elif PubRetSide.calls == 2:
PubRetSide.calls += 1
PubRetSide.batch2_tgt = tgt
PubRetSide.batch2_jid = jid = salt.utils.jid.gen_jid({})
pub_ret = {"jid": jid, "minions": tgt}
elif PubRetSide.calls == 3:
PubRetSide.calls += 1
PubRetSide.batch3_tgt = tgt
PubRetSide.batch3_jid = jid = salt.utils.jid.gen_jid({})
pub_ret = {"jid": jid, "minions": tgt}
elif fun == "saltutil.find_job":
jid = salt.utils.jid.gen_jid({})
pub_ret = {"jid": jid, "minions": tgt}
return pub_ret


class MockSubscriber:
"""
Mock salt.transport.ipc IPCMessageSubscriber in order to inject events into
salt.utils.Event
"""

calls = 0
pubret = None

def __init__(self, *args, **kwargs):
return

def read(self, timeout=None):
"""
Mock IPCMessageSubcriber read method.
- Return events for initial ping
- Returns event for a minion in first batch to cause second batch to get sent.
- Returns 5 null events on first iteration of second batch to go back to first batch.
- On second iteration of first batch, send an event from second batch which will get cached.
- Return events for the rest of the batches.
"""
if MockSubscriber.pubret.initial_ping:
# Send ping responses for 4 minions
jid = MockSubscriber.pubret.initial_ping
if MockSubscriber.calls == 0:
MockSubscriber.calls += 1
return self._ret(jid, minion_id="minion0", fun="test.ping")
elif MockSubscriber.calls == 1:
MockSubscriber.calls += 1
return self._ret(jid, minion_id="minion1", fun="test.ping")
elif MockSubscriber.calls == 2:
MockSubscriber.calls += 1
return self._ret(jid, minion_id="minion2", fun="test.ping")
elif MockSubscriber.calls == 3:
MockSubscriber.calls += 1
return self._ret(jid, minion_id="minion3", fun="test.ping")
if MockSubscriber.pubret.batch1_jid:
jid = MockSubscriber.pubret.batch1_jid
tgt = MockSubscriber.pubret.batch1_tgt
if MockSubscriber.calls == 4:
# Send a return for first minion in first batch. This causes the
# second batch to get sent.
MockSubscriber.calls += 1
return self._ret(jid, minion_id=tgt[0], fun="state.sls")
if MockSubscriber.pubret.batch2_jid:
if MockSubscriber.calls <= 10:
# Skip the first iteration of the second batch; this will cause
# batch logic to go back to iterating over the first batch.
MockSubscriber.calls += 1
return
elif MockSubscriber.calls == 11:
# Send the minion from the second batch, This event will get cached.
jid = MockSubscriber.pubret.batch2_jid
tgt = MockSubscriber.pubret.batch2_tgt
MockSubscriber.calls += 1
return self._ret(jid, minion_id=tgt[0], fun="state.sls")
if MockSubscriber.calls == 12:
jid = MockSubscriber.pubret.batch1_jid
tgt = MockSubscriber.pubret.batch1_tgt
MockSubscriber.calls += 1
return self._ret(jid, minion_id=tgt[1], fun="state.sls")
if MockSubscriber.pubret.batch3_jid:
jid = MockSubscriber.pubret.batch3_jid
tgt = MockSubscriber.pubret.batch3_tgt
if MockSubscriber.calls == 13:
MockSubscriber.calls += 1
return self._ret(jid, minion_id=tgt[0], fun="state.sls")
return

def _ret(self, jid, minion_id, fun, _return=True, _retcode=0):
"""
Create a mock return from a jid, minion, and fun
"""
serial = salt.payload.Serial({"serial": "msgpack"})
dumped = serial.dumps(
{
"fun_args": [],
"jid": jid,
"return": _return,
"retcode": 0,
"success": True,
"cmd": "_return",
"fun": fun,
"id": minion_id,
"_stamp": "2021-05-24T01:23:25.373194",
},
use_bin_type=True,
)
tag = "salt/job/{}/ret".format(jid).encode()
return b"".join([tag, b"\n\n", dumped])

def connect(self, timeout=None):
pass


def test_batch():

pub_side_effect = PubRetSide()
MockSubscriber.pubret = pub_side_effect

def returns_for_job(jid):
return True

opts = {
"conf_file": "",
"tgt": "minion*",
"fun": "state.sls",
"arg": ["foo"],
"timeout": 1,
"gather_job_timeout": 1,
"batch": 2,
"extension_modules": "",
"failhard": True,
}
with patch("salt.transport.ipc.IPCMessageSubscriber", MockSubscriber):
batch = salt.cli.batch.Batch(opts, quiet=True)
with patch.object(batch.local, "pub", Mock(side_effect=pub_side_effect)):
with patch.object(
batch.local, "returns_for_job", Mock(side_effect=returns_for_job)
):
ret = list(batch.run())
assert len(ret) == 4
for val in ret:
values = list(val.values())
assert len(values) == 1
assert values[0] is True

0 comments on commit 5e190cf

Please sign in to comment.