Skip to content

Commit

Permalink
Merge pull request #53 from life4/decouple-extractors
Browse files Browse the repository at this point in the history
Decouple extractors
  • Loading branch information
orsinium committed May 1, 2020
2 parents 4d96acf + d0791b7 commit 09d4069
Show file tree
Hide file tree
Showing 9 changed files with 320 additions and 210 deletions.
35 changes: 34 additions & 1 deletion deal/linter/_extractors/common.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# built-in
import ast
from contextlib import suppress
from functools import partial
from types import SimpleNamespace
from typing import Iterator, List, NamedTuple, Optional, Tuple
from typing import Callable, Iterator, List, NamedTuple, Optional, Tuple

# external
import astroid
Expand Down Expand Up @@ -83,3 +84,35 @@ def infer(expr) -> Tuple:
with suppress(astroid.exceptions.InferenceError, RecursionError):
return tuple(g for g in expr.infer() if type(g) is not astroid.Uninferable)
return tuple()


class Extractor:
__slots__ = ('handlers', )

def __init__(self):
self.handlers = dict()

def _register(self, types: Tuple[type], handler: Callable) -> Callable:
for tp in types:
# it's here to have `getattr` to get nodes from `ast` module
# that are available only in some Python versions.
if tp is None:
continue # pragma: no coverage
self.handlers[tp] = handler
return handler

def register(self, *types):
return partial(self._register, types)

def __call__(self, body: List, **kwargs) -> Iterator[Token]:
for expr in traverse(body=body):
handler = self.handlers.get(type(expr))
if not handler:
continue
token = handler(expr=expr, **kwargs)
if token is None:
continue
if type(token) is Token:
yield token
continue
yield from token
129 changes: 70 additions & 59 deletions deal/linter/_extractors/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,84 @@
# built-in
import ast
import builtins
from typing import Iterator, List
from typing import Iterator, Optional, Union

# external
import astroid

# app
from .common import TOKENS, Token, get_name, infer, traverse
from .common import TOKENS, Extractor, Token, get_name, infer
from .contracts import get_contracts


def get_exceptions(body: List, *, dive: bool = True) -> Iterator[Token]:
for expr in traverse(body):
token_info = dict(line=expr.lineno, col=expr.col_offset)

# assert
if isinstance(expr, TOKENS.ASSERT):
yield Token(value=AssertionError, **token_info)
continue

# explicit raise
if isinstance(expr, TOKENS.RAISE):
name = get_name(expr.exc)
if not name:
# raised a value, too tricky
if not isinstance(expr.exc, TOKENS.CALL):
continue
# raised an instance of an exception
name = get_name(expr.exc.func)
if not name or name[0].islower():
continue
exc = getattr(builtins, name, name)
token_info['col'] = expr.exc.col_offset
yield Token(value=exc, **token_info)
continue

# division by zero
if isinstance(expr, TOKENS.BIN_OP):
if isinstance(expr.op, ast.Div) or expr.op == '/':
if isinstance(expr.right, astroid.node_classes.NodeNG):
guesses = infer(expr=expr.right)
token_info['col'] = expr.right.col_offset
for guess in guesses:
if type(guess) is not astroid.Const:
continue
yield Token(value=ZeroDivisionError, **token_info)
break
continue
if isinstance(expr.right, ast.Num) and expr.right.n == 0:
token_info['col'] = expr.right.col_offset
yield Token(value=ZeroDivisionError, **token_info)
continue

# exit()
if isinstance(expr, TOKENS.CALL):
name = get_name(expr.func)
if name and name == 'exit':
yield Token(value=SystemExit, **token_info)
continue
# sys.exit()
if isinstance(expr.func, TOKENS.ATTR):
name = get_name(expr.func)
if name and name == 'sys.exit':
yield Token(value=SystemExit, **token_info)
get_exceptions = Extractor()


@get_exceptions.register(*TOKENS.ASSERT)
def handle_assert(expr, **kwargs) -> Optional[Token]:
token_info = dict(line=expr.lineno, col=expr.col_offset)
return Token(value=AssertionError, **token_info)


# explicit raise
@get_exceptions.register(*TOKENS.RAISE)
def handle_raise(expr, **kwargs) -> Optional[Token]:
token_info = dict(line=expr.lineno, col=expr.col_offset)
name = get_name(expr.exc)
if not name:
# raised a value, too tricky
if not isinstance(expr.exc, TOKENS.CALL):
return None
# raised an instance of an exception
name = get_name(expr.exc.func)
if not name or name[0].islower():
return None
exc = getattr(builtins, name, name)
token_info['col'] = expr.exc.col_offset
return Token(value=exc, **token_info)


# division by zero
@get_exceptions.register(*TOKENS.BIN_OP)
def handle_bin_op(expr, **kwargs) -> Optional[Token]:
token_info = dict(line=expr.lineno, col=expr.col_offset)
if isinstance(expr.op, ast.Div) or expr.op == '/':
if isinstance(expr.right, astroid.node_classes.NodeNG):
guesses = infer(expr=expr.right)
token_info['col'] = expr.right.col_offset
for guess in guesses:
if type(guess) is not astroid.Const:
continue

# infer function call and check the function body for raises
if dive:
yield from _exceptions_from_funcs(expr=expr)
return Token(value=ZeroDivisionError, **token_info)
if isinstance(expr.right, ast.Num) and expr.right.n == 0:
token_info['col'] = expr.right.col_offset
return Token(value=ZeroDivisionError, **token_info)
return None


# exit()
@get_exceptions.register(*TOKENS.CALL)
def handle_call(expr, dive: bool = True) -> Optional[Union[Token, Iterator[Token]]]:
token_info = dict(line=expr.lineno, col=expr.col_offset)
name = get_name(expr.func)
if name and name == 'exit':
return Token(value=SystemExit, **token_info)
# sys.exit()
if isinstance(expr.func, TOKENS.ATTR):
name = get_name(expr.func)
if name and name == 'sys.exit':
return Token(value=SystemExit, **token_info)
# infer function call and check the function body for raises
if dive:
return _exceptions_from_funcs(expr=expr)
return None


@get_exceptions.register(astroid.Assign)
def handle_assign(expr: astroid.Assign, dive: bool = True) -> Iterator[Token]:
# infer function call and check the function body for raises
if dive:
yield from _exceptions_from_funcs(expr=expr)


def _exceptions_from_funcs(expr) -> Iterator[Token]:
Expand Down Expand Up @@ -100,6 +110,7 @@ def _exceptions_from_funcs(expr) -> Iterator[Token]:
if name is None:
continue
yield Token(value=name, **extra)
return None


def get_names(expr) -> Iterator:
Expand Down
48 changes: 25 additions & 23 deletions deal/linter/_extractors/exceptions_stubs.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,38 @@
# built-in
import builtins
from pathlib import Path
from typing import Iterator, List, Optional, Tuple
from typing import Optional, Tuple

# external
import astroid

# app
from .._contract import Category
from .._stub import EXTENSION, StubFile, StubsManager
from .common import Token, infer, traverse


def get_exceptions_stubs(body: List, *, dive: bool = True, stubs: StubsManager) -> Iterator[Token]:
for expr in traverse(body):
if type(expr) is not astroid.Call:
return
extra = dict(
line=expr.lineno,
col=expr.col_offset,
)
for value in infer(expr=expr.func):
if type(value) is not astroid.FunctionDef:
continue
module_name, func_name = _get_full_name(expr=value)
stub = _get_stub(module_name=module_name, expr=value, stubs=stubs)
if stub is None:
continue
names = stub.get(func=func_name, contract=Category.RAISES)
for name in names:
name = getattr(builtins, name, name)
yield Token(value=name, **extra)
from .common import Extractor, Token, infer


get_exceptions_stubs = Extractor()


@get_exceptions_stubs.register(astroid.Call)
def handle_astroid_call(expr: astroid.Call, *, dive: bool = True, stubs: StubsManager) -> Optional[Token]:
extra = dict(
line=expr.lineno,
col=expr.col_offset,
)
for value in infer(expr=expr.func):
if type(value) is not astroid.FunctionDef:
continue
module_name, func_name = _get_full_name(expr=value)
stub = _get_stub(module_name=module_name, expr=value, stubs=stubs)
if stub is None:
continue
names = stub.get(func=func_name, contract=Category.RAISES)
for name in names:
name = getattr(builtins, name, name)
return Token(value=name, **extra)
return None


def _get_stub(
Expand Down
50 changes: 28 additions & 22 deletions deal/linter/_extractors/globals.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,42 @@
# built-in
import ast
from typing import Iterator
from typing import Optional

# external
import astroid

# app
from .common import TOKENS, Token, traverse
from .common import TOKENS, Extractor, Token


def get_globals(body: list) -> Iterator[Token]:
for expr in traverse(body):
if isinstance(expr, TOKENS.GLOBAL):
yield Token(value='global', line=expr.lineno, col=expr.col_offset)
continue
get_globals = Extractor()

if isinstance(expr, TOKENS.NONLOCAL):
yield Token(value='nonlocal', line=expr.lineno, col=expr.col_offset)
continue

if type(expr) is ast.Import:
yield Token(value='import', line=expr.lineno, col=expr.col_offset)
continue
@get_globals.register(*TOKENS.GLOBAL)
def handle_global(expr) -> Optional[Token]:
return Token(value='global', line=expr.lineno, col=expr.col_offset)

if type(expr) is astroid.Import:
yield Token(value='import', line=expr.lineno, col=expr.col_offset)
continue

if type(expr) is ast.ImportFrom:
yield Token(value='import', line=expr.lineno, col=expr.col_offset)
continue
@get_globals.register(*TOKENS.NONLOCAL)
def handle_nonlocal(expr) -> Optional[Token]:
return Token(value='nonlocal', line=expr.lineno, col=expr.col_offset)

if type(expr) is astroid.ImportFrom:
yield Token(value='import', line=expr.lineno, col=expr.col_offset)
continue

@get_globals.register(ast.Import)
def handle_ast_import(expr: ast.Import) -> Optional[Token]:
return Token(value='import', line=expr.lineno, col=expr.col_offset)


@get_globals.register(astroid.Import)
def handle_astroid_import(expr: astroid.Import) -> Optional[Token]:
return Token(value='import', line=expr.lineno, col=expr.col_offset)


@get_globals.register(ast.ImportFrom)
def handle_ast_import_from(expr: ast.ImportFrom) -> Optional[Token]:
return Token(value='import', line=expr.lineno, col=expr.col_offset)


@get_globals.register(astroid.ImportFrom)
def handle_astroid_import_from(expr: astroid.ImportFrom) -> Optional[Token]:
return Token(value='import', line=expr.lineno, col=expr.col_offset)
35 changes: 20 additions & 15 deletions deal/linter/_extractors/imports.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
# built-in
import ast
from typing import Iterator

# external
import astroid

# app
from .common import Token, traverse


def get_imports(body: list) -> Iterator[Token]:
for expr in traverse(body):
token_info = dict(line=expr.lineno, col=expr.col_offset)
if isinstance(expr, astroid.ImportFrom):
dots = '.' * (expr.level or 0)
name = expr.modname or ''
yield Token(value=dots + name, **token_info)
if isinstance(expr, ast.ImportFrom):
dots = '.' * expr.level
name = expr.module or ''
yield Token(value=dots + name, **token_info)
from .common import Extractor, Token


get_imports = Extractor()


@get_imports.register(astroid.ImportFrom)
def handle_astroid(expr: astroid.ImportFrom) -> Token:
token_info = dict(line=expr.lineno, col=expr.col_offset)
dots = '.' * (expr.level or 0)
name = expr.modname or ''
return Token(value=dots + name, **token_info)


@get_imports.register(ast.ImportFrom)
def handle_ast(expr: ast.ImportFrom) -> Token:
token_info = dict(line=expr.lineno, col=expr.col_offset)
dots = '.' * expr.level
name = expr.module or ''
return Token(value=dots + name, **token_info)
Loading

0 comments on commit 09d4069

Please sign in to comment.