Skip to content

Commit

Permalink
[resoto][fix] Remove type ignores for pypy 3.8 (#836)
Browse files Browse the repository at this point in the history
* [resoto][chore] Bump libs

* [resoto][feat] pypy 3.9 and bump minimal version

* [resoto][fix] Remove type ignores for pypy 3.8

* use latest version of resotoclient
  • Loading branch information
aquamatthias committed May 4, 2022
1 parent 60c92ce commit a4440ba
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 58 deletions.
2 changes: 1 addition & 1 deletion resotocore/requirements-test.txt
Expand Up @@ -5,4 +5,4 @@ pytest-runner==6.0.0
pytest-asyncio==0.18.3
deepdiff==5.8.0
hypothesis==6.46.2
resotoclient==0.1.8
resotoclient>=0.1.9
8 changes: 2 additions & 6 deletions resotocore/resotocore/cli/cli.py
Expand Up @@ -121,7 +121,7 @@ def __init__(
self.parts = {p.name: p for p in parts + [self] if not isinstance(p, InternalPart)}
self.alias_names = {a: n for a, n in alias_names.items() if n in self.parts and a not in self.parts}
self.reverse_alias_names: Dict[str, List[str]] = {
k: [e[0] for e in v] for k, v in group_by(lambda a: a[1], self.alias_names.items()).items() # type: ignore
k: [e[0] for e in v] for k, v in group_by(lambda a: a[1], self.alias_names.items()).items()
}
self.alias_templates = {a.name: a for a in sorted(alias_templates, key=attrgetter("name"))}

Expand Down Expand Up @@ -227,11 +227,7 @@ class CLI:
"""

def __init__(
self,
dependencies: CLIDependencies,
parts: List[CLICommand],
env: Dict[str, Any],
alias_names: Dict[str, str],
self, dependencies: CLIDependencies, parts: List[CLICommand], env: Dict[str, Any], alias_names: Dict[str, str]
):
dependencies.extend(cli=self)
alias_templates = [AliasTemplate.from_config(cmd) for cmd in dependencies.config.custom_commands.commands]
Expand Down
28 changes: 6 additions & 22 deletions resotocore/resotocore/db/graphdb.py
Expand Up @@ -17,12 +17,7 @@
from resotocore.analytics import CoreEvent, AnalyticsEventSender
from resotocore.db import arango_query, EstimatedSearchCost
from resotocore.db.arango_query import fulltext_delimiter
from resotocore.db.async_arangodb import (
AsyncArangoDB,
AsyncArangoTransactionDB,
AsyncArangoDBBase,
AsyncCursorContext,
)
from resotocore.db.async_arangodb import AsyncArangoDB, AsyncArangoTransactionDB, AsyncArangoDBBase, AsyncCursorContext
from resotocore.db.model import GraphUpdate, QueryModel
from resotocore.error import InvalidBatchUpdate, ConflictingChangeInProgress, NoSuchChangeError, OptimisticLockingFailed
from resotocore.model.adjust_node import AdjustNode
Expand Down Expand Up @@ -209,12 +204,7 @@ async def update_node_with(
# call adjuster on resulting node
ctime = value_in_path_get(node, NodePath.reported_ctime, utc_str())
adjusted = self.adjust_node(model, GraphAccess.dump_direct(node_id, updated, kind, recompute=True), ctime)
update = {
"_key": node["_key"],
"hash": adjusted["hash"],
"kinds": adjusted["kinds"],
"flat": adjusted["flat"],
}
update = {"_key": node["_key"], "hash": adjusted["hash"], "kinds": adjusted["kinds"], "flat": adjusted["flat"]}
# copy relevant sections into update node
for sec in [section] if section else Section.content_ordered:
if sec in adjusted:
Expand Down Expand Up @@ -473,9 +463,7 @@ async def move_temp_to_proper(self, change_id: str, temp_name: str) -> None:
]
+ edge_inserts
+ edge_deletes
+ [
f'remove {{_key: "{change_key}"}} in {self.in_progress}',
],
+ [f'remove {{_key: "{change_key}"}} in {self.in_progress}'],
)
)
await self.db.execute_transaction(
Expand Down Expand Up @@ -526,7 +514,7 @@ def adjust_node(self, model: Model, json: Json, created_at: Any) -> Json:
return self.node_adjuster.adjust(json)

def prepare_nodes(
self, access: GraphAccess, node_cursor: Iterable, model: Model # type: ignore # pypy
self, access: GraphAccess, node_cursor: Iterable[Json], model: Model
) -> Tuple[GraphUpdate, List[Json], List[Json], List[Json]]:
log.info(f"Prepare nodes for subgraph {access.root()}")
info = GraphUpdate()
Expand All @@ -538,11 +526,7 @@ def prepare_nodes(

def insert_node(node: Json) -> None:
elem = self.adjust_node(model, node, access.at_json)
js_doc: Json = {
"_key": elem["id"],
"created": access.at_json,
"updated": access.at_json,
}
js_doc: Json = {"_key": elem["id"], "created": access.at_json, "updated": access.at_json}
for prop in optional_properties:
value = node.get(prop, None)
if value:
Expand Down Expand Up @@ -577,7 +561,7 @@ def update_or_delete_node(node: Json) -> None:
return info, resource_inserts, resource_updates, resource_deletes

def prepare_edges(
self, access: GraphAccess, edge_cursor: Iterable, edge_type: str # type: ignore # pypy
self, access: GraphAccess, edge_cursor: Iterable[Json], edge_type: str
) -> Tuple[GraphUpdate, List[Json], List[Json]]:
log.info(f"Prepare edges of type {edge_type} for subgraph {access.root()}")
info = GraphUpdate()
Expand Down
8 changes: 4 additions & 4 deletions resotocore/resotocore/model/db_updater.py
Expand Up @@ -8,7 +8,7 @@
from datetime import timedelta
from multiprocessing import Process, Queue
from queue import Empty
from typing import Optional, Union, AsyncGenerator, Any, Generator, List, cast
from typing import Optional, Union, AsyncGenerator, Any, Generator, List

from aiostream import stream
from aiostream.core import Stream
Expand Down Expand Up @@ -190,7 +190,7 @@ async def send_to_child(pa: ProcessAction) -> bool:
await run_async(write.put, pa, True, stale)
return alive

def read_results() -> Task: # type: ignore # pypy
def read_results() -> Task[GraphUpdate]:
async def read_forever() -> GraphUpdate:
nonlocal deadline
nonlocal dead_adjusted
Expand All @@ -213,7 +213,7 @@ async def read_forever() -> GraphUpdate:

return asyncio.create_task(read_forever())

task: Optional[Task] = None # type: ignore # pypy
task: Optional[Task[GraphUpdate]] = None
result: Optional[GraphUpdate] = None
try:
reset_process_start_method() # other libraries might have tampered the value in the mean time
Expand All @@ -226,7 +226,7 @@ async def read_forever() -> GraphUpdate:
# in case the child is dead, we should stop
break
await send_to_child(MergeGraph(db.name, change_id, maybe_batch is not None))
result = cast(GraphUpdate, await task) # wait for final result
result = await task # wait for final result
return result
finally:
if task is not None and not task.done():
Expand Down
15 changes: 11 additions & 4 deletions resotocore/resotocore/query/model.py
Expand Up @@ -13,7 +13,7 @@
from resotocore.model.resolve_in_graph import GraphResolver
from resotocore.model.typed_model import to_js_str
from resotocore.types import Json, JsonElement
from resotocore.util import combine_optional, first
from resotocore.util import combine_optional

PathRoot = "/"

Expand Down Expand Up @@ -244,10 +244,18 @@ def find_term(self, fn: Callable[[Term], bool]) -> Optional[Term]:
elif isinstance(self, NotTerm):
return self.term.find_term(fn)
elif isinstance(self, MergeTerm):

def walk_merge_queries(mt: MergeTerm) -> Optional[Term]:
for mq in mt.merge:
for p in mq.query.parts:
if (term := p.term.find_term(fn)) is not None:
return term
return None

return (
self.pre_filter.find_term(fn)
or (self.post_filter.find_term(fn) if self.post_filter else None)
or first(lambda mq: first(lambda p: p.term.find_term(fn), mq.query.parts), self.merge)
or walk_merge_queries(self)
)
else:
return None
Expand Down Expand Up @@ -704,8 +712,7 @@ def __str__(self) -> str:

def change_variable(self, fn: Callable[[str], str]) -> Aggregate:
return Aggregate(
[a.change_variable(fn) for a in self.group_by],
[a.change_variable(fn) for a in self.group_func],
[a.change_variable(fn) for a in self.group_by], [a.change_variable(fn) for a in self.group_func]
)

def property_paths(self) -> Set[str]:
Expand Down
Expand Up @@ -17,7 +17,7 @@ def wait_and_start(
task_handler: TaskHandler,
message_bus: MessageBus,
wait_after_connect: timedelta = timedelta(seconds=10),
) -> Task: # type: ignore # pypy
) -> Task[None]:
"""
This function is used to trigger workflows automatically, when the related subscribing actor connects.
Such behaviour can be useful during startup, when we do not want to wait until the next scheduled time triggers.
Expand Down
4 changes: 2 additions & 2 deletions resotocore/resotocore/task/subscribers.py
Expand Up @@ -38,7 +38,7 @@ async def start(self) -> None:
async def stop(self) -> None:
await self.cleaner.stop()

async def all_subscribers(self) -> Iterable: # type: ignore # pypy
async def all_subscribers(self) -> Iterable[Subscriber]:
return self._subscribers_by_id.values()

async def get_subscriber(self, subscriber_id: str) -> Optional[Subscriber]:
Expand Down Expand Up @@ -95,7 +95,7 @@ def subscribers_by_event(self) -> Dict[str, List[Subscriber]]:
return self._subscribers_by_event

@staticmethod
def update_subscriber_by_event(subscribers: Iterable) -> Dict[str, List[Subscriber]]: # type: ignore # pypy
def update_subscriber_by_event(subscribers: Iterable[Subscriber]) -> Dict[str, List[Subscriber]]:
result: Dict[str, List[Subscriber]] = defaultdict(list)
for subscriber in subscribers:
for subscription in subscriber.subscriptions.values():
Expand Down
2 changes: 1 addition & 1 deletion resotocore/resotocore/task/task_description.py
Expand Up @@ -599,7 +599,7 @@ def __init__(
self.subscribers_by_event = subscribers_by_event
self.task_started_at = utc()
self.step_started_at = self.task_started_at
self.update_task: Optional[Task] = None # type: ignore # pypy
self.update_task: Optional[Task[None]] = None
self.descriptor_alive = True

steps = [StepState.from_step(step, self) for step in descriptor.steps]
Expand Down
6 changes: 3 additions & 3 deletions resotocore/resotocore/task/task_handler.py
Expand Up @@ -76,8 +76,8 @@ def __init__(
# Step1: define all workflows and jobs in code: later it will be persisted and read from database
self.task_descriptions: Sequence[TaskDescription] = [*self.known_workflows(config), *self.known_jobs()]
self.tasks: Dict[str, RunningTask] = {}
self.message_bus_watcher: Optional[Task] = None # type: ignore # pypy
self.initial_start_workflow_task: Optional[Task] = None # type: ignore # pypy
self.message_bus_watcher: Optional[Task[None]] = None
self.initial_start_workflow_task: Optional[Task[None]] = None
self.timeout_watcher = Periodic("task_timeout_watcher", self.check_overdue_tasks, timedelta(seconds=10))
self.registered_event_trigger: List[Tuple[EventTrigger, TaskDescription]] = []
self.registered_event_trigger_by_message_type: Dict[str, List[Tuple[EventTrigger, TaskDescription]]] = {}
Expand Down Expand Up @@ -437,7 +437,7 @@ async def execute_commands() -> None:
# if this was the last result the task was waiting for, delete the task
await self.store_running_task_state(wi, origin_message)

async def execute_in_order(task: Task) -> None: # type: ignore # pypy
async def execute_in_order(task: Task[Any]) -> None:
# make sure the last execution is finished, before the new execution starts
await task
await execute_commands()
Expand Down
14 changes: 7 additions & 7 deletions resotocore/resotocore/util.py
Expand Up @@ -92,7 +92,7 @@ def uuid_str(from_object: Optional[Any] = None) -> str:
return str(uuid.uuid1())


def group_by(f: Callable[[AnyT], AnyR], iterable: Iterable) -> Dict[AnyR, List[AnyT]]: # type: ignore # pypy
def group_by(f: Callable[[AnyT], AnyR], iterable: Iterable[AnyT]) -> Dict[AnyR, List[AnyT]]:
"""
Group iterable by key provided by given key function.
:param f: the function to be applied on every element that yields the key.
Expand All @@ -106,11 +106,11 @@ def group_by(f: Callable[[AnyT], AnyR], iterable: Iterable) -> Dict[AnyR, List[A
return v


def non_empty(el: Iterable) -> bool: # type: ignore # pypy
def non_empty(el: Iterable[Any]) -> bool:
return bool(el)


def empty(el: Iterable) -> bool: # type: ignore # pypy
def empty(el: Iterable[Any]) -> bool:
return not non_empty(el)


Expand All @@ -136,7 +136,7 @@ def interleave(elements: List[AnyT]) -> List[Tuple[AnyT, AnyT]]:

# we expect a callable that returns a truthy value.
# Due to limitations of lambda expressions we use Any here.
def exist(f: Callable[[Any], Any], iterable: Iterable) -> bool: # type: ignore # pypy
def exist(f: Callable[[Any], Any], iterable: Iterable[Any]) -> bool:
"""
Items are passed to the callable as long as it returns False.
Return True once the callable finds one True, otherwise return False.
Expand All @@ -152,7 +152,7 @@ def exist(f: Callable[[Any], Any], iterable: Iterable) -> bool: # type: ignore

# we expect a callable that returns a truthy value.
# Due to limitations of lambda expressions we use Any here.
def first(f: Callable[[Any], Any], iterable: Iterable) -> Optional[Any]: # type: ignore # pypy
def first(f: Callable[[Any], Any], iterable: Iterable[AnyT]) -> Optional[AnyT]:
for a in iterable:
if f(a):
return a
Expand Down Expand Up @@ -262,7 +262,7 @@ async def with_first(elem: AnyT) -> AsyncGenerator[AnyT, None]:
return gen


def set_future_result(future: Future, result: Any) -> None: # type: ignore # pypy
def set_future_result(future: Future[Any], result: Any) -> None:
if not future.done():
if isinstance(result, Exception):
future.set_exception(result)
Expand Down Expand Up @@ -302,7 +302,7 @@ def __init__(self, name: str, func: Callable[[], Any], frequency: timedelta, fir
self.func = func
self.frequency = frequency
self.first_run = first_run if first_run else frequency
self._task: Optional[Task] = None # type: ignore # pypy
self._task: Optional[Task[None]] = None

@property
def started(self) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion resotocore/resotocore/web/api.py
Expand Up @@ -138,7 +138,7 @@ def __init__(
self.app.on_response_prepare.append(on_response_prepare)
self.session: Optional[ClientSession] = None
self.in_shutdown = False
self.websocket_handler: Dict[str, Tuple[Future, web.WebSocketResponse]] = {} # type: ignore # pypy
self.websocket_handler: Dict[str, Tuple[Future[Any], web.WebSocketResponse]] = {}
path_part = config.api.web_path.strip().strip("/").strip()
web_path = "" if path_part == "" else f"/{path_part}"
self.__add_routes(web_path)
Expand Down
2 changes: 1 addition & 1 deletion resotocore/resotocore/worker_task_queue.py
Expand Up @@ -28,7 +28,7 @@ class WorkerTask:
name: str # the well known name of the task to perform: the worker attaches to this name
attrs: Dict[str, str] # all worker attributes need to match those attrs (but the task can define more)
data: Json
callback: Future # type: ignore # pypy # the callers callback. Notify the caller once the task is dones
callback: Future[Json] # the callers callback. Notify the caller once the task is dones
timeout: timedelta # timeout of this task to be performed

def __eq__(self, other: Any) -> bool:
Expand Down
9 changes: 4 additions & 5 deletions resotocore/tests/resotocore/worker_task_queue_test.py
Expand Up @@ -7,7 +7,7 @@

from resotocore.model.graph_access import Section
from resotocore.model.resolve_in_graph import GraphResolver, NodePath
from resotocore.util import group_by, identity, value_in_path
from resotocore.util import group_by, value_in_path
from resotocore.worker_task_queue import WorkerTaskDescription, WorkerTaskQueue, WorkerTask, WorkerTaskName


Expand Down Expand Up @@ -104,16 +104,15 @@ async def test_handle_work_successfully(
assert results == [{"result": "done!"} for _ in range(0, 20)]

# make sure the work is split equally between all workers: 20 work items by 4 workers: 5 work items each
by_worker = group_by(identity, (item for sublist in performed_by.values() for item in sublist))
by_worker = group_by(lambda x: x, (item for sublist in performed_by.values() for item in sublist))
assert len(by_worker) == 4
for work_done in by_worker.values():
assert len(work_done) == 5


@mark.asyncio
async def test_handle_failure(
task_queue: WorkerTaskQueue,
worker: Tuple[WorkerTaskDescription, WorkerTaskDescription, WorkerTaskDescription],
task_queue: WorkerTaskQueue, worker: Tuple[WorkerTaskDescription, WorkerTaskDescription, WorkerTaskDescription]
) -> None:
_, fail_task, _ = worker

Expand Down Expand Up @@ -155,7 +154,7 @@ async def test_handle_outdated(
assert isinstance(r, Exception)

# 20 work items by 4 workers: 5 work items each + retried 3 times (15) => 20
by_worker = group_by(identity, (item for sublist in performed_by.values() for item in sublist))
by_worker = group_by(lambda x: x, (item for sublist in performed_by.values() for item in sublist))
assert len(by_worker) == 4
for work_done in by_worker.values():
assert len(work_done) == 20
Expand Down

0 comments on commit a4440ba

Please sign in to comment.