Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SofaSimulationCore] FIX Task scheduler memory leak #1927

Merged
merged 5 commits into from Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -67,6 +67,22 @@ namespace sofa

return MemoryAlloc::Stack;
}



Task::MemoryAlloc SleepTask::run()
{
using namespace std::chrono_literals;
std::this_thread::sleep_for(20ms);
m_isTaskDone = true;
return Task::Stack;
}

Task::MemoryAlloc ThreadIdTask::run()
{
m_isTaskDone = true;
using namespace std::chrono_literals;
std::this_thread::sleep_for(20ms); //add sleep period to simulate the worker thread is busy
*m_threadId = std::this_thread::get_id();
return Task::Stack;
}
} // namespace sofa
@@ -1,9 +1,32 @@
#include <sofa/simulation/Task.h>

#include <thread>

namespace sofa
{

class SleepTask : public simulation::CpuTask
{
public:
explicit SleepTask(simulation::CpuTask::Status* status) : CpuTask(status) {}
~SleepTask() override = default;

MemoryAlloc run() final;

bool m_isTaskDone{ false };
};

class ThreadIdTask : public simulation::CpuTask
{
public:
ThreadIdTask(std::thread::id* id, simulation::CpuTask::Status* status) : CpuTask(status), m_threadId{id} {}
~ThreadIdTask() override = default;

MemoryAlloc run() final;

bool m_isTaskDone{ false };
std::thread::id* m_threadId { nullptr };
};

// compute recursively the Fibonacci number for input N O(~1.6 exp(N))
// this is implemented to test the task scheduler generating super lightweight tasks and not for performance
class FibonacciTask : public simulation::CpuTask
Expand Down
Expand Up @@ -6,6 +6,85 @@

namespace sofa
{
TEST(TaskSchedulerTests, DefaultTaskScheduler)
{
simulation::TaskScheduler* scheduler = simulation::TaskScheduler::create(simulation::DefaultTaskScheduler::name());
EXPECT_NE(scheduler, nullptr);
//Make sure that the created TaskScheduler is of derived class DefaultTaskScheduler
EXPECT_NE(dynamic_cast<simulation::DefaultTaskScheduler*>(scheduler), nullptr);

//The default name is "Main 0"
EXPECT_EQ(std::string(scheduler->getCurrentThreadName()), "Main 0");

// scheduler has not been initialized yet
EXPECT_EQ(scheduler->getThreadCount(), 0);

scheduler->init(0);
EXPECT_EQ(scheduler->getThreadCount(), std::thread::hardware_concurrency() / 2);

//Create a scheduler with another name. It has consequences on the previously created task scheduler
simulation::TaskScheduler* scheduler_2 = simulation::TaskScheduler::create("notRegisteredName");
EXPECT_NE(scheduler_2, nullptr);
EXPECT_EQ(scheduler, scheduler_2);
EXPECT_NE(dynamic_cast<simulation::DefaultTaskScheduler*>(scheduler_2), nullptr);

//creating a "new" DefaultTaskScheduler uninitializes the previous instance
EXPECT_EQ(scheduler->getThreadCount(), 0);
EXPECT_EQ(scheduler_2->getThreadCount(), 0);

EXPECT_EQ(simulation::TaskScheduler::getInstance(), scheduler_2);

simulation::TaskScheduler* scheduler_3 = simulation::TaskScheduler::create(simulation::DefaultTaskScheduler::name());
EXPECT_NE(scheduler_3, nullptr);
EXPECT_EQ(scheduler, scheduler_3);
EXPECT_NE(dynamic_cast<simulation::DefaultTaskScheduler*>(scheduler_3), nullptr);

EXPECT_EQ(scheduler->getThreadCount(), 0);
EXPECT_EQ(scheduler_2->getThreadCount(), 0);
EXPECT_EQ(scheduler_3->getThreadCount(), 0);

scheduler->init(0);

simulation::CpuTask::Status status;
SleepTask sleepTask(&status);
EXPECT_FALSE(sleepTask.m_isTaskDone);
scheduler->addTask(&sleepTask);
EXPECT_FALSE(sleepTask.m_isTaskDone);

simulation::TaskScheduler* scheduler_4 = simulation::TaskScheduler::create("anotherName");
EXPECT_NE(scheduler_4, nullptr);

scheduler->workUntilDone(&status);
EXPECT_TRUE(sleepTask.m_isTaskDone);

EXPECT_NE(scheduler_4, nullptr);

scheduler->init(0);
EXPECT_EQ(scheduler->getThreadCount(), std::thread::hardware_concurrency() / 2);

simulation::CpuTask::Status threadIdTaskStatus;

const unsigned int nbTasks = scheduler->getThreadCount() * 5;
std::vector<std::thread::id> threadIds(nbTasks);
std::vector<ThreadIdTask> tasks;
for (unsigned int i = 0; i < nbTasks; ++i)
{
tasks.emplace_back(&threadIds[i], &threadIdTaskStatus);
auto& lastTask = tasks.back();
EXPECT_FALSE(lastTask.m_isTaskDone);
}
for (auto& task : tasks)
{
scheduler->addTask(&task);
}
scheduler->workUntilDone(&threadIdTaskStatus);
EXPECT_FALSE(threadIdTaskStatus.isBusy());

std::set<std::thread::id> uniqueThreadIds(std::cbegin(threadIds), std::cend(threadIds));
EXPECT_EQ(uniqueThreadIds.size(), std::thread::hardware_concurrency() / 2);

scheduler->stop();
}

// compute the Fibonacci number for input N
static int64_t Fibonacci(int64_t N, int nbThread = 0)
Expand Down
Expand Up @@ -55,9 +55,11 @@ namespace sofa
m_isClosing = false;

// init global static thread local var
workerThreadIndex = new WorkerThread(this, 0, "Main ");
_threads[std::this_thread::get_id()] = workerThreadIndex;// new WorkerThread(this, 0, "Main ");

if (_threads.find(std::this_thread::get_id()) == _threads.end())
{
workerThreadIndex = new WorkerThread(this, 0, "Main ");
_threads[std::this_thread::get_id()] = workerThreadIndex;// new WorkerThread(this, 0, "Main ");
}
}

DefaultTaskScheduler::~DefaultTaskScheduler()
Expand Down Expand Up @@ -112,7 +114,7 @@ namespace sofa
m_workerThreadsIdle = true;
m_mainTaskStatus = nullptr;

// default number of thread: only physicsal cores. no advantage from hyperthreading.
// default number of thread: only physical cores. no advantage from hyperthreading.
m_threadCount = GetHardwareThreadsCount();

if ( NbThread > 0 )//&& NbThread <= MAX_THREADS )
Expand All @@ -131,7 +133,6 @@ namespace sofa

m_workerThreadCount = m_threadCount;
m_isInitialized = true;
return;
}


Expand All @@ -140,7 +141,7 @@ namespace sofa
{
m_isClosing = true;

if ( m_isInitialized )
if ( m_isInitialized )
{
// wait for all
WaitForWorkersToBeReady();
Expand Down Expand Up @@ -307,7 +308,7 @@ namespace sofa
return;
}

const std::thread::id WorkerThread::getId()
const std::thread::id WorkerThread::getId() const
{
return m_stdThread.get_id();
}
Expand Down
Expand Up @@ -72,36 +72,30 @@ namespace sofa {
{
public:

WorkerThread(DefaultTaskScheduler* const& taskScheduler, const int index, const std::string& name = "Worker");
WorkerThread(DefaultTaskScheduler* const& taskScheduler, int index, const std::string& name = "Worker");

~WorkerThread();


/// Return the WorkerThread corresponding to the current thread
static WorkerThread* getCurrent();

// queue task if there is space, and run it otherwise
bool addTask(Task* pTask);

void workUntilDone(Task::Status* status);

const Task::Status* getCurrentStatus() const { return m_currentStatus; }

const char* getName() const { return m_name.c_str(); }

int getType() const { return m_type; }

const std::thread::id getId();
const std::thread::id getId() const;

const std::deque<Task*>* getTasksQueue() { return &m_tasks; }

std::uint64_t getTaskCount() { return m_tasks.size(); }

int GetWorkerIndex();

void* allocate();

void free(void* ptr);



private:

bool start(DefaultTaskScheduler* const& taskScheduler);
Expand Down Expand Up @@ -169,8 +163,16 @@ namespace sofa {
public:

// interface


/**
* Call stop() and start() if not already initialized
* @param nbThread
*/
virtual void init(const unsigned int nbThread = 0) final;

/**
* Wait and destroy worker threads
*/
virtual void stop(void) final;
virtual unsigned int getThreadCount(void) const final { return m_threadCount; }
virtual const char* getCurrentThreadName() override final;
Expand Down Expand Up @@ -199,7 +201,10 @@ namespace sofa {
void WaitForWorkersToBeReady();

void wakeUpWorkers();


/**
* Assuming 2 concurrent threads by CPU core, return the number of CPU core on the system
*/
static unsigned GetHardwareThreadsCount();

WorkerThread* getCurrentThread();
Expand Down Expand Up @@ -228,7 +233,14 @@ namespace sofa {
DefaultTaskScheduler(const DefaultTaskScheduler&) {}

~DefaultTaskScheduler() override;


/**
* Create worker threads
* If the number of required threads is 0, the number of threads will be equal to the
* result of GetHardwareThreadsCount()
*
* @param NbThread
*/
void start(unsigned int NbThread);

bool m_isInitialized;
Expand Down
Expand Up @@ -16,7 +16,7 @@ namespace sofa
// the TaskScheduler::_schedulers must be initialized before any call to TaskScheduler::registerScheduler
std::map<std::string, std::function<TaskScheduler*()> > TaskScheduler::_schedulers;
std::string TaskScheduler::_currentSchedulerName;
TaskScheduler* TaskScheduler::_currentScheduler = nullptr;
std::shared_ptr<TaskScheduler> TaskScheduler::_currentScheduler = nullptr;

// register default task scheduler
const bool DefaultTaskScheduler::isRegistered = TaskScheduler::registerScheduler(DefaultTaskScheduler::name(), &DefaultTaskScheduler::create);
Expand All @@ -27,7 +27,7 @@ namespace sofa
// is already the current scheduler
std::string nameStr(name);
if (!nameStr.empty() && _currentSchedulerName == name)
return _currentScheduler;
return _currentScheduler.get();

auto iter = _schedulers.find(name);
if (iter == _schedulers.end())
Expand All @@ -40,17 +40,17 @@ namespace sofa

if (_currentScheduler != nullptr)
{
delete _currentScheduler;
_currentScheduler.reset();
}

TaskSchedulerCreatorFunction& creatorFunc = iter->second;
_currentScheduler = creatorFunc();
_currentScheduler = std::shared_ptr<TaskScheduler>(creatorFunc());

_currentSchedulerName = iter->first;

Task::setAllocator(_currentScheduler->getTaskAllocator());

return _currentScheduler;
return _currentScheduler.get();
}


Expand All @@ -68,13 +68,12 @@ namespace sofa
_currentScheduler->init();
}

return _currentScheduler;
return _currentScheduler.get();
}


TaskScheduler::~TaskScheduler()
{

}

} // namespace simulation
Expand Down
Expand Up @@ -49,15 +49,40 @@ namespace sofa


virtual ~TaskScheduler();


/**
* Check if a TaskScheduler already exists with this name.
* If not, it creates and registers a new TaskScheduler of type DefaultTaskScheduler with
* name as a key
*
* @param name key to find or create a TaskScheduler
* @return A TaskScheduler
*/
static TaskScheduler* create(const char* name = "");

typedef std::function<TaskScheduler* ()> TaskSchedulerCreatorFunction;


/**
* Register a new scheduler in the factory
*
* @param name key in the factory
* @param creatorFunc function creating a new TaskScheduler or a derived class
* @return
*/
static bool registerScheduler(const char* name, std::function<TaskScheduler* ()> creatorFunc);


/**
* Get the current TaskScheduler instance.
*
* If not instance has been created yet, a new one with empty name is created.
* @return The current TaskScheduler instance
*/
static TaskScheduler* getInstance();


/**
* Get the name of the current TaskScheduler instance
* @return The name of the current TaskScheduler instance
*/
static const std::string& getCurrentName() { return _currentSchedulerName; }

// interface
Expand Down Expand Up @@ -86,7 +111,7 @@ namespace sofa

// current instantiated scheduler
static std::string _currentSchedulerName;
static TaskScheduler * _currentScheduler;
static std::shared_ptr<TaskScheduler> _currentScheduler;

friend class Task;
};
Expand Down