Skip to content

Commit

Permalink
Make MutexQueue use jsemaphore for signaling
Browse files Browse the repository at this point in the history
  • Loading branch information
sapier authored and sapier committed Jan 10, 2014
1 parent 10fdbf7 commit 8b0b857
Show file tree
Hide file tree
Showing 13 changed files with 250 additions and 101 deletions.
18 changes: 16 additions & 2 deletions src/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,20 @@ Client::Client(
}
}

void Client::Stop()
{
//request all client managed threads to stop
m_mesh_update_thread.Stop();
}

bool Client::isShutdown()
{

if (!m_mesh_update_thread.IsRunning()) return true;

return false;
}

Client::~Client()
{
{
Expand All @@ -296,7 +310,7 @@ Client::~Client()
m_mesh_update_thread.Stop();
m_mesh_update_thread.Wait();
while(!m_mesh_update_thread.m_queue_out.empty()) {
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front();
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
delete r.mesh;
}

Expand Down Expand Up @@ -692,7 +706,7 @@ void Client::step(float dtime)
while(!m_mesh_update_thread.m_queue_out.empty())
{
num_processed_meshes++;
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front();
MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
MapBlock *block = m_env.getMap().getBlockNoCreateNoEx(r.p);
if(block)
{
Expand Down
8 changes: 8 additions & 0 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ class Client : public con::PeerHandler, public InventoryManager, public IGameDef
);

~Client();

/*
request all threads managed by client to be stopped
*/
void Stop();


bool isShutdown();
/*
The name of the local player should already be set when
calling this, as it is sent in the initialization.
Expand Down
5 changes: 3 additions & 2 deletions src/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,9 @@ void * Connection::Thread()

runTimeouts(dtime);

//NOTE this is only thread safe for ONE consumer thread!
while(!m_command_queue.empty()){
ConnectionCommand c = m_command_queue.pop_front();
ConnectionCommand c = m_command_queue.pop_frontNoEx();
processCommand(c);
}

Expand Down Expand Up @@ -1556,7 +1557,7 @@ ConnectionEvent Connection::getEvent()
e.type = CONNEVENT_NONE;
return e;
}
return m_event_queue.pop_front();
return m_event_queue.pop_frontNoEx();
}

ConnectionEvent Connection::waitEvent(u32 timeout_ms)
Expand Down
32 changes: 21 additions & 11 deletions src/game.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ class GameGlobalShaderConstantSetter : public IShaderConstantSetter
services->setVertexShaderConstant("animationTimer", &animation_timer_f, 1);

LocalPlayer* player = m_client->getEnv().getLocalPlayer();
v3f eye_position = player->getEyePosition();
v3f eye_position = player->getEyePosition();
services->setPixelShaderConstant("eyePosition", (irr::f32*)&eye_position, 3);
services->setVertexShaderConstant("eyePosition", (irr::f32*)&eye_position, 3);

Expand Down Expand Up @@ -1876,12 +1876,12 @@ void the_game(
}
else if(input->wasKeyDown(getKeySetting("keymap_screenshot")))
{
irr::video::IImage* const image = driver->createScreenShot();
if (image) {
irr::c8 filename[256];
snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png",
irr::video::IImage* const image = driver->createScreenShot();
if (image) {
irr::c8 filename[256];
snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png",
g_settings->get("screenshot_path").c_str(),
device->getTimer()->getRealTime());
device->getTimer()->getRealTime());
if (driver->writeImageToFile(image, filename)) {
std::wstringstream sstr;
sstr<<"Saved screenshot to '"<<filename<<"'";
Expand All @@ -1891,8 +1891,8 @@ void the_game(
} else{
infostream<<"Failed to save screenshot '"<<filename<<"'"<<std::endl;
}
image->drop();
}
image->drop();
}
}
else if(input->wasKeyDown(getKeySetting("keymap_toggle_hud")))
{
Expand Down Expand Up @@ -2263,7 +2263,7 @@ void the_game(
new MainRespawnInitiator(
&respawn_menu_active, &client);
GUIDeathScreen *menu =
new GUIDeathScreen(guienv, guiroot, -1,
new GUIDeathScreen(guienv, guiroot, -1,
&g_menumgr, respawner);
menu->drop();

Expand Down Expand Up @@ -2755,7 +2755,7 @@ void the_game(

// Sign special case, at least until formspec is properly implemented.
// Deprecated?
if(meta && meta->getString("formspec") == "hack:sign_text_input"
if(meta && meta->getString("formspec") == "hack:sign_text_input"
&& !random_input
&& !input->isKeyDown(getKeySetting("keymap_sneak")))
{
Expand Down Expand Up @@ -3222,7 +3222,7 @@ void the_game(

driver->getOverrideMaterial().Material.ColorMask = irr::video::ECP_RED;
driver->getOverrideMaterial().EnableFlags = irr::video::EMF_COLOR_MASK;
driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX +
driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX +
irr::scene::ESNRP_SOLID +
irr::scene::ESNRP_TRANSPARENT +
irr::scene::ESNRP_TRANSPARENT_EFFECT +
Expand Down Expand Up @@ -3433,6 +3433,16 @@ void the_game(
chat_backend.addMessage(L"", L"# Disconnected.");
chat_backend.addMessage(L"", L"");

client.Stop();

//force answer all texture and shader jobs (TODO return empty values)

while(!client.isShutdown()) {
tsrc->processQueue();
shsrc->processQueue();
sleep_ms(100);
}

// Client scope (client is destructed before destructing *def and tsrc)
}while(0);
} // try-catch
Expand Down
2 changes: 1 addition & 1 deletion src/httpfetch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ class CurlFetchThread : public JThread
*/

while (!m_requests.empty()) {
Request req = m_requests.pop_front();
Request req = m_requests.pop_frontNoEx();
processRequest(req);
}
processQueued(&pool);
Expand Down
1 change: 1 addition & 0 deletions src/itemdef.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ class CItemDefManager: public IWritableItemDefManager
void processQueue(IGameDef *gamedef)
{
#ifndef SERVER
//NOTE this is only thread safe for ONE consumer thread!
while(!m_get_clientcached_queue.empty())
{
GetRequest<std::string, ClientCached*, u8, u8>
Expand Down
1 change: 1 addition & 0 deletions src/jthread/jsemaphore.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class JSemaphore {

void Post();
void Wait();
bool Wait(unsigned int time_ms);

int GetValue();

Expand Down
31 changes: 31 additions & 0 deletions src/jthread/pthread/jsemaphore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include <assert.h>
#include <errno.h>
#include <sys/time.h>
#include "jthread/jsemaphore.h"

#define UNUSED(expr) do { (void)(expr); } while (0)

JSemaphore::JSemaphore() {
int sem_init_retval = sem_init(&m_semaphore,0,0);
assert(sem_init_retval == 0);
Expand Down Expand Up @@ -49,6 +53,33 @@ void JSemaphore::Wait() {
UNUSED(sem_wait_retval);
}

bool JSemaphore::Wait(unsigned int time_ms) {
struct timespec waittime;
struct timeval now;

if (gettimeofday(&now, NULL) == -1) {
assert("Unable to get time by clock_gettime!" == 0);
return false;
}

waittime.tv_nsec = ((time_ms % 1000) * 1000 * 1000) + (now.tv_usec * 1000);
waittime.tv_sec = (time_ms / 1000) + (waittime.tv_nsec / (1000*1000*1000)) + now.tv_sec;
waittime.tv_nsec %= 1000*1000*1000;

errno = 0;
int sem_wait_retval = sem_timedwait(&m_semaphore,&waittime);

if (sem_wait_retval == 0)
{
return true;
}
else {
assert((errno == ETIMEDOUT) || (errno == EINTR));
return false;
}
return sem_wait_retval == 0 ? true : false;
}

int JSemaphore::GetValue() {

int retval = 0;
Expand Down
15 changes: 15 additions & 0 deletions src/jthread/win32/jsemaphore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@ void JSemaphore::Wait() {
INFINITE);
}

bool JSemaphore::Wait(unsigned int time_ms) {
unsigned int retval = WaitForSingleObject(
m_hSemaphore,
time_ms);

if (retval == WAIT_OBJECT_0)
{
return true;
}
else {
assert(retval == WAIT_TIMEOUT);
return false;
}
}

int JSemaphore::GetValue() {

long int retval = 0;
Expand Down
24 changes: 11 additions & 13 deletions src/shader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -427,21 +427,18 @@ u32 ShaderSource::getShaderId(const std::string &name)
/* infostream<<"Waiting for shader from main thread, name=\""
<<name<<"\""<<std::endl;*/

try{
while(true) {
// Wait result for a second
GetResult<std::string, u32, u8, u8>
result = result_queue.pop_front(1000);

if (result.key == name) {
return result.item;
}
while(true) {
GetResult<std::string, u32, u8, u8>
result = result_queue.pop_frontNoEx();

if (result.key == name) {
return result.item;
}
else {
errorstream << "Got shader with invalid name: " << result.key << std::endl;
}
}
catch(ItemNotFoundException &e){
errorstream<<"Waiting for shader " << name << " timed out."<<std::endl;
return 0;
}

}

infostream<<"getShaderId(): Failed"<<std::endl;
Expand Down Expand Up @@ -537,6 +534,7 @@ void ShaderSource::processQueue()
/*
Fetch shaders
*/
//NOTE this is only thread safe for ONE consumer thread!
if(!m_get_shader_queue.empty()){
GetRequest<std::string, u32, u8, u8>
request = m_get_shader_queue.pop();
Expand Down
1 change: 1 addition & 0 deletions src/tile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ void TextureSource::processQueue()
/*
Fetch textures
*/
//NOTE this is only thread safe for ONE consumer thread!
if(!m_get_texture_queue.empty())
{
GetRequest<std::string, u32, u8, u8>
Expand Down
Loading

0 comments on commit 8b0b857

Please sign in to comment.