Skip to content

Commit

Permalink
run_tests: use vnc expectScreen for screen compares
Browse files Browse the repository at this point in the history
vncdotool expectScreen allows us to wait until screen matches,
with a timeout.  Unfortunately it has some bug around capture
so it was a massive pain to get working.

Future work can simplify some stuff around timeouts and retries now.
  • Loading branch information
reticulatedpines committed Feb 25, 2023
1 parent b135593 commit 43bd315
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 19 deletions.
18 changes: 12 additions & 6 deletions magiclantern/ml_qemu/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def __init__(self, build_dir, rom_dir, source_dir,
# FIXME make this a class property, can't remember syntax right now
self.screen_cap_prefix = "test_"
self.screen_cap_counter = 0
self.screen_cap_name = None
if monitor_socket_path:
self.monitor_socket_path = monitor_socket_path
else:
Expand Down Expand Up @@ -198,6 +199,8 @@ def __enter__(self):
"stdin":subprocess.PIPE}
if self.verbose:
# don't redirect stdout stderr, just spam console
self.stdout = None
self.stderr = None
pass
else:
# capture stdout and stderr, this makes it quiet,
Expand All @@ -222,6 +225,8 @@ def __enter__(self):
if self.vnc_display:
try:
self.vnc_client = vncdotool.api.connect(self.vnc_display)
self.vnc_client.timeout = 6 # for use with e.g. expectScreen(),
# throws TimeoutError
except Exception as e:
self._cleanup()
raise(e)
Expand Down Expand Up @@ -262,6 +267,7 @@ def _cleanup(self):
self.stderr.close()
self.stderr = None
if self.vnc_client:
self.vnc_client.timeout = None
self.vnc_client.disconnect()
try:
os.remove(self.monitor_socket_path)
Expand Down Expand Up @@ -318,15 +324,15 @@ def capture_screen(self, delay):
sleep(delay)
n = self.screen_cap_counter
self.screen_cap_counter += 1
capture_name = self.screen_cap_prefix + str(n).zfill(2) + ".png"
self.screen_cap_name = self.screen_cap_prefix + str(n).zfill(2) + ".png"

if self.unreliable_screencaps:
# take screencaps until two match, or we hit the max
max_attempts = 5
screencap_hashes = []
match_found = False
for i in range(max_attempts):
name = str(i) + capture_name
name = str(i) + self.screen_cap_name
self.vnc_client.captureScreen(name)
sleep(0.2) # too fast and we can capture the same glitchy screen and false match
with open(name, "rb") as f:
Expand All @@ -340,15 +346,15 @@ def capture_screen(self, delay):
# The last capture will either be a match,
# or there were no matches. Make it have the "real" name,
# comparison occurs against this file.
shutil.copy(name, capture_name)
shutil.copy(name, self.screen_cap_name)

# delete temp files if we have a match,
# otherwise keep for inspection
if match_found:
for j in range(i + 1):
os.remove(str(j) + capture_name)
os.remove(str(j) + self.screen_cap_name)
else:
self.vnc_client.captureScreen(capture_name)
self.vnc_client.captureScreen(self.screen_cap_name)
sleep(0.1)
return capture_name
return self.screen_cap_name

45 changes: 33 additions & 12 deletions magiclantern/ml_tests/menu_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def run(self, lock):

key_sequence = self.qemu_key_sequences[self.cam.code_rom_md5]


# invoke qemu and control it to run the test
with QemuRunner(self.qemu_dir, self.cam.rom_dir, self.cam.source_dir,
self.cam.model,
Expand All @@ -101,6 +102,23 @@ def run(self, lock):
vnc_display=self.vnc_display,
verbose=self.verbose) as self.qemu_runner:
q = self.qemu_runner

# Let's try some filthy hacking. For unknown reasons,
# framebufferUpdateRequest(incremental=1), called internally as
# part of expectScreen(), causes the screen compare to always fail.
# That param is not exposed as part of expectScreen().
# Monkey patch the function with a wrapper that forces incremental=0.
#
# Getting to the actual function is its own special joy, it
# is quite indirect.
def _fbReplacer(obj, x=0, y=0, width=None, height=None, incremental=0):
# the following will get called as a method, therefore passing self implicitly
obj._framebufferUpdateRequest(x=x, y=y, width=width, height=height, incremental=0)

parent = q.vnc_client.factory.protocol
parent._framebufferUpdateRequest = parent.framebufferUpdateRequest
parent.framebufferUpdateRequest = _fbReplacer

q.screen_cap_prefix = "menu_test_"
for k in key_sequence:
delay = 0.3
Expand All @@ -110,28 +128,31 @@ def run(self, lock):
# menu transitions are much slower than others
delay = 5
k = k.split()[-1]
capture_filename = q.key_press(k, delay=delay)
capture_filepath = os.path.join(self.output_dir, capture_filename)
expected_hash = 0
with open(capture_filepath, "rb") as f:
test_hash = hashlib.md5(f.read()).hexdigest()
q.key_press(k, delay=delay)
expected_output_path = os.path.join(self.expected_output_dir,
q.screen_cap_name)
try:
expected_output_path = os.path.join(self.expected_output_dir,
capture_filename)
with open(expected_output_path, "rb") as f:
expected_hash = hashlib.md5(f.read()).hexdigest()
q.vnc_client.expectScreen(expected_output_path, maxrms=0.0)
except FileNotFoundError:
if self.force_continue:
pass
else:
return self.return_failure("Missing expected output file: %s"
% expected_output_path)
if test_hash != expected_hash:
except TimeoutError:
# vncdotool api object can throw this if its timeout property is set,
# we do this in QemuRunner.
#
# This means we never saw the right screen, the best we can do to help
# debug is save the last known content.
fail_name = "fail_" + q.screen_cap_name
q.vnc_client.screen.save(fail_name)
if self.force_continue:
pass
else:
return self.return_failure("Mismatched hash for file '%s', expected %s, got %s"
% (capture_filename, expected_hash, test_hash))
return self.return_failure("Qemu screen never matched against "
"expected result file '%s'\n, check '%s'"
% (expected_output_path, fail_name))

# attempt clean shutdown via Qemu monitor socket
q.shutdown()
Expand Down
7 changes: 6 additions & 1 deletion magiclantern/ml_tests/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ def test_worker(tests, q, lock, verbose=False):
lock)

with test as t:
t.run(lock)
try:
t.run(lock)
except TimeoutError:
locking_print("FAIL: timeout during test, %s, %s"
% (test.cam.model, test.__class__.__name__),
lock)
# update the shared list so it has the result from the run
tests[i] = test

Expand Down

0 comments on commit 43bd315

Please sign in to comment.