Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nathan/scheduler #29

Closed
wants to merge 3 commits into from
Closed

Nathan/scheduler #29

wants to merge 3 commits into from

Conversation

dijkstracula
Copy link
Collaborator

@dijkstracula dijkstracula commented Mar 22, 2021

Here's the Prioritizer. As I mentioned on Slack the interface diverged slightly but I think modifying your Tethys code to use it will be a straightofrward refactoring.

Though there are unit tests I forgot to write some example code; I'll push one up, but wanted to get the code up ASAP.

prioritizer tests

prioritizer tests
@dijkstracula dijkstracula marked this pull request as draft March 22, 2021 15:52
@stellentus
Copy link
Owner

stellentus commented Mar 23, 2021

Running in Parallel

  • I think this doesn't support running many tasks in parallel? It relies upon p.runOne() being a blocking call, since it's timing it, and it calls p.runOne() serially in a loop, so there's no way for a user to make it parallel.
  • I don't think we will want a p.sleepinterval. It assumes a pause between requests, but we want multiple in-flight requests. So I think more of a paradigm of loading tasks into a work queue, and adjusting how many tasks we throw into that queue (based on load) delaying between individual tasks.
  • I think the easiest way to make this run in parallel would be to create a channel of tasks, buffered to size N. There are M worker goroutines (equal to or greater than the number in Pooled) which act on that channel. The goroutine in Start fills the channel. First it puts all of the priority0 tasks into the channel. (If there are more than N tasks, this will block once the channel buffer is full.) Then each time the channel has space to add a new task, we check if it's time to add the priority 0 tasks again (i.e. ~1s has elapsed since we last added them); otherwise add priority 1+. This idea needs a bit of work because it doesn't precisely time the priority-0 tasks every 1Hz, but the general idea is that I think a channel of tasks would allow multiple in flight at once. A different solution is fine with me, as long as it supports tasks in parallel.

API

  • Pooled makes use of a private task and tasked to manage ReadWriter requests. It looks like Prioritizer is on its way to following a similar model. Right now it relies on the same private task, which makes sense, but the current Enqueue function should be private as well. If it then gets a public function that allows the reading of a tag, then I think it's going to work.
  • Simply creating a public Task would make for a generic prioritizer, but then it doesn't really fit in this project. I think for this project we should go ahead and make it specific to reading tags. (I mean, that code has to be implemented somewhere; it may as well be here.) It's easiest if we just reuse the value interface{} from the call to ReadTag; then I think there's no need to reflect, though I'm not sure if that will cause GC issues. (Then it might need to reflect like Refresher to create a new instance of the same type.)
  • I think it would be ok to require that Enqueue is only called when the Prioritizer is not running. Then no mutex would be needed.

Minor

  • NewPrioritizer uses p.runqueues without initializing it?
  • I think defer func() {p.mtx.Unlock() }() can simply be defer p.mtx.Unlock()
  • Start could directly use p.running instead of a local copy, couldn't it? (Though the idiomatic go way would be to use a cancel channel instead of a bool&mutex.)
  • popBack is documented as pushFront

@dijkstracula
Copy link
Collaborator Author

I think this doesn't support running many tasks in parallel? It relies upon p.runOne() being a blocking call, since it's timing it, and it calls p.runOne() serially in a loop, so there's no way for a user to make it parallel.

The intention here was that we'd use the Pooled pool to designate concurrency, but you're right: since Pooled blocks this too will block. Argh. (Prior to the weekend the runqueue was indeed a channel of tasks, but I changed it to avoid having a bounded buffer there. Details to follow.)

I don't think we will want a p.sleepinterval. It assumes a pause between requests, but we want multiple in-flight requests

Of course, agreed we want multiple in-flight requests. The intention here was to avoid thrashing by hammering downstream ReaderWriters, but, I have no evidence to suggest that this is an actual problem, so, happy to remove it until wee see otherwise.

I think the easiest way to make this run in parallel would be to create a channel of tasks, buffered to size N.

Yep, this was what I had as of Friday: the problems as I saw it were: what should N be? The job of coordinating parallelism falls to Pooled, not this, so the goal was to avoid making that decision in the Prioritizer. It would be awesome if we can at runtime vary N, since that's exactly the number of in-flight requests we want to scale up and down, but that appears to be set in stone once the channel's created. Maybe the right solution here is to just make the depth of the channel "large enough"? I hate that though :)

API (summarizing all your points)

I disagree that the Proritizer should be overfit to deal specifically with tags. Here's why: we nee to carry specific state along otherwise, as you do with the Info structure, in order to thread the reflection through correctly. (At least that's my understanding as to why it's there.) By passing a function to execute later, any state that is needed in order to perform the operation is already there: it's closed over by the function. That way, I don't think we need anything like the Info structure threaded through: from the programmer's perspective we can write the same "for this tag, execute the following operation with the following priority: read the tag from the downstream Reader, and put it in the cache, both of which are in scope in this anonymous function".)

That said: if I'm wrong on all this, then I'm very happy to make "enqueue" private and have a tag-specific interface.

@dijkstracula
Copy link
Collaborator Author

Minor

NewPrioritizer uses p.runqueues without initializing it?

what the heck, that must have been a last-minute mess-up on my part, else, how would this work at all...?

I think defer func() {p.mtx.Unlock() }() can simply be defer p.mtx.Unlock()

👍

Start could directly use p.running instead of a local copy, couldn't it?

I need to hold the mutex to read it out.

Though the idiomatic go way would be to use a cancel channel instead of a bool&mutex.

Ah, this is worth looking into.

popBack is documented as pushFront

Whoops!

@dijkstracula
Copy link
Collaborator Author

Moved to tethys-plc-bridge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants