Skip to content

Commit

Permalink
[SofaSimulationCore] FIX Task scheduler memory leak (#1927)
Browse files Browse the repository at this point in the history
* Minor cleaning + comments

* [SofaSimulationCore] shared pointer instead of raw pointer

The static raw pointer had two bad effects:
- Memory leak
- Task scheduler not being stopped

* [SofaSimulationCore] Add more unit tests for TaskScheduler

* [SofaSimulationCore] Use unique_ptr instead of shared_ptr

* Remove unit tests
  • Loading branch information
alxbilger committed Apr 14, 2021
1 parent 9f24a99 commit e6b15fa
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 41 deletions.
Expand Up @@ -67,6 +67,4 @@ namespace sofa

return MemoryAlloc::Stack;
}


} // namespace sofa
@@ -1,10 +1,9 @@
#include <sofa/simulation/Task.h>

#include <thread>

namespace sofa
{

// compute recursively the Fibonacci number for input N O(~1.6 exp(N))
// compute recursively the Fibonacci number for input N O(~1.6 exp(N))
// this is implemented to test the task scheduler generating super lightweight tasks and not for performance
class FibonacciTask : public simulation::CpuTask
{
Expand Down
Expand Up @@ -6,7 +6,6 @@

namespace sofa
{

// compute the Fibonacci number for input N
static int64_t Fibonacci(int64_t N, int nbThread = 0)
{
Expand Down
Expand Up @@ -55,9 +55,11 @@ namespace sofa
m_isClosing = false;

// init global static thread local var
workerThreadIndex = new WorkerThread(this, 0, "Main ");
_threads[std::this_thread::get_id()] = workerThreadIndex;// new WorkerThread(this, 0, "Main ");

if (_threads.find(std::this_thread::get_id()) == _threads.end())
{
workerThreadIndex = new WorkerThread(this, 0, "Main ");
_threads[std::this_thread::get_id()] = workerThreadIndex;// new WorkerThread(this, 0, "Main ");
}
}

DefaultTaskScheduler::~DefaultTaskScheduler()
Expand Down Expand Up @@ -112,7 +114,7 @@ namespace sofa
m_workerThreadsIdle = true;
m_mainTaskStatus = nullptr;

// default number of thread: only physicsal cores. no advantage from hyperthreading.
// default number of thread: only physical cores. no advantage from hyperthreading.
m_threadCount = GetHardwareThreadsCount();

if ( NbThread > 0 )//&& NbThread <= MAX_THREADS )
Expand All @@ -131,7 +133,6 @@ namespace sofa

m_workerThreadCount = m_threadCount;
m_isInitialized = true;
return;
}


Expand All @@ -140,7 +141,7 @@ namespace sofa
{
m_isClosing = true;

if ( m_isInitialized )
if ( m_isInitialized )
{
// wait for all
WaitForWorkersToBeReady();
Expand Down Expand Up @@ -307,7 +308,7 @@ namespace sofa
return;
}

const std::thread::id WorkerThread::getId()
const std::thread::id WorkerThread::getId() const
{
return m_stdThread.get_id();
}
Expand Down
Expand Up @@ -72,36 +72,30 @@ namespace sofa {
{
public:

WorkerThread(DefaultTaskScheduler* const& taskScheduler, const int index, const std::string& name = "Worker");
WorkerThread(DefaultTaskScheduler* const& taskScheduler, int index, const std::string& name = "Worker");

~WorkerThread();


/// Return the WorkerThread corresponding to the current thread
static WorkerThread* getCurrent();

// queue task if there is space, and run it otherwise
bool addTask(Task* pTask);

void workUntilDone(Task::Status* status);

const Task::Status* getCurrentStatus() const { return m_currentStatus; }

const char* getName() const { return m_name.c_str(); }

int getType() const { return m_type; }

const std::thread::id getId();
const std::thread::id getId() const;

const std::deque<Task*>* getTasksQueue() { return &m_tasks; }

std::uint64_t getTaskCount() { return m_tasks.size(); }

int GetWorkerIndex();

void* allocate();

void free(void* ptr);



private:

bool start(DefaultTaskScheduler* const& taskScheduler);
Expand Down Expand Up @@ -169,8 +163,16 @@ namespace sofa {
public:

// interface


/**
* Call stop() and start() if not already initialized
* @param nbThread
*/
virtual void init(const unsigned int nbThread = 0) final;

/**
* Wait and destroy worker threads
*/
virtual void stop(void) final;
virtual unsigned int getThreadCount(void) const final { return m_threadCount; }
virtual const char* getCurrentThreadName() override final;
Expand Down Expand Up @@ -199,7 +201,10 @@ namespace sofa {
void WaitForWorkersToBeReady();

void wakeUpWorkers();


/**
* Assuming 2 concurrent threads by CPU core, return the number of CPU core on the system
*/
static unsigned GetHardwareThreadsCount();

WorkerThread* getCurrentThread();
Expand Down Expand Up @@ -228,7 +233,14 @@ namespace sofa {
DefaultTaskScheduler(const DefaultTaskScheduler&) {}

~DefaultTaskScheduler() override;


/**
* Create worker threads
* If the number of required threads is 0, the number of threads will be equal to the
* result of GetHardwareThreadsCount()
*
* @param NbThread
*/
void start(unsigned int NbThread);

bool m_isInitialized;
Expand Down
Expand Up @@ -16,7 +16,7 @@ namespace sofa
// the TaskScheduler::_schedulers must be initialized before any call to TaskScheduler::registerScheduler
std::map<std::string, std::function<TaskScheduler*()> > TaskScheduler::_schedulers;
std::string TaskScheduler::_currentSchedulerName;
TaskScheduler* TaskScheduler::_currentScheduler = nullptr;
std::unique_ptr<TaskScheduler> TaskScheduler::_currentScheduler = nullptr;

// register default task scheduler
const bool DefaultTaskScheduler::isRegistered = TaskScheduler::registerScheduler(DefaultTaskScheduler::name(), &DefaultTaskScheduler::create);
Expand All @@ -27,7 +27,7 @@ namespace sofa
// is already the current scheduler
std::string nameStr(name);
if (!nameStr.empty() && _currentSchedulerName == name)
return _currentScheduler;
return _currentScheduler.get();

auto iter = _schedulers.find(name);
if (iter == _schedulers.end())
Expand All @@ -40,17 +40,17 @@ namespace sofa

if (_currentScheduler != nullptr)
{
delete _currentScheduler;
_currentScheduler.reset();
}

TaskSchedulerCreatorFunction& creatorFunc = iter->second;
_currentScheduler = creatorFunc();
_currentScheduler = std::unique_ptr<TaskScheduler>(creatorFunc());

_currentSchedulerName = iter->first;

Task::setAllocator(_currentScheduler->getTaskAllocator());

return _currentScheduler;
return _currentScheduler.get();
}


Expand All @@ -68,13 +68,12 @@ namespace sofa
_currentScheduler->init();
}

return _currentScheduler;
return _currentScheduler.get();
}


TaskScheduler::~TaskScheduler()
{

}

} // namespace simulation
Expand Down
Expand Up @@ -49,15 +49,40 @@ namespace sofa


virtual ~TaskScheduler();


/**
* Check if a TaskScheduler already exists with this name.
* If not, it creates and registers a new TaskScheduler of type DefaultTaskScheduler with
* name as a key
*
* @param name key to find or create a TaskScheduler
* @return A TaskScheduler
*/
static TaskScheduler* create(const char* name = "");

typedef std::function<TaskScheduler* ()> TaskSchedulerCreatorFunction;


/**
* Register a new scheduler in the factory
*
* @param name key in the factory
* @param creatorFunc function creating a new TaskScheduler or a derived class
* @return
*/
static bool registerScheduler(const char* name, std::function<TaskScheduler* ()> creatorFunc);


/**
* Get the current TaskScheduler instance.
*
* If not instance has been created yet, a new one with empty name is created.
* @return The current TaskScheduler instance
*/
static TaskScheduler* getInstance();


/**
* Get the name of the current TaskScheduler instance
* @return The name of the current TaskScheduler instance
*/
static const std::string& getCurrentName() { return _currentSchedulerName; }

// interface
Expand Down Expand Up @@ -86,7 +111,7 @@ namespace sofa

// current instantiated scheduler
static std::string _currentSchedulerName;
static TaskScheduler * _currentScheduler;
static std::unique_ptr<TaskScheduler> _currentScheduler;

friend class Task;
};
Expand Down

0 comments on commit e6b15fa

Please sign in to comment.