Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: 178f1d79cf
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 273 lines (248 sloc) 9.106 kb

/*
* OXT - OS eXtensions for boosT
* Provides important functionality necessary for writing robust server software.
*
* Copyright (c) 2010 Phusion
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef _OXT_THREAD_HPP_
#define _OXT_THREAD_HPP_

#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include "system_calls.hpp"
#include "detail/context.hpp"
#include <string>
#include <list>
#include <unistd.h>
#include <limits.h> // for PTHREAD_STACK_MIN

namespace oxt {

/**
* Enhanced thread class with support for:
* - user-defined stack size.
* - system call interruption.
* - backtraces.
*/
class thread: public boost::thread {
private:
thread_local_context_ptr context;

static std::string make_thread_name(const std::string &given_name);
static void thread_main(const boost::function<void ()> func, thread_local_context_ptr ctx);

public:
/**
* Create a new thread.
*
* @param func A function object which will be called as the thread's
* main function. This object must be copyable. <tt>func</tt> is
* copied into storage managed internally by the thread library,
* and that copy is invoked on a newly-created thread of execution.
* @param name A name for this thread. If an empty string is given, then
* a name will be automatically chosen.
* @param stack_size The stack size, in bytes, that the thread should
* have. If 0 is specified, the operating system's default stack
* size is used. If non-zero is specified, and the size is smaller
* than the operating system's minimum stack size, then the operating
* system's minimum stack size will be used.
* @pre func must be copyable.
* @throws boost::thread_resource_error Something went wrong during
* creation of the thread.
*/
explicit thread(const boost::function<void ()> func,
const std::string &name = std::string(),
unsigned int stack_size = 0)
: boost::thread()
{
context = thread_local_context::make_shared_ptr();
context->thread_name = make_thread_name(name);
thread_info = make_thread_info(boost::bind(thread_main, func, context));

unsigned long min_stack_size;
bool stack_min_size_defined;
bool round_stack_size;
#ifdef PTHREAD_STACK_MIN
// PTHREAD_STACK_MIN may not be a constant macro so we need
// to evaluate it dynamically.
min_stack_size = PTHREAD_STACK_MIN;
stack_min_size_defined = true;
#else
// Assume minimum stack size is 128 KB.
min_stack_size = 128 * 1024;
stack_min_size_defined = false;
#endif
if (stack_size != 0 && stack_size < min_stack_size) {
stack_size = min_stack_size;
round_stack_size = !stack_min_size_defined;
} else {
round_stack_size = true;
}

if (round_stack_size) {
// Round stack size up to page boundary.
long page_size;
#if defined(_SC_PAGESIZE)
page_size = sysconf(_SC_PAGESIZE);
#elif defined(_SC_PAGE_SIZE)
page_size = sysconf(_SC_PAGE_SIZE);
#elif defined(PAGESIZE)
page_size = sysconf(PAGESIZE);
#elif defined(PAGE_SIZE)
page_size = sysconf(PAGE_SIZE);
#else
page_size = getpagesize();
#endif
if (stack_size % page_size != 0) {
stack_size = stack_size - (stack_size % page_size) + page_size;
}
}

attributes attrs;
attrs.set_stack_size(stack_size);
start_thread(attrs);
}

/**
* Return this thread's name. The name was set during construction.
*/
std::string name() const throw();

/**
* Return the current backtrace of the thread of execution, as a string.
*/
std::string backtrace() const throw();

/**
* Return the backtraces of all oxt::thread threads, as well as that of the
* main thread, in a nicely formatted string.
*/
static std::string all_backtraces() throw();

/**
* Interrupt the thread. This method behaves just like
* boost::thread::interrupt(), but if <em>interruptSyscalls</em> is true
* then it will also respect the interruption points defined in
* oxt::syscalls.
*
* Note that an interruption request may get lost, depending on the
* current execution point of the thread. Thus, one should call this
* method in a loop, until a certain goal condition has been fulfilled.
* interrupt_and_join() is a convenience method that implements this
* pattern.
*/
void interrupt(bool interruptSyscalls = true);

/**
* Keep interrupting the thread until it's done, then join it.
*
* @param interruptSyscalls Whether oxt::syscalls calls should also
* be eligable for interruption.
* @throws boost::thread_interrupted The calling thread has been
* interrupted before we could join this thread.
*/
void interrupt_and_join(bool interruptSyscalls = true) {
bool done = false;
while (!done) {
interrupt(interruptSyscalls);
done = timed_join(boost::posix_time::millisec(10));
}
}

/**
* Keep interrupting the thread until it's done, then join it.
* This method will keep trying for at most <em>timeout</em> milliseconds.
*
* @param timeout The maximum number of milliseconds that this method
* should keep trying.
* @param interruptSyscalls Whether oxt::syscalls calls should also
* be eligable for interruption.
* @return True if the thread was successfully joined, false if the
* timeout has been reached.
* @throws boost::thread_interrupted The calling thread has been
* interrupted before we could join this thread.
*/
bool interrupt_and_join(unsigned int timeout, bool interruptSyscalls = true) {
bool joined = false, timed_out = false;
boost::posix_time::ptime deadline =
boost::posix_time::microsec_clock::local_time() +
boost::posix_time::millisec(timeout);
while (!joined && !timed_out) {
interrupt(interruptSyscalls);
joined = timed_join(boost::posix_time::millisec(10));
timed_out = !joined && boost::posix_time::microsec_clock::local_time() > deadline;
}
return joined;
}

/**
* Interrupt and join multiple threads in a way that's more efficient than calling
* interrupt_and_join() on each thread individually. It iterates over all threads,
* interrupts each one without joining it, then waits until at least one thread
* is joinable. This is repeated until all threads are joined.
*
* @param threads An array of threads to join.
* @param size The number of elements in <em>threads</em>.
* @param interruptSyscalls Whether oxt::syscalls calls should also
* be eligable for interruption.
* @throws boost::thread_interrupted The calling thread has been
* interrupted before all threads have been joined. Some threads
* may have been successfully joined while others haven't.
*/
static void interrupt_and_join_multiple(oxt::thread **threads, unsigned int size,
bool interruptSyscalls = true)
{
std::list<oxt::thread *> remaining_threads;
std::list<oxt::thread *>::iterator it, current;
oxt::thread *thread;
unsigned int i;

for (i = 0; i < size; i++) {
remaining_threads.push_back(threads[i]);
}

while (!remaining_threads.empty()) {
for (it = remaining_threads.begin(); it != remaining_threads.end(); it++) {
thread = *it;
thread->interrupt(interruptSyscalls);
}
for (it = remaining_threads.begin(); it != remaining_threads.end(); it++) {
thread = *it;
if (thread->timed_join(boost::posix_time::millisec(0))) {
current = it;
it--;
remaining_threads.erase(current);
}
}
if (!remaining_threads.empty()) {
syscalls::usleep(10000);
}
}
}
};

/**
* Like boost::lock_guard, but is interruptable.
*/
template<typename TimedLockable>
class interruptable_lock_guard {
private:
TimedLockable &mutex;
public:
interruptable_lock_guard(TimedLockable &m): mutex(m) {
bool locked = false;

while (!locked) {
locked = m.timed_lock(boost::posix_time::milliseconds(20));
if (!locked) {
boost::this_thread::interruption_point();
}
}
}

~interruptable_lock_guard() {
mutex.unlock();
}
};

} // namespace oxt

#endif /* _OXT_THREAD_HPP_ */

Something went wrong with that request. Please try again.