Skip to content
This repository has been archived by the owner on May 14, 2024. It is now read-only.

Generate test specification from a reference program's runs #13

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 111 additions & 1 deletion calico/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@

import logging
import os
import pty
import sys
import textwrap
import termios
import tty
from collections import OrderedDict
from enum import Enum
from itertools import count
from json import dumps
from re import escape

import pexpect

Expand Down Expand Up @@ -64,10 +71,18 @@ def __init__(self, type_, data, timeout=-1):

def __iter__(self):
"""Get components of this action as a sequence."""
yield self.type_.value[0]
yield self.type_.value[1]
yield self.data if self.data != pexpect.EOF else "_EOF_"
yield self.timeout

def __str__(self):
"""Get an str which produces this action when parsed."""
action_type, action_data, timeout = self
timeout = "" if timeout == -1 else f"# timeout: {timeout}"
if action_data != "_EOF_":
action_data = dumps(action_data)
Copy link
Contributor Author

@sahinakkaya sahinakkaya Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried ruamel.yaml.dump but it's not doing what I want while dumping strings:

>>> yaml.dump("6")
"'6'\n"
>>> yaml.dump("str")
'str\n...\n'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer a problem. I'm able to generate the yaml file with ruamel.yaml. I can work on it if this is OK to merge.

return f"- {action_type}: {action_data} {timeout}\n"


def run_script(command, script, defs=None, g_timeout=None):
"""Run a command and check whether it follows a script.
Expand Down Expand Up @@ -192,6 +207,22 @@ def add_action(self, action):
"""
self.script.append(action)

def __str__(self):
"""Get an str which produces this test case when parsed."""
script = "\n"
for action in self.script:
script += str(action)
script = textwrap.indent(script, " " * 16)

spec = f"""
- {self.name}:
run: {self.command}
script: {script}
return: {self.exits}
points: {self.points}
"""
return textwrap.dedent(spec)

def run(self, defs=None, jailed=False, g_timeout=None):
"""Run this test and produce a report.

Expand Down Expand Up @@ -255,6 +286,10 @@ def add_case(self, case):
super().__setitem__(case.name, case)
self.points += case.points if case.points is not None else 0

def __str__(self):
"""Get an str which produces this test suite when parsed."""
return "\n".join(str(test_case) for test_case in self.values())

def run(self, tests=None, quiet=False, g_timeout=None):
"""Run this test suite.

Expand Down Expand Up @@ -299,3 +334,78 @@ def run(self, tests=None, quiet=False, g_timeout=None):

report["points"] = earned_points
return report


class Clioc:
"""A class that is able to generate test specifications from a reference program's runs."""

def __init__(self, argv):
"""Initialize this test specification generator.

:sig: (List[str]) -> None
:param argv: List of command line arguments to run the reference program.
"""
self.argv = argv
""""""
self.__current_test_case = None

def generate_test_spec(self):
"""
Run the reference program and return the generated test specification.

:sig: () -> str
:return: The specification that is generated from reference program's run.
"""
test_suite = self.__create_test_suite()
return str(test_suite)

def __create_test_suite(self):
calico = Calico()
case_number = count(1)
while True:
os.system("clear")
calico.add_case(self.__create_test_case(next(case_number)))
if input("Do you want to continue? [Y/n] ").lower() not in ("y", ""):
print("Abort.")
break
return calico

def __create_test_case(self, case_num):
case_name = f"case_{case_num}"
print(f"Running for {case_name}...")
self.__current_test_case = TestCase(case_name, " ".join(self.argv), None)
exit_code = self.__spawn()
self.__current_test_case.add_action(Action(ActionType.EXPECT, "_EOF_"))
print(f"{case_name} ended")
points = int(input("Assign points for this run: "))
self.__current_test_case.points = points
self.__current_test_case.exits = exit_code
return self.__current_test_case

def __spawn(self):
# see https://github.com/itublg/calico/pull/13#discussion_r464052578
pid, master_fd = pty.fork()
if pid == pty.CHILD:
os.execlp(self.argv[0], *self.argv)

mode = tty.tcgetattr(master_fd)
mode[3] &= ~termios.ECHO # 3 corresponds to lflag, disable echoing
tty.tcsetattr(master_fd, termios.TCSANOW, mode)
try:
pty._copy(master_fd, self.__read_write_handler, self.__read_write_handler)
except OSError:
pass
os.close(master_fd)
return os.waitpid(pid, 0)[1] >> 8

def __read_write_handler(self, fd):
data = os.read(fd, 1024).decode("utf8")
if fd == 0: # read from stdin
action = Action(ActionType.SEND, data[:-1]) # omit the end line character
self.__current_test_case.add_action(action)
else: # write to stdout
for line in data.splitlines(keepends=True):
line = escape(line).replace("\\ ", " ") # escape metacharacters, reformat string
action = Action(ActionType.EXPECT, line)
self.__current_test_case.add_action(action)
return data.encode("utf8")
4 changes: 4 additions & 0 deletions calico/base.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,7 @@ class Calico(OrderedDict):
quiet: Optional[List[str]] = ...,
g_timeout: Optional[int] = ...,
) -> Mapping[str, Any]: ...

class Clioc:
def __init__(self, argv: List[str]) -> None: ...
def generate_test_spec(self) -> str: ...
12 changes: 9 additions & 3 deletions calico/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import os
import sys
from argparse import ArgumentParser

from calico import __version__
from calico.parse import parse_spec

from calico.base import Clioc

_logger = logging.getLogger("calico")

Expand All @@ -40,7 +39,9 @@ def make_parser(prog):
"""
parser = ArgumentParser(prog=prog)
parser.add_argument("--version", action="version", version="%(prog)s " + __version__)

parser.add_argument(
"-g", "--generate-test-file", nargs="+",
help="generate a test file from run(s)")
parser.add_argument("spec", help="test specifications file")
parser.add_argument("-d", "--directory", help="change to directory before doing anything")
parser.add_argument(
Expand Down Expand Up @@ -91,6 +92,11 @@ def main(argv=None):
arguments = parser.parse_args(argv[1:])
try:
spec_filename = os.path.abspath(arguments.spec)
if arguments.generate_test_file:
test_suite_generator = Clioc(arguments.generate_test_file)
with open(spec_filename, "w") as f:
f.write(test_suite_generator.generate_test_spec())
print("A test file named '%s' has been generated." % arguments.spec)
with open(spec_filename) as f:
content = f.read()

Expand Down
20 changes: 10 additions & 10 deletions tests/test_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def test_case_with_no_script_should_expect_eof():
run: echo 1
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "_EOF_", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "_EOF_", -1)]


def test_case_run_with_timeout_should_generate_expect_eof_with_timeout():
Expand All @@ -296,7 +296,7 @@ def test_case_run_with_timeout_should_generate_expect_eof_with_timeout():
run: echo 1 # timeout: 5
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "_EOF_", 5)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "_EOF_", 5)]


def test_case_run_with_non_numeric_timeout_value_should_raise_error():
Expand Down Expand Up @@ -329,7 +329,7 @@ def test_case_script_with_string_action_data_should_be_ok():
- expect: "1"
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "1", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "1", -1)]


def test_case_script_with_numeric_action_data_should_raise_error():
Expand All @@ -352,7 +352,7 @@ def test_case_script_with_action_data_eof_should_be_ok():
- expect: _EOF_
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "_EOF_", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "_EOF_", -1)]


def test_case_script_with_multiple_action_data_should_raise_error():
Expand All @@ -377,7 +377,7 @@ def test_case_script_with_expect_shortcut_should_be_ok():
- e: "1"
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "1", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "1", -1)]


def test_case_script_with_send_shortcut_should_be_ok():
Expand All @@ -388,7 +388,7 @@ def test_case_script_with_send_shortcut_should_be_ok():
- s: "1"
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("s", "1", -1)]
assert [tuple(s) for s in runner["c1"].script] == [("send", "1", -1)]


def test_case_script_order_should_be_preserved():
Expand All @@ -402,9 +402,9 @@ def test_case_script_order_should_be_preserved():
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [
("e", "foo", -1),
("s", "1", -1),
("e", "_EOF_", -1),
("expect", "foo", -1),
("send", "1", -1),
("expect", "_EOF_", -1),
]


Expand All @@ -416,7 +416,7 @@ def test_case_script_action_with_integer_timeout_value_should_be_ok():
- expect: "foo" # timeout: 5
"""
runner = parse_spec(source)
assert [tuple(s) for s in runner["c1"].script] == [("e", "foo", 5)]
assert [tuple(s) for s in runner["c1"].script] == [("expect", "foo", 5)]


def test_case_script_action_with_fractional_timeout_value_should_raise_error():
Expand Down