Skip to content

Commit

Permalink
Rework server stepping and dtime calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
Desour authored and sfan5 committed Dec 25, 2023
1 parent b6c7c5a commit 322c4a5
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 80 deletions.
35 changes: 21 additions & 14 deletions src/client/game.cpp
Expand Up @@ -1304,8 +1304,8 @@ void Game::run()
updatePauseState();
if (m_is_paused)
dtime = 0.0f;
else
step(dtime);

step(dtime);

processClientEvents(&cam_view_target);
updateDebugState();
Expand Down Expand Up @@ -1454,7 +1454,7 @@ bool Game::createSingleplayerServer(const std::string &map_dir,
} else {
bind_str = g_settings->get("bind_address");
}

Address bind_addr(0, 0, 0, 0, port);

if (g_settings->getBool("ipv6_server"))
Expand Down Expand Up @@ -1682,10 +1682,7 @@ bool Game::connectToServer(const GameStartData &start_data,
fps_control.limit(device, &dtime);

// Update client and server
client->step(dtime);

if (server != NULL)
server->step(dtime);
step(dtime);

// End condition
if (client->getState() == LC_Init) {
Expand Down Expand Up @@ -1744,10 +1741,7 @@ bool Game::getServerContent(bool *aborted)
fps_control.limit(device, &dtime);

// Update client and server
client->step(dtime);

if (server != NULL)
server->step(dtime);
step(dtime);

// End condition
if (client->mediaReceived() && client->itemdefReceived() &&
Expand Down Expand Up @@ -2765,10 +2759,23 @@ void Game::updatePauseState()

inline void Game::step(f32 dtime)
{
if (server)
server->step(dtime);
if (server) {
float fps_max = (!device->isWindowFocused() || g_menumgr.pausesGame()) ?
g_settings->getFloat("fps_max_unfocused") :
g_settings->getFloat("fps_max");
fps_max = std::max(fps_max, 1.0f);
float steplen = 1.0f / fps_max;

server->setStepSettings(Server::StepSettings{
steplen,
m_is_paused
});

client->step(dtime);
server->step();
}

if (!m_is_paused)
client->step(dtime);
}

static void pauseNodeAnimation(PausedNodesList &paused, scene::ISceneNode *node) {
Expand Down
8 changes: 4 additions & 4 deletions src/network/connection.cpp
Expand Up @@ -1418,15 +1418,15 @@ void Connection::Disconnect()
putCommand(ConnectionCommand::disconnect());
}

bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
bool Connection::ReceiveTimeoutMs(NetworkPacket *pkt, u32 timeout_ms)
{
/*
Note that this function can potentially wait infinitely if non-data
events keep happening before the timeout expires.
This is not considered to be a problem (is it?)
*/
for(;;) {
ConnectionEventPtr e_ptr = waitEvent(timeout);
ConnectionEventPtr e_ptr = waitEvent(timeout_ms);
const ConnectionEvent &e = *e_ptr;

if (e.type != CONNEVENT_NONE) {
Expand Down Expand Up @@ -1467,14 +1467,14 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout)

void Connection::Receive(NetworkPacket *pkt)
{
bool any = Receive(pkt, m_bc_receive_timeout);
bool any = ReceiveTimeoutMs(pkt, m_bc_receive_timeout);
if (!any)
throw NoIncomingDataException("No incoming data");
}

bool Connection::TryReceive(NetworkPacket *pkt)
{
return Receive(pkt, 0);
return ReceiveTimeoutMs(pkt, 0);
}

void Connection::Send(session_t peer_id, u8 channelnum,
Expand Down
5 changes: 2 additions & 3 deletions src/network/connection.h
Expand Up @@ -714,7 +714,8 @@ class Connection
void Connect(Address address);
bool Connected();
void Disconnect();
void Receive(NetworkPacket* pkt);
bool ReceiveTimeoutMs(NetworkPacket *pkt, u32 timeout_ms);
void Receive(NetworkPacket *pkt);
bool TryReceive(NetworkPacket *pkt);
void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
session_t GetPeerID() const { return m_peer_id; }
Expand Down Expand Up @@ -747,8 +748,6 @@ class Connection
// Command queue: user -> SendThread
MutexedQueue<ConnectionCommandPtr> m_command_queue;

bool Receive(NetworkPacket *pkt, u32 timeout);

void putEvent(ConnectionEventPtr e);

void TriggerSend();
Expand Down
95 changes: 44 additions & 51 deletions src/server.cpp
Expand Up @@ -106,26 +106,33 @@ void *ServerThread::run()
/*
* The real business of the server happens on the ServerThread.
* How this works:
* AsyncRunStep() runs an actual server step as soon as enough time has
* passed (dedicated_server_loop keeps track of that).
* Receive() blocks at least(!) 30ms waiting for a packet (so this loop
* doesn't busy wait) and will process any remaining packets.
* AsyncRunStep() (which runs the actual server step) is called at the
* server-step frequency. Receive() is used for waiting between the steps.
*/

try {
m_server->AsyncRunStep(true);
m_server->AsyncRunStep(0.0f, true);
} catch (con::ConnectionBindFailed &e) {
m_server->setAsyncFatalError(e.what());
} catch (LuaError &e) {
m_server->setAsyncFatalError(e);
}

float dtime = 0.0f;

while (!stopRequested()) {
ScopeProfiler spm(g_profiler, "Server::RunStep() (max)", SPT_MAX);

u64 t0 = porting::getTimeUs();

const Server::StepSettings step_settings = m_server->getStepSettings();

try {
m_server->AsyncRunStep();
m_server->AsyncRunStep(step_settings.pause ? 0.0f : dtime);

m_server->Receive();
const float remaining_time = step_settings.steplen
- 1e-6f * (porting::getTimeUs() - t0);
m_server->Receive(remaining_time);

} catch (con::PeerNotFoundException &e) {
infostream<<"Server: PeerNotFoundException"<<std::endl;
Expand All @@ -135,6 +142,8 @@ void *ServerThread::run()
} catch (LuaError &e) {
m_server->setAsyncFatalError(e);
}

dtime = 1e-6f * (porting::getTimeUs() - t0);
}

END_DEBUG_EXCEPTION_HANDLER
Expand Down Expand Up @@ -574,15 +583,8 @@ void Server::stop()
infostream<<"Server: Threads stopped"<<std::endl;
}

void Server::step(float dtime)
void Server::step()
{
// Limit a bit
if (dtime > DTIME_LIMIT)
dtime = DTIME_LIMIT;
{
MutexAutoLock lock(m_step_dtime_mutex);
m_step_dtime += dtime;
}
// Throw if fatal error occurred in thread
std::string async_err = m_async_fatal_error.get();
if (!async_err.empty()) {
Expand All @@ -595,30 +597,18 @@ void Server::step(float dtime)
}
}

void Server::AsyncRunStep(bool initial_step)
void Server::AsyncRunStep(float dtime, bool initial_step)
{

float dtime;
{
MutexAutoLock lock1(m_step_dtime_mutex);
dtime = m_step_dtime;
}

{
// Send blocks to clients
SendBlocks(dtime);
}

if((dtime < 0.001) && !initial_step)
if ((dtime < 0.001f) && !initial_step)
return;

ScopeProfiler sp(g_profiler, "Server::AsyncRunStep()", SPT_AVG);

{
MutexAutoLock lock1(m_step_dtime_mutex);
m_step_dtime -= dtime;
}

/*
Update uptime
*/
Expand Down Expand Up @@ -1048,25 +1038,27 @@ void Server::AsyncRunStep(bool initial_step)
m_shutdown_state.tick(dtime, this);
}

void Server::Receive()
void Server::Receive(float timeout)
{
const u64 t0 = porting::getTimeUs();
const float timeout_us = timeout * 1e6f;
auto remaining_time_us = [&]() -> float {
return std::max(0.0f, timeout_us - (porting::getTimeUs() - t0));
};

NetworkPacket pkt;
session_t peer_id;
bool first = true;
for (;;) {
pkt.clear();
peer_id = 0;
try {
/*
In the first iteration *wait* for a packet, afterwards process
all packets that are immediately available (no waiting).
*/
if (first) {
m_con->Receive(&pkt);
first = false;
} else {
if (!m_con->TryReceive(&pkt))
return;
if (!m_con->ReceiveTimeoutMs(&pkt,
(u32)remaining_time_us() / 1000)) {
// No incoming data.
// Already break if there's 1ms left, as ReceiveTimeoutMs is too coarse
// and a faster server-step is better than busy waiting.
if (remaining_time_us() < 1000.0f)
break;
}

peer_id = pkt.getPeerId();
Expand All @@ -1085,8 +1077,6 @@ void Server::Receive()
DenyAccess(peer_id, SERVER_ACCESSDENIED_UNEXPECTED_DATA);
} catch (const con::PeerNotFoundException &e) {
// Do nothing
} catch (const con::NoIncomingDataException &e) {
return;
}
}
}
Expand Down Expand Up @@ -3953,21 +3943,24 @@ void dedicated_server_loop(Server &server, bool &kill)

IntervalLimiter m_profiler_interval;

static thread_local const float steplen =
g_settings->getFloat("dedicated_server_step");
static thread_local const float profiler_print_interval =
g_settings->getFloat("profiler_print_interval");
constexpr float steplen = 0.05f; // always 50 ms
const float profiler_print_interval = g_settings->getFloat("profiler_print_interval");

server.setStepSettings(Server::StepSettings{
g_settings->getFloat("dedicated_server_step"),
false
});

/*
* The dedicated server loop only does time-keeping (in Server::step) and
* provides a way to main.cpp to kill the server externally (bool &kill).
* The dedicated server loop only provides a way to main.cpp to kill the
* server externally (bool &kill).
*/

for(;;) {
// This is kind of a hack but can be done like this
// because server.step() is very light
sleep_ms((int)(steplen*1000.0));
server.step(steplen);
sleep_ms((int)(steplen*1000.0f));
server.step();

if (server.isShutdownRequested() || kill)
break;
Expand Down
23 changes: 15 additions & 8 deletions src/server.h
Expand Up @@ -38,6 +38,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "chatmessage.h"
#include "sound.h"
#include "translation.h"
#include <atomic>
#include <string>
#include <list>
#include <map>
Expand Down Expand Up @@ -157,12 +158,12 @@ class Server : public con::PeerHandler, public MapEventReceiver,

void start();
void stop();
// This is mainly a way to pass the time to the server.
// Actual processing is done in another thread.
void step(float dtime);
// This just checks if there was an error in that thread.
void step();
// This is run by ServerThread and does the actual processing
void AsyncRunStep(bool initial_step=false);
void Receive();
void AsyncRunStep(float dtime, bool initial_step = false);
void Receive(float timeout);
PlayerSAO* StageTwoClientInit(session_t peer_id);

/*
Expand Down Expand Up @@ -293,6 +294,14 @@ class Server : public con::PeerHandler, public MapEventReceiver,
inline bool isSingleplayer() const
{ return m_simple_singleplayer_mode; }

struct StepSettings {
float steplen;
bool pause;
};

void setStepSettings(StepSettings spdata) { m_step_settings.store(spdata); }
StepSettings getStepSettings() { return m_step_settings.load(); }

inline void setAsyncFatalError(const std::string &error)
{ m_async_fatal_error.set(error); }
inline void setAsyncFatalError(const LuaError &e)
Expand Down Expand Up @@ -624,10 +633,8 @@ class Server : public con::PeerHandler, public MapEventReceiver,
/*
Threads
*/
// A buffer for time steps
// step() increments and AsyncRunStep() run by m_thread reads it.
float m_step_dtime = 0.0f;
std::mutex m_step_dtime_mutex;
// Set by Game
std::atomic<StepSettings> m_step_settings{{0.1f, false}};

// The server mainly operates in this thread
ServerThread *m_thread = nullptr;
Expand Down

0 comments on commit 322c4a5

Please sign in to comment.