Skip to content
Permalink
Browse files

Introduce Commands, Tasks, Scheduler

This provides a basic infrastructure to run arbitrary Command
implementations on the core without blocking the coresession.

- Commands are created inside coresessions (always bound to a user ID)
- Tasks wrap commands for metadata (permissions, timings,..)
- Scheduler uses a QThreadPool to run the tasks (which are QRunnables)

By default, the _maximum_ amount of threads in the pool is the number of active
users at core start. Thereby every user _could_ be able to get a free
thread. There is no limitation on threads-per-user yet though.
Unused threads time out after 30 seconds.

The maximum threads can be overridden at compile time by defining CORE_MAX_TASK_THREADS.
Note: This can not be used to set the threads to zero. The minimum
number of threads is 1.

To enable the scheduler and thereby any kind of threads, use the core
command line switch --enable-tasks.
  • Loading branch information
seezer committed Aug 23, 2014
1 parent e236a6e commit 8e4c1d8c20b04bd1b2e7a98ab3ad2899e5735724
@@ -128,6 +128,7 @@ int main(int argc, char **argv)
cliParser->addOption("port <port>", 'p', "The port quasselcore will listen at", QString("4242"));
cliParser->addSwitch("norestore", 'n', "Don't restore last core's state");
cliParser->addOption("loglevel <level>", 'L', "Loglevel Debug|Info|Warning|Error", "Info");
cliParser->addSwitch("enable-tasks", 0, "Enable task machinery");
#ifdef HAVE_SYSLOG
cliParser->addSwitch("syslog", 0, "Log to syslog");
#endif
@@ -36,6 +36,10 @@ set(SOURCES
sqlitestorage.cpp
storage.cpp

task.cpp
command.cpp
scheduler.cpp

# needed for automoc
coreeventmanager.h
)
@@ -0,0 +1,29 @@
#include "command.h"
#include "corenetwork.h"

Command::Command(UserId userId, CoreNetwork *network, QObject *parent)
: QObject(parent),
_userId(userId),
_network(network)
{
}

Command::~Command()
{
}

void Command::setupOutput()
{
connect(this, SIGNAL(msg(Message::Type,BufferInfo::Type,QString,QString)),
_network, SLOT(displayMsg(Message::Type,BufferInfo::Type,QString,QString)), Qt::QueuedConnection);
}

void Command::sendMessage(Message::Type msgType, BufferInfo::Type bufferType, const QString &target, const QString &text)
{
emit msg(msgType, bufferType, target, text);
}

UserId Command::userId()
{
return _userId;
}
@@ -0,0 +1,31 @@
#ifndef COMMAND_H
#define COMMAND_H

#include "types.h"
#include "message.h"

class CoreNetwork;

class Command : public QObject
{
Q_OBJECT

public:
Command(UserId userId, CoreNetwork *network, QObject *parent = 0);
virtual ~Command();
virtual void exec() = 0;
virtual void setupOutput();
virtual void sendMessage(Message::Type msgType, BufferInfo::Type bufferType, const QString &target, const QString &text);

signals:
void msg(Message::Type, BufferInfo::Type ,const QString &target, const QString &msg);

protected:
virtual UserId userId();

private:
UserId _userId;
CoreNetwork *_network;
};

#endif // COMMAND_H
@@ -21,6 +21,8 @@
#include <QCoreApplication>

#include "core.h"

#include "command.h"
#include "coreauthhandler.h"
#include "coresession.h"
#include "coresettings.h"
@@ -29,6 +31,7 @@
#include "network.h"
#include "postgresqlstorage.h"
#include "quassel.h"
#include "scheduler.h"
#include "sqlitestorage.h"
#include "util.h"

@@ -83,7 +86,8 @@ void Core::destroy()

Core::Core()
: QObject(),
_storage(0)
_storage(0),
_scheduler(0)
{
#ifdef HAVE_UMASK
umask(S_IRWXG | S_IRWXO);
@@ -207,6 +211,17 @@ void Core::init()
exit(0);
}

if(Quassel::isOptionSet("enable-tasks"))
{
int maxThreads;
#ifdef CORE_MAX_TASK_THREADS
maxThreads = CORE_MAX_TASK_THREADS;
#else
// default is "1 per active user" - requires restart to adapt to new users
maxThreads = cs.coreState().toMap()["ActiveSessions"].toList().size();
#endif
_scheduler = new Scheduler(maxThreads, this);
}
connect(&_server, SIGNAL(newConnection()), this, SLOT(incomingConnection()));
connect(&_v6server, SIGNAL(newConnection()), this, SLOT(incomingConnection()));
if (!startListening()) exit(1); // TODO make this less brutal
@@ -222,6 +237,10 @@ Core::~Core()
foreach(CoreAuthHandler *handler, _connectingClients) {
handler->deleteLater(); // disconnect non authed clients
}
if(_scheduler)
{
delete _scheduler;
}
qDeleteAll(sessions);
qDeleteAll(_storageBackends);
}
@@ -416,6 +435,16 @@ bool Core::createNetwork(UserId user, NetworkInfo &info)
return true;
}

void Core::scheduleCommand(Command *command)
{
Scheduler *scheduler = instance()->_scheduler;
if(scheduler)
{
Task *task = scheduler->createTask(command);
scheduler->runTask(task);
}
}


/*** Network Management ***/

@@ -46,6 +46,8 @@ class CoreSession;
struct NetworkInfo;
class SessionThread;
class SignalProxy;
class Scheduler;
class Command;

class AbstractSqlMigrationReader;
class AbstractSqlMigrationWriter;
@@ -481,6 +483,7 @@ class Core : public QObject
return instance()->_storage->bufferMarkerLineMsgIds(user);
}

static void scheduleCommand(Command *command);

static inline QDateTime startTime() { return instance()->_startTime; }
static inline bool isConfigured() { return instance()->_configured; }
@@ -565,6 +568,8 @@ private slots:

bool _configured;

Scheduler *_scheduler;

static AbstractSqlMigrationReader *getMigrationReader(Storage *storage);
static AbstractSqlMigrationWriter *getMigrationWriter(Storage *storage);
static void stdInEcho(bool on);
@@ -196,6 +196,11 @@ void CoreSession::saveSessionState() const
_networkConfig->save();
}

void CoreSession::scheduleCommand(Command *cmd)
{
Core::scheduleCommand(cmd);
}


void CoreSession::restoreSessionState()
{
@@ -30,6 +30,7 @@
#include "protocol.h"
#include "message.h"
#include "storage.h"
#include "command.h"

class CoreBacklogManager;
class CoreBufferSyncer;
@@ -132,6 +133,8 @@ public slots:
//! Marks us away (or unaway) on all networks
void globalAway(const QString &msg = QString());

void scheduleCommand(Command *cmd);

signals:
void initialized();
void sessionState(const Protocol::SessionState &sessionState);
@@ -0,0 +1,31 @@
#include "scheduler.h"

#include "task.h"
#include "command.h"

#include <QThreadPool>

Scheduler::Scheduler(int maxThreads, QObject *parent) :
QObject(parent)
{
// even if maxThreads is < 1, one thread is created
QThreadPool::globalInstance()->setMaxThreadCount(maxThreads);
}

Scheduler::~Scheduler()
{
QThreadPool::globalInstance()->waitForDone();
}


Task *Scheduler::createTask(Command *command)
{
return new Task(command);
}

void Scheduler::runTask(Task *task)
{
// TODO use an own pool if core ever has other usecases for the global one
QThreadPool *pool = QThreadPool::globalInstance();
pool->start(task);
}
@@ -0,0 +1,31 @@
#ifndef SCHEDULER_H
#define SCHEDULER_H

#include <QObject>

class Command;
class Task;

class Scheduler : public QObject
{
Q_OBJECT
public:
explicit Scheduler(int maxThreads, QObject *parent = 0);
~Scheduler();

Task *createTask(Command *command);
/**
* @brief Schedule a task for immediate execution on next free worker
* @param task
*/
void runTask(Task *task);

// TODO provide runAt
signals:
void start();

public slots:
private:
};

#endif // SCHEDULER_H
@@ -0,0 +1,24 @@
#include "task.h"
#include "command.h"
Task::Task(Command *command, QObject *parent)
: QObject(parent),
_command(command)
{
}

QDateTime Task::runAt() const
{
return _runAt;
}

void Task::setRunAt(const QDateTime &runAt)
{
_runAt = runAt;
}

void Task::run()
{
_command->setupOutput();
_command->exec();
}

@@ -0,0 +1,28 @@
#ifndef TASK_H
#define TASK_H

#include "command.h"

#include <QDateTime>
#include <QScopedPointer>
#include <QObject>
#include <QRunnable>

class Task : public QObject, public QRunnable
{
Q_OBJECT
public:
Task(Command *command, QObject *parent = 0);

QDateTime runAt() const;
void setRunAt(const QDateTime &runAt);

virtual void run();

private:
QDateTime _runAt;

QScopedPointer<Command> _command;
};

#endif // TASK_H

0 comments on commit 8e4c1d8

Please sign in to comment.
You can’t perform that action at this time.