From e020fefc5020efcb4de64547d152634bb55d89cf Mon Sep 17 00:00:00 2001 From: Waket Zheng Date: Sat, 11 May 2024 22:40:58 +0800 Subject: [PATCH 1/2] refactor: optimize query utils --- tortoise/query_utils.py | 69 +++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 37 deletions(-) diff --git a/tortoise/query_utils.py b/tortoise/query_utils.py index b0f8873b9..f83f2665b 100644 --- a/tortoise/query_utils.py +++ b/tortoise/query_utils.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from copy import copy -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from pypika import Table from pypika.terms import Criterion @@ -15,9 +17,12 @@ from tortoise.queryset import QuerySet +TableCriterionTuple = Tuple[Table, Criterion] + + def _get_joins_for_related_field( table: Table, related_field: RelationalField, related_field_name: str -) -> List[Tuple[Table, Criterion]]: +) -> List[TableCriterionTuple]: required_joins = [] related_table: Table = related_field.related_model._meta.basetable @@ -70,7 +75,7 @@ def _get_joins_for_related_field( return required_joins -class EmptyCriterion(Criterion): # type: ignore +class EmptyCriterion(Criterion): # type:ignore[misc] def __or__(self, other: Criterion) -> Criterion: return other @@ -101,54 +106,44 @@ class QueryModifier: def __init__( self, where_criterion: Optional[Criterion] = None, - joins: Optional[List[Tuple[Table, Criterion]]] = None, + joins: Optional[List[TableCriterionTuple]] = None, having_criterion: Optional[Criterion] = None, ) -> None: self.where_criterion: Criterion = where_criterion or EmptyCriterion() - self.joins = joins if joins else [] + self.joins = joins or [] self.having_criterion: Criterion = having_criterion or EmptyCriterion() - def __and__(self, other: "QueryModifier") -> "QueryModifier": - return QueryModifier( + def __and__(self, other: QueryModifier) -> QueryModifier: + return self.__class__( where_criterion=_and(self.where_criterion, other.where_criterion), joins=self.joins + other.joins, having_criterion=_and(self.having_criterion, other.having_criterion), ) - def __or__(self, other: "QueryModifier") -> "QueryModifier": - if self.having_criterion or other.having_criterion: - # TODO: This could be optimized? - result_having_criterion = _or( - _and(self.where_criterion, self.having_criterion), - _and(other.where_criterion, other.having_criterion), - ) - return QueryModifier( - joins=self.joins + other.joins, having_criterion=result_having_criterion - ) + def _and_criterion(self) -> Criterion: + return _and(self.where_criterion, self.having_criterion) - if self.where_criterion and other.where_criterion: - return QueryModifier( - where_criterion=self.where_criterion | other.where_criterion, - joins=self.joins + other.joins, + def __or__(self, other: QueryModifier) -> QueryModifier: + kw: Dict[str, Criterion] = {} + if self.having_criterion or other.having_criterion: + kw["having_criterion"] = _or(self._and_criterion(), other._and_criterion()) + else: + kw["where_criterion"] = ( + (self.where_criterion | other.where_criterion) + if self.where_criterion and other.where_criterion + else (self.where_criterion or other.where_criterion) ) + return self.__class__(joins=self.joins + other.joins, **kw) - return QueryModifier( - where_criterion=self.where_criterion or other.where_criterion, - joins=self.joins + other.joins, - ) - - def __invert__(self) -> "QueryModifier": - if not self.where_criterion and not self.having_criterion: - return QueryModifier(joins=self.joins) + def __invert__(self) -> QueryModifier: + kw: Dict[str, Criterion] = {} if self.having_criterion: - # TODO: This could be optimized? - return QueryModifier( - joins=self.joins, - having_criterion=_and(self.where_criterion, self.having_criterion).negate(), - ) - return QueryModifier(where_criterion=self.where_criterion.negate(), joins=self.joins) + kw["having_criterion"] = (self.where_criterion & self.having_criterion).negate() + elif self.where_criterion: + kw["where_criterion"] = self.where_criterion.negate() + return self.__class__(joins=self.joins, **kw) - def get_query_modifiers(self) -> Tuple[Criterion, List[Tuple[Table, Criterion]], Criterion]: + def get_query_modifiers(self) -> Tuple[Criterion, List[TableCriterionTuple], Criterion]: """ Returns a tuple of the query criterion. """ @@ -188,7 +183,7 @@ def resolve_for_queryset(self, queryset: "QuerySet") -> None: ) if forwarded_prefetch: - if first_level_field not in queryset._prefetch_map.keys(): + if first_level_field not in queryset._prefetch_map: queryset._prefetch_map[first_level_field] = set() queryset._prefetch_map[first_level_field].add( Prefetch(forwarded_prefetch, self.queryset, to_attr=self.to_attr) From 49cf8f1828c0ca8787ab2e8d61e924e9ea62d516 Mon Sep 17 00:00:00 2001 From: Waket Zheng Date: Sun, 12 May 2024 00:25:58 +0800 Subject: [PATCH 2/2] Use position argument instead of kwargs --- tortoise/query_utils.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tortoise/query_utils.py b/tortoise/query_utils.py index f83f2665b..68d4084bd 100644 --- a/tortoise/query_utils.py +++ b/tortoise/query_utils.py @@ -1,7 +1,7 @@ from __future__ import annotations from copy import copy -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple from pypika import Table from pypika.terms import Criterion @@ -124,24 +124,24 @@ def _and_criterion(self) -> Criterion: return _and(self.where_criterion, self.having_criterion) def __or__(self, other: QueryModifier) -> QueryModifier: - kw: Dict[str, Criterion] = {} + where_criterion = having_criterion = None if self.having_criterion or other.having_criterion: - kw["having_criterion"] = _or(self._and_criterion(), other._and_criterion()) + having_criterion = _or(self._and_criterion(), other._and_criterion()) else: - kw["where_criterion"] = ( + where_criterion = ( (self.where_criterion | other.where_criterion) if self.where_criterion and other.where_criterion else (self.where_criterion or other.where_criterion) ) - return self.__class__(joins=self.joins + other.joins, **kw) + return self.__class__(where_criterion, self.joins + other.joins, having_criterion) def __invert__(self) -> QueryModifier: - kw: Dict[str, Criterion] = {} + where_criterion = having_criterion = None if self.having_criterion: - kw["having_criterion"] = (self.where_criterion & self.having_criterion).negate() + having_criterion = (self.where_criterion & self.having_criterion).negate() elif self.where_criterion: - kw["where_criterion"] = self.where_criterion.negate() - return self.__class__(joins=self.joins, **kw) + where_criterion = self.where_criterion.negate() + return self.__class__(where_criterion, self.joins, having_criterion) def get_query_modifiers(self) -> Tuple[Criterion, List[TableCriterionTuple], Criterion]: """