Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SofaSimulationCore] FIX Task scheduler memory leak #1927

Merged
merged 5 commits into from Apr 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -67,6 +67,4 @@ namespace sofa

return MemoryAlloc::Stack;
}


} // namespace sofa
@@ -1,10 +1,9 @@
#include <sofa/simulation/Task.h>

#include <thread>

namespace sofa
{

// compute recursively the Fibonacci number for input N O(~1.6 exp(N))
// compute recursively the Fibonacci number for input N O(~1.6 exp(N))
// this is implemented to test the task scheduler generating super lightweight tasks and not for performance
class FibonacciTask : public simulation::CpuTask
{
Expand Down
Expand Up @@ -6,7 +6,6 @@

namespace sofa
{

// compute the Fibonacci number for input N
static int64_t Fibonacci(int64_t N, int nbThread = 0)
{
Expand Down
Expand Up @@ -55,9 +55,11 @@ namespace sofa
m_isClosing = false;

// init global static thread local var
workerThreadIndex = new WorkerThread(this, 0, "Main ");
_threads[std::this_thread::get_id()] = workerThreadIndex;// new WorkerThread(this, 0, "Main ");

if (_threads.find(std::this_thread::get_id()) == _threads.end())
{
workerThreadIndex = new WorkerThread(this, 0, "Main ");
_threads[std::this_thread::get_id()] = workerThreadIndex;// new WorkerThread(this, 0, "Main ");
}
}

DefaultTaskScheduler::~DefaultTaskScheduler()
Expand Down Expand Up @@ -112,7 +114,7 @@ namespace sofa
m_workerThreadsIdle = true;
m_mainTaskStatus = nullptr;

// default number of thread: only physicsal cores. no advantage from hyperthreading.
// default number of thread: only physical cores. no advantage from hyperthreading.
m_threadCount = GetHardwareThreadsCount();

if ( NbThread > 0 )//&& NbThread <= MAX_THREADS )
Expand All @@ -131,7 +133,6 @@ namespace sofa

m_workerThreadCount = m_threadCount;
m_isInitialized = true;
return;
}


Expand All @@ -140,7 +141,7 @@ namespace sofa
{
m_isClosing = true;

if ( m_isInitialized )
if ( m_isInitialized )
{
// wait for all
WaitForWorkersToBeReady();
Expand Down Expand Up @@ -307,7 +308,7 @@ namespace sofa
return;
}

const std::thread::id WorkerThread::getId()
const std::thread::id WorkerThread::getId() const
{
return m_stdThread.get_id();
}
Expand Down
Expand Up @@ -72,36 +72,30 @@ namespace sofa {
{
public:

WorkerThread(DefaultTaskScheduler* const& taskScheduler, const int index, const std::string& name = "Worker");
WorkerThread(DefaultTaskScheduler* const& taskScheduler, int index, const std::string& name = "Worker");

~WorkerThread();


/// Return the WorkerThread corresponding to the current thread
static WorkerThread* getCurrent();

// queue task if there is space, and run it otherwise
bool addTask(Task* pTask);

void workUntilDone(Task::Status* status);

const Task::Status* getCurrentStatus() const { return m_currentStatus; }

const char* getName() const { return m_name.c_str(); }

int getType() const { return m_type; }

const std::thread::id getId();
const std::thread::id getId() const;

const std::deque<Task*>* getTasksQueue() { return &m_tasks; }

std::uint64_t getTaskCount() { return m_tasks.size(); }

int GetWorkerIndex();

void* allocate();

void free(void* ptr);



private:

bool start(DefaultTaskScheduler* const& taskScheduler);
Expand Down Expand Up @@ -169,8 +163,16 @@ namespace sofa {
public:

// interface


/**
* Call stop() and start() if not already initialized
* @param nbThread
*/
virtual void init(const unsigned int nbThread = 0) final;

/**
* Wait and destroy worker threads
*/
virtual void stop(void) final;
virtual unsigned int getThreadCount(void) const final { return m_threadCount; }
virtual const char* getCurrentThreadName() override final;
Expand Down Expand Up @@ -199,7 +201,10 @@ namespace sofa {
void WaitForWorkersToBeReady();

void wakeUpWorkers();


/**
* Assuming 2 concurrent threads by CPU core, return the number of CPU core on the system
*/
static unsigned GetHardwareThreadsCount();

WorkerThread* getCurrentThread();
Expand Down Expand Up @@ -228,7 +233,14 @@ namespace sofa {
DefaultTaskScheduler(const DefaultTaskScheduler&) {}

~DefaultTaskScheduler() override;


/**
* Create worker threads
* If the number of required threads is 0, the number of threads will be equal to the
* result of GetHardwareThreadsCount()
*
* @param NbThread
*/
void start(unsigned int NbThread);

bool m_isInitialized;
Expand Down
Expand Up @@ -16,7 +16,7 @@ namespace sofa
// the TaskScheduler::_schedulers must be initialized before any call to TaskScheduler::registerScheduler
std::map<std::string, std::function<TaskScheduler*()> > TaskScheduler::_schedulers;
std::string TaskScheduler::_currentSchedulerName;
TaskScheduler* TaskScheduler::_currentScheduler = nullptr;
std::unique_ptr<TaskScheduler> TaskScheduler::_currentScheduler = nullptr;

// register default task scheduler
const bool DefaultTaskScheduler::isRegistered = TaskScheduler::registerScheduler(DefaultTaskScheduler::name(), &DefaultTaskScheduler::create);
Expand All @@ -27,7 +27,7 @@ namespace sofa
// is already the current scheduler
std::string nameStr(name);
if (!nameStr.empty() && _currentSchedulerName == name)
return _currentScheduler;
return _currentScheduler.get();

auto iter = _schedulers.find(name);
if (iter == _schedulers.end())
Expand All @@ -40,17 +40,17 @@ namespace sofa

if (_currentScheduler != nullptr)
{
delete _currentScheduler;
_currentScheduler.reset();
}

TaskSchedulerCreatorFunction& creatorFunc = iter->second;
_currentScheduler = creatorFunc();
_currentScheduler = std::unique_ptr<TaskScheduler>(creatorFunc());

_currentSchedulerName = iter->first;

Task::setAllocator(_currentScheduler->getTaskAllocator());

return _currentScheduler;
return _currentScheduler.get();
}


Expand All @@ -68,13 +68,12 @@ namespace sofa
_currentScheduler->init();
}

return _currentScheduler;
return _currentScheduler.get();
}


TaskScheduler::~TaskScheduler()
{

}

} // namespace simulation
Expand Down
Expand Up @@ -49,15 +49,40 @@ namespace sofa


virtual ~TaskScheduler();


/**
* Check if a TaskScheduler already exists with this name.
* If not, it creates and registers a new TaskScheduler of type DefaultTaskScheduler with
* name as a key
*
* @param name key to find or create a TaskScheduler
* @return A TaskScheduler
*/
static TaskScheduler* create(const char* name = "");

typedef std::function<TaskScheduler* ()> TaskSchedulerCreatorFunction;


/**
* Register a new scheduler in the factory
*
* @param name key in the factory
* @param creatorFunc function creating a new TaskScheduler or a derived class
* @return
*/
static bool registerScheduler(const char* name, std::function<TaskScheduler* ()> creatorFunc);


/**
* Get the current TaskScheduler instance.
*
* If not instance has been created yet, a new one with empty name is created.
* @return The current TaskScheduler instance
*/
static TaskScheduler* getInstance();


/**
* Get the name of the current TaskScheduler instance
* @return The name of the current TaskScheduler instance
*/
static const std::string& getCurrentName() { return _currentSchedulerName; }

// interface
Expand Down Expand Up @@ -86,7 +111,7 @@ namespace sofa

// current instantiated scheduler
static std::string _currentSchedulerName;
static TaskScheduler * _currentScheduler;
static std::unique_ptr<TaskScheduler> _currentScheduler;

friend class Task;
};
Expand Down