-
-
Notifications
You must be signed in to change notification settings - Fork 507
Implement support for <QuerySet>.as_manager()
#1025
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,8 @@ | |
TypeInfo, | ||
Var, | ||
) | ||
from mypy.plugin import AttributeContext, DynamicClassDefContext, SemanticAnalyzerPluginInterface | ||
from mypy.plugin import AttributeContext, DynamicClassDefContext | ||
from mypy.semanal import SemanticAnalyzer | ||
from mypy.semanal_shared import has_placeholder | ||
from mypy.types import AnyType, CallableType, Instance, ProperType | ||
from mypy.types import Type as MypyType | ||
|
@@ -150,7 +151,6 @@ def get_method_type_from_reverse_manager( | |
|
||
|
||
def resolve_manager_method_from_instance(instance: Instance, method_name: str, ctx: AttributeContext) -> MypyType: | ||
|
||
api = helpers.get_typechecker_api(ctx) | ||
method_type = get_method_type_from_dynamic_manager( | ||
api, method_name, instance | ||
|
@@ -164,9 +164,11 @@ def resolve_manager_method(ctx: AttributeContext) -> MypyType: | |
A 'get_attribute_hook' that is intended to be invoked whenever the TypeChecker encounters | ||
an attribute on a class that has 'django.db.models.BaseManager' as a base. | ||
""" | ||
# Skip (method) type that is currently something other than Any | ||
# Skip (method) type that is currently something other than Any of type `implementation_artifact` | ||
if not isinstance(ctx.default_attr_type, AnyType): | ||
return ctx.default_attr_type | ||
elif ctx.default_attr_type.type_of_any != TypeOfAny.implementation_artifact: | ||
return ctx.default_attr_type | ||
|
||
# (Current state is:) We wouldn't end up here when looking up a method from a custom _manager_. | ||
# That's why we only attempt to lookup the method for either a dynamically added or reverse manager. | ||
|
@@ -197,12 +199,12 @@ def create_new_manager_class_from_from_queryset_method(ctx: DynamicClassDefConte | |
return | ||
|
||
# Don't redeclare the manager class if we've already defined it. | ||
manager_node = semanal_api.lookup_current_scope(ctx.name) | ||
if manager_node and isinstance(manager_node.node, TypeInfo): | ||
manager_sym = semanal_api.lookup_current_scope(ctx.name) | ||
if manager_sym and isinstance(manager_sym.node, TypeInfo): | ||
# This is just a deferral run where our work is already finished | ||
return | ||
|
||
new_manager_info = create_manager_info_from_from_queryset_call(ctx.api, ctx.call, ctx.name) | ||
new_manager_info = create_manager_info_from_from_queryset_call(semanal_api, ctx.call, ctx.name) | ||
if new_manager_info is None: | ||
if not ctx.api.final_iteration: | ||
ctx.api.defer() | ||
|
@@ -212,8 +214,17 @@ def create_new_manager_class_from_from_queryset_method(ctx: DynamicClassDefConte | |
helpers.add_new_manager_base(semanal_api, new_manager_info.fullname) | ||
|
||
|
||
def register_dynamically_created_manager(fullname: str, manager_name: str, manager_base: TypeInfo) -> None: | ||
manager_base.metadata.setdefault("from_queryset_managers", {}) | ||
# The `__module__` value of the manager type created by Django's | ||
# `.from_queryset` is `django.db.models.manager`. But we put new type(s) in the | ||
# module currently being processed, so we'll map those together through metadata. | ||
runtime_fullname = ".".join(["django.db.models.manager", manager_name]) | ||
manager_base.metadata["from_queryset_managers"][runtime_fullname] = fullname | ||
Comment on lines
+218
to
+223
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't these be under the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, probably. They weren't previously so I kept them the same. Will probably enforce cache clear to work properly. But that might be completely fine too. |
||
|
||
|
||
def create_manager_info_from_from_queryset_call( | ||
api: SemanticAnalyzerPluginInterface, call_expr: CallExpr, name: Optional[str] = None | ||
api: SemanticAnalyzer, call_expr: CallExpr, name: Optional[str] = None | ||
) -> Optional[TypeInfo]: | ||
""" | ||
Extract manager and queryset TypeInfo from a from_queryset call. | ||
|
@@ -247,30 +258,48 @@ def create_manager_info_from_from_queryset_call( | |
else: | ||
manager_name = f"{base_manager_info.name}From{queryset_info.name}" | ||
|
||
try: | ||
new_manager_info = create_manager_class(api, base_manager_info, name or manager_name, call_expr.line) | ||
except helpers.IncompleteDefnException: | ||
return None | ||
|
||
popuplate_manager_from_queryset(new_manager_info, queryset_info) | ||
|
||
manager_fullname = ".".join(["django.db.models.manager", manager_name]) | ||
|
||
base_manager_info = new_manager_info.mro[1] | ||
base_manager_info.metadata.setdefault("from_queryset_managers", {}) | ||
base_manager_info.metadata["from_queryset_managers"][manager_fullname] = new_manager_info.fullname | ||
# Always look in global scope, as that's where we'll declare dynamic manager classes | ||
manager_sym = api.globals.get(manager_name) | ||
if ( | ||
manager_sym is not None | ||
and isinstance(manager_sym.node, TypeInfo) | ||
and manager_sym.node.has_base(base_manager_info.fullname) | ||
and manager_sym.node.metadata.get("django", {}).get("from_queryset_manager") == queryset_info.fullname | ||
): | ||
# Reuse an identical, already generated, manager | ||
new_manager_info = manager_sym.node | ||
else: | ||
# Create a new `TypeInfo` instance for the manager type | ||
try: | ||
new_manager_info = create_manager_class( | ||
api=api, | ||
base_manager_info=base_manager_info, | ||
name=manager_name, | ||
line=call_expr.line, | ||
with_unique_name=name is not None and name != manager_name, | ||
) | ||
except helpers.IncompleteDefnException: | ||
return None | ||
|
||
populate_manager_from_queryset(new_manager_info, queryset_info) | ||
register_dynamically_created_manager( | ||
fullname=new_manager_info.fullname, | ||
manager_name=manager_name, | ||
manager_base=base_manager_info, | ||
) | ||
|
||
# Add the new manager to the current module | ||
module = api.modules[api.cur_mod_id] | ||
module.names[name or manager_name] = SymbolTableNode( | ||
GDEF, new_manager_info, plugin_generated=True, no_serialize=False | ||
) | ||
if name is not None and name != new_manager_info.name: | ||
# Unless names are equal, there's 2 symbol names that needs the manager info | ||
module.names[name] = SymbolTableNode(GDEF, new_manager_info, plugin_generated=True) | ||
|
||
module.names[new_manager_info.name] = SymbolTableNode(GDEF, new_manager_info, plugin_generated=True) | ||
return new_manager_info | ||
|
||
|
||
def create_manager_class( | ||
api: SemanticAnalyzerPluginInterface, base_manager_info: TypeInfo, name: str, line: int | ||
api: SemanticAnalyzer, base_manager_info: TypeInfo, name: str, line: int, with_unique_name: bool | ||
) -> TypeInfo: | ||
|
||
base_manager_instance = fill_typevars(base_manager_info) | ||
|
@@ -280,17 +309,24 @@ def create_manager_class( | |
if any(has_placeholder(type_var) for type_var in base_manager_info.defn.type_vars): | ||
raise helpers.IncompleteDefnException | ||
|
||
manager_info = helpers.create_type_info(name, api.cur_mod_id, bases=[base_manager_instance]) | ||
if with_unique_name: | ||
manager_info = helpers.add_new_class_for_module( | ||
module=api.modules[api.cur_mod_id], | ||
name=name, | ||
bases=[base_manager_instance], | ||
) | ||
else: | ||
manager_info = helpers.create_type_info(name, api.cur_mod_id, bases=[base_manager_instance]) | ||
|
||
manager_info.line = line | ||
manager_info.type_vars = base_manager_info.type_vars | ||
manager_info.defn.type_vars = base_manager_info.defn.type_vars | ||
manager_info.defn.line = line | ||
manager_info.metaclass_type = manager_info.calculate_metaclass_type() | ||
|
||
return manager_info | ||
|
||
|
||
def popuplate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeInfo) -> None: | ||
def populate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeInfo) -> None: | ||
""" | ||
Add methods from the QuerySet class to the manager. | ||
""" | ||
|
@@ -318,7 +354,7 @@ def popuplate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeI | |
helpers.add_new_sym_for_info( | ||
manager_info, | ||
name=name, | ||
sym_type=AnyType(TypeOfAny.special_form), | ||
sym_type=AnyType(TypeOfAny.implementation_artifact), | ||
) | ||
|
||
# For methods on BaseManager that return a queryset we need to update | ||
|
@@ -330,5 +366,103 @@ def popuplate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeI | |
helpers.add_new_sym_for_info( | ||
manager_info, | ||
name=method_name, | ||
sym_type=AnyType(TypeOfAny.special_form), | ||
sym_type=AnyType(TypeOfAny.implementation_artifact), | ||
) | ||
|
||
|
||
def create_new_manager_class_from_as_manager_method(ctx: DynamicClassDefContext) -> None: | ||
""" | ||
Insert a new manager class node for a | ||
|
||
``` | ||
<manager name> = <QuerySet>.as_manager() | ||
``` | ||
""" | ||
semanal_api = helpers.get_semanal_api(ctx) | ||
# Don't redeclare the manager class if we've already defined it. | ||
manager_node = semanal_api.lookup_current_scope(ctx.name) | ||
if manager_node and manager_node.type is not None: | ||
# This is just a deferral run where our work is already finished | ||
return | ||
|
||
manager_sym = semanal_api.lookup_fully_qualified_or_none(fullnames.MANAGER_CLASS_FULLNAME) | ||
assert manager_sym is not None | ||
manager_base = manager_sym.node | ||
if manager_base is None: | ||
if not semanal_api.final_iteration: | ||
semanal_api.defer() | ||
return | ||
|
||
assert isinstance(manager_base, TypeInfo) | ||
|
||
callee = ctx.call.callee | ||
assert isinstance(callee, MemberExpr) | ||
assert isinstance(callee.expr, RefExpr) | ||
|
||
queryset_info = callee.expr.node | ||
if queryset_info is None: | ||
if not semanal_api.final_iteration: | ||
semanal_api.defer() | ||
return | ||
|
||
assert isinstance(queryset_info, TypeInfo) | ||
|
||
manager_class_name = manager_base.name + "From" + queryset_info.name | ||
current_module = semanal_api.modules[semanal_api.cur_mod_id] | ||
existing_sym = current_module.names.get(manager_class_name) | ||
if ( | ||
existing_sym is not None | ||
and isinstance(existing_sym.node, TypeInfo) | ||
and existing_sym.node.has_base(fullnames.MANAGER_CLASS_FULLNAME) | ||
and existing_sym.node.metadata.get("django", {}).get("from_queryset_manager") == queryset_info.fullname | ||
): | ||
# Reuse an identical, already generated, manager | ||
new_manager_info = existing_sym.node | ||
else: | ||
# Create a new `TypeInfo` instance for the manager type | ||
try: | ||
new_manager_info = create_manager_class( | ||
api=semanal_api, | ||
base_manager_info=manager_base, | ||
name=manager_class_name, | ||
line=ctx.call.line, | ||
with_unique_name=True, | ||
) | ||
except helpers.IncompleteDefnException: | ||
if not semanal_api.final_iteration: | ||
semanal_api.defer() | ||
return | ||
|
||
populate_manager_from_queryset(new_manager_info, queryset_info) | ||
register_dynamically_created_manager( | ||
fullname=new_manager_info.fullname, | ||
manager_name=manager_class_name, | ||
manager_base=manager_base, | ||
) | ||
|
||
# So that the plugin will reparameterize the manager when it is constructed inside of a Model definition | ||
helpers.add_new_manager_base(semanal_api, new_manager_info.fullname) | ||
|
||
# Whenever `<QuerySet>.as_manager()` isn't called at class level, we want to ensure | ||
# that the variable is an instance of our generated manager. Instead of the return | ||
# value of `.as_manager()`. Though model argument is populated as `Any`. | ||
# `transformers.models.AddManagers` will populate a model's manager(s), when it | ||
# finds it on class level. | ||
var = Var(name=ctx.name, type=Instance(new_manager_info, [AnyType(TypeOfAny.from_omitted_generics)])) | ||
var.info = new_manager_info | ||
var._fullname = f"{current_module.fullname}.{ctx.name}" | ||
var.is_inferred = True | ||
# Note: Order of `add_symbol_table_node` calls matters. Depending on what level | ||
# we've found the `.as_manager()` call. Point here being that we want to replace the | ||
# `.as_manager` return value with our newly created manager. | ||
assert semanal_api.add_symbol_table_node( | ||
ctx.name, SymbolTableNode(semanal_api.current_symbol_kind(), var, plugin_generated=True) | ||
) | ||
flaeppe marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
# Add the new manager to the current module | ||
assert semanal_api.add_symbol_table_node( | ||
# We'll use `new_manager_info.name` instead of `manager_class_name` here | ||
# to handle possible name collisions, as it's unique. | ||
new_manager_info.name, | ||
# Note that the generated manager type is always inserted at module level | ||
SymbolTableNode(GDEF, new_manager_info, plugin_generated=True), | ||
flaeppe marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
) |
Uh oh!
There was an error while loading. Please reload this page.