Skip to content

Commit

Permalink
- moved mRun.store(true) to top of start function in UDP thread, othe…
Browse files Browse the repository at this point in the history
…rwise, thread could potentially exit before mRun is set to true

- in UDP Client changed mRemoteIP to mEndpoint which can contain an address to an mDNS local address which will be resolved by ASIO
- changed process to onProcess to more nicely align with onStart and onStart virtual methods in UDPAdapter
- remove adapter before calling onStop, fixing a potentiol error where adapter could get called while its internal implementation is already stopped or discarded
  • Loading branch information
TimGroeneboom committed Oct 6, 2022
1 parent 647322d commit 44f31f8
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 33 deletions.
10 changes: 7 additions & 3 deletions modules/napudp/src/udpadapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,21 @@ namespace nap
return false;

mThread->registerAdapter(this);

return true;
}


void UDPAdapter::stop()
{
mThread->removeAdapter(this);
onStop();
}

mThread->removeAdapter(this);
};

void UDPAdapter::process()
{
onProcess();
}


bool UDPAdapter::handleAsioError(const std::error_code& errorCode, utility::ErrorState& errorState, bool& success)
Expand Down
5 changes: 3 additions & 2 deletions modules/napudp/src/udpadapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ namespace nap
// Properties
ResourcePtr<UDPThread> mThread; ///< Property: 'Thread' the udp thread the adapter registers itself to
bool mAllowFailure = false; ///< Property: 'AllowFailure' if binding to socket is allowed to fail on initialization

protected:
/**
* Called by start method and needs to be implemented by derived class
Expand All @@ -87,7 +86,9 @@ namespace nap
/**
* called by a UDPThread
*/
virtual void process() = 0;
virtual void onProcess() = 0;

void process();

bool handleAsioError(const std::error_code& errorCode, utility::ErrorState& errorState, bool& success);

Expand Down
15 changes: 11 additions & 4 deletions modules/napudp/src/udpclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#include <thread>

RTTI_BEGIN_CLASS(nap::UDPClient)
RTTI_PROPERTY("Endpoint", &nap::UDPClient::mRemoteIp, nap::rtti::EPropertyMetaData::Default)
RTTI_PROPERTY("Endpoint", &nap::UDPClient::mEndpoint, nap::rtti::EPropertyMetaData::Default)
RTTI_PROPERTY("Broadcast", &nap::UDPClient::mBroadcast, nap::rtti::EPropertyMetaData::Default)
RTTI_PROPERTY("Port", &nap::UDPClient::mPort, nap::rtti::EPropertyMetaData::Default)
RTTI_PROPERTY("MaxQueueSize", &nap::UDPClient::mMaxPacketQueueSize, nap::rtti::EPropertyMetaData::Default)
Expand Down Expand Up @@ -73,8 +73,15 @@ namespace nap
if(handleAsioError(asio_error_code, errorState, init_success))
return init_success;

// create address from string
auto address = address::from_string(mRemoteIp, asio_error_code);
// resolve ip address from endpoint
asio::ip::tcp::resolver resolver(getIOContext());
asio::ip::tcp::resolver::query query(mEndpoint, "80");
asio::ip::tcp::resolver::iterator iter = resolver.resolve(query, asio_error_code);
if(handleAsioError(asio_error_code, errorState, init_success))
return init_success;

asio::ip::tcp::endpoint endpoint = iter->endpoint();
auto address = address::from_string(endpoint.address().to_string(), asio_error_code);
if(handleAsioError(asio_error_code, errorState, init_success))
return init_success;

Expand Down Expand Up @@ -135,7 +142,7 @@ namespace nap
}


void UDPClient::process()
void UDPClient::onProcess()
{
// let the socket send queued packets
UDPPacket packet_to_send;
Expand Down
5 changes: 2 additions & 3 deletions modules/napudp/src/udpclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ namespace nap
void send(UDPPacket&& packet);

int mPort = 13251; ///< Property: 'Port' the port the client socket binds to
std::string mRemoteIp = "10.8.0.3"; ///< Property: 'Endpoint' the ip address the client socket binds to
std::string mEndpoint = "10.8.0.3"; ///< Property: 'Endpoint' the ip address the client socket binds to
int mMaxPacketQueueSize = 1000; ///< Property: 'MaxQueueSize' maximum of queued packets
bool mStopOnMaxQueueSizeExceeded = true; ///< Property: 'StopOnMaxQueueSizeExceeded' stop adding packets when queue size is exceed
bool mBroadcast = false; ///< Property: 'Broadcast' set option to broadcast
Expand All @@ -73,8 +73,7 @@ namespace nap
/**
* The process function
*/
void process() override final;

void onProcess() override final;
private:
// Client specific ASIO implementation
class Impl;
Expand Down
2 changes: 1 addition & 1 deletion modules/napudp/src/udpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ namespace nap
}


void UDPServer::process()
void UDPServer::onProcess()
{
asio::error_code asio_error;
size_t available_bytes = mImpl->mSocket.available(asio_error);
Expand Down
5 changes: 2 additions & 3 deletions modules/napudp/src/udpserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ namespace nap

int mPort = 13251; ///< Property: 'Port' the port the server socket binds to
std::string mIPAddress = ""; ///< Property: 'IP Address' local ip address to bind to, if left empty will bind to any local address
std::vector<std::string> mMulticastGroups; ///< Property: 'Multicast Groups' multicast groups to join

std::vector<std::string> mMulticastGroups; ///< Property: 'Multicast Groups' multicast groups to join
protected:
/**
* Called when server socket needs to be created
Expand All @@ -64,8 +64,7 @@ namespace nap
/**
* The process function
*/
void process() override final;

void onProcess() override final;
private:
// Server specific ASIO implementation
class Impl;
Expand Down
20 changes: 10 additions & 10 deletions modules/napudp/src/udpthread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ using asio::ip::address;
using asio::ip::udp;

RTTI_BEGIN_ENUM(nap::EUDPThreadUpdateMethod)
RTTI_ENUM_VALUE(nap::EUDPThreadUpdateMethod::MAIN_THREAD, "Main Thread"),
RTTI_ENUM_VALUE(nap::EUDPThreadUpdateMethod::SPAWN_OWN_THREAD, "Spawn Own Thread"),
RTTI_ENUM_VALUE(nap::EUDPThreadUpdateMethod::MANUAL, "Manual")
RTTI_ENUM_VALUE(nap::EUDPThreadUpdateMethod::UDP_MAIN_THREAD, "Main Thread"),
RTTI_ENUM_VALUE(nap::EUDPThreadUpdateMethod::UDP_SPAWN_OWN_THREAD, "Spawn Own Thread"),
RTTI_ENUM_VALUE(nap::EUDPThreadUpdateMethod::UDP_MANUAL, "Manual")
RTTI_END_ENUM

RTTI_BEGIN_CLASS_NO_DEFAULT_CONSTRUCTOR(nap::UDPThread)
Expand Down Expand Up @@ -60,24 +60,24 @@ namespace nap

bool UDPThread::start(utility::ErrorState& errorState)
{
mRun.store(true);

switch (mUpdateMethod)
{
case EUDPThreadUpdateMethod::SPAWN_OWN_THREAD:
case EUDPThreadUpdateMethod::UDP_SPAWN_OWN_THREAD:
mThread = std::thread([this] { thread(); });
break;
case EUDPThreadUpdateMethod::MAIN_THREAD:
case EUDPThreadUpdateMethod::UDP_MAIN_THREAD:
mService.registerUdpThread(this);
break;
case EUDPThreadUpdateMethod::MANUAL:
case EUDPThreadUpdateMethod::UDP_MANUAL:
mManualProcessFunc = [this]() { process(); };
break;
default:
errorState.fail("Unknown UDP thread update method");
return false;
}

mRun.store(true);

return true;
}

Expand All @@ -90,10 +90,10 @@ namespace nap

switch (mUpdateMethod)
{
case EUDPThreadUpdateMethod::SPAWN_OWN_THREAD:
case EUDPThreadUpdateMethod::UDP_SPAWN_OWN_THREAD:
mThread.join();
break;
case EUDPThreadUpdateMethod::MAIN_THREAD:
case EUDPThreadUpdateMethod::UDP_MAIN_THREAD:
mService.removeUdpThread(this);
break;
default:
Expand Down
13 changes: 6 additions & 7 deletions modules/napudp/src/udpthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ namespace nap
*/
enum EUDPThreadUpdateMethod : int
{
MAIN_THREAD = 0, ///< process UDPAdapters on main thread
SPAWN_OWN_THREAD = 1, ///< process UDPAdapters in newly spawned thread
MANUAL = 2 ///< only process UDPAdapters when the user explicitly calls manualProcess on the UDPThread
UDP_MAIN_THREAD = 0, ///< process UDPAdapters on main thread
UDP_SPAWN_OWN_THREAD = 1, ///< process UDPAdapters in newly spawned thread
UDP_MANUAL = 2 ///< only process UDPAdapters when the user explicitly calls manualProcess on the UDPThread
};

// forward declares
Expand Down Expand Up @@ -80,19 +80,18 @@ namespace nap
virtual void stop() override final;

/**
*
* @return asio IO context
*/
asio::io_context& getIOContext();

EUDPThreadUpdateMethod mUpdateMethod = EUDPThreadUpdateMethod::MAIN_THREAD; ///< Property: 'Update Method' the way the UDPThread should process adapters

/**
* Call this when update method is set to manual.
* If the update method is MAIN_THREAD or SPAWN_OWN_THREAD, this function will not do anything.
*/
void manualProcess();

// properties
EUDPThreadUpdateMethod mUpdateMethod = EUDPThreadUpdateMethod::UDP_MAIN_THREAD; ///< Property: 'Update Method' the way the UDPThread should process adapters
private:
/**
* the threaded function
Expand Down Expand Up @@ -126,7 +125,7 @@ namespace nap
UDPService& mService;

// adapters
std::vector<UDPAdapter*> mAdapters;
std::vector<UDPAdapter*> mAdapters;

struct Impl;
std::unique_ptr<Impl> mImpl;
Expand Down

0 comments on commit 44f31f8

Please sign in to comment.