Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate CoreContext thread pools #848

Merged
merged 4 commits into from Feb 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 0 additions & 52 deletions autowiring/CoreContext.h
Expand Up @@ -22,7 +22,6 @@
#include "MemoEntry.h"
#include "once.h"
#include "result_or_default.h"
#include "ThreadPool.h"
#include "TypeRegistry.h"
#include "TypeUnifier.h"

Expand All @@ -49,10 +48,6 @@ class CoreContextT;
template<typename T>
class JunctionBox;

namespace autowiring {
class ThreadPool;
}

/// \file
/// CoreContext definitions.

Expand Down Expand Up @@ -231,10 +226,6 @@ class CoreContext:
// Actual core threads:
std::list<CoreRunnable*> m_threads;

// The thread pool used by this context. By default, a context inherits the thread pool of
// its parent, and the global context gets the system thread pool.
std::shared_ptr<autowiring::ThreadPool> m_threadPool;

// The start token for the thread pool, if one exists
std::shared_ptr<void> m_startToken;

Expand Down Expand Up @@ -1066,49 +1057,6 @@ class CoreContext:
);
}

/// <summary>
/// Assigns the thread pool handler for this context
/// </summary>
/// <remarks>
/// If the context is currently running, the thread pool will automatically be started. The pool's
/// start token and shared pointer is reset automatically when the context is torn down. If the
/// context has already been shut down (IE, IsShutdown returns true), this method has no effect.
///
/// Dispatchers that have been attached to the current thread pool will not be transitioned to the
/// new pool. Changing the thread pool may cause the previously assigned thread pool to be stopped.
/// This will cause it to complete all work assigned to it and release resources associated with
/// processing. If there are no other handles to the pool, it may potentially destroy itself.
///
/// It is an error to pass nullptr to this method.
/// </remarks>
void SetThreadPool(const std::shared_ptr<autowiring::ThreadPool>& threadPool);

/// <summary>
/// Returns the current thread pool
/// </summary>
/// <remarks>
/// If the context has been shut down, (IE, IsShutdown returns true), this method returns nullptr. Calling
/// ThreadPool::Start on the returned shared pointer will not cause dispatchers pended to this context to
/// be executed. To do this, invoke CoreContext::Initiate
/// </remarks>
std::shared_ptr<autowiring::ThreadPool> GetThreadPool(void) const;

/// <summary>
/// Submits the specified lambda to this context's ThreadPool for processing
/// </summary>
/// <returns>True if the job has been submitted for execution</returns>
/// <remarks>
/// The passed thunk will not be executed if the current context has already stopped.
/// </remarks>
template<class Fx>
bool operator+=(Fx&& fx) {
auto pool = GetThreadPool();
return
pool ?
pool->Submit(std::make_unique<DispatchThunk<Fx>>(std::forward<Fx&&>(fx))) :
false;
}

/// <summary>
/// Adds a post-attachment listener in this context for a particular autowired member.
/// There is no guarantee for the context in which the listener will be called.
Expand Down
2 changes: 1 addition & 1 deletion autowiring/SystemThreadPoolStl.h
Expand Up @@ -2,7 +2,7 @@
#pragma once
#include "DispatchQueue.h"
#include "SystemThreadPool.h"
#include <thread>
#include <thread>
#include <vector>

namespace autowiring {
Expand Down
15 changes: 14 additions & 1 deletion autowiring/ThreadPool.h
@@ -1,8 +1,9 @@
// Copyright (C) 2012-2015 Leap Motion, Inc. All rights reserved.
#pragma once
#include "DispatchThunk.h"
#include <atomic>
#include <memory>
#include <mutex>
#include MEMORY_HEADER

class DispatchQueue;
class DispatchThunkBase;
Expand Down Expand Up @@ -89,6 +90,18 @@ class ThreadPool:
/// be submitted for execution.
/// </remarks>
virtual bool Submit(std::unique_ptr<DispatchThunkBase>&& thunk) = 0;

/// <summary>
/// Submits the specified lambda to this context's ThreadPool for processing
/// </summary>
/// <returns>True if the job has been submitted for execution</returns>
/// <remarks>
/// The passed thunk will not be executed if the current context has already stopped.
/// </remarks>
template<class Fx>
bool operator+=(Fx&& fx) {
return Submit(std::make_unique<DispatchThunk<Fx>>(std::forward<Fx&&>(fx)));
}
};

}
87 changes: 1 addition & 86 deletions src/autowiring/CoreContext.cpp
Expand Up @@ -12,7 +12,6 @@
#include "NullPool.h"
#include "SystemThreadPool.h"
#include "thread_specific_ptr.h"
#include "ThreadPool.h"
#include <sstream>
#include <stdexcept>

Expand Down Expand Up @@ -58,8 +57,7 @@ CoreContext::CoreContext(const std::shared_ptr<CoreContext>& pParent, t_childLis
m_backReference(backReference),
m_sigilType(sigilType),
m_stateBlock(std::make_shared<CoreContextStateBlock>(pParent ? pParent->m_stateBlock : nullptr)),
m_junctionBoxManager(new JunctionBoxManager),
m_threadPool(std::make_shared<NullPool>())
m_junctionBoxManager(new JunctionBoxManager)
{}

CoreContext::~CoreContext(void) {
Expand Down Expand Up @@ -450,47 +448,9 @@ void CoreContext::Initiate(void) {

// Now we can recover the first thread that will need to be started
auto beginning = m_threads.begin();

// Start our threads before starting any child contexts:
std::shared_ptr<ThreadPool> threadPool;
auto nullPool = std::dynamic_pointer_cast<NullPool>(m_threadPool);
if (nullPool) {
// Decide which pool will become our current thread pool. Global context is the final case,
// which defaults to the system thread pool
if (!nullPool->GetSuccessor())
nullPool->SetSuccessor(m_pParent ? m_pParent->GetThreadPool() : SystemThreadPool::New());

// Trigger null pool destruction at this point:
m_threadPool = nullPool->MoveDispatchersToSuccessor();
}

// The default case should not generally occur, but if it were the case that the null pool were
// updated before the context was initiated, then we would have no work to do as no successors
// exist to be moved. In that case, simply take a record of the current thread pool for the
// call to Start that follows the unlock.
threadPool = m_threadPool;
lk.unlock();
onInitiated();

// Start the thread pool out of the lock, and then update our start token if our thread pool
// reference has not changed. The next pool could potentially be nullptr if the parent is going
// down while we are going up.
if (threadPool) {
// Initiate
auto startToken = threadPool->Start();

// Transfer all dispatchers from the null pool to the new thread pool:
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

// If the thread pool was updated while we were trying to start the pool we observed earlier,
// then allow our token to expire and do not do any other work. Whomever caused the thread
// pool pointer to be updated would also have seen that the context is currently started,
// and would have updated both the thread pool pointer and the start token at the same time.
if (m_threadPool == threadPool)
// Swap, not assign; we don't want teardown to happen while synchronized
std::swap(m_startToken, startToken);
}

if (beginning != m_threads.end()) {
auto outstanding = m_stateBlock->IncrementOutstandingThreadCount(shared_from_this());
for (auto q = beginning; q != m_threads.end(); ++q)
Expand Down Expand Up @@ -554,16 +514,13 @@ void CoreContext::SignalShutdown(bool wait, ShutdownMode shutdownMode) {

// Thread pool token and pool pointer
std::shared_ptr<void> startToken;
std::shared_ptr<ThreadPool> threadPool;

// Tear down all the children, evict thread pool:
{
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

startToken = std::move(m_startToken);
m_startToken.reset();
threadPool = std::move(m_threadPool);
m_threadPool.reset();

// Fill strong lock series in order to ensure proper teardown interleave:
childrenInterleave.reserve(m_children.size());
Expand Down Expand Up @@ -719,48 +676,6 @@ void CoreContext::BuildCurrentState(void) {
}
}

void CoreContext::SetThreadPool(const std::shared_ptr<ThreadPool>& threadPool) {
if (!threadPool)
throw std::invalid_argument("A context cannot be given a null thread pool");

std::shared_ptr<ThreadPool> priorThreadPool;
{
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);
if (IsShutdown())
// Nothing to do, context already down
return;

if (!IsRunning()) {
// Just set up the forwarding thread pool
auto nullPool = std::dynamic_pointer_cast<NullPool>(m_threadPool);
if (!nullPool)
throw autowiring_error("Internal error, null pool was deassigned even though the context has not been started");
priorThreadPool = nullPool->GetSuccessor();
nullPool->SetSuccessor(threadPool);
return;
}

priorThreadPool = m_threadPool;
m_threadPool = threadPool;
}

// We are presently running. We need to start the pool, and then attempt to
// update our token
auto startToken = threadPool->Start();
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);
if (m_threadPool != threadPool)
// Thread pool was updated by someone else, let them complete their operation
return;

// Update our start token and return. Swap, not move; we don't want to risk
// calling destructors while synchronized.
std::swap(m_startToken, startToken);
}

std::shared_ptr<ThreadPool> CoreContext::GetThreadPool(void) const {
return (std::lock_guard<std::mutex>)m_stateBlock->m_lock, m_threadPool;
}

void CoreContext::Dump(std::ostream& os) const {
std::lock_guard<std::mutex> lk(m_stateBlock->m_lock);

Expand Down