Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler code #7561

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft

Scheduler code #7561

wants to merge 18 commits into from

Conversation

wouterdb
Copy link
Contributor

@wouterdb wouterdb commented Apr 24, 2024

Review

I would like to get general approval on what is here, before we decide how to proceed.

Description

Potential implementation for queue scheduler

  1. built a scheduler toolbox
  2. integrated with the agent
  3. added deploy, dryrun and getfact

Todo

  • add exception handling to the scheduler, now it stop working on exception
  • the version based cache is a mess now. I would propose to make it a timed cache that keep every version open for a fixed amount of time
  • resume after suspend has to be replaced with priorities in the queue
  • cleanup the full queue

Self Check:

Strike through any lines that are not applicable (~~line~~) then check the box

  • Attached issue to pull request
  • Changelog entry
  • Type annotations are present
  • Code is clear and sufficiently documented
  • No (preventable) type errors (check using make mypy or make mypy-diff)
  • Sufficient test cases (reproduces the bug/tests the requested feature)
  • Correct, in line with design
  • End user documentation is included or an issue is created for end-user documentation (add ref to issue here: )
  • If this PR fixes a race condition in the test suite, also push the fix to the relevant stable branche(s) (see test-fixes for more info)

@wouterdb wouterdb requested review from sanderr and arnaudsjs and removed request for arnaudsjs and sanderr May 7, 2024 10:59
"""Queue to schedule tasks with inter-dependencies and provide an overview of the queue"""

def __init__(self) -> None:
self.full_queue: PriorityQueue[typing.Tuple[int, int, BaseTask]] = PriorityQueue()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would improve readability if we would convert the tuple into a proper object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, I like the simplicity of it (i.e. the way it is compared is very clear, which is the most relevant property), I'll add a todo for later

src/inmanta/scheduler/scheduler.py Outdated Show resolved Hide resolved


class SentinelTask(BaseTask):
"""Task to unblock the queue on shutdown"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs more documentation.

src/inmanta/agent/agent.py Outdated Show resolved Hide resolved
Comment on lines 400 to 416
if new_request.is_periodic:
# periodic
if self.active_repair is not None and not self.active_repair.run.finished():
# active one
if self.active_repair.origin.is_full_deploy:
self.logger.info(
"Ignoring new run '%s' in favor of current '%s'", new_request.reason, self.active_repair.origin.reason
)
return
else:
self.logger.info("Upgrading run '%s' to '%s'", self.active_repair.origin.reason, new_request.reason)
self.active_repair.run.cancel()
self.active_repair = build_run(PRIO_PERIODIC)
else:
# Just do as you are told
if self.active_deploy is not None and not self.active_deploy.run.finished():
self.logger.info("Canceling run '%s' in favor of '%s'", self.active_repair.origin.reason, new_request.reason)
Copy link
Contributor

@arnaudsjs arnaudsjs May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part I don't understand. What about period deploys?

@@ -524,6 +476,9 @@ def __init__(self, process: "Agent", name: str, uri: str, *, ensure_deploy_on_st

# init
self._nq = ResourceScheduler(self, self._env_id, name)
self.work_queue = scheduler.TaskQueue()
self.work_queue_drainer = scheduler.TaskRunner(self.work_queue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have two different terms for the same thing (queue_drainer vs. TaskRunner)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

@@ -754,52 +713,49 @@ async def dryrun(self, dry_run_id: uuid.UUID, version: int) -> Apireturn:
return 200

async def do_run_dryrun(self, version: int, dry_run_id: uuid.UUID) -> None:
async with self.dryrunlock:
async with self.ratelimiter:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove the ratelimiter here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the queue does that now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know that we can run only one task at a time, because only a single logical thread drains the queue,...

Co-authored-by: arnaudsjs <2684622+arnaudsjs@users.noreply.github.com>
Copy link
Contributor

@sanderr sanderr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed scheduler.py. Have yet to review the rest.

# Link to the parent queue, required to jump into run queue
self._queue: typing.Optional["TaskQueue"] = None
# priority / insertion number / self
self._entry: typing.Optional[typing.Tuple[int, int, "BaseTask"]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._entry: typing.Optional[typing.Tuple[int, int, "BaseTask"]] = None
self._entry: typing.Optional[tuple[int, int, "BaseTask"]] = None

self.done = False

# Link to the parent queue, required to jump into run queue
self._queue: typing.Optional["TaskQueue"] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you recently asked about import practice opinions, I'll comment on this. I personally feel like qualifiers are clutter when they're obvious. Especially for the common types like Optional, Sequence, TypeVar, Callable, ... we use them so often and so wide-spread that I feel like they're almost native.

This remains very subjective of course.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For abc and typing.TYPE_CHECKING I'm on the fence, and for all other modules used in this file I like the qualified approach you used.

def is_done(self) -> bool:
return self.done

def cancel(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we inform provides as well in this case? Otherwise we potentially create infinite waiters, i.e. a memory leak?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to leave this as a todo for later stage I think.

I'm not entirely clear on what we expect from the cancel.

e.g.

  • canceling a deploy for a resource that has already reached its desired state is equal to deploy done, but if it was previously failed, it equals failure.
  • if we store the future of the execution in the task, we can even cancel pre-emptively.

It is not unlikely this entire dependency mechanism will change.

"""
Declare we wait for some other task

Adding the same task twice will make this one wait forever!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So will adding a cancelled task. Is that prohibited?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

Comment on lines 181 to 182
def __repr__(self) -> str:
return self._name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, repr is meant to be an unambiguous representation of this object. I think we should at least make it f"Task(self._name)", and preferably include the priority as well.

return task

def view(self) -> typing.Sequence[Task]:
"""This leaks a reference to the internal queue: don't touch it"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is too cryptic imo, what does "don't touch it" mean?

self.queue = queue
self.running = False
self.should_run = True
self.finished: typing.Optional[asyncio.Task[None]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finished is a strange name for a task object.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to me, it is the point in the Future where it is finished.

async def run(self) -> None:
self.running = True
while self.should_run:
task = await self.queue.get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this raise an IndexError when the queue is exhausted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, it waits (it is a blocking queue)


Remove and return an item from the queue.
If queue is empty, wait until an item is available.

self.should_run = True
self.finished = asyncio.create_task(self.run())

def stop(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stops the runner asyncronously. I think it could use a docstring on what exactly it achieves.

def long_string(self) -> str:
return "{} awaits {}".format(self.resource_id.resource_str(), " ".join([str(aw) for aw in self.dependencies]))
def priority(self) -> int:
return 100 # 90 # 110
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the comment mean?

self.resource_id = resource_id
self.group_id: uuid.UUID = group_id
self.reason: str = reason
name = f"Deploy {self.resource_id} as part of {self.group_id} because {self.reason}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more a description than a name, is it?

def __init__(self, scheduler: "ResourceScheduler", resource_id: Id, gid: uuid.UUID, reason: str) -> None:
super().__init__(scheduler, resource_id, gid, reason)
async def run(self) -> None:
raise Exception("This task should never be scheduler")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise Exception("This task should never be scheduler")
raise Exception("This task should never be scheduled")

Comment on lines +50 to +51
self.cancelled = False
# Indicates if a cancel is requested
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.cancelled = False
# Indicates if a cancel is requested
# Indicates if a cancel is requested
self.cancelled = False

Comment on lines +31 to +33
resource_container.Provider.set("agent1", "key1", "value1")
resource_container.Provider.set("agent1", "key1", "value1")
resource_container.Provider.set("agent1", "key1", "value1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three times the same line?

@@ -2137,6 +2137,7 @@ async def test_reload(
assert dep_state.index == resource_container.Provider.reloadcount("agent1", "key2")


@pytest.mark.skip("Skipped for agent refactor")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to be more descriptive in this message. Why do we need to skip this test?

Copy link
Contributor

@sanderr sanderr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On hold until further notice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants