Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Immediately scheduling task on a different thread #1354

Closed
yonik opened this issue Apr 20, 2024 · 9 comments
Closed

Immediately scheduling task on a different thread #1354

yonik opened this issue Apr 20, 2024 · 9 comments

Comments

@yonik
Copy link

yonik commented Apr 20, 2024

I need to launch tasks from a thread that won't be participating in executing them (it's participating in a different event loop and will be blocking).  It seems to take on the order of ~50usec for TBB to run the task on a different thread.  Is there a way to get this work scheduled faster?

Below is some test code and results.

Results from calling task_group.run() from a non-arena thread:

[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:604] Time until first submit=30
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=56497
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=38618
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=40086
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=41376
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=44238
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=60127
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=60355
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=64096
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=63806
[2024-04-20 12:10:59.628] [info] [TBBTest.cpp:609] latency=64776

Results from calling task_group.run() from within a task:

[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:605] Time until first submit=39873
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=5609
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=990
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=870
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=780
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=690
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=600
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=520
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=420
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=360
[2024-04-20 12:04:18.297] [info] [TBBTest.cpp:610] latency=290

The test code:

// try_put to a multifunction_node in a graph takes about ~50us for the first task to be scheduled
// on a different thread.  Same for task group.
TEST_F(TBBTest, testLatency) {


  // test the time it takes for another thread to work-steal a task from a blocked thread.
  class Msg {
  public:
    Msg() = default;
    Msg(const Msg&) = delete;
    Msg& operator=(const Msg&) = delete;

    std::chrono::time_point<std::chrono::high_resolution_clock> submitTime;
    std::chrono::time_point<std::chrono::high_resolution_clock> processTime;

    void start() {
      submitTime = std::chrono::high_resolution_clock::now();
    }

    void submitted() {
      processTime = std::chrono::high_resolution_clock::now();
    }
  };

  int n = 10;
  std::vector<Msg> messages(n);

  oneapi::tbb::task_group tg;

  // Lets try warming up the arena / task group.  Nope... didn't help.
  int res=0;
  for (int i=0; i<10; i++) {
    tg.run([&]{res++;});
  }
  std::this_thread::sleep_for(std::chrono::milliseconds(1));  // give chance for warmup tasks to run


  auto start = std::chrono::high_resolution_clock::now();


  for (auto& msg : messages) {
    auto* mptr = &msg;
    mptr->start();
    tg.run([mptr]{
      mptr->submitted();
    });
  }

  /*
 // submit tasks from within the task group.  Doesn't solve the problem of latency of *this* task though.
  tg.run([&]{
    for (auto& msg : messages) {
      auto* mptr = &msg;
      mptr->start();
      tg.run([mptr]{
        mptr->submitted();
      });
    }
  });
   */

  // sleep for a bit to give time for tasks to execute *not* in this thread.
  // comment this out to see the difference of scheduling on *this* thread.
  std::this_thread::sleep_for(std::chrono::milliseconds(1000));

  tg.wait();


  // print out the time between our start and when the first task was submitted
  auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(messages[0].submitTime - start).count();
  LOG_INFO("Time until first submit={}", elapsed);

  for (auto &msg : messages) {
    auto submitTime = std::chrono::time_point_cast<std::chrono::nanoseconds>(msg.submitTime).time_since_epoch().count();
    auto processTime = std::chrono::time_point_cast<std::chrono::nanoseconds>(msg.processTime).time_since_epoch().count();
    LOG_INFO("latency={}", processTime - submitTime);
  }
}

@pavelkumbrasev
Copy link
Contributor

Hi @yonik, I'm not sure I 100% understood your example but as far I can see you want to run a subset of tasks from the thread that is not going to participate in this work.
Serial submit can actually take significant amount of time to process since single thread will submit all the tasks.
Our parallel algorithms use recursive decomposition pattern where threads will submit no more than log2(N) tasks per tree branch.
So the quickest way to submit a bunch a parallel tasks is to use something like tg.run([&] { tbb::parallel_for(); })
If there is no point where thread will call tg.wait() then task_group::enqueue should be used.

@yonik
Copy link
Author

yonik commented Apr 22, 2024

Hi @yonik, I'm not sure I 100% understood your example but as far I can see you want to run a subset of tasks from the thread that is not going to participate in this work.

Yeah, I think it really comes down to the time for default work-stealing to occur. The fact that I launched multiple tasks in my example code isn't actually very relevant. I'm looking for a way to tell TBB "I just created a task, but this thread won't be executing it, so please move/steal this task ASAP". It's almost like work-requesting rather than work-stealing.

If there is no point where thread will call tg.wait() then task_group::enqueue should be used.

I already tried a bunch of variants I didn't include here. flow graphs, task groups, and arenas all showed about the same amount of latency for work to be stolen and started on a different thread.

@pavelkumbrasev
Copy link
Contributor

I believe the latency in this scenario will be directly depended on the fact: is internal arena already populated with threads or not.
If the async submits to the arena is quite frequent it should keep all the threads inside arena thus latency should not be too big.
If the gaps between submit are less than 1ms you can try this patch with increased block time #1352.
I hope with threads in place and task_arena::enqueue the avg latency should be around 5-10us not 50.

@yonik
Copy link
Author

yonik commented Apr 24, 2024

hope with threads in place and task_arena::enqueue the avg latency should be around 5-10us not 50.

Yes, that looks to be the case.
I changed the warmup code in the test code above to this:

   // create enough tasks that multiple other threads will be spun-up.
  int res=0;
  for (int i=0; i<100000; i++) {
    tg.run([&]{res++;});
  }
  tg.wait();

Results of task_group.run (I also started recording what thread did the processing):

Time until first submit=880 this thread is 140097750809856
latency=2440 threadId=140096200492608
latency=3615 threadId=140096183412288
latency=3400 threadId=140097667135040
latency=2480 threadId=140096018372160
latency=3665 threadId=140097651365440
latency=3840 threadId=140096200492608
latency=3740 threadId=140096018372160
latency=3610 threadId=140097667135040
latency=4620 threadId=140096192095808
latency=4140 threadId=140097667135040

Results using arena.enqueue: (it does look faster than task_group now)

Time until first submit=910 this thread is 140192042380544
latency=9270 threadId=140190294070848
latency=2530 threadId=140190410532416
latency=3120 threadId=140191946663488
latency=4147 threadId=140190470891072
latency=2760 threadId=140190475146816
latency=2020 threadId=140190311130688
latency=2480 threadId=140190338565696
latency=1410 threadId=140191982396992
latency=2230 threadId=140190442260032
latency=1700 threadId=140190208636480

From an implementation POV, what makes arena.enqueue() that much faster than task_group.run()?
Thanks for helping me understand how this all works!

@pavelkumbrasev
Copy link
Contributor

task_group::run will place task into thread's local queue so other threads can get it only though stealing (which is still efficient).
And the task_arena::enqueue will place a task into shared queue (which is still scalable) so other threads will find it faster.
So it will help to reduce latency in your case.

@pavelkumbrasev
Copy link
Contributor

Is there anything else I can help you with or we can close this issue?

@yonik
Copy link
Author

yonik commented Apr 25, 2024

Is there anything else I can help you with or we can close this issue?

I've been experimenting a little more and noticed some interesting things about task_groups. Once tasks in a task_group exist, they are very quick to start running (presumably because of the local queue you mentioned). But some combination of the task_group creation and the first task submit is slower (not horrible, but around 10us).

So the question is: assuming one has a root task launched via task_arena.enqueue() and that task runs multiple tasks via a task_group, is there a best practice or way to lower the task_group creation time? Assuming the same arena will always be used, would caching task_group objects speed things up?
EDIT: see last message! seems like it's just the first time a task_group is used on a thread.

I'll attach some test code once I clean it up.

@yonik
Copy link
Author

yonik commented Apr 25, 2024

Test code:

#include <iostream>
#include <oneapi/tbb/task_arena.h>
#include <oneapi/tbb/task_group.h>



tbb::task_arena arena;
int method = 0;  // 0=arena enqueue, 1=new task group

class Msg {
public:
  std::vector<Msg> children;
  std::chrono::time_point<std::chrono::high_resolution_clock> preSubmitTime;  // time when we want to submit a new task
  std::chrono::time_point<std::chrono::high_resolution_clock> postSubmitTime; // time right after we submitted a task
  std::chrono::time_point<std::chrono::high_resolution_clock> processTime;    // time when process was finally called
  std::thread::id threadId;

  void startTimer() {
    preSubmitTime = std::chrono::high_resolution_clock::now();
  }

  void submitted() {
    postSubmitTime = std::chrono::high_resolution_clock::now();
  }

  void process() {
    processTime = std::chrono::high_resolution_clock::now();
    threadId = std::this_thread::get_id();
    switch(method) {
      case 0:
        submitChildrenArena();
        break;
      case 1:
        submitChildrenNewTaskGroup();
        break;
      default:
        std::cerr << "Invalid method " << method << std::endl;
        exit(1);
    }

  }

  void submitChildrenArena() {
    // record the time before task_group creation so its cost is included.
    for (auto& child: children) {
      child.startTimer();
    }

    for (auto& child : children) {
      arena.enqueue([&child] {
        child.process();
      });
      child.submitted();
    }
  }

  void submitChildrenNewTaskGroup() {
    // record the time before task_group creation so its cost is included.
    for (auto& child: children) {
      child.startTimer();
    }
    {
      tbb::task_group tg;
      for (auto& child: children) {
        tg.run([&child] {
          child.process();
        });
        child.submitted();
      }
      tg.wait();
    }
  }

  // recursively fill out subtasks
  void addSubTasks(int numSubTasks, int depthLeft) {
    if (depthLeft == 0 || numSubTasks == 0) {
      return;
    }
    children.resize(numSubTasks);
    for (auto& child : children) {
      child.addSubTasks(numSubTasks, depthLeft-1);
    }
  }

  void print(int level=0) {
    for (int i=0; i<level; i++) {
      std::cout << "  ";
    }
    auto timeToSubmit = std::chrono::duration_cast<std::chrono::nanoseconds>(postSubmitTime - preSubmitTime).count();
    auto timeToSchedule = std::chrono::duration_cast<std::chrono::nanoseconds>(processTime - postSubmitTime).count();
    std::cout << "Thread " << threadId << " timeToSubmit=" << timeToSubmit << " timeToSchedule=" << timeToSchedule << " total=" << timeToSubmit+timeToSchedule << std::endl;
    for (auto& child : children) {
      child.print(level+1);
    }
  }

};

int main(int argc, char** argv) {
  int numRootTasks = 4;
  int numChildTasks = 4;  // per root task
  int depth = 2;

  if (argc <= 1) {
    std::cout << "Usage: " << argv[0] << " numRootTasks numChildTasks depth childSubmitMethod" << std::endl;
    std::cout << "\t\tchildSubmitMethod: 0=arena enqueue, 1=new task group" << std::endl;
    return 1;
  }
  if (argc > 1) {
    numRootTasks = std::stoi(argv[1]);
  }
  if (argc > 2) {
    numChildTasks = std::stoi(argv[2]);
  }
  if (argc > 3) {
    depth = std::stoi(argv[3]);
  }
  if (argc > 4) {
    method = std::stoi(argv[4]);
  }

  std::vector<Msg> rootTasks(numRootTasks);
  for (auto& rootTask : rootTasks) {
    rootTask.addSubTasks(numChildTasks, depth - 1);
  }

  // Try to get all threads to spin-up before we continue.  This seems to work better than the commented out code below.
  std::barrier start_barrier(arena.max_concurrency());  // not +1, I think, because one slot is reserved for a master thread.
  for (int i=0; i<arena.max_concurrency(); i++) {
    arena.enqueue([&]{start_barrier.arrive_and_wait();});
  }
  start_barrier.arrive_and_wait();

  /*
  // create enough tasks that multiple other threads will be spun-up.
  int res=0;
  arena.execute([&]{
    oneapi::tbb::task_group tg;
    for (int i=0; i<100000; i++) {
      tg.run([&]{res++;});
    }
    tg.wait();
  });
*/

  // Independent root tasks submitted to the arena.
  for (auto& rootTask : rootTasks) {
    rootTask.startTimer();
    arena.enqueue([&rootTask] {
      rootTask.process();
    });
    rootTask.submitted();
  }

  // Sleep for a bit to allow the tasks to finish.
  std::this_thread::sleep_for(std::chrono::seconds(1));

  for (auto& rootTask : rootTasks) {
    rootTask.print();
  }


  return 0;
}

Timings for using arena.enqueue for sub-tasks:

~/tbb_sched/cmake-build-release$ ./tbb_sched 4 4 2 0
Thread 140207543887424 timeToSubmit=7730 timeToSchedule=-1525 total=6205
  Thread 140207164720704 timeToSubmit=4180 timeToSchedule=898 total=5078
  Thread 140207164720704 timeToSubmit=5800 timeToSchedule=618 total=6418
  Thread 140207539689024 timeToSubmit=6500 timeToSchedule=490 total=6990
  Thread 140207273748032 timeToSubmit=7220 timeToSchedule=950 total=8170
Thread 140207160522304 timeToSubmit=1490 timeToSchedule=1275 total=2765
  Thread 140207294740032 timeToSubmit=1580 timeToSchedule=965 total=2545
  Thread 140207277946432 timeToSubmit=2560 timeToSchedule=1410 total=3970
  Thread 140207315732032 timeToSubmit=3370 timeToSchedule=910 total=4280
  Thread 140207290541632 timeToSubmit=4820 timeToSchedule=920 total=5740
Thread 140207177315904 timeToSubmit=2070 timeToSchedule=1055 total=3125
  Thread 140207539689024 timeToSubmit=1760 timeToSchedule=710 total=2470
  Thread 140207298938432 timeToSubmit=2500 timeToSchedule=1280 total=3780
  Thread 140207164720704 timeToSubmit=3080 timeToSchedule=868 total=3948
  Thread 140207194109504 timeToSubmit=3860 timeToSchedule=640 total=4500
Thread 140207324128832 timeToSubmit=360 timeToSchedule=1105 total=1465
  Thread 140207535490624 timeToSubmit=2110 timeToSchedule=1250 total=3360
  Thread 140207311533632 timeToSubmit=3520 timeToSchedule=510 total=4030
  Thread 140207315732032 timeToSubmit=4100 timeToSchedule=350 total=4450
  Thread 140207273748032 timeToSubmit=4370 timeToSchedule=510 total=4880

Timing for using task_group.run() for sub-tasks:

~/tbb_sched/cmake-build-release$ ./tbb_sched 4 4 2 1
Thread 139911623673408 timeToSubmit=930 timeToSchedule=1912 total=2842
  Thread 139910962988608 timeToSubmit=12420 timeToSchedule=1720 total=14140
  Thread 139911585887808 timeToSubmit=12750 timeToSchedule=1779 total=14529
  Thread 139911623673408 timeToSubmit=12860 timeToSchedule=1210 total=14070
  Thread 139911623673408 timeToSubmit=12920 timeToSchedule=630 total=13550
Thread 139911598483008 timeToSubmit=1860 timeToSchedule=1092 total=2952
  Thread 139911644665408 timeToSubmit=7290 timeToSchedule=1375 total=8665
  Thread 139911611078208 timeToSubmit=7550 timeToSchedule=1370 total=8920
  Thread 139910954591808 timeToSubmit=7660 timeToSchedule=1940 total=9600
  Thread 139911598483008 timeToSubmit=7690 timeToSchedule=520 total=8210
Thread 139910950393408 timeToSubmit=1080 timeToSchedule=882 total=1962
  Thread 139910841366080 timeToSubmit=7630 timeToSchedule=1630 total=9260
  Thread 139910967187008 timeToSubmit=7810 timeToSchedule=1685 total=9495
  Thread 139910950393408 timeToSubmit=7860 timeToSchedule=1290 total=9150
  Thread 139910950393408 timeToSubmit=7900 timeToSchedule=630 total=8530
Thread 139911606879808 timeToSubmit=570 timeToSchedule=1422 total=1992
  Thread 139911627871808 timeToSubmit=10180 timeToSchedule=1430 total=11610
  Thread 139911606879808 timeToSubmit=10350 timeToSchedule=1470 total=11820
  Thread 139911606879808 timeToSubmit=10530 timeToSchedule=1120 total=11650
  Thread 139911606879808 timeToSubmit=10590 timeToSchedule=510 total=11100

Nested task groups:

~/tbb_sched/cmake-build-release$ ./tbb_sched 4 1 4 1
Thread 140009002194496 timeToSubmit=2160 timeToSchedule=820 total=2980
  Thread 140008977004096 timeToSubmit=6810 timeToSchedule=1185 total=7995
    Thread 140008981202496 timeToSubmit=7720 timeToSchedule=1275 total=8995
      Thread 140008526681664 timeToSubmit=7400 timeToSchedule=940 total=8340
Thread 140008547673664 timeToSubmit=1200 timeToSchedule=490 total=1690
  Thread 140008547673664 timeToSubmit=7320 timeToSchedule=250 total=7570
    Thread 140009002194496 timeToSubmit=390 timeToSchedule=170 total=560
      Thread 140009002194496 timeToSubmit=1760 timeToSchedule=170 total=1930
Thread 140008985400896 timeToSubmit=460 timeToSchedule=820 total=1280
  Thread 140008539276864 timeToSubmit=6800 timeToSchedule=1670 total=8470
    Thread 140008539276864 timeToSubmit=9010 timeToSchedule=740 total=9750
      Thread 140008985400896 timeToSubmit=350 timeToSchedule=500 total=850
Thread 140008964408896 timeToSubmit=1450 timeToSchedule=1210 total=2660
  Thread 140008964408896 timeToSubmit=7200 timeToSchedule=350 total=7550
    Thread 140008964408896 timeToSubmit=380 timeToSchedule=140 total=520
      Thread 140008964408896 timeToSubmit=210 timeToSchedule=60 total=270

@yonik
Copy link
Author

yonik commented Apr 26, 2024

OK, things are now looking great! I changed the warm up code to use a barrier to ensure that all of the threads are running, and lowered the concurrency to the number of physical cores. Then I noticed that the longer task_group time seems to only be for an arena thread that hasn't had a task_group used on it before. Once a thread was re-used, it gets fast. It seems to take less than 200ns to create a task_grpoup, add 4 sub-tasks, and have one of the sub-tasks run (including book-keeping overhead.)

Here is some sample output. Note that the latency of the root task is high, but that's fine since we launched a bunch and only have so many physical cores. The latency of the sub-tasks, once the root task is started, settles in to fast numbers. These seem like great results now, so closing this issue. Thank you for all the help!

~/tbb_sched/cmake-build-release$ ./tbb_sched 100 4 2 1 | tail -20
Thread 140541798905408 timeToSubmit=90 timeToSchedule=31850 total=31940
  Thread 140541798905408 timeToSubmit=110 timeToSchedule=260 total=370
  Thread 140541798905408 timeToSubmit=100 timeToSchedule=190 total=290
  Thread 140541798905408 timeToSubmit=110 timeToSchedule=110 total=220
  Thread 140541798905408 timeToSubmit=120 timeToSchedule=30 total=150
Thread 140541777913408 timeToSubmit=30 timeToSchedule=33920 total=33950
  Thread 140541777913408 timeToSubmit=130 timeToSchedule=320 total=450
  Thread 140541777913408 timeToSubmit=140 timeToSchedule=230 total=370
  Thread 140541777913408 timeToSubmit=150 timeToSchedule=140 total=290
  Thread 140541777913408 timeToSubmit=160 timeToSchedule=40 total=200
Thread 140541756921408 timeToSubmit=40 timeToSchedule=33889 total=33929
  Thread 140541756921408 timeToSubmit=100 timeToSchedule=290 total=390
  Thread 140541756921408 timeToSubmit=120 timeToSchedule=200 total=320
  Thread 140541756921408 timeToSubmit=140 timeToSchedule=110 total=250
  Thread 140541756921408 timeToSubmit=150 timeToSchedule=30 total=180
Thread 140541874198080 timeToSubmit=80 timeToSchedule=30960 total=31040
  Thread 140541874198080 timeToSubmit=130 timeToSchedule=320 total=450
  Thread 140541874198080 timeToSubmit=120 timeToSchedule=230 total=350
  Thread 140541874198080 timeToSubmit=140 timeToSchedule=130 total=270
  Thread 140541874198080 timeToSubmit=140 timeToSchedule=40 total=180 

@yonik yonik closed this as completed Apr 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants