Skip to content

Commit

Permalink
Replace SimpleThread by JThread now implementing same features
Browse files Browse the repository at this point in the history
  • Loading branch information
sapier authored and sapier committed Dec 15, 2013
1 parent 9772322 commit e9e9fd7
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 88 deletions.
7 changes: 3 additions & 4 deletions src/client.cpp
Expand Up @@ -177,7 +177,7 @@ void * MeshUpdateThread::Thread()


BEGIN_DEBUG_EXCEPTION_HANDLER BEGIN_DEBUG_EXCEPTION_HANDLER


while(getRun()) while(!StopRequested())
{ {
/*// Wait for output queue to flush. /*// Wait for output queue to flush.
// Allow 2 in queue, this makes less frametime jitter. // Allow 2 in queue, this makes less frametime jitter.
Expand Down Expand Up @@ -302,9 +302,8 @@ Client::~Client()
m_con.Disconnect(); m_con.Disconnect();
} }


m_mesh_update_thread.setRun(false); m_mesh_update_thread.Stop();
while(m_mesh_update_thread.IsRunning()) m_mesh_update_thread.Wait();
sleep_ms(100);
while(!m_mesh_update_thread.m_queue_out.empty()) { while(!m_mesh_update_thread.m_queue_out.empty()) {
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front(); MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front();
delete r.mesh; delete r.mesh;
Expand Down
2 changes: 1 addition & 1 deletion src/client.h
Expand Up @@ -103,7 +103,7 @@ struct MeshUpdateResult
} }
}; };


class MeshUpdateThread : public SimpleThread class MeshUpdateThread : public JThread
{ {
public: public:


Expand Down
4 changes: 2 additions & 2 deletions src/connection.cpp
Expand Up @@ -556,7 +556,7 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,


Connection::~Connection() Connection::~Connection()
{ {
stop(); Stop();
// Delete peers // Delete peers
for(std::map<u16, Peer*>::iterator for(std::map<u16, Peer*>::iterator
j = m_peers.begin(); j = m_peers.begin();
Expand All @@ -578,7 +578,7 @@ void * Connection::Thread()
u32 curtime = porting::getTimeMs(); u32 curtime = porting::getTimeMs();
u32 lasttime = curtime; u32 lasttime = curtime;


while(getRun()) while(!StopRequested())
{ {
BEGIN_DEBUG_EXCEPTION_HANDLER BEGIN_DEBUG_EXCEPTION_HANDLER


Expand Down
8 changes: 4 additions & 4 deletions src/connection.h
Expand Up @@ -450,11 +450,11 @@ struct ConnectionEvent
return "CONNEVENT_NONE"; return "CONNEVENT_NONE";
case CONNEVENT_DATA_RECEIVED: case CONNEVENT_DATA_RECEIVED:
return "CONNEVENT_DATA_RECEIVED"; return "CONNEVENT_DATA_RECEIVED";
case CONNEVENT_PEER_ADDED: case CONNEVENT_PEER_ADDED:
return "CONNEVENT_PEER_ADDED"; return "CONNEVENT_PEER_ADDED";
case CONNEVENT_PEER_REMOVED: case CONNEVENT_PEER_REMOVED:
return "CONNEVENT_PEER_REMOVED"; return "CONNEVENT_PEER_REMOVED";
case CONNEVENT_BIND_FAILED: case CONNEVENT_BIND_FAILED:
return "CONNEVENT_BIND_FAILED"; return "CONNEVENT_BIND_FAILED";
} }
return "Invalid ConnectionEvent"; return "Invalid ConnectionEvent";
Expand Down Expand Up @@ -544,7 +544,7 @@ struct ConnectionCommand
} }
}; };


class Connection: public SimpleThread class Connection: public JThread
{ {
public: public:
Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6); Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
Expand Down
25 changes: 8 additions & 17 deletions src/emerge.cpp
Expand Up @@ -47,7 +47,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "mapgen_math.h" #include "mapgen_math.h"




class EmergeThread : public SimpleThread class EmergeThread : public JThread
{ {
public: public:
Server *m_server; Server *m_server;
Expand All @@ -61,26 +61,17 @@ class EmergeThread : public SimpleThread
std::queue<v3s16> blockqueue; std::queue<v3s16> blockqueue;


EmergeThread(Server *server, int ethreadid): EmergeThread(Server *server, int ethreadid):
SimpleThread(), JThread(),
m_server(server), m_server(server),
map(NULL), map(NULL),
emerge(NULL), emerge(NULL),
mapgen(NULL), mapgen(NULL),
enable_mapgen_debug_info(false),
id(ethreadid) id(ethreadid)
{ {
} }


void *Thread(); void *Thread();

void trigger()
{
setRun(true);
if(IsRunning() == false)
{
Start();
}
}

bool popBlockEmerge(v3s16 *pos, u8 *flags); bool popBlockEmerge(v3s16 *pos, u8 *flags);
bool getBlockOrStartGen(v3s16 p, MapBlock **b, bool getBlockOrStartGen(v3s16 p, MapBlock **b,
BlockMakeData *data, bool allow_generate); BlockMakeData *data, bool allow_generate);
Expand Down Expand Up @@ -137,9 +128,9 @@ EmergeManager::EmergeManager(IGameDef *gamedef) {


EmergeManager::~EmergeManager() { EmergeManager::~EmergeManager() {
for (unsigned int i = 0; i != emergethread.size(); i++) { for (unsigned int i = 0; i != emergethread.size(); i++) {
emergethread[i]->setRun(false); emergethread[i]->Stop();
emergethread[i]->qevent.signal(); emergethread[i]->qevent.signal();
emergethread[i]->stop(); emergethread[i]->Wait();
delete emergethread[i]; delete emergethread[i];
delete mapgen[i]; delete mapgen[i];
} }
Expand Down Expand Up @@ -261,9 +252,9 @@ Mapgen *EmergeManager::getCurrentMapgen() {
} }




void EmergeManager::triggerAllThreads() { void EmergeManager::startAllThreads() {
for (unsigned int i = 0; i != emergethread.size(); i++) for (unsigned int i = 0; i != emergethread.size(); i++)
emergethread[i]->trigger(); emergethread[i]->Start();
} }




Expand Down Expand Up @@ -499,7 +490,7 @@ void *EmergeThread::Thread() {
mapgen = emerge->mapgen[id]; mapgen = emerge->mapgen[id];
enable_mapgen_debug_info = emerge->mapgen_debug_info; enable_mapgen_debug_info = emerge->mapgen_debug_info;


while (getRun()) while (!StopRequested())
try { try {
if (!popBlockEmerge(&p, &flags)) { if (!popBlockEmerge(&p, &flags)) {
qevent.wait(); qevent.wait();
Expand Down
2 changes: 1 addition & 1 deletion src/emerge.h
Expand Up @@ -119,7 +119,7 @@ class EmergeManager : public IBackgroundBlockEmerger {
Mapgen *createMapgen(std::string mgname, int mgid, Mapgen *createMapgen(std::string mgname, int mgid,
MapgenParams *mgparams); MapgenParams *mgparams);
MapgenParams *createMapgenParams(std::string mgname); MapgenParams *createMapgenParams(std::string mgname);
void triggerAllThreads(); void startAllThreads();
bool enqueueBlockEmerge(u16 peer_id, v3s16 p, bool allow_generate); bool enqueueBlockEmerge(u16 peer_id, v3s16 p, bool allow_generate);


void registerMapgen(std::string name, MapgenFactory *mgfactory); void registerMapgen(std::string name, MapgenFactory *mgfactory);
Expand Down
11 changes: 4 additions & 7 deletions src/httpfetch.cpp
Expand Up @@ -319,7 +319,7 @@ struct HTTPFetchOngoing
} }
}; };


class CurlFetchThread : public SimpleThread class CurlFetchThread : public JThread
{ {
protected: protected:
enum RequestType { enum RequestType {
Expand Down Expand Up @@ -539,7 +539,6 @@ class CurlFetchThread : public SimpleThread


void * Thread() void * Thread()
{ {
ThreadStarted();
log_register_thread("CurlFetchThread"); log_register_thread("CurlFetchThread");
DSTACK(__FUNCTION_NAME); DSTACK(__FUNCTION_NAME);


Expand All @@ -553,7 +552,7 @@ class CurlFetchThread : public SimpleThread


assert(m_all_ongoing.empty()); assert(m_all_ongoing.empty());


while (getRun()) { while (!StopRequested()) {
BEGIN_DEBUG_EXCEPTION_HANDLER BEGIN_DEBUG_EXCEPTION_HANDLER


/* /*
Expand Down Expand Up @@ -641,9 +640,9 @@ void httpfetch_cleanup()
{ {
verbosestream<<"httpfetch_cleanup: cleaning up"<<std::endl; verbosestream<<"httpfetch_cleanup: cleaning up"<<std::endl;


g_httpfetch_thread->setRun(false); g_httpfetch_thread->Stop();
g_httpfetch_thread->requestWakeUp(); g_httpfetch_thread->requestWakeUp();
g_httpfetch_thread->stop(); g_httpfetch_thread->Wait();
delete g_httpfetch_thread; delete g_httpfetch_thread;


curl_global_cleanup(); curl_global_cleanup();
Expand All @@ -652,8 +651,6 @@ void httpfetch_cleanup()
void httpfetch_async(const HTTPFetchRequest &fetchrequest) void httpfetch_async(const HTTPFetchRequest &fetchrequest)
{ {
g_httpfetch_thread->requestFetch(fetchrequest); g_httpfetch_thread->requestFetch(fetchrequest);
if (!g_httpfetch_thread->IsRunning())
g_httpfetch_thread->Start();
} }


static void httpfetch_request_clear(unsigned long caller) static void httpfetch_request_clear(unsigned long caller)
Expand Down
15 changes: 7 additions & 8 deletions src/server.cpp
Expand Up @@ -73,14 +73,14 @@ class ClientNotFoundException : public BaseException
{} {}
}; };


class ServerThread : public SimpleThread class ServerThread : public JThread
{ {
Server *m_server; Server *m_server;


public: public:


ServerThread(Server *server): ServerThread(Server *server):
SimpleThread(), JThread(),
m_server(server) m_server(server)
{ {
} }
Expand All @@ -98,7 +98,7 @@ void * ServerThread::Thread()


BEGIN_DEBUG_EXCEPTION_HANDLER BEGIN_DEBUG_EXCEPTION_HANDLER


while(getRun()) while(!StopRequested())
{ {
try{ try{
//TimeTaker timer("AsyncRunStep() + Receive()"); //TimeTaker timer("AsyncRunStep() + Receive()");
Expand Down Expand Up @@ -963,14 +963,13 @@ void Server::start(unsigned short port)
infostream<<"Starting server on port "<<port<<"..."<<std::endl; infostream<<"Starting server on port "<<port<<"..."<<std::endl;


// Stop thread if already running // Stop thread if already running
m_thread->stop(); m_thread->Stop();


// Initialize connection // Initialize connection
m_con.SetTimeoutMs(30); m_con.SetTimeoutMs(30);
m_con.Serve(port); m_con.Serve(port);


// Start thread // Start thread
m_thread->setRun(true);
m_thread->Start(); m_thread->Start();


// ASCII art for the win! // ASCII art for the win!
Expand All @@ -993,9 +992,9 @@ void Server::stop()
infostream<<"Server: Stopping and waiting threads"<<std::endl; infostream<<"Server: Stopping and waiting threads"<<std::endl;


// Stop threads (set run=false first so both start stopping) // Stop threads (set run=false first so both start stopping)
m_thread->setRun(false); m_thread->Stop();
//m_emergethread.setRun(false); //m_emergethread.setRun(false);
m_thread->stop(); m_thread->Wait();
//m_emergethread.stop(); //m_emergethread.stop();


infostream<<"Server: Threads stopped"<<std::endl; infostream<<"Server: Threads stopped"<<std::endl;
Expand Down Expand Up @@ -1682,7 +1681,7 @@ void Server::AsyncRunStep()
{ {
counter = 0.0; counter = 0.0;


m_emerge->triggerAllThreads(); m_emerge->startAllThreads();


// Update m_enable_rollback_recording here too // Update m_enable_rollback_recording here too
m_enable_rollback_recording = m_enable_rollback_recording =
Expand Down
44 changes: 0 additions & 44 deletions src/util/thread.h
Expand Up @@ -59,53 +59,9 @@ class MutexedVariable
JMutex m_mutex; JMutex m_mutex;
}; };


/*
A base class for simple background thread implementation
*/

class SimpleThread : public JThread
{
bool run;
JMutex run_mutex;

public:

SimpleThread():
JThread(),
run(true)
{
}

virtual ~SimpleThread()
{}

virtual void * Thread() = 0;

bool getRun()
{
JMutexAutoLock lock(run_mutex);
return run;
}
void setRun(bool a_run)
{
JMutexAutoLock lock(run_mutex);
run = a_run;
}

void stop()
{
setRun(false);
while(IsRunning())
sleep_ms(100);
}
};

/* /*
A single worker thread - multiple client threads queue framework. A single worker thread - multiple client threads queue framework.
*/ */



template<typename Key, typename T, typename Caller, typename CallerData> template<typename Key, typename T, typename Caller, typename CallerData>
class GetResult class GetResult
{ {
Expand Down

0 comments on commit e9e9fd7

Please sign in to comment.