Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion dev/flake8/checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
"over `sqlalchemy.orm.relationship(backref=...)`"
)
WH003_msg = "WH003 `@view_config.renderer` configured template file not found"
# TODO: This would be better served by mypy, no types in Pyramid makes this harder.
# Support https://github.com/Pylons/pyramid/issues/2638 for general Pyramid type info.
WH004_msg = "WH004 metrics tags must be a list of strings"


class WarehouseVisitor(ast.NodeVisitor):
Expand Down Expand Up @@ -101,6 +104,46 @@ def visit_AnnAssign(self, node: ast.AnnAssign) -> None: # noqa: N802
self.check_for_backref(node)
self.generic_visit(node)

def is_metrics_method_call(self, node: ast.Call) -> bool:
"""Check if this is a call to a metrics method."""
if not isinstance(node.func, ast.Attribute):
return False

# Check for metrics.<method>()
if isinstance(node.func.value, ast.Name) and node.func.value.id == "metrics":
return True

# Check for request.metrics.<method>() or any_obj.metrics.<method>()
if (
isinstance(node.func.value, ast.Attribute)
and node.func.value.attr == "metrics"
):
return True

return False

def check_metrics_tags(self, node: ast.Call) -> None:
"""Check that tags parameter in metrics calls is a list."""
if not self.is_metrics_method_call(node):
return

# Check keyword arguments for tags=
for kw in node.keywords:
if kw.arg == "tags":
# tags should be None, a variable (Name), or a List
# Flag if it's a literal non-list type (string, tuple, dict, set, etc.)
if isinstance(kw.value, (ast.Constant, ast.Tuple, ast.Dict, ast.Set)):
# Allow None
if isinstance(kw.value, ast.Constant) and kw.value.value is None:
continue
self.errors.append(
(kw.value.lineno, kw.value.col_offset, WH004_msg)
)

def visit_Call(self, node: ast.Call) -> None: # noqa: N802
self.check_metrics_tags(node)
self.generic_visit(node)

def visit_FunctionDef(self, node: ast.FunctionDef) -> None: # noqa: N802
for decorator in node.decorator_list:
if (
Expand Down Expand Up @@ -173,7 +216,45 @@ def my_view(request):
assert len(visitor.errors) == 0


def test_wh004_metrics_tags_invalid_types():
# Test case: Invalid tag types (should error)
code = dedent(
"""
metrics.increment("counter", tags="string")
request.metrics.gauge("gauge", tags=("tuple",))
metrics.histogram("hist", tags={"dict": "value"})
"""
)
tree = ast.parse(code)
visitor = WarehouseVisitor(filename="test_file.py")
visitor.visit(tree)

# Assert that all 3 errors are raised
assert len(visitor.errors) == 3
assert all(error[2] == WH004_msg for error in visitor.errors)


def test_wh004_metrics_tags_valid_types():
# Test case: Valid tag types (should not error)
code = dedent(
"""
metrics.increment("counter", tags=["tag1", "tag2"])
request.metrics.gauge("gauge", tags=None)
tag_list = ["tag1"]
metrics.histogram("hist", tags=tag_list)
"""
)
tree = ast.parse(code)
visitor = WarehouseVisitor(filename="test_file.py")
visitor.visit(tree)

# Assert that no errors are raised
assert len(visitor.errors) == 0


if __name__ == "__main__":
test_wh003_renderer_template_not_found()
test_wh003_renderer_template_in_package_path()
print("Test passed!")
test_wh004_metrics_tags_invalid_types()
test_wh004_metrics_tags_valid_types()
print("All tests passed!")