Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to properly use a queue #76

Open
WebFreak001 opened this issue Mar 8, 2023 · 7 comments
Open

How to properly use a queue #76

WebFreak001 opened this issue Mar 8, 2023 · 7 comments

Comments

@WebFreak001
Copy link

I'm trying to understand the API and writing an example to process a list of jobs that are known ahead of time, but I've got a few questions:

// uncomment this line:
//version = Alternative;

struct Job
{
	int workData;

	// implementation requirement: needed for MPSCQueue to work
	shared Job* next;
}

void main(string[] args) @safe
{
	// number of spawned OS threads
	size_t numWorkers = 16;

	import concurrency.thread : stdTaskPool;
	auto taskPool = stdTaskPool(numWorkers);

	import concurrency.data.queue.mpsc : MPSCQueue;
	auto queue = new MPSCQueue!Job;

	Job[] jobs = new Job[10000];
	foreach (i, ref job; jobs)
	{
		job.workData = cast(int)i;
		queue.push(&jobs[i]);
	}

	import concurrency.operations : then, via, on;
	import concurrency.sender : just;

	import std.array : array;
	import std.range : repeat;

	shared int i = -1;
	shared int[] calledTimes = new int[numWorkers];

	auto workerTasks =
		just(queue) // binding (i, queue) into the callback
		.then(
			// why isn't the queue forced to be shared? I think it should be, no?
			(MPSCQueue!Job queue) shared @safe {
				import core.atomic : atomicOp;
				int thisTask = i.atomicOp!"+="(1);

				// on each thread, pop queue until empty and run performJob
				while (auto job = queue.pop())
				{
					performJob(thisTask, *job);
					calledTimes[thisTask].atomicOp!"+="(1);
				}
			})
		.on(taskPool.getScheduler()) // when run, run on new thread
		.repeat(numWorkers)
		.array;

	import concurrency.operations : whenAll;
	import concurrency.syncwait : syncWait;
	whenAll(workerTasks).syncWait();

	import std.stdio;

	writeln("distribution: ", (() @trusted => cast(int[])calledTimes)());
}

void performJob(int runnerId, ref Job job) @safe
{
	version (Alternative)
	{
		import core.thread;

		Thread.sleep(1.msecs);
	}
	else
	{
		import std.stdio;

		writeln(runnerId, ": running job #", job.workData);
	}
}

MPSCQueue.pop does not seem to be thread-safe? it doesn't have shared on it, but also the API neither forces me to make it shared, nor does it forbid this kind of usage. In this code, when you uncomment the version at the top, you can see that if you increase the jobs high enough / let it run long enough, it eventually dead-locks / races into a lock. (using writeln has some weird internal locks, causing a non-uniform load distribution, which doesn't seem to dead-lock)

Q: What type should I rather use for a queue that I can call pop on in parallel? (use-case: one thread generates jobs, lots of worker threads distribute work for each item)

Q2: Calling whenAll with the same task in multiple arguments running the task multiple times feels quite foreign to me, but from the unittests I saw this usage, so I adapted it here. But is this really intended behavior?

Q3: how to I properly pass the queue into my job processor? (I would expect something like just making the queue argument shared ref in the callback, assuming the queue would be a struct that can't be copied, but at least with the class version this doesn't compile)

overarching Q: should I even be writing my code like this in the first place? Is there a better way?

@WebFreak001 WebFreak001 changed the title How to properly How to properly use a queue Mar 8, 2023
@WebFreak001
Copy link
Author

WebFreak001 commented Mar 8, 2023

I saw streams existed as well, but this doesn't do what I expect:

struct Job
{
	int workData;
}

void main(string[] args) @safe
{
	// number of spawned OS threads
	size_t numWorkers = 16;

	import concurrency.thread : stdTaskPool;
	auto taskPool = stdTaskPool(numWorkers);

	Job[] jobs = new Job[8000];
	foreach (i, ref job; jobs)
		job.workData = cast(int)i;

	import concurrency.operations : then, via, on;
	import concurrency.stream : arrayStream;
	import concurrency.syncwait : syncWait;

	jobs
		.arrayStream
		.collect((Job job) shared @safe => performJob(0, job))
		.on(taskPool.getScheduler())
		.syncWait;

}

void performJob(int runnerId, ref Job job) @trusted
{
	import core.thread;

	Thread.sleep(1.msecs);
}

(it doesn't run them in parallel, since this takes 8000 msecs instead of 8000/16 msecs)

@skoppe
Copy link
Collaborator

skoppe commented Mar 9, 2023

There is no need to do manual queuing:

#!/usr/bin/env dub
/+ dub.sdl:
 name "jobs"
 dependency "concurrency" version="*"
 dflags "-dip1000"
 +/

import concurrency;
import std.range : iota;
import std.algorithm : map;

struct Job
{
  int workData;
}

void main(string[] args) @safe
{
  import concurrency.thread : stdTaskPool;
  import concurrency.operations : then, on, whenAll;
  import std.array : array;

  // number of spawned OS threads
  size_t numWorkers = 16;

  auto taskPool = stdTaskPool(numWorkers);
  scope scheduler = taskPool.getScheduler();

  iota(10000)
    .map!(i => just(Job(i)).then(&performJob).on(scheduler))
    .array
    .whenAll
    .syncWait;
}

void performJob(ref Job job) @trusted
{
  import core.thread;

  Thread.sleep(1.msecs);
}

@skoppe
Copy link
Collaborator

skoppe commented Mar 9, 2023

MPSCQueue.pop does not seem to be thread-safe? it doesn't have shared on it, but also the API neither forces me to make it shared, nor does it forbid this kind of usage. In this code, when you uncomment the version at the top, you can see that if you increase the jobs high enough / let it run long enough, it eventually dead-locks / races into a lock. (using writeln has some weird internal locks, causing a non-uniform load distribution, which doesn't seem to dead-lock)

Yes. just is breaking the law here. It should only accept non-aliased data. I have a branch working on a fix, but it needs to be backwards compatible to allow users to update (myself included).

Also, this is a multi-producer-single-consumer queue, and you were using it completely opposite (single-producer-multi-consumer). Yes, had just been implemented correctly, it would have screamed at you, forcing you to make it shared, at which point pop would stop working (its non shared).

In fact, the queue was designed to work correctly with shared, see the producer method, which returns a wrapper that is shared but can only push.

@skoppe
Copy link
Collaborator

skoppe commented Mar 9, 2023

it eventually dead-locks / races into a lock.

This is because you are pop-ing from multiple threads. Again, I need to fix just so that this becomes impossible.

@WebFreak001
Copy link
Author

Also, this is a multi-producer-single-consumer queue, and you were using it completely opposite (single-producer-multi-consumer). Yes, had just been implemented correctly, it would have screamed at you, forcing you to make it shared, at which point pop would stop working (its non shared).

ah that explains it, I was thinking what MPSCQueue would otherwise stand for and would have otherwise just casted away the shared, knowing that it will probably break because of this. (a ddoc comment would be nice here)

@WebFreak001
Copy link
Author

I have also very roughly benchmarked overhead memory & time of concurrency here for your example:

image

nice and constant overhead (so RAM and time growing linearly with number of jobs when queuing all at once)

now it's probably also quite interesting to see if it's possible to dynamically queue all the tasks (e.g. calling whenAll with a range instead of array) and to see if it makes it constant memory use instead of linear memory use

@skoppe
Copy link
Collaborator

skoppe commented Mar 10, 2023

now it's probably also quite interesting to see if it's possible to dynamically queue all the tasks (e.g. calling whenAll with a range instead of array) and to see if it makes it constant memory use instead of linear memory use

whenAll currently doesn't accept a range. Supporting those with lengths would be easy, but others are more difficult.

You can switch to using a Nursery, which is intended for a dynamic job sets. There are allocations involved though, likely more than with a static list.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants