diff --git a/.gitignore b/.gitignore index 35641fad..02368776 100644 --- a/.gitignore +++ b/.gitignore @@ -68,4 +68,10 @@ instance/ # Virtualenv venv/ +venv36/ +venv37/ +venv38/ .venv/ + +# Datasets +datasets/ \ No newline at end of file diff --git a/README.md b/README.md index 325478cc..e2647b67 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ - [Status](#status) - [Overview](#overview) - [Installation](#installation) + - [Verify Local Environment](#verify-local-environment) - [Windows](#windows) - [macOS](#macos) - [About pip](#about-pip) @@ -20,6 +21,8 @@ This project is design to be completed on [Pluralsight](https://pluralsight.com) ## Installation +### Verify Local Environment + ### Windows Open a command prompt or powershell and run the following commands, replacing 'project-root' with the path to the root folder of the project. @@ -37,25 +40,29 @@ Open a terminal and run the following commands, replacing 'project-root' with th ```bash > cd 'project-root' -$ python3 -m venv venv -$ source venv/bin/activate -$ pip install -r requirements.txt +> python3 -m venv venv +> source venv/bin/activate +> pip install -r requirements.txt ``` *Note: If you've installed Python 3 using a method other than Homebrew, you might need to type `python` in the second command instead of `python3`.* ### About pip -Versions pip updates frequently, but versions greater than 10.x.x should work with this project. +`pip` updates frequently, but versions greater than 10.x.x should work with this project. ## Verify Setup In order to verify that everything is setup correctly, run the following command from the project root. ```bash -> pytest +pytest ``` -You should see that all the tests are failing. This is good! We’ll be fixing these tests once we jump into the build step. Every time you want to check your work locally you can type that command, and it will report the status of every task in the project. +You should see that all the tests are failing. This is good! We’ll be fixing these tests once we jump into the build step. + +Every time you want to check your work locally you can type that command, and it will report the status of every task in the project. ## Previewing Your Work + +You can preview your work by opening a terminal, changing to the project root, activating the virtual environment, and executing the appropriate python script. For example `python sensor/sensor.py`. diff --git a/conftest.py b/conftest.py new file mode 100644 index 00000000..130f60be --- /dev/null +++ b/conftest.py @@ -0,0 +1,182 @@ +import ast +import json + +import parso +import pytest + +from collections import OrderedDict +from types import GeneratorType as generator +from itertools import chain +from pathlib import Path + +from objectpath import Tree +from mongoquery import Query + +from tests.nodes import convert_node, flatten +from tests.template import Template + + +class Parser: + def __init__(self, file_name, nodes): + + sensor = Path.cwd() / "sensor" + # ext = sensor / "extensions" + + self.data = { + "success": True, + "full_path": "", + "message": "", + "start_pos": 0, + "nodes": nodes, + } + + if file_name is not None: + path = lambda root, fn: root / "{}.py".format(fn) + # if file_name == "menu" or file_name == "stats": + # full_path = path(ext, file_name) + if file_name == "sensor": + full_path = path(Path.cwd(), file_name) + else: + full_path = path(sensor, file_name) + + grammar = parso.load_grammar() + module = grammar.parse(path=full_path) + self.data["success"] = len(grammar.iter_errors(module)) == 0 + + if self.data["success"]: + self.data["message"] = "Syntax: valid" + if file_name is not None: + self.data["nodes"] = convert_node(ast.parse(full_path.read_text())) + self.data["code"] = full_path.read_text() + + else: + self.data["message"] = grammar.iter_errors(module)[0].message + self.data["start_pos"] = grammar.iter_errors(module)[0].start_pos[0] + + @property + def nodes(self): + return self.data["nodes"] + + n = nodes + + @property + def success(self): + return self.data["success"] + + @property + def code(self): + return self.data["code"] + + def query(self, pattern): + nodes = Template(pattern).process(self.code) + if isinstance(nodes, list) and len(nodes) == 1: + nodes = nodes[0] + + return Parser(None, nodes) + + def query_raw(self, pattern): + nodes = Template(pattern).process(self.code, True) + if isinstance(nodes, list) and len(nodes) == 1: + nodes = [flatten(convert_node(node)) for node in nodes[0].body] + return Parser(None, nodes) + + def last_line(self): + return flatten(self.nodes["body"][-1]) + + @property + def message(self): + return "{} on or around line {} in `{}`.".format( + self.data["message"], self.data["start_pos"], self.data["full_path"] + ) + + def match(self, template): + return Parser(None, list(filter(Query(template).match, self.nodes))) + + def execute(self, expr): + result = Tree(self.nodes).execute(expr) + if isinstance(result, (generator, chain, map)): + process = list(result) + return ( + Parser(None, process[0]) if len(process) == 1 else Parser(None, process) + ) + else: + return Parser(None, result) + + ex = execute + + def exists(self): + return bool(self.nodes) + + def calls(self): + nodes = self.execute("$.body[@.type is 'Expr' and @.value.type is 'Call']").n + node_list = [nodes] if isinstance(nodes, dict) else nodes + + return Parser(None, [flatten(node) for node in node_list]) + + def assign_(self): + return Parser(None, [flatten(self.execute("$.body[@.type is 'Assign']").n)]) + + def assigns(self): + return Parser( + None, + [flatten(node) for node in self.execute("$.body[@.type is 'Assign']").n], + ) + + def globals(self, name): + return name in self.execute("$.body[@.type is 'Global'].names").n + + def defines(self, name): + return self.execute( + "$.body[@.type is 'FunctionDef' and @.name is '{}'].(name, args, body, decorator_list)".format( + name + ) + ) + + def class_(self, name): + return self.execute( + "$.body[@.type is 'ClassDef' and @.name is '{}'].(name, args, body)".format( + name + ) + ) + + def decorators(self): + return Parser(None, [flatten(self.execute("$.decorator_list").n)]) + + def returns(self, name): + return name == self.execute("$.body[@.type is 'Return'].value.id").n + + def returns_call(self): + return Parser(None, [flatten(self.execute("$.body[@.type is 'Return']").n)]) + + def method(self, name): + return self.execute( + "$..body[@.type is 'FunctionDef' and @.name is '{}']".format(name) + ) + + def has_arg(self, name, pos=0): + nodes = self.execute("$.args.args.arg").n + return nodes[pos] if isinstance(nodes, list) else nodes + + def imports(self, name): + return name in self.execute("$.body[@.type is 'Import'].names..name").n + + def for_(self): + for_body = self.execute("$.body[@.type is 'For'].body").n + iterators = self.execute("$.body[@.type is 'For'].(target, iter)").n + return Parser(None, [flatten(for_body), flatten(iterators)]) + + def from_imports(self, mod, alias): + nodes = self.execute( + "$.body[@.type is 'ImportFrom' and @.module is '{}'].names..name".format( + mod + ) + ).n + return alias in (nodes if isinstance(nodes, list) else [nodes]) + + +@pytest.fixture +def parse(): + def _parse(file_name): + return Parser(file_name, {}) + + return _parse \ No newline at end of file diff --git a/datasets/SENSOR_ROOM1.csv b/datasets/SENSOR_ROOM1.csv index 62fe229b..257cde3b 100644 --- a/datasets/SENSOR_ROOM1.csv +++ b/datasets/SENSOR_ROOM1.csv @@ -1,4 +1,4 @@ -id,date,time,temperature,humidity,energy_usage,particulate,room +id,date,time,temperature,humidity,energy_usage,particulate,area 1,5/7/2020,0:01,60,0.8,0xffe,6.2E+00,1 2,5/7/2020,0:01,60,0.8,0xff3,6.3E+00,1 3,5/7/2020,0:04,60,0.8,0xfef,6.5E+00,1 diff --git a/datasets/SENSOR_ROOM2.csv b/datasets/SENSOR_ROOM2.csv index 0d62f0b9..e4ce8048 100644 --- a/datasets/SENSOR_ROOM2.csv +++ b/datasets/SENSOR_ROOM2.csv @@ -1,4 +1,4 @@ -id,date,time,temperature,humidity,energy_usage,particulate,room +id,date,time,temperature,humidity,energy_usage,particulate,area 1,5/7/2020,0:01,50,0.8,0xffc,8.1E+00,2 2,5/7/2020,0:02,50,0.53,0xffb,8.1E+00,2 3,5/7/2020,0:02,50,0.82,0xff3,8.5E+00,2 diff --git a/pytest.ini b/pytest.ini index 68530888..ccfcc269 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,2 @@ [pytest] -addopts = --tb=short -p no:warnings \ No newline at end of file +addopts = -rN --tb=short -p no:warnings diff --git a/requirements.txt b/requirements.txt index 51faa6f2..46c045f2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,11 +1,9 @@ docutils==0.16 -Markdown==3.1.1 -parso==0.6.0 -pytest==5.3.5 +Markdown==3.2.1 +mongoquery==1.3.6 +objectpath==0.6.1 +parso==0.6.2 +pytest==5.4.1 pytest-json-report==1.2.1 -pytest-metadata==1.8.0 -pytest-sugar==0.9.2 -python-dateutil==2.7.5 -py==1.7.0 -pyparsing==2.3.0 -more-itertools==4.3.0 \ No newline at end of file +PyYAML==5.3.1 +typer==0.1.1 \ No newline at end of file diff --git a/sensor/load_data.py b/sensor/load_data.py new file mode 100644 index 00000000..f48df9f0 --- /dev/null +++ b/sensor/load_data.py @@ -0,0 +1,21 @@ +# Module 1: Load data from files +import os +import glob +import csv + +def load_sensor_data(): + sensor_data = [] + + sensor_files = glob.glob(os.path.join(os.getcwd(), 'datasets', '*.csv')) + + # Loop over list of files + for sensor_file in sensor_files: + with open(sensor_file ) as data_file: + # Create a csv.DictReader + data_reader = csv.DictReader(data_file, delimiter=',') + # Loop over each row dictionary + for row in data_reader: + # Create a list of dictionaries + sensor_data.append(row) + + return sensor_data \ No newline at end of file diff --git a/sensor/load_info.py b/sensor/load_info.py deleted file mode 100644 index f752b095..00000000 --- a/sensor/load_info.py +++ /dev/null @@ -1 +0,0 @@ -# Module 2: Load data from files \ No newline at end of file diff --git a/sensor/sensor_app.py b/sensor/sensor_app.py new file mode 100644 index 00000000..cec00482 --- /dev/null +++ b/sensor/sensor_app.py @@ -0,0 +1,6 @@ +# Runner script for all modules +from load_data import load_sensor_data # module 2 + +data = load_sensor_data() +# print(f"Loaded records {len(data)}") +print("Loaded records: [{}]".format(len(data))) \ No newline at end of file diff --git a/sensor/sensor_data.py b/sensor/sensor_data.py deleted file mode 100644 index c9b09313..00000000 --- a/sensor/sensor_data.py +++ /dev/null @@ -1 +0,0 @@ -# Runner script for all modules \ No newline at end of file diff --git a/tasks.md b/tasks.md index a7eb5d23..215574ff 100644 --- a/tasks.md +++ b/tasks.md @@ -1,33 +1,137 @@ -# Module 01 - The Sensor Class +# Module 1 - The Sensor Class -- [Module 01 - The Sensor Class](#module-01---the-sensor-class) +- [Module 1 - The Sensor Class](#module-1---the-sensor-class) - [Status](#status) - - [Module 1: System Setup](#module-1-system-setup) - - [Module 2: Load Data From Files](#module-2-load-data-from-files) - - [Module 3: Create a Class HomeData](#module-3-create-a-class-homedata) - - [Module 4: Analyze Temperature Data](#module-4-analyze-temperature-data) - - [Module 5: Analyze Humidity Data](#module-5-analyze-humidity-data) - - [Module 6: Analyze Air Quality Data](#module-6-analyze-air-quality-data) - - [Module 7: Analyze Energy Consumption Data](#module-7-analyze-energy-consumption-data) + - [Module 1: Load Sensor Data From Files](#module-1-load-sensor-data-from-files) + - [M1: Task 1: Import os, glob, and csv](#m1-task-1-import-os-glob-and-csv) + - [M1: Task 2: Create a Function to parse the data](#m1-task-2-create-a-function-to-parse-the-data) + - [M1: Task 3: Sensor Data File Management](#m1-task-3-sensor-data-file-management) + - [M1: Task 4: Read Data Files](#m1-task-4-read-data-files) + - [M1: Task 5: Get Sensor Data with sensor_app](#m1-task-5-get-sensor-data-with-sensorapp) + - [Module 2: Create a Class HomeData](#module-2-create-a-class-homedata) + - [Module 3: Analyze Temperature Data](#module-3-analyze-temperature-data) + - [Module 4: Analyze Humidity Data](#module-4-analyze-humidity-data) + - [Module 5: Analyze Air Quality Data](#module-5-analyze-air-quality-data) + - [Module 6: Analyze Energy Consumption Data](#module-6-analyze-energy-consumption-data) ## Status Draft. -## Module 1: System Setup +## Module 1: Load Sensor Data From Files -## Module 2: Load Data From Files +### M1: Task 1: Import os, glob, and csv + +[//]:# (@pytest.mark.test_load_data_import_module1) + +The dataset for this project is stored in several CSV files found in the `dataset` folder. It represents the data collected from a device with multiple sensors. The records include measurements of temperature, humidity, energy consumption, and particle count in the air over a given area. The data is collected over a period of 24 hours. + +To start, open the file called `load_data.py` in the `sensor` folder - the rest of the tasks in this module happen in this same file. + +At the top of the file create three import statements for `os`, `glob`, and `csv`. These libraries will allow us to work with a collection of files. + +--- +To test this module locally: + +- Open a terminal at the root of the project +- Run the command `pytest -k module1` + +### M1: Task 2: Create a Function to parse the data + +[//]:# (@pytest.mark.test_load_data_load_sensor_func_module1) + +Create a function called `load_sensor_data` that takes no arguments. +In the body of the `load_sensor_data` function, create variable called `sensor_data` and set it as an empty `list`. + +--- +To test this module locally: + +- Open a terminal at the root of the project +- Run the command `pytest -k module1` + +### M1: Task 3: Sensor Data File Management + +[//]:# (@pytest.mark.test_load_data_sensor_files_module1) + +Next, create a variable called `sensor_files` that is set to a call to the `glob.glob()` function. + +Pass the glob function a single argument, a call to the `os.path.join()` function. + +In turn pass `os.path.join()` three arguments: `os.getcwd()`, `"datasets"`, and `"*.csv"`. + +Your statement should look like this: ```python -from sensor_file import Sensor + sensor_files = glob.glob(os.path.join(os.getcwd(), 'datasets', '*.csv')) ``` -## Module 3: Create a Class HomeData +--- +To test this module locally: + +- Open a terminal at the root of the project +- Run the command `pytest -k module1` + +### M1: Task 4: Read Data Files + +[//]:# (@pytest.mark.test_load_data_read_files_module1) + +The `sensor_files` object contains a list of file names i.e. ['SENSOR_ROOM2', 'SENSOR_ROOM1'] + +To read the sensor data of these files, five steps are required: + +1) Create one `for` loop that loops through `sensor_files` using `sensor_file` as the iterator variable. + +2) In the body of this loop use a `with` statement to `open` the `sensor_file` and set the alias to `data_file`. + +3) In the `with` body, set a variable called `data_reader` equal to `csv.DictReader()`. Pass in the current `data_file` as the first argument, and set the `delimiter=','` as the second argument. The `data_reader` will contain a list of dictionaries with the sensor data. + +4) Create a second `for` loop to `data_file` to get access to each record. Use `row` as your iterator variable. + +5) Inside the body of the second `for` loop, append each `row` record to the `sensor_data` list created on `Task 2` + +Finally, your function should return `sensor_data` list containing a list of dictionaries. + +--- +To test this module locally: + +- Open a terminal at the root of the project +- Run the command `pytest -k module1` + +### M1: Task 5: Get Sensor Data with sensor_app + +[//]:# (@pytest.mark.test_sensor_app_load_data_return_module1) +oLet's set up the command line interface (CLI). Open the `sensor_app.py` file in the `sensor` directory of the project. + +At the top, from the `load_data` module, `import` the `load_sensor_data` function. + +Define variable called `data` and set it equal to `load_sensor_data()`. + +Print the length of the `data` list using `formatted` print. Your output should look like this: + +```bash +Loaded records: [2000] +``` + +--- +To test this task locally: + +- Open a terminal at the root of the project +- Run the command `python sensor/sensor_app.py` + +--- +To test this module locally: + +- Open a terminal at the root of the project +- Run the command `pytest -k module1` + +- + +## Module 2: Create a Class HomeData -## Module 4: Analyze Temperature Data +## Module 3: Analyze Temperature Data -## Module 5: Analyze Humidity Data +## Module 4: Analyze Humidity Data -## Module 6: Analyze Air Quality Data +## Module 5: Analyze Air Quality Data -## Module 7: Analyze Energy Consumption Data +## Module 6: Analyze Energy Consumption Data diff --git a/tests/nodes.py b/tests/nodes.py new file mode 100644 index 00000000..b4e2a4e6 --- /dev/null +++ b/tests/nodes.py @@ -0,0 +1,182 @@ +import os.path +import warnings +import ast + +from collections import OrderedDict + + +nodes = { + # mod + ast.Module: ["body"], + ast.Interactive: ["body"], + ast.Expression: ["body"], + ast.Suite: ["body"], + # stmt + ast.FunctionDef: ["name", "args", "body", "decorator_list", "returns"], + ast.AsyncFunctionDef: ["name", "args", "body", "decorator_list", "returns"], + ast.ClassDef: ["name", "bases", "keywords", "body", "decorator_list"], + ast.Return: ["value"], + ast.Delete: ["targets"], + ast.Assign: ["targets", "value"], + ast.AugAssign: ["target", "op", "value"], + ast.AnnAssign: ["target", "annotation", "value", "simple"], + ast.For: ["target", "iter", "body", "orelse"], + ast.AsyncFor: ["target", "iter", "body", "orelse"], + ast.While: ["test", "body", "orelse"], + ast.If: ["test", "body", "orelse"], + ast.With: ["items", "body"], + ast.AsyncWith: ["items", "body"], + ast.Raise: ["exc", "cause"], + ast.Try: ["body", "handlers", "orelse", "finalbody"], + ast.Assert: ["test", "msg"], + ast.Import: ["names"], + ast.ImportFrom: ["module", "names", "level"], + ast.Global: ["names"], + ast.Nonlocal: ["names"], + ast.Expr: ["value"], + ast.Pass: [], + ast.Break: [], + ast.Continue: [], + # expr + ast.BoolOp: ["op", "values"], + ast.BinOp: ["left", "op", "right"], + ast.UnaryOp: ["op", "operand"], + ast.Lambda: ["args", "body"], + ast.IfExp: ["test", "body", "orelse"], + ast.Dict: ["keys", "values"], + ast.Set: ["elts"], + ast.ListComp: ["elt", "generators"], + ast.SetComp: ["elt", "generators"], + ast.DictComp: ["key", "value", "generators"], + ast.GeneratorExp: ["elt", "generators"], + ast.Await: ["value"], + ast.Yield: ["value"], + ast.YieldFrom: ["value"], + ast.Compare: ["left", "ops", "comparators"], + ast.Call: ["func", "args", "keywords"], + ast.Num: ["n"], + ast.Str: ["s"], + ast.FormattedValue: ["value", "conversion", "format_spec"], + ast.JoinedStr: ["values"], + ast.Bytes: ["s"], + ast.NameConstant: ["value"], + ast.Ellipsis: [], + ast.Constant: ["value"], + ast.Attribute: ["value", "attr"], + ast.Subscript: ["value", "slice"], + ast.Starred: ["value"], + ast.Name: ["id"], + ast.List: ["elts"], + ast.Tuple: ["elts"], + # expr_context + ast.Load: [], + ast.Store: [], + ast.Del: [], + ast.AugLoad: [], + ast.AugStore: [], + ast.Param: [], + # slice + ast.Slice: ["lower", "upper", "step"], + ast.ExtSlice: ["dims"], + ast.Index: ["value"], + # boolop + ast.And: [], + ast.Or: [], + # operator + ast.Sub: [], + ast.Mult: [], + ast.MatMult: [], + ast.Div: [], + ast.Mod: [], + ast.Pow: [], + ast.LShift: [], + ast.RShift: [], + ast.BitOr: [], + ast.BitXor: [], + ast.BitAnd: [], + ast.FloorDiv: [], + # unaryop + ast.Invert: [], + ast.Not: [], + ast.UAdd: [], + ast.USub: [], + # cmpop + ast.Eq: [], + ast.NotEq: [], + ast.Lt: [], + ast.LtE: [], + ast.Gt: [], + ast.GtE: [], + ast.Is: [], + ast.IsNot: [], + ast.In: [], + ast.NotIn: [], + # comprehension + ast.comprehension: ["target", "iter", "ifs", "is_async"], + # excepthandler + ast.ExceptHandler: ["type", "name", "body"], + # arguments + ast.arguments: [ + "args", + "vararg", + "kwonlyargs", + "kw_defaults", + "kwarg", + "defaults", + ], + # arg + ast.arg: ["arg", "annotation"], + # keyword + ast.keyword: ["arg", "value"], + # alias + ast.alias: ["name", "asname"], + # withitem + ast.withitem: ["context_expr", "optional_vars"], +} + + +def convert_node(node): + t = type(node) + + if t is str or t is int: + return node + + if t is list: + return [convert_node(child) for child in node] + + if node is None: + return "nil" + + tname = t.__qualname__ + d = {"type": tname} + + if t not in nodes: + return f"#<{tname}>" + + for name in nodes[t]: + d[name] = convert_node(getattr(node, name)) + return d + + +def flatten(d, sep="_"): + + obj = OrderedDict() + + def recurse(t, parent_key=""): + + if isinstance(t, list): + for i in range(len(t)): + recurse(t[i], parent_key + sep + str(i) if parent_key else str(i)) + elif isinstance(t, dict): + for k, v in t.items(): + if k == "n" or k == "s": + k = "value" + if v == "Str" or v == "NameConstant" or v == "Num": + v = "Constant" + recurse(v, parent_key + sep + k if parent_key else k) + else: + obj[parent_key] = t + + recurse(d) + + return obj \ No newline at end of file diff --git a/tests/template.py b/tests/template.py new file mode 100644 index 00000000..24951cd2 --- /dev/null +++ b/tests/template.py @@ -0,0 +1,565 @@ +import os.path +import warnings +import ast + +from tests.nodes import convert_node + + +class Template(object): + def __init__(self, pattern): + self.pattern = TemplateTransformer.load(pattern) + + def process(self, code, raw=False): + tree = ast.parse(code) if isinstance(code, str) else code + + nodes = [] + for node in ast.walk(tree): + if isinstance(node, type(self.pattern)) and is_ast_equal( + node, self.pattern + ): + if not raw: + nodes.append(convert_node(node)) + else: + nodes.append(node) + + return nodes + + def process_file(self, filename): + if isinstance(filename, str): + with open(filename, "rb") as file: + tree = ast.parse(file.read()) + else: + tree = ast.parse(filename.read()) + yield from self.query(tree) + + def filter_subdirs(self, dirnames): + dirnames[:] = [d for d in dirnames if d != "build"] + + def process_directory(self, directory): + for dirpath, dirnames, filenames in os.walk(directory): + self.filter_subdirs(dirnames) + + for filename in filenames: + if filename.endswith((".py", ".pyw")): + filepath = os.path.join(dirpath, filename) + try: + for match in self.query_file(filepath): + yield filepath, match + except SyntaxError as e: + warnings.warn( + "Failed to parse {}:\n{}".format(filepath, e)) + + +class TemplateTransformer(ast.NodeTransformer): + + __WILDCARD_NAME = "__past_wildcard" + __MULTIWILDCARD_NAME = "__past_multiwildcard" + + @classmethod + def load(cls, pattern): + pattern = pattern.replace("??", cls.__MULTIWILDCARD_NAME).replace( + "?", cls.__WILDCARD_NAME + ) + transformed = ast.parse(pattern).body[0] + + if isinstance(transformed, ast.Expr): + transformed = transformed.value + if isinstance(transformed, (ast.Attribute, ast.Subscript)): + del transformed.ctx + + return cls().visit(transformed) + + def must_exist(self, node, path): + if (node is None) or (node == []): + raise TemplateMismatch(path, node, "non empty") + + def must_not_exist(self, node, path): + if (node is not None) and (node != []): + raise TemplateMismatch(path, node, "empty") + + def visit_Name(self, node): + if node.id == self.__WILDCARD_NAME: + return self.must_exist + elif node.id == self.__MULTIWILDCARD_NAME: + return self.must_exist + + return NameOrAttr(node.id) + + def transform_wildcard(self, node, attrname): + if getattr(node, attrname, None) in ( + self.__WILDCARD_NAME, + self.__MULTIWILDCARD_NAME, + ): + setattr(node, attrname, self.must_exist) + + def transform_wildcard_body(self, node, attrname): + body = getattr(node, attrname, []) + + def _is_multiwildcard(n): + return is_ast_equal( + n, ast.Expr(value=ast.Name(id=self.__MULTIWILDCARD_NAME)) + ) + + if len(body) == 1 and _is_multiwildcard(body[0]): + setattr(node, attrname, self.must_exist) + return + + for i, n in enumerate(body): + if _is_multiwildcard(n): + newbody = body[:i] + Middle() + body[i + 1:] + setattr(node, attrname, newbody) + + def visit_Attribute(self, node): + self.transform_wildcard(node, "attr") + return self.generic_visit(node) + + def visit_FunctionDef(self, node): + self.transform_wildcard(node, "name") + self.transform_wildcard_body(node, "body") + return self.generic_visit(node) + + visit_ClassDef = visit_FunctionDef + + def visit_arguments(self, node): + positional_final_wildcard = False + for i, a in enumerate(node.args): + if a.arg == self.__MULTIWILDCARD_NAME: + from_end = len(node.args) - (i + 1) + if from_end == 0: + positional_final_wildcard = True + + args = ( + self.visit_list(node.args[:i]) + + Middle() + + self.visit_list(node.args[i + 1:]) + ) + break + else: + if node.args: + args = self.visit_list(node.args) + else: + args = self.must_not_exist + + defaults = [ + (a.arg, self.visit(d)) + for a, d in zip(node.args[-len(node.defaults):], node.defaults) + if a.arg not in {self.__WILDCARD_NAME, self.__MULTIWILDCARD_NAME} + ] + + if node.vararg is None: + if positional_final_wildcard: + vararg = None + else: + vararg = self.must_not_exist + else: + vararg = self.visit(node.vararg) + + kwonly_args_dflts = [ + (self.visit(a), (d if d is None else self.visit(d))) + for a, d in zip(node.kwonlyargs, node.kw_defaults) + if a.arg != self.__MULTIWILDCARD_NAME + ] + + koa_subset = ( + positional_final_wildcard and vararg is None and ( + not node.kwonlyargs) + ) or any(a.arg == self.__MULTIWILDCARD_NAME for a in node.kwonlyargs) + + if node.kwarg is None: + if koa_subset: + kwarg = None + else: + kwarg = self.must_not_exist + else: + kwarg = self.visit(node.kwarg) + + return DefArgsCheck( + args=args, + defaults=defaults, + vararg=vararg, + kwonly_args_dflts=kwonly_args_dflts, + koa_subset=koa_subset, + kwarg=kwarg, + ) + + def visit_arg(self, node): + self.transform_wildcard(node, "arg") + return self.generic_visit(node) + + def visit_If(self, node): + self.transform_wildcard_body(node, "body") + self.transform_wildcard_body(node, "orelse") + return self.generic_visit(node) + + visit_For = visit_While = visit_If + + def visit_Try(self, node): + self.transform_wildcard_body(node, "body") + self.transform_wildcard_body(node, "orelse") + self.transform_wildcard_body(node, "finalbody") + return self.generic_visit(node) + + def visit_ExceptHandler(self, node): + self.transform_wildcard(node, "name") + self.transform_wildcard_body(node, "body") + return self.generic_visit(node) + + def visit_With(self, node): + self.transform_wildcard_body(node, "body") + return self.generic_visit(node) + + def visit_Call(self, node): + kwargs_are_subset = False + for i, n in enumerate(node.args): + if is_ast_equal(n, ast.Name(id=self.__MULTIWILDCARD_NAME)): + if i + 1 == len(node.args): + kwargs_are_subset = True + + node.args = ( + self.visit_list(node.args[:i]) + + Middle() + + self.visit_list(node.args[i + 1:]) + ) + + break + + if kwargs_are_subset or any( + k.arg == self.__MULTIWILDCARD_NAME for k in node.keywords + ): + template_keywords = [ + self.visit(k) + for k in node.keywords + if k.arg != self.__MULTIWILDCARD_NAME + ] + + def kwargs_checker(sample_keywords, path): + sample_kwargs = {k.arg: k.value for k in sample_keywords} + + for k in template_keywords: + if k.arg == self.__MULTIWILDCARD_NAME: + continue + if k.arg in sample_kwargs: + assert_ast_equal( + sample_kwargs[k.arg], k.value, path + [k.arg]) + else: + raise TemplateMismatch( + path, "(missing)", "keyword arg %s" % k.arg + ) + + if template_keywords: + node.keywords = kwargs_checker + else: + del node.keywords + + if node.args == []: + node.args = self.must_not_exist + if getattr(node, "keywords", None) == []: + node.keywords = self.must_not_exist + return self.generic_visit(node) + + def transform_import_names(self, node): + if len(node.names) == 1 and node.names[0].name == self.__MULTIWILDCARD_NAME: + del node.names + else: + for alias in node.names: + self.visit_alias(alias) + + def visit_Import(self, node): + self.transform_import_names(node) + return node + + def visit_ImportFrom(self, node): + self.transform_wildcard(node, "module") + self.transform_import_names(node) + if node.level == 0: + del node.level + return node + + def visit_alias(self, node): + self.transform_wildcard(node, "name") + if node.asname is None: + del node.asname + else: + self.transform_wildcard(node, "asname") + + def generic_visit(self, node): + for field, old_value in ast.iter_fields(node): + old_value = getattr(node, field, None) + if isinstance(old_value, list): + new_values = [] + for value in old_value: + if isinstance(value, ast.AST): + value = self.visit(value) + if value is None: + continue + elif isinstance(value, list): + new_values.extend(value) + continue + new_values.append(value) + + if not new_values: + delattr(node, field) + + old_value[:] = new_values + elif isinstance(old_value, ast.AST): + new_node = self.visit(old_value) + if new_node is None: + delattr(node, field) + else: + setattr(node, field, new_node) + return node + + def visit_list(self, l): + return [self.visit(n) for n in l] + + +class NameOrAttr(object): + def __init__(self, name): + self.name = name + + def __repr__(self): + return "past.NameOrAttr({%r})" % self.name + + def __call__(self, node, path): + if isinstance(node, ast.Name): + if node.id != self.name: + raise TemplatePlainObjMismatch( + path + ["id"], node.id, self.name) + elif isinstance(node, ast.Attribute): + if node.attr != self.name: + raise TemplatePlainObjMismatch( + path + ["attr"], node.attr, self.name) + else: + raise TemplateNodeTypeMismatch(path, node, "Name or Attribute") + + +class Middle(object): + def __init__(self, front=None, back=None): + super(Middle, self).__init__() + self.front = front or [] + self.back = back or [] + + def __radd__(self, other): + if not isinstance(other, list): + raise TypeError( + "Cannot add {} and Middle objects".format(type(other))) + return Middle(other + self.front, self.back) + + def __add__(self, other): + if not isinstance(other, list): + raise TypeError( + "Cannot add Middle and {} objects".format(type(other))) + return Middle(self.front, self.back + other) + + def __call__(self, sample_list, path): + if not isinstance(sample_list, list): + raise TemplateNodeTypeMismatch(path, sample_list, list) + + if self.front: + nfront = len(self.front) + if len(sample_list) < nfront: + raise TemplateNodeListMismatch( + path + [""], sample_list, self.front + ) + check_node_list(path, sample_list[:nfront], self.front) + if self.back: + nback = len(self.back) + if len(sample_list) < nback: + raise TemplateNodeListMismatch( + path + [""], sample_list, self.back + ) + check_node_list(path, sample_list[-nback:], self.back, -nback) + + +class TemplateMismatch(AssertionError): + def __init__(self, path, got, expected): + self.path = path + self.expected = expected + self.got = got + + def __str__(self): + return ("Mismatch at {}.\n" "Found : {}\n" "Expected: {}").format( + format_path(self.path), self.got, self.expected + ) + + +class TemplateNodeTypeMismatch(TemplateMismatch): + def __str__(self): + expected = ( + type(self.expected).__name__ + if isinstance(self.expected, ast.AST) + else self.expected + ) + return "At {}, found {} node instead of {}".format( + format_path(self.path), type(self.got).__name__, expected + ) + + +class TemplateNodeListMismatch(TemplateMismatch): + def __str__(self): + return "At {}, found {} node(s) instead of {}".format( + format_path(self.path), len(self.got), len(self.expected) + ) + + +class TemplatePlainListMismatch(TemplateMismatch): + def __str__(self): + return ("At {}, lists differ.\nFound : {}\nExpected: {}").format( + format_path(self.path), self.got, self.expected + ) + + +class TemplatePlainObjMismatch(TemplateMismatch): + def __str__(self): + return "At {}, found {!r} instead of {!r}".format( + format_path(self.path), self.got, self.expected + ) + + +class DefArgsCheck: + def __init__(self, args, defaults, vararg, kwonly_args_dflts, koa_subset, kwarg): + self.args = args + self.defaults = defaults + self.vararg = vararg + self.kwonly_args_dflts = kwonly_args_dflts + self.koa_subset = koa_subset + self.kwarg = kwarg + + def __repr__(self): + return ( + "past.DefArgsCheck(args={s.args}, defaults={s.defaults}, " + "vararg={s.vararg}, kwonly_args_dflts={s.kwonly_args_dflts}, " + "koa_subset={s.koa_subset}, kwarg={s.kwarg}" + ).format(s=self) + + def __call__(self, sample_node, path): + if self.args: + if isinstance(self.args, list): + check_node_list(path + ["args"], sample_node.args, self.args) + else: + assert_ast_equal(sample_node.args, self.args) + + if self.defaults: + sample_args_w_defaults = sample_node.args[-len( + sample_node.defaults):] + sample_arg_defaults = { + a.arg: d for a, d in zip(sample_args_w_defaults, sample_node.defaults) + } + for argname, dflt in self.defaults: + try: + sample_dflt = sample_arg_defaults[argname] + except KeyError: + raise TemplateMismatch( + path + ["defaults", argname], "(missing default)", dflt + ) + else: + assert_ast_equal(dflt, sample_dflt, + path + ["defaults", argname]) + + if self.vararg: + assert_ast_equal(sample_node.vararg, self.vararg) + + sample_kwonlyargs = { + k.arg: (k, d) + for k, d in zip(sample_node.kwonlyargs, sample_node.kw_defaults) + } + + for template_arg, template_dflt in self.kwonly_args_dflts: + argname = template_arg.arg + try: + sample_arg, sample_dflt = sample_kwonlyargs[argname] + except KeyError: + raise TemplateMismatch( + path + + ["kwonlyargs"], "(missing)", "keyword arg %s" % argname + ) + else: + assert_ast_equal( + sample_arg, template_arg, path + ["kwonlyargs", argname] + ) + if template_dflt is not None: + assert_ast_equal( + sample_dflt, template_dflt, path + + ["kw_defaults", argname] + ) + + if not self.koa_subset: + template_kwarg_names = {k.arg for k, d in self.kwonly_args_dflts} + excess_names = set(sample_kwonlyargs) - template_kwarg_names + if excess_names: + raise TemplateMismatch( + path + + ["kwonlyargs"], excess_names, "(not present in template)" + ) + + if self.kwarg: + assert_ast_equal(sample_node.kwarg, self.kwarg) + + +def format_path(path): + formed = path[:1] + for part in path[1:]: + if isinstance(part, int): + formed.append("[%d]" % part) + else: + formed.append("." + part) + return "".join(formed) + + +def check_node_list(path, sample, template, start_enumerate=0): + if len(sample) != len(template): + raise TemplateNodeListMismatch(path, sample, template) + + for i, (sample_node, template_node) in enumerate( + zip(sample, template), start=start_enumerate + ): + if callable(template_node): + template_node(sample_node, path + [i]) + else: + assert_ast_equal(sample_node, template_node, path + [i]) + + +def assert_ast_equal(sample, template, path=None): + if path is None: + path = ["tree"] + + if callable(template): + return template(sample, path) + + if not isinstance(sample, type(template)): + raise TemplateNodeTypeMismatch(path, sample, template) + + for name, template_field in ast.iter_fields(template): + sample_field = getattr(sample, name) + field_path = path + [name] + + if isinstance(template_field, list): + if template_field and ( + isinstance(template_field[0], ast.AST) or callable( + template_field[0]) + ): + check_node_list(field_path, sample_field, template_field) + else: + if sample_field != template_field: + raise TemplatePlainListMismatch( + field_path, sample_field, template_field + ) + + elif isinstance(template_field, ast.AST): + assert_ast_equal(sample_field, template_field, field_path) + + elif callable(template_field): + template_field(sample_field, field_path) + + else: + if sample_field != template_field: + raise TemplatePlainObjMismatch( + field_path, sample_field, template_field) + + +def is_ast_equal(sample, template): + try: + assert_ast_equal(sample, template) + return True + except TemplateMismatch: + return False diff --git a/tests/test_module1.py b/tests/test_module1.py index d41e177e..608935b5 100644 --- a/tests/test_module1.py +++ b/tests/test_module1.py @@ -1,2 +1,314 @@ -import re import pytest +import json + + +@pytest.mark.test_load_data_import_module1 +def test_load_data_import_module1(parse): + + # import os + # import glob + # import csv + + load_data = parse("load_data") + assert load_data.success, load_data.message + + os_import = load_data.imports("os") + assert os_import, "Are you importing `os`?" + + glob_import = load_data.imports("glob") + assert glob_import, "Are you importing `glob`?" + + csv_import = load_data.imports("csv") + assert csv_import, "Are you importing `csv`?" + + +@pytest.mark.test_load_data_load_sensor_func_module1 +def test_load_data_load_sensor_func_module1(parse): + + # def load_sensor_data(): + # sensor_data = [] + + load_data = parse("load_data") + assert load_data.success, load_data.message + + load_sensor_data = load_data.defines("load_sensor_data") + assert ( + load_sensor_data.exists() + ), "Are you defining a function called `load_sensor_data` with the correct arguments?" + + # print(json.dumps(load_sensor_data.assign_().n, indent=2)) + # assert False + + sensor_data = load_sensor_data.assign_().match( + { + "type": "Assign", + "targets_0_type": "Name", + "targets_0_id": "sensor_data", + "value_type": "List", + } + ) + assert ( + sensor_data + ), "Are you creating a variable called `sensor_data` set equal to an empty list?" + + +@pytest.mark.test_load_data_sensor_files_module1 +def test_load_data_sensor_files_module1(parse): + + # def load_sensor_data(): + # .... + # sensor_files = glob.glob(os.path.join(os.getcwd(), 'datasets', '*.csv')) + + load_data = parse("load_data") + assert load_data.success, load_data.message + + # load_sensor_data = load_data.query("def load_sensor_data(): ??") + load_sensor_data = load_data.defines("load_sensor_data") + load_sensor_data_exists = load_sensor_data.exists() + assert ( + load_sensor_data_exists + ), "Are you defining a function called `load_sensor_data` with the correct arguments?" + + sensor_files = ( + load_sensor_data.assign_() + .match( + { + "1_type": "Assign", + "1_targets_0_type": "Name", + "1_targets_0_id": "sensor_files", + "1_value_type": "Call", + + "1_value_func_type": "Attribute", + "1_value_func_value_type": "Name", + "1_value_func_value_id": "glob", + "1_value_func_attr": "glob", + + "1_value_args_0_type": "Call", + "1_value_args_0_func_type": "Attribute", + "1_value_args_0_func_value_type": "Attribute", + "1_value_args_0_func_value_value_type": "Name", + "1_value_args_0_func_value_value_id": "os", + "1_value_args_0_func_value_attr": "path", + "1_value_args_0_func_attr": "join", + + "1_value_args_0_args_0_type": "Call", + "1_value_args_0_args_0_func_type": "Attribute", + "1_value_args_0_args_0_func_value_type": "Name", + "1_value_args_0_args_0_func_value_id": "os", + "1_value_args_0_args_0_func_attr": "getcwd", + "1_value_args_0_args_1_type": "Constant", + "1_value_args_0_args_1_value": "datasets", + "1_value_args_0_args_2_type": "Constant", + "1_value_args_0_args_2_value": "*.csv" + } + ) + .exists() + ) + assert ( + sensor_files + ), "Are you creating a variable called `sensor_files` and assigning it glob.glob() and passing os.path.join()? Are you passing 3 values to os.path.join()?" + + +# @pytest.mark.test_load_data_sensor_files_query_module1 +# def test_load_data_sensor_files_query_module1(parse): + +# # def load_sensor_data(): +# # .... +# # sensor_files = glob.glob(os.path.join(os.getcwd(), 'datasets', '*.csv')) + +# load_data = parse("load_data") +# assert load_data.success, load_data.message + +# sensor_files = load_data.query( +# "glob.glob(os.path.join(os.getcwd(), 'datasets', '*.csv'))") +# sensor_files_exists = sensor_files.exists() +# assert ( +# sensor_files_exists +# ), "Are you including `glob.glob(os.path.join(os.getcwd(), 'datasets', '*.csv'))` in your code?" + + +@pytest.mark.test_load_data_read_files_module1 +def test_load_data_read_files_module1(parse): + + # def load_sensor_data(): + # .... + # for sensor_file in sensor_files: + # with open(sensor_file) as data_file: + # data_reader = csv.DictReader(data_file, delimiter=',') + # for row in data_reader: + # sensor_data.append(row) + # return sensor_data + + load_data = parse("load_data") + assert load_data.success, load_data.message + + # load_sensor_data = load_data.defines("load_sensor_data") + load_sensor_data = load_data.query("def load_sensor_data(): ??") + assert ( + load_sensor_data.exists() + ), "Are you defining a function called `load_sensor_data` with the correct arguments?" + + # print(json.dumps(load_sensor_data.for_().n, indent=2)) # TODO Remove + # print(json.dumps(load_sensor_data.assign_().n, indent=2)) + # print(json.dumps(load_sensor_data.returns_call().n, indent=2)) + # print(json.dumps(load_sensor_data.assigns().n, indent=2)) + # assert False + + first_for_exists = ( + load_sensor_data.for_() + .match( + { + "target_type": "Name", + "target_id": "sensor_file", + "iter_type": "Name", + "iter_id": "sensor_files", + } + ) + .exists() + ) + assert ( + first_for_exists + ), 'Do you have a `for` loop, looping through `sensor_files`? Is the current loop value called `sensor_file`?' + + with_exists = ( + load_sensor_data.for_() + .match( + { + "0_type": "With", + "0_items_0_type": "withitem", + "0_items_0_context_expr_type": "Call", + "0_items_0_context_expr_func_type": "Name", + "0_items_0_context_expr_func_id": "open", + "0_items_0_context_expr_args_0_type": "Name", + "0_items_0_context_expr_args_0_id": "sensor_file", + "0_items_0_optional_vars_type": "Name", + "0_items_0_optional_vars_id": "data_file" + } + ) + .exists() + ) + assert ( + with_exists + ), "Do you have a call to `open` in your `with` code and are you passing `open` the correct argument?" + + data_reader = ( + load_sensor_data.for_() + .match( + { + "0_body_0_type": "Assign", + "0_body_0_targets_0_type": "Name", + "0_body_0_targets_0_id": "data_reader", + "0_body_0_value_type": "Call", + "0_body_0_value_func_type": "Attribute", + "0_body_0_value_func_value_type": "Name", + "0_body_0_value_func_value_id": "csv", + "0_body_0_value_func_attr": "DictReader", + "0_body_0_value_args_0_type": "Name", + "0_body_0_value_args_0_id": "data_file", + "0_body_0_value_keywords_0_type": "keyword", + "0_body_0_value_keywords_0_arg": "delimiter", + "0_body_0_value_keywords_0_value_type": "Constant", + "0_body_0_value_keywords_0_value_value": ",", + } + ) + .exists() + ) + assert ( + data_reader + ), "Are you assigning `data_reader` the result of `csv.DictReader()` with the correct input argument and delimeter?" + + second_for_exist = ( + load_sensor_data.for_() + .match( + { + "0_body_1_type": "For", + "0_body_1_target_type": "Name", + "0_body_1_target_id": "row", + "0_body_1_iter_type": "Name", + "0_body_1_iter_id": "data_reader", + } + ) + .exists() + ) + assert ( + second_for_exist + ), 'Do you have a second `for` loop, looping through `data_reader`? Is the current loop value called `row`?' + + sensor_data_append = ( + load_sensor_data.for_() + .match( + { + "0_body_1_body_0_type": "Expr", + "0_body_1_body_0_value_type": "Call", + "0_body_1_body_0_value_func_type": "Attribute", + "0_body_1_body_0_value_func_value_type": "Name", + "0_body_1_body_0_value_func_value_id": "sensor_data", + "0_body_1_body_0_value_func_attr": "append", + "0_body_1_body_0_value_args_0_type": "Name", + "0_body_1_body_0_value_args_0_id": "row" + } + ) + .exists() + ) + assert ( + sensor_data_append + ), 'Are you appending the `row` records to the `sensor_data` list?' + + returns_load_sensor_data = load_sensor_data.returns("sensor_data") + assert ( + returns_load_sensor_data + ), 'Are you returning `sensor_data` from `load_sensor_data` function?' + + +@pytest.mark.test_sensor_app_load_data_return_module1 +def test_sensor_app_load_data_return_module1(parse): + # from load_data import load_sensor_data + # data = load_sensor_data() + # print("Loaded records {}".format(len(data))) + + sensor = parse("sensor_app") + assert sensor.success, sensor.message + + load_sensor_data_import = sensor.from_imports( + "load_data", "load_sensor_data") + assert load_sensor_data_import, "Are you importing `load_sensor_data` from load_data?" + + # print(json.dumps(sensor.assign_().n, indent=2)) + # print(json.dumps(sensor.calls().n, indent=2)) + # assert False + + data = sensor.assign_().match( + { + "type": "Assign", + "targets_0_type": "Name", + "targets_0_id": "data", + "value_type": "Call", + "value_func_type": "Name", + "value_func_id": "load_sensor_data", + } + ) + assert ( + data + ), "Are you creating a variable called `data` set equal to `load_sensor_data()` function?" + + print_data = sensor.calls().match( + { + "type": "Expr", + "value_type": "Call", + "value_func_type": "Name", + "value_func_id": "print", + "value_args_0_type": "Call", + "value_args_0_func_type": "Attribute", + "value_args_0_func_value_type": "Constant", + "value_args_0_func_value_value": "Loaded records {}", + "value_args_0_func_attr": "format", + "value_args_0_args_0_type": "Call", + "value_args_0_args_0_func_type": "Name", + "value_args_0_args_0_func_id": "len", + "value_args_0_args_0_args_0_type": "Name", + "value_args_0_args_0_args_0_id": "data" + } + ) + assert ( + print_data + ), "Are you calling `print()` and passing in `format()`? Are you passing 1 values to `format()` `len(data)`"