Skip to content
This repository has been archived by the owner on May 14, 2024. It is now read-only.

Generate test specification from a reference program's runs #13

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 115 additions & 1 deletion calico/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@

import logging
import os
import pty
import sys
import textwrap
import termios
import tty
from collections import OrderedDict
from enum import Enum
from itertools import count
from json import dumps
from re import escape

import pexpect

Expand Down Expand Up @@ -64,10 +71,18 @@ def __init__(self, type_, data, timeout=-1):

def __iter__(self):
"""Get components of this action as a sequence."""
yield self.type_.value[0]
yield self.type_.value[1]
yield self.data if self.data != pexpect.EOF else "_EOF_"
yield self.timeout

def __str__(self):
"""Get an str which produces this action when parsed."""
action_type, action_data, timeout = self
timeout = "" if timeout == -1 else f"# timeout: {timeout}"
if action_data != "_EOF_":
action_data = dumps(action_data)
Copy link
Contributor Author

@sahinakkaya sahinakkaya Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried ruamel.yaml.dump but it's not doing what I want while dumping strings:

>>> yaml.dump("6")
"'6'\n"
>>> yaml.dump("str")
'str\n...\n'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer a problem. I'm able to generate the yaml file with ruamel.yaml. I can work on it if this is OK to merge.

return f"- {action_type}: {action_data} {timeout}\n"


def run_script(command, script, defs=None, g_timeout=None):
"""Run a command and check whether it follows a script.
Expand Down Expand Up @@ -192,6 +207,22 @@ def add_action(self, action):
"""
self.script.append(action)

def __str__(self):
"""Get an str which produces this test case when parsed."""
script = "\n"
for action in self.script:
script += str(action)
script = textwrap.indent(script, " " * 16)

spec = f"""
- {self.name}:
run: {self.command}
script: {script}
return: {self.exits}
points: {self.points}
"""
return textwrap.dedent(spec)

def run(self, defs=None, jailed=False, g_timeout=None):
"""Run this test and produce a report.

Expand Down Expand Up @@ -255,6 +286,10 @@ def add_case(self, case):
super().__setitem__(case.name, case)
self.points += case.points if case.points is not None else 0

def __str__(self):
"""Get an str which produces this test suite when parsed."""
return "\n".join(str(test_case) for test_case in self.values())

def run(self, tests=None, quiet=False, g_timeout=None):
"""Run this test suite.

Expand Down Expand Up @@ -299,3 +334,82 @@ def run(self, tests=None, quiet=False, g_timeout=None):

report["points"] = earned_points
return report


class Clioc:
"""A class that is able to generate test specifications from a reference program's runs."""

def __init__(self, argv):
"""Initialize this test specification generator.

:sig: (List[str]) -> None
:param argv: List of command line arguments to run the reference program.
"""
self.argv = argv
""""""
self.__current_test_case = None

def generate_test_spec(self):
"""
Run the reference program and return the generated test specification.

:sig: () -> str
:return: The specification that is generated from reference program's run.
"""
test_suite = self.__create_test_suite()
return str(test_suite)

def __create_test_suite(self):
calico = Calico()
case_number = count(1)
while True:
os.system("clear")
calico.add_case(self.__create_test_case(next(case_number)))
if input("Do you want to continue? [Y/n] ").lower() not in ("y", ""):
print("Abort.")
break
return calico

def __create_test_case(self, case_num):
case_name = f"case_{case_num}"
print(f"Running for {case_name}...")
self.__current_test_case = TestCase(case_name, " ".join(self.argv), None)
exit_code = self.__spawn()
self.__current_test_case.add_action(Action(ActionType.EXPECT, "_EOF_"))
print(f"{case_name} ended")
points = int(input("Assign points for this run: "))
self.__current_test_case.points = points
self.__current_test_case.exits = exit_code
return self.__current_test_case

def __spawn(self):
pid, master_fd = pty.fork()
if pid == pty.CHILD:
os.execlp(self.argv[0], *self.argv)
try:
mode = tty.tcgetattr(pty.STDIN_FILENO)
tty.setraw(master_fd, termios.TCSANOW)
except tty.error:
restore = False
else:
restore = True

try:
pty._copy(master_fd, self.__read_write_handler, self.__read_write_handler)
except OSError:
if restore:
tty.tcsetattr(pty.STDIN_FILENO, tty.TCSAFLUSH, mode)

os.close(master_fd)
return os.waitpid(pid, 0)[1] >> 8
Copy link
Contributor Author

@sahinakkaya sahinakkaya Aug 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is almost exact copy of pty.spawn with a difference on one line: tty.setraw(master_fd, termios.TCSANOW). It puts the newly opened pseudo-terminal into raw mode and I had no idea why this solves the problem I mentioned here. tty.setraw sets some flags for the given file descriptor and only few of them are related with our problem. These are the lines that I found necessary for us after a bit of debugging:

mode[OFLAG] = mode[OFLAG] & ~(OPOST)
mode[LFLAG] = mode[LFLAG] & ~(ECHO | ICANON | IEXTEN | ISIG) # and only `~ECHO` is enough here actually

OPOST: Enable implementation-defined output processing.

This, for example when enabled puts a carriage return, \r, before newline character\n. Since it's negated in the above code, it treats newline as a newline and I had to add it manually afterwards (413th line in this file). So I think we don't need this line. (OPOST will be set, we don't need to add \r manually)

ECHO: Echo input characters.

This will make the pseudo-terminal device to echo it's stdin when enabled. We already capture it through our stdin, so we should disable echoing of the same characters. This line is needed.

The other flags, (ICANON, IEXTEN, ISIG), have no effect on the result as far as I have observed.

I also realized that try ... except ... thing for restoring has no effect because we don't change anything for the file descriptor, pty.STDIN_FILENO. We can also delete these lines.

The purpose of this review was to mention pty.spawn to prove the correctness of my function with a suggestion to delete try ... except ... statement but it ended up explaining all the shady parts. I'll update the code with the changes. Hopefully, it will become much simpler.


def __read_write_handler(self, fd):
data = os.read(fd, 1024).decode("utf8")
if fd == 0: # read from stdin
action = Action(ActionType.SEND, data[:-1]) # omit the end line character
self.__current_test_case.add_action(action)
else: # write to stdout
for line in data.splitlines(keepends=True):
action = Action(ActionType.EXPECT, escape(line).replace("\n", "\r\n")) # escape metacharacters
self.__current_test_case.add_action(action)
return data.encode("utf8")
4 changes: 4 additions & 0 deletions calico/base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,7 @@ class Calico(OrderedDict):
quiet: Optional[List[str]] = ...,
g_timeout: Optional[int] = ...,
) -> Mapping[str, Any]: ...

class Clioc:
def __init__(self, argv: List[str]) -> None: ...
def generate_test_spec(self) -> str: ...
12 changes: 9 additions & 3 deletions calico/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import os
import sys
from argparse import ArgumentParser

from calico import __version__
from calico.parse import parse_spec

from calico.base import Clioc

_logger = logging.getLogger("calico")

Expand All @@ -40,7 +39,9 @@ def make_parser(prog):
"""
parser = ArgumentParser(prog=prog)
parser.add_argument("--version", action="version", version="%(prog)s " + __version__)

parser.add_argument(
"-g", "--generate-test-file", nargs="+",
help="generate a test file from run(s)")
parser.add_argument("spec", help="test specifications file")
parser.add_argument("-d", "--directory", help="change to directory before doing anything")
parser.add_argument(
Expand Down Expand Up @@ -91,6 +92,11 @@ def main(argv=None):
arguments = parser.parse_args(argv[1:])
try:
spec_filename = os.path.abspath(arguments.spec)
if arguments.generate_test_file:
test_suite_generator = Clioc(arguments.generate_test_file)
with open(spec_filename, "w") as f:
f.write(test_suite_generator.generate_test_spec())
print("A test file named '%s' has been generated." % arguments.spec)
with open(spec_filename) as f:
content = f.read()

Expand Down
20 changes: 10 additions & 10 deletions tests/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def test_case_with_no_script_should_expect_eof():
run: echo 1
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "_EOF_", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "_EOF_", -1)]


def test_case_run_with_timeout_should_generate_expect_eof_with_timeout():
Expand All @@ -296,7 +296,7 @@ def test_case_run_with_timeout_should_generate_expect_eof_with_timeout():
run: echo 1 # timeout: 5
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "_EOF_", 5)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "_EOF_", 5)]


def test_case_run_with_non_numeric_timeout_value_should_raise_error():
Expand Down Expand Up @@ -329,7 +329,7 @@ def test_case_script_with_string_action_data_should_be_ok():
- expect: "1"
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "1", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "1", -1)]


def test_case_script_with_numeric_action_data_should_raise_error():
Expand All @@ -352,7 +352,7 @@ def test_case_script_with_action_data_eof_should_be_ok():
- expect: _EOF_
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "_EOF_", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "_EOF_", -1)]


def test_case_script_with_multiple_action_data_should_raise_error():
Expand All @@ -377,7 +377,7 @@ def test_case_script_with_expect_shortcut_should_be_ok():
- e: "1"
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "1", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "1", -1)]


def test_case_script_with_send_shortcut_should_be_ok():
Expand All @@ -388,7 +388,7 @@ def test_case_script_with_send_shortcut_should_be_ok():
- s: "1"
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("s", "1", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("send", "1", -1)]


def test_case_script_order_should_be_preserved():
Expand All @@ -402,9 +402,9 @@ def test_case_script_order_should_be_preserved():
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [
("e", "foo", -1),
("s", "1", -1),
("e", "_EOF_", -1),
("expect", "foo", -1),
("send", "1", -1),
("expect", "_EOF_", -1),
]


Expand All @@ -416,7 +416,7 @@ def test_case_script_action_with_integer_timeout_value_should_be_ok():
- expect: "foo" # timeout: 5
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "foo", 5)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "foo", 5)]


def test_case_script_action_with_fractional_timeout_value_should_raise_error():
Expand Down