From f5754d36a70fb9c76be5d82a29bbd396023594d5 Mon Sep 17 00:00:00 2001 From: Jeremy Geras Date: Thu, 9 May 2013 21:21:48 +0000 Subject: [PATCH 1/2] name change for connect(..) param in reTurn; is_v6 -> allowV6 git-svn-id: https://svn.resiprocate.org/rep/resiprocate/branches/b-counterpath-recon-20130424@10204 ddefafc4-47db-0310-ae44-fa13212b10f2 --- reTurn/AsyncSocketBase.hxx | 2 +- reTurn/AsyncTcpSocketBase.cxx | 2 +- reTurn/AsyncTcpSocketBase.hxx | 2 +- reTurn/AsyncTlsSocketBase.cxx | 2 +- reTurn/AsyncTlsSocketBase.hxx | 2 +- reTurn/AsyncUdpSocketBase.cxx | 4 ++-- reTurn/AsyncUdpSocketBase.hxx | 2 +- reTurn/client/TurnAsyncSocket.cxx | 4 ++-- reTurn/client/TurnAsyncSocket.hxx | 2 +- reTurn/client/test/TestAsyncClient.cxx | 2 +- reTurn/client/test/TestRtpLoad.cxx | 2 +- 11 files changed, 13 insertions(+), 13 deletions(-) diff --git a/reTurn/AsyncSocketBase.hxx b/reTurn/AsyncSocketBase.hxx index c563bf0595..7d4dd11ff1 100644 --- a/reTurn/AsyncSocketBase.hxx +++ b/reTurn/AsyncSocketBase.hxx @@ -30,7 +30,7 @@ public: /// Note: The following API's are thread safe and queue the request to be handled by the ioService thread virtual asio::error_code bind(const asio::ip::address& address, unsigned short port) = 0; - virtual void connect(const std::string& address, unsigned short port, bool is_v6) = 0; + virtual void connect(const std::string& address, unsigned short port, bool allowV6) = 0; /// Note: destination is ignored for TCP and TLS connections virtual void send(const StunTuple& destination, boost::shared_ptr& data); // Send unframed data virtual void send(const StunTuple& destination, unsigned short channel, boost::shared_ptr& data); // send with turn framing diff --git a/reTurn/AsyncTcpSocketBase.cxx b/reTurn/AsyncTcpSocketBase.cxx index 6a4b1e418f..7cc94ade94 100644 --- a/reTurn/AsyncTcpSocketBase.cxx +++ b/reTurn/AsyncTcpSocketBase.cxx @@ -47,7 +47,7 @@ AsyncTcpSocketBase::bind(const asio::ip::address& address, unsigned short port) } void -AsyncTcpSocketBase::connect(const std::string& address, unsigned short port, bool is_v6) +AsyncTcpSocketBase::connect(const std::string& address, unsigned short port, bool allowV6) { // Start an asynchronous resolve to translate the address // into a list of endpoints. diff --git a/reTurn/AsyncTcpSocketBase.hxx b/reTurn/AsyncTcpSocketBase.hxx index 619a3f8c91..02608b7dbe 100644 --- a/reTurn/AsyncTcpSocketBase.hxx +++ b/reTurn/AsyncTcpSocketBase.hxx @@ -17,7 +17,7 @@ public: virtual unsigned int getSocketDescriptor(); virtual asio::error_code bind(const asio::ip::address& address, unsigned short port); - virtual void connect(const std::string& address, unsigned short port, bool is_v6); + virtual void connect(const std::string& address, unsigned short port, bool allowV6); virtual void transportReceive(); virtual void transportFramedReceive(); diff --git a/reTurn/AsyncTlsSocketBase.cxx b/reTurn/AsyncTlsSocketBase.cxx index b7e5bf5e46..93d740fe3c 100644 --- a/reTurn/AsyncTlsSocketBase.cxx +++ b/reTurn/AsyncTlsSocketBase.cxx @@ -54,7 +54,7 @@ AsyncTlsSocketBase::bind(const asio::ip::address& address, unsigned short port) } void -AsyncTlsSocketBase::connect(const std::string& address, unsigned short port, bool is_v6) +AsyncTlsSocketBase::connect(const std::string& address, unsigned short port, bool allowV6) { mHostname = address; diff --git a/reTurn/AsyncTlsSocketBase.hxx b/reTurn/AsyncTlsSocketBase.hxx index 0f0444253f..3e71be7633 100644 --- a/reTurn/AsyncTlsSocketBase.hxx +++ b/reTurn/AsyncTlsSocketBase.hxx @@ -24,7 +24,7 @@ public: virtual unsigned int getSocketDescriptor(); virtual asio::error_code bind(const asio::ip::address& address, unsigned short port); - virtual void connect(const std::string& address, unsigned short port, bool is_v6); + virtual void connect(const std::string& address, unsigned short port, bool allowV6); void doHandshake(); diff --git a/reTurn/AsyncUdpSocketBase.cxx b/reTurn/AsyncUdpSocketBase.cxx index 4fac39572c..135b2245f4 100644 --- a/reTurn/AsyncUdpSocketBase.cxx +++ b/reTurn/AsyncUdpSocketBase.cxx @@ -44,13 +44,13 @@ AsyncUdpSocketBase::bind(const asio::ip::address& address, unsigned short port) } void -AsyncUdpSocketBase::connect(const std::string& address, unsigned short port, bool is_v6) +AsyncUdpSocketBase::connect(const std::string& address, unsigned short port, bool allowV6) { // Start an asynchronous resolve to translate the address // into a list of endpoints. resip::Data service(port); #ifdef USE_IPV6 - asio::ip::udp::resolver::query query((is_v6 ? asio::ip::udp::v6() : asio::ip::udp::v4()), address, service.c_str()); + asio::ip::udp::resolver::query query((allowV6 ? asio::ip::udp::v6() : asio::ip::udp::v4()), address, service.c_str()); #else asio::ip::udp::resolver::query query(asio::ip::udp::v4(), address, service.c_str()); #endif diff --git a/reTurn/AsyncUdpSocketBase.hxx b/reTurn/AsyncUdpSocketBase.hxx index 88f9810fa7..2f094d0126 100644 --- a/reTurn/AsyncUdpSocketBase.hxx +++ b/reTurn/AsyncUdpSocketBase.hxx @@ -17,7 +17,7 @@ public: virtual unsigned int getSocketDescriptor(); virtual asio::error_code bind(const asio::ip::address& address, unsigned short port); - virtual void connect(const std::string& address, unsigned short port, bool is_v6); + virtual void connect(const std::string& address, unsigned short port, bool allowV6); virtual void transportReceive(); virtual void transportFramedReceive(); diff --git a/reTurn/client/TurnAsyncSocket.cxx b/reTurn/client/TurnAsyncSocket.cxx index 0219b91382..b1291b3e2e 100644 --- a/reTurn/client/TurnAsyncSocket.cxx +++ b/reTurn/client/TurnAsyncSocket.cxx @@ -1100,9 +1100,9 @@ TurnAsyncSocket::sendToRemotePeer(RemotePeer& remotePeer, boost::shared_ptrconnect(argv[1], port); + turnSocket->connect(argv[1], port, false); // Set the username and password turnSocket->setUsernameAndPassword(username, password); diff --git a/reTurn/client/test/TestRtpLoad.cxx b/reTurn/client/test/TestRtpLoad.cxx index be46f44397..ad0c4e5413 100644 --- a/reTurn/client/test/TestRtpLoad.cxx +++ b/reTurn/client/test/TestRtpLoad.cxx @@ -368,7 +368,7 @@ int main(int argc, char* argv[]) handler.setTurnAsyncSocket(turnSocket.get()); // Connect to Stun/Turn Server - turnSocket->connect(turnAddress.c_str(), port); + turnSocket->connect(turnAddress.c_str(), port, false); // Set the username and password turnSocket->setUsernameAndPassword(username, password); From ffeaec3207fe66d445743ccc9492ca75a85e50ee Mon Sep 17 00:00:00 2001 From: Jeremy Geras Date: Mon, 13 May 2013 20:03:29 +0000 Subject: [PATCH 2/2] reflow merge - made public interfaces thread safe to simplify state management - introduced FlowHandler as "the" way to get incoming packets (it is no longer necessary to queue) - introduced USE_DTLS defines so that it is possible to USE_SSL without necessarily using DTLS (which requires the 1.0.1 stream of openssl) - added ICE support for reflow (connectivity check scheduling, handling) git-svn-id: https://svn.resiprocate.org/rep/resiprocate/branches/b-counterpath-recon-20130424@10210 ddefafc4-47db-0310-ae44-fa13212b10f2 --- reTurn/client/TurnAsyncSocket.cxx | 5 +- reTurn/client/TurnAsyncSocketHandler.hxx | 4 +- reflow/FakeSelectSocketDescriptor.cxx | 1 + reflow/Flow.cxx | 863 ++++++++++++++---- reflow/Flow.hxx | 138 ++- reflow/FlowDtlsSocketContext.cxx | 8 +- reflow/FlowDtlsSocketContext.hxx | 4 - reflow/FlowDtlsTimerContext.cxx | 3 + reflow/FlowHandler.hxx | 59 ++ reflow/FlowManager.cxx | 74 +- reflow/FlowManager.hxx | 10 +- reflow/IceCandidate.hxx | 183 ++++ reflow/MediaStream.cxx | 334 +++++-- reflow/MediaStream.hxx | 69 +- reflow/{ErrorCode.hxx => ReflowErrorCode.hxx} | 114 +-- reflow/dtls_wrapper/DtlsFactory.cxx | 2 + reflow/dtls_wrapper/DtlsSocket.cxx | 2 + reflow/dtls_wrapper/DtlsSocket.hxx | 4 - reflow/dtls_wrapper/DtlsTimer.cxx | 2 + 19 files changed, 1489 insertions(+), 390 deletions(-) create mode 100644 reflow/FlowHandler.hxx create mode 100644 reflow/IceCandidate.hxx rename reflow/{ErrorCode.hxx => ReflowErrorCode.hxx} (97%) diff --git a/reTurn/client/TurnAsyncSocket.cxx b/reTurn/client/TurnAsyncSocket.cxx index b1291b3e2e..c3e3ad3548 100644 --- a/reTurn/client/TurnAsyncSocket.cxx +++ b/reTurn/client/TurnAsyncSocket.cxx @@ -13,6 +13,7 @@ using namespace resip; //#define TURN_CHANNEL_BINDING_REFRESH_SECONDS 20 // TESTING only #define TURN_CHANNEL_BINDING_REFRESH_SECONDS 240 // 4 minuntes - this is one minute before the permission will expire, Note: ChannelBinding refreshes also refresh permissions + #define SOFTWARE_STRING "reTURN Async Client 0.3 - RFC5389/turn-12 " // Note padding size to a multiple of 4, to help compatibility with older clients namespace reTurn { @@ -836,7 +837,7 @@ TurnAsyncSocket::handleBindRequest(StunMessage& stunMessage) DebugLog(<< "Sending response to BIND to " << stunMessage.mRemoteTuple); sendStunMessage(response, false, UDP_MAX_RETRANSMITS, DEFAULT_RETRANS_INTERVAL_MS, &(stunMessage.mRemoteTuple)); - if(mTurnAsyncSocketHandler) mTurnAsyncSocketHandler->onIncomingBindRequestProcessed(getSocketDescriptor(), stunMessage.mRemoteTuple); + if(mTurnAsyncSocketHandler) mTurnAsyncSocketHandler->onIncomingBindRequestProcessed(getSocketDescriptor(), stunMessage.mRemoteTuple, stunMessage); return asio::error_code(); } @@ -1071,8 +1072,6 @@ TurnAsyncSocket::doSendToFramed(const asio::ip::address& address, unsigned short return sendToRemotePeer(*remotePeer, data); } - - void TurnAsyncSocket::sendToRemotePeer(RemotePeer& remotePeer, boost::shared_ptr& data) { diff --git a/reTurn/client/TurnAsyncSocketHandler.hxx b/reTurn/client/TurnAsyncSocketHandler.hxx index 8874f3196d..b0d7e6f07e 100644 --- a/reTurn/client/TurnAsyncSocketHandler.hxx +++ b/reTurn/client/TurnAsyncSocketHandler.hxx @@ -9,6 +9,8 @@ namespace reTurn { +class StunMessage; + class TurnAsyncSocketHandler { public: @@ -46,7 +48,7 @@ public: virtual void onSendSuccess(unsigned int socketDesc) = 0; virtual void onSendFailure(unsigned int socketDesc, const asio::error_code& e) = 0; - virtual void onIncomingBindRequestProcessed(unsigned int socketDesc, const StunTuple& sourceTuple) = 0; + virtual void onIncomingBindRequestProcessed(unsigned int socketDesc, const StunTuple& sourceTuple, const reTurn::StunMessage& bindRequest) = 0; private: }; diff --git a/reflow/FakeSelectSocketDescriptor.cxx b/reflow/FakeSelectSocketDescriptor.cxx index 20e44adfc0..5d295553ba 100644 --- a/reflow/FakeSelectSocketDescriptor.cxx +++ b/reflow/FakeSelectSocketDescriptor.cxx @@ -72,6 +72,7 @@ FakeSelectSocketDescriptor::send() #else size_t res = ::write(mPipe[1], fakeData, 1); assert(res == 1); + ((void)res); // elimiante warning GCC #endif } diff --git a/reflow/Flow.cxx b/reflow/Flow.cxx index 4126fa0abb..008e0c3e70 100644 --- a/reflow/Flow.cxx +++ b/reflow/Flow.cxx @@ -10,22 +10,26 @@ #include #include "FlowManagerSubsystem.hxx" -#include "ErrorCode.hxx" +#include "ReflowErrorCode.hxx" #include "Flow.hxx" #include "MediaStream.hxx" +#include "FlowHandler.hxx" +#ifdef USE_SSL +#ifdef USE_DTLS #include "FlowDtlsSocketContext.hxx" +#endif +#endif using namespace flowmanager; using namespace resip; - -#ifdef USE_SSL using namespace dtls; -#endif - using namespace std; #define MAX_RECEIVE_FIFO_DURATION 10 // seconds #define MAX_RECEIVE_FIFO_SIZE (100 * MAX_RECEIVE_FIFO_DURATION) // 1000 = 1 message every 10 ms for 10 seconds - appropriate for RTP +#define ICMP_GRACE_PERIOD_SECONDS 5 // allow ICMP errors for this many seconds before onReceiveFailure +#define CONNECTIVITY_CHECK_MAX_RETRANSMITS 20 // 7 is the default for Rc spec'd in RFC 5389 +#define CONNECTIVITY_CHECK_RETRANS_INTERVAL_MS 100 // http://tools.ietf.org/html/rfc5245#section-16 #define RESIPROCATE_SUBSYSTEM FlowManagerSubsystem::FLOWMANAGER @@ -121,6 +125,7 @@ Flow::Flow(asio::io_service& ioService, const StunTuple& localBinding, MediaStream& mediaStream) : mIOService(ioService), + mConnectivityCheckTimer(mIOService), #ifdef USE_SSL mSslContext(sslContext), #endif @@ -130,7 +135,23 @@ Flow::Flow(asio::io_service& ioService, mAllocationProps(StunMessage::PropsNone), mReservationToken(0), mFlowState(Unconnected), - mReceivedDataFifo(MAX_RECEIVE_FIFO_DURATION,MAX_RECEIVE_FIFO_SIZE) + mReceivedDataFifo(MAX_RECEIVE_FIFO_DURATION,MAX_RECEIVE_FIFO_SIZE), + mHandler(NULL), + mActiveDestinationSet(false), + mConnectivityChecksPending(false), + mIceComplete(false), + mIceRole(Flow::IceRole_Unknown), + mPeerRflxCandidatePriority(0) +{ +} + +Flow::~Flow() +{ +} + +// this is only called from MediaStream::initializeImpl(..) +void +Flow::initialize() { InfoLog(<< "Flow: flow created for " << mLocalBinding << " ComponentId=" << mComponentId); @@ -166,29 +187,47 @@ Flow::Flow(asio::io_service& ioService, } } -Flow::~Flow() -{ - InfoLog(<< "Flow: flow destroyed for " << mLocalBinding << " ComponentId=" << mComponentId); - +// this is only called from MediaStream::shutdownImpl(..) +void +Flow::shutdown() + { + Lock lock(mMutex); + if (mTurnSocket.get()) + { + mTurnSocket->disableTurnAsyncHandler(); + mTurnSocket->close(); + } + mConnectivityCheckTimer.cancel(); #ifdef USE_SSL +#ifdef USE_DTLS // Cleanup DtlsSockets - { - Lock lock(mMutex); std::map::iterator it; for(it = mDtlsSockets.begin(); it != mDtlsSockets.end(); it++) { delete it->second; } - } - #endif //USE_SSL +#endif //USE_DTLS +#endif //USE_SSL - // Cleanup TurnSocket - if(mTurnSocket.get()) - { - mTurnSocket->disableTurnAsyncHandler(); - mTurnSocket->close(); + InfoLog(<< "Flow: flow destroyed for " << mLocalBinding << " ComponentId=" << mComponentId); +} + +void +Flow::setHandlerImpl(FlowHandler* handler, resip::Condition& cv) +{ + Lock lock(mMutex); + mHandler = handler; + cv.signal(); } + +void +Flow::setHandler(FlowHandler* handler) +{ + Lock lock(mMutex); + Condition cv; + mIOService.post(boost::bind(&Flow::setHandlerImpl, this, handler, boost::ref(cv))); + cv.wait(mMutex); } void @@ -210,7 +249,16 @@ Flow::activateFlow(UInt8 allocationProps) { changeFlowState(ConnectingServer); mTurnSocket->connect(mMediaStream.mNatTraversalServerHostname.c_str(), - mMediaStream.mNatTraversalServerPort); + mMediaStream.mNatTraversalServerPort, + mLocalBinding.getAddress().is_v6()); + } + else if (mMediaStream.mNatTraversalMode != MediaStream::NoNatTraversal && + mMediaStream.mNatTraversalServerHostname.empty()) + { + // pretend we're connected and got our binding + mReflexiveTuple = mLocalBinding; + changeFlowState(Ready); + mMediaStream.onFlowReady(mComponentId); } else { @@ -241,35 +289,34 @@ Flow::getSocketDescriptor() // Turn Send Methods void -Flow::send(char* buffer, unsigned int size) +Flow::send(boost::shared_ptr& buffer) { - assert(mTurnSocket.get()); - if(isReady()) - { - if(processSendData(buffer, size, mTurnSocket->getConnectedAddress(), mTurnSocket->getConnectedPort())) - { - mTurnSocket->send(buffer, size); - } - } - else - { - onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category)); - } + mIOService.post(boost::bind(&Flow::sendImpl, this, buffer)); } void -Flow::sendTo(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int size) +Flow::sendImpl(boost::shared_ptr& buffer) { assert(mTurnSocket.get()); - if(isReady()) + if(isReady() && mActiveDestinationSet) { - if(processSendData(buffer, size, address, port)) + if(processSendData(buffer, mTurnSocket->getConnectedAddress(), mTurnSocket->getConnectedPort())) { - mTurnSocket->sendTo(address, port, buffer, size); + // !jjg! this if-else will need to be reworked (along with the one in setActiveDestinationImpl) + // before we can possibly support relay candidates for ICE + if (mMediaStream.mNatTraversalMode == MediaStream::TurnAllocation) + { + mTurnSocket->sendFramed(buffer); + } + else + { + mTurnSocket->sendUnframed(buffer); +} } } else { + WarningLog(<< "Flow::send(..) failed in state " << mFlowState); onSendFailure(mTurnSocket->getSocketDescriptor(), asio::error_code(flowmanager::InvalidState, asio::error::misc_category)); } } @@ -284,11 +331,14 @@ Flow::rawSendTo(const asio::ip::address& address, unsigned short port, const cha bool -Flow::processSendData(char* buffer, unsigned int& size, const asio::ip::address& address, unsigned short port) +Flow::processSendData(boost::shared_ptr& buffer, const asio::ip::address& address, unsigned short port) { if(mMediaStream.mSRTPSessionOutCreated) { - err_status_t status = mMediaStream.srtpProtect((void*)buffer, (int*)&size, mComponentId == RTCP_COMPONENT_ID); + char* data = buffer->mutableData(); + int size = buffer->size(); + err_status_t status = mMediaStream.srtpProtect((void*)data, (int*)&size, mComponentId == RTCP_COMPONENT_ID); + buffer->mutableSize() = size; if(status != err_status_ok) { ErrLog(<< "Unable to SRTP protect the packet, error code=" << status << "(" << srtp_error_string(status) << ") ComponentId=" << mComponentId); @@ -297,15 +347,17 @@ Flow::processSendData(char* buffer, unsigned int& size, const asio::ip::address& } } #ifdef USE_SSL +#ifdef USE_DTLS else { - Lock lock(mMutex); DtlsSocket* dtlsSocket = getDtlsSocket(StunTuple(mLocalBinding.getTransportType(), address, port)); if(dtlsSocket) { if(((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->isSrtpInitialized()) { - err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpProtect((void*)buffer, (int*)&size, mComponentId == RTCP_COMPONENT_ID); + int size = buffer->size(); + err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpProtect((char*)buffer->mutableData(), (int*)&size, mComponentId == RTCP_COMPONENT_ID); + buffer->mutableSize() = size; if(status != err_status_ok) { ErrLog(<< "Unable to SRTP protect the packet, error code=" << status << "(" << srtp_error_string(status) << ") ComponentId=" << mComponentId); @@ -320,96 +372,29 @@ Flow::processSendData(char* buffer, unsigned int& size, const asio::ip::address& return false; } } + else if (mMediaStream.mSRTPEnabled) + { + return false; + } } +#endif //USE_DTLS #endif //USE_SSL return true; } - - -// Receive Methods -asio::error_code -Flow::receiveFrom(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int& size, unsigned int timeout) +void +Flow::asyncReceive() { - bool done = false; - asio::error_code errorCode; - - UInt64 startTime = Timer::getTimeMs(); - unsigned int recvTimeout; - while(!done) - { - // We define timeout of 0 differently then TimeLimitFifo - we want 0 to mean no-block at all - if(timeout == 0 && mReceivedDataFifo.empty()) - { - // timeout - return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category); - } - - recvTimeout = timeout ? (unsigned int)(timeout - (Timer::getTimeMs() - startTime)) : 0; - if(timeout != 0 && recvTimeout <= 0) - { - // timeout - return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category); - } - ReceivedData* receivedData = mReceivedDataFifo.getNext(recvTimeout); - if(receivedData) - { - mFakeSelectSocketDescriptor.receive(); - - // discard any data not from address/port requested - if(address == receivedData->mAddress && port == receivedData->mPort) - { - errorCode = processReceivedData(buffer, size, receivedData); - done = true; - } - delete receivedData; - } - else - { - // timeout - errorCode = asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category); - done = true; - } - } - return errorCode; + mIOService.post(boost::bind(&Flow::asyncReceiveImpl, this)); } -asio::error_code -Flow::receive(char* buffer, unsigned int& size, unsigned int timeout, asio::ip::address* sourceAddress, unsigned short* sourcePort) +void +Flow::asyncReceiveImpl() { - asio::error_code errorCode; - - //InfoLog(<< "Flow::receive called with buffer size=" << size << ", timeout=" << timeout); - // We define timeout of 0 differently then TimeLimitFifo - we want 0 to mean no-block at all - if(timeout == 0 && mReceivedDataFifo.empty()) - { - // timeout - InfoLog(<< "Receive timeout (timeout==0 and fifo empty)!"); - return asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category); - } - if(mReceivedDataFifo.empty()) - { - WarningLog(<< "Receive called when there is no data available! ComponentId=" << mComponentId); - } - - ReceivedData* receivedData = mReceivedDataFifo.getNext(timeout); - if(receivedData) - { - mFakeSelectSocketDescriptor.receive(); - errorCode = processReceivedData(buffer, size, receivedData, sourceAddress, sourcePort); - delete receivedData; - } - else - { - // timeout - InfoLog(<< "Receive timeout! ComponentId=" << mComponentId); - errorCode = asio::error_code(flowmanager::ReceiveTimeout, asio::error::misc_category); - } - return errorCode; + mTurnSocket->turnReceive(); } - asio::error_code Flow::processReceivedData(char* buffer, unsigned int& size, ReceivedData* receivedData, asio::ip::address* sourceAddress, unsigned short* sourcePort) { @@ -423,22 +408,24 @@ Flow::processReceivedData(char* buffer, unsigned int& size, ReceivedData* receiv if(status != err_status_ok) { ErrLog(<< "Unable to SRTP unprotect the packet (componentid=" << mComponentId << "), error code=" << status << "(" << srtp_error_string(status) << ")"); + memset((void*)receivedData->mData->data(), 0, receivedData->mData->size()); //errorCode = asio::error_code(flowmanager::SRTPError, asio::error::misc_category); } } #ifdef USE_SSL +#ifdef USE_DTLS else { - Lock lock(mMutex); DtlsSocket* dtlsSocket = getDtlsSocket(StunTuple(mLocalBinding.getTransportType(), receivedData->mAddress, receivedData->mPort)); if(dtlsSocket) { if(((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->isSrtpInitialized()) { - err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpUnprotect((void*)receivedData->mData->data(), (int*)&receivedsize, mComponentId == RTCP_COMPONENT_ID); + err_status_t status = ((FlowDtlsSocketContext*)dtlsSocket->getSocketContext())->srtpUnprotect((void*)receivedData->mData->mutableData(), (int*)&receivedsize, mComponentId == RTCP_COMPONENT_ID); if(status != err_status_ok) { ErrLog(<< "Unable to SRTP unprotect the packet (componentid=" << mComponentId << "), error code=" << status << "(" << srtp_error_string(status) << ")"); + memset((void*)receivedData->mData->data(), 0, receivedData->mData->size()); //errorCode = asio::error_code(flowmanager::SRTPError, asio::error::misc_category); } } @@ -449,10 +436,11 @@ Flow::processReceivedData(char* buffer, unsigned int& size, ReceivedData* receiv } } } +#endif //USE_DTLS #endif //USE_SSL if(!errorCode) { - if(size > receivedsize) + if(size >= receivedsize) { size = receivedsize; memcpy(buffer, receivedData->mData->data(), size); @@ -477,41 +465,216 @@ Flow::processReceivedData(char* buffer, unsigned int& size, ReceivedData* receiv } void -Flow::setActiveDestination(const char* address, unsigned short port) +Flow::setIceRole(bool controlling) +{ + mIOService.post(boost::bind(&Flow::setIceRoleImpl, this, controlling)); +} + +void +Flow::setIceRoleImpl(bool controlling) +{ + mIceRole = controlling ? Flow::IceRole_Controlling : Flow::IceRole_Controlled; +} + +void Flow::setPeerReflexiveCandidatePriority(UInt32 priority) +{ + mIOService.post(boost::bind(&Flow::setPeerReflexiveCandidatePriorityImpl, this, priority)); +} + +void +Flow::setActiveDestination(const resip::Data& address, unsigned short port, const std::vector& candidates) +{ + mIOService.post(boost::bind(&Flow::setActiveDestinationImpl, this, address, port, candidates)); +} + +void +Flow::setActiveDestinationImpl(const resip::Data& address, unsigned short port, const std::vector& candidates) { if(mTurnSocket.get()) { if(mMediaStream.mNatTraversalMode != MediaStream::TurnAllocation) { + if (mMediaStream.mNatTraversalMode == MediaStream::Ice && candidates.size() > 0) + { + if (!mIceComplete) + { + bool isRtpFlow = (mMediaStream.getRtpFlow() == this); + + std::vector::const_iterator candIt = candidates.begin(); + for (; candIt != candidates.end(); ++candIt) + { + // first attempt to find and update any candidates that we may have inserted due to receiving + // BIND requests before receiving the SDP answer + bool alreadyInCheckList = false; + std::vector::iterator candPairIt = mIceCheckList.begin(); + for (; candPairIt != mIceCheckList.end(); ++candPairIt) + { + if (candPairIt->mRemoteCandidate.getTransportAddr() == candIt->getTransportAddr()) + { + alreadyInCheckList = true; + candPairIt->mRemoteCandidate = *candIt; + candPairIt->mState = IceCandidatePair::Waiting; + break; + } + } + + if (!alreadyInCheckList) + { + DebugLog(<< "adding ice candidate pair for remote candidate " << candIt->getTransportAddr() << " to ICE check list"); + IceCandidatePair candidatePair; + candidatePair.mLocalCandidate = IceCandidate(mLocalBinding, IceCandidate::CandidateType_Host, 0, Data::Empty, mComponentId, StunTuple()); + candidatePair.mRemoteCandidate = *candIt; + candidatePair.mState = (isRtpFlow? IceCandidatePair::Waiting : IceCandidatePair::Frozen); + mIceCheckList.push_back(candidatePair); + } + } + + if (mFlowState == Ready) + { + if (isRtpFlow) + { + DebugLog(<< "scheduling initial ICE connectivity checks for RTP flow"); + changeFlowState(CheckingConnectivity); + scheduleConnectivityChecks(); + } + else + { + DebugLog(<< "setting RTCP flow to CheckingConnectivity state"); + changeFlowState(CheckingConnectivity); + } + } + else + { + DebugLog(<< "delaying ICE connectivity checks until flow state is Ready; current state is " << flowStateToString(mFlowState)); + mConnectivityChecksPending = true; + } + } + } + else + { changeFlowState(Connecting); - mTurnSocket->connect(address, port); + mTurnSocket->connect(address.c_str(), port, mLocalBinding.getAddress().is_v6()); + } } else { - mTurnSocket->setActiveDestination(asio::ip::address::from_string(address), port); + mTurnSocket->setActiveDestination(asio::ip::address::from_string(address.c_str()), port); + } + mActiveDestinationSet = true; + } +} + +void +Flow::scheduleConnectivityChecks() +{ + InfoLog(<< "Schedule ICE connectivity checks for " << (mComponentId == RTP_COMPONENT_ID ? "RTP" : "RTCP") << " Flow"); + if(mTurnSocket.get() && mMediaStream.mNatTraversalMode == MediaStream::Ice) + { + mConnectivityCheckTimer.cancel(); + // !jjg! todo: follow the formula for Ta in http://tools.ietf.org/html/draft-ietf-mmusic-ice-19#section-16 + mConnectivityCheckTimer.expires_from_now(boost::posix_time::milliseconds(20)); + mConnectivityCheckTimer.async_wait(boost::bind(&Flow::onConnectivityCheckTimer, this, asio::placeholders::error)); + } +} + +void +Flow::setOutgoingIceUsernameAndPassword(const resip::Data& username, const resip::Data& password) +{ + if (!username.empty() && !password.empty()) + { + if (username != mOutgoingIceUsername || password != mOutgoingIcePassword) + { + mIceComplete = false; + } + mOutgoingIceUsername = username; + mOutgoingIcePassword = password; + } +} +void +Flow::setLocalIcePassword(const resip::Data& password) +{ + if (!password.empty()) + { + mTurnSocket->setLocalPassword(password.c_str()); + } +} + +void +Flow::onConnectivityCheckTimer(const asio::error_code& error) +{ + if (error == asio::error::operation_aborted) + { + return; } + + if (mFlowState == CheckingConnectivity) + { + DebugLog(<< "set username/password for ICE: " << mOutgoingIceUsername << ", " << mOutgoingIcePassword); + mTurnSocket->setUsernameAndPassword(mOutgoingIceUsername.c_str(), mOutgoingIcePassword.c_str(), true); + + // the RTP flow is the only one that does a connectivity check at this point, + // since it and the RTCP flow share the same foundation + std::vector::iterator candPairIt = mIceCheckList.begin(); + for (; candPairIt != mIceCheckList.end(); ++candPairIt) + { + const reTurn::IceCandidate& c = candPairIt->mRemoteCandidate; + if (candPairIt->mState == IceCandidatePair::Waiting) + { + candPairIt->mState = IceCandidatePair::InProgress; + if (c.getTransportAddr() == mLocalBinding) // .jjg. it is possible that one of the remote candidiates + // has the same IP address and port as us; just skip the check if so + { + DebugLog(<< "remote candidate has the same IP:port as our local binding; skip this one " << c.getTransportAddr().getAddress().to_string() << ":" << c.getTransportAddr().getPort()); + candPairIt->mState = IceCandidatePair::Failed; } else - WarningLog(<<"No TURN Socket, can't send media to destination"); + { + // !jjg! todo: also pass in something to indicate the retransmission interval RTO + mTurnSocket->connectivityCheck( + c.getTransportAddr(), + mPeerRflxCandidatePriority, + mIceRole == Flow::IceRole_Controlling, + mIceRole == Flow::IceRole_Controlled, + mIceRole == Flow::IceRole_Controlled ? CONNECTIVITY_CHECK_MAX_RETRANSMITS*2 : CONNECTIVITY_CHECK_MAX_RETRANSMITS, + CONNECTIVITY_CHECK_RETRANS_INTERVAL_MS); + DebugLog(<< "checking connectivity to remote candidate " << c.getTransportAddr().getAddress().to_string() << ":" << c.getTransportAddr().getPort()); + break; + } + } + } + + // make the timer fire Ta seconds from now to do a check for the next + // candidate in the Waiting state (if any) + scheduleConnectivityChecks(); + } } #ifdef USE_SSL +#ifdef USE_DTLS void Flow::startDtlsClient(const char* address, unsigned short port) { - Lock lock(mMutex); + mIOService.post(boost::bind(&Flow::startDtlsClientImpl, this, std::string(address), port)); +} + +void +Flow::startDtlsClientImpl(const std::string& address, unsigned short port) +{ createDtlsSocketClient(StunTuple(mLocalBinding.getTransportType(), asio::ip::address::from_string(address), port)); } -#endif void Flow::setRemoteSDPFingerprint(const resip::Data& fingerprint) { - Lock lock(mMutex); + mIOService.post(boost::bind(&Flow::setRemoteSDPFingerprintImpl, this, fingerprint)); +} + +void +Flow::setRemoteSDPFingerprintImpl(const resip::Data& fingerprint) +{ mRemoteSDPFingerprint = fingerprint; -#ifdef USE_SSL // Check all existing DtlsSockets and tear down those that don't match std::map::iterator it; for(it = mDtlsSockets.begin(); it != mDtlsSockets.end(); it++) @@ -523,15 +686,27 @@ Flow::setRemoteSDPFingerprint(const resip::Data& fingerprint) ((FlowDtlsSocketContext*)it->second->getSocketContext())->fingerprintMismatch(); } } -#endif //USE_SSL } const resip::Data Flow::getRemoteSDPFingerprint() { - Lock lock(mMutex); return mRemoteSDPFingerprint; } +#endif // USE_DTLS +#endif // USE_SSL + + +// !jjg! needs to be re-implemented with a thread-safe interface +//void Flow::setOnBeforeSocketClosedFp(boost::function fp) +//{ +// Lock lock(mMutex); +// +// if(!mTurnSocket.get()) +// return; +// +// mTurnSocket->setOnBeforeSocketClosedFp(fp); +//} const StunTuple& Flow::getLocalTuple() @@ -542,25 +717,68 @@ Flow::getLocalTuple() StunTuple Flow::getSessionTuple() { - assert(mFlowState == Ready); - Lock lock(mMutex); - if(mMediaStream.mNatTraversalMode == MediaStream::TurnAllocation) { + assert(mFlowState == Ready); return mRelayTuple; } + else if(mMediaStream.mNatTraversalMode == MediaStream::Ice) + { + assert(mFlowState == Ready || mFlowState == Connected); + std::vector::iterator candIt = mIceCheckList.begin(); + for (; candIt != mIceCheckList.end(); ++candIt) + { + if (candIt->mState == IceCandidatePair::Succeeded) + { + return candIt->mLocalCandidate.getTransportAddr(); + } + } + if (mReflexiveTuple.getTransportType() != StunTuple::None) + { + return mReflexiveTuple; // should not get here + } + return mLocalBinding; + } else if(mMediaStream.mNatTraversalMode == MediaStream::StunBindDiscovery) { + assert(mFlowState == Ready); return mReflexiveTuple; } return mLocalBinding; } +reTurn::IceCandidate +Flow::getLocalNominatedIceCandidate() const +{ + std::vector::const_iterator candIt = mIceCheckList.begin(); + for (; candIt != mIceCheckList.end(); ++candIt) + { + if (candIt->mState == IceCandidatePair::Succeeded) + { + return candIt->mLocalCandidate; + } + } + return reTurn::IceCandidate(); +} + +reTurn::IceCandidate +Flow::getRemoteNominatedIceCandidate() const +{ + std::vector::const_iterator candIt = mIceCheckList.begin(); + for (; candIt != mIceCheckList.end(); ++candIt) + { + if (candIt->mState == IceCandidatePair::Succeeded) + { + return candIt->mRemoteCandidate; + } + } + return reTurn::IceCandidate(); +} + StunTuple Flow::getRelayTuple() { assert(mFlowState == Ready); - Lock lock(mMutex); return mRelayTuple; } @@ -568,7 +786,6 @@ StunTuple Flow::getReflexiveTuple() { assert(mFlowState == Ready); - Lock lock(mMutex); return mReflexiveTuple; } @@ -576,7 +793,6 @@ UInt64 Flow::getReservationToken() { assert(mFlowState == Ready); - Lock lock(mMutex); return mReservationToken; } @@ -589,6 +805,7 @@ Flow::onConnectSuccess(unsigned int socketDesc, const asio::ip::address& address switch(mMediaStream.mNatTraversalMode) { case MediaStream::StunBindDiscovery: + case MediaStream::Ice: if(mFlowState == ConnectingServer) { changeFlowState(Binding); @@ -597,7 +814,19 @@ Flow::onConnectSuccess(unsigned int socketDesc, const asio::ip::address& address else { changeFlowState(Ready); - mMediaStream.onFlowReady(mComponentId); + if (mMediaStream.mNatTraversalMode == MediaStream::Ice && mIceComplete) + { + std::vector::iterator candIt = mIceCheckList.begin(); + for (; candIt != mIceCheckList.end(); ++candIt) + { + if (candIt->mState == IceCandidatePair::Succeeded) + { + mMediaStream.onFlowIceComplete(mComponentId, (mIceRole == IceRole_Controlling)); + break; + } + } + mIceRole = IceRole_Unknown; + } } break; case MediaStream::TurnAllocation: @@ -640,21 +869,274 @@ Flow::onSharedSecretFailure(unsigned int socketDesc, const asio::error_code& e) void Flow::onBindSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& stunServerTuple) { - InfoLog(<< "Flow::onBindingSuccess: socketDesc=" << socketDesc << ", reflexive=" << reflexiveTuple << ", componentId=" << mComponentId); + InfoLog(<< "Flow::onBindingSuccess: socketDesc=" << socketDesc << ", reflexive=" << reflexiveTuple << ", componentId=" << mComponentId << ", stunServer=" << stunServerTuple); + + if (mFlowState == CheckingConnectivity && mMediaStream.getRtpFlow() == this) { - Lock lock(mMutex); - mReflexiveTuple = reflexiveTuple; + // this candidate is a good one! use it + mConnectivityCheckTimer.cancel(); + std::vector::iterator candIt = mIceCheckList.begin(); + std::auto_ptr nominatedCandidatePair; + bool newPeerReflexiveCandidateDiscovered = false; + for (; candIt != mIceCheckList.end(); ++candIt) + { + if (candIt->mRemoteCandidate.getTransportAddr() == stunServerTuple) + { + if (candIt->mLocalCandidate.getTransportAddr() == reflexiveTuple) + { + nominatedCandidatePair.reset(new IceCandidatePair(*candIt)); + candIt->mState = IceCandidatePair::Succeeded; + } + else + { + newPeerReflexiveCandidateDiscovered = true; + } + break; + } + } + if (nominatedCandidatePair.get() == NULL && newPeerReflexiveCandidateDiscovered) + { + IceCandidatePair newPeerRflxCandidate; + newPeerRflxCandidate.mRemoteCandidate = candIt->mRemoteCandidate; + newPeerRflxCandidate.mLocalCandidate = IceCandidate(reflexiveTuple, IceCandidate::CandidateType_Prflx, 0, resip::Data::Empty, mComponentId, mLocalBinding); + newPeerRflxCandidate.mState = IceCandidatePair::Succeeded; + mIceCheckList.push_back(newPeerRflxCandidate); + nominatedCandidatePair.reset(new IceCandidatePair(newPeerRflxCandidate)); + } + + if (nominatedCandidatePair.get() != NULL) + { + changeFlowState(Connecting); + mTurnSocket->connect(stunServerTuple.getAddress().to_string(), stunServerTuple.getPort(), mLocalBinding.getAddress().is_v6()); + DebugLog(<< "connecting RTP flow to " << stunServerTuple.getAddress().to_string() << ":" << stunServerTuple.getPort()); + //mIceRole = IceRole_Unknown; + mIceComplete = true; + + // time to make the RTCP flow do its connectivity check, using the candidate with the same foundation + // as that of the RTP candidate that has just succeeded + candIt = mMediaStream.getRtcpFlow()->mIceCheckList.begin(); + for (; candIt != mMediaStream.getRtcpFlow()->mIceCheckList.end(); ++candIt) + { + if (candIt->mRemoteCandidate.getFoundation() == nominatedCandidatePair->mRemoteCandidate.getFoundation()) + { + mMediaStream.getRtcpFlow()->changeFlowState(CheckingConnectivity); + + DebugLog(<< "set username/password for ICE (RTCP flow): " << mOutgoingIceUsername << ", " << mOutgoingIcePassword); + mMediaStream.getRtcpFlow()->mTurnSocket->setUsernameAndPassword(mOutgoingIceUsername.c_str(), mOutgoingIcePassword.c_str(), true); + + // .jjg. do the connectivity check here directly rather than + // going through the onConnectivityCheckTimer(), since we are just trying + // this single candidate which is pretty much guaranteed to work... + reTurn::IceCandidate& c = candIt->mRemoteCandidate; + candIt->mState = IceCandidatePair::InProgress; + mMediaStream.getRtcpFlow()->mTurnSocket->connectivityCheck( + c.getTransportAddr(), + mMediaStream.getRtcpFlow()->mPeerRflxCandidatePriority, + mMediaStream.getRtcpFlow()->mIceRole == Flow::IceRole_Controlling, + mMediaStream.getRtcpFlow()->mIceRole == Flow::IceRole_Controlled, + CONNECTIVITY_CHECK_MAX_RETRANSMITS, + CONNECTIVITY_CHECK_RETRANS_INTERVAL_MS); + DebugLog(<< "(RTCP flow) checking connectivity to remote candidate " << c.getTransportAddr().getAddress().to_string() << ":" << c.getTransportAddr().getPort()); + } + } + } } + else if (mFlowState == CheckingConnectivity && mMediaStream.getRtcpFlow() == this) + { + // this candidate is a good one! use it + std::vector::iterator candIt = mIceCheckList.begin(); + std::auto_ptr nominatedCandidatePair; + bool newPeerReflexiveCandidateDiscovered = false; + for (; candIt != mIceCheckList.end(); ++candIt) + { + if (candIt->mRemoteCandidate.getTransportAddr() == stunServerTuple) + { + if (candIt->mLocalCandidate.getTransportAddr() == reflexiveTuple) + { + nominatedCandidatePair.reset(new IceCandidatePair(*candIt)); + candIt->mState = IceCandidatePair::Succeeded; + } + else + { + newPeerReflexiveCandidateDiscovered = true; + } + break; + } + } + if (nominatedCandidatePair.get() == NULL && newPeerReflexiveCandidateDiscovered) + { + IceCandidatePair newPeerRflxCandidate; + newPeerRflxCandidate.mRemoteCandidate = candIt->mRemoteCandidate; + newPeerRflxCandidate.mLocalCandidate = IceCandidate(reflexiveTuple, IceCandidate::CandidateType_Prflx, 0, resip::Data::Empty, mComponentId, mLocalBinding); + newPeerRflxCandidate.mState = IceCandidatePair::Succeeded; + mIceCheckList.push_back(newPeerRflxCandidate); + nominatedCandidatePair.reset(new IceCandidatePair(newPeerRflxCandidate)); + } + + if (nominatedCandidatePair.get() != NULL) + { + changeFlowState(Connecting); + mTurnSocket->connect(stunServerTuple.getAddress().to_string(), stunServerTuple.getPort(), mLocalBinding.getAddress().is_v6()); + DebugLog(<< "connecting RTCP flow to " << stunServerTuple.getAddress().to_string() << ":" << stunServerTuple.getPort()); + //mIceRole = IceRole_Unknown; + mIceComplete = true; + } + } + else if (mMediaStream.mNatTraversalMode == MediaStream::Ice && + (mFlowState == Connecting || mFlowState == Connected || mFlowState == Ready)) + { + // ignore; we probably had multiple ICE candidates that succeeded, and we've already picked + // the first successful one + DebugLog(<< "ignoring onBindSuccess() in state " << flowStateToString(mFlowState)); + } + else + { + mReflexiveTuple = reflexiveTuple; changeFlowState(Ready); mMediaStream.onFlowReady(mComponentId); + + // this block is necessary because setActiveDestination(..) is called BEFORE our server reflexive + // candidate is disconvered (i.e. before we get to the previous three lines of code) ... + // when we are UAS for the transaction, onFlowReady results in our answer going out HOWEVER + // setActiveDestination is NOT called again -- so we need to manually kick-start ICE connectivity checks + if (mConnectivityChecksPending) + { + if (mMediaStream.getRtpFlow() == this && mMediaStream.getRtcpFlow() == NULL) + { + changeFlowState(CheckingConnectivity); + scheduleConnectivityChecks(); + mConnectivityChecksPending = false; +} + else if (mMediaStream.getRtcpFlow() == this && mMediaStream.getRtpFlow()->isReady()) + { + mMediaStream.getRtpFlow()->changeFlowState(CheckingConnectivity); + mMediaStream.getRtpFlow()->scheduleConnectivityChecks(); + changeFlowState(CheckingConnectivity); + mMediaStream.getRtpFlow()->mConnectivityChecksPending = false; + mConnectivityChecksPending = false; + } + else if (mMediaStream.getRtpFlow() == this && mMediaStream.getRtcpFlow()->isReady()) + { + changeFlowState(CheckingConnectivity); + scheduleConnectivityChecks(); + mConnectivityChecksPending = false; + mMediaStream.getRtcpFlow()->changeFlowState(CheckingConnectivity); + mMediaStream.getRtcpFlow()->mConnectivityChecksPending = false; + } + } + } } + void Flow::onBindFailure(unsigned int socketDesc, const asio::error_code& e, const StunTuple& stunServerTuple) { WarningLog(<< "Flow::onBindingFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId ); + if (mMediaStream.mNatTraversalMode == MediaStream::Ice && mFlowState == CheckingConnectivity) + { + unsigned int numFailed = 0; + unsigned int numSucceeded = 0; + std::vector::iterator candIt = mIceCheckList.begin(); + for (; candIt != mIceCheckList.end(); ++candIt) + { + if (candIt->mRemoteCandidate.getTransportAddr() == stunServerTuple) + { + candIt->mState = IceCandidatePair::Failed; + } + if (candIt->mState == IceCandidatePair::Failed) + { + numFailed++; + } + else if (candIt->mState == IceCandidatePair::Succeeded) + { + numSucceeded++; + } + } + + if ((numSucceeded == 0) && (numFailed > 0) && (numFailed == mIceCheckList.size())) + { + InfoLog(<< (mComponentId == RTP_COMPONENT_ID ? "RTP" : "RTCP") << ": all " << numFailed << " remote candidates have failed; no more remote candidates to try"); + mConnectivityCheckTimer.cancel(); + changeFlowState(Connected); // ICE failed, but we can still use the flow; + // use Connected state for now so that we don't stream media + // until setActiveDestination is called + mMediaStream.onFlowIceFailed(mComponentId, (mIceRole == IceRole_Controlling)); + if (mMediaStream.getRtcpFlow()) + { + mMediaStream.getRtcpFlow()->changeFlowState(Connected); + mMediaStream.onFlowIceFailed(mMediaStream.getRtcpFlow()->mComponentId, (mIceRole == IceRole_Controlling)); + } + } + } + else if (mMediaStream.mNatTraversalMode == MediaStream::Ice && + (mFlowState == Connecting || mFlowState == Connected || mFlowState == Ready)) + { + // we are good to go, but we get a bind failure? + // just ignore this condition, since RTP is probably flowing at this point + DebugLog(<< "ignoring onBindFailure() in state " << flowStateToString(mFlowState)); + } + else + { changeFlowState(Connected); mMediaStream.onFlowError(mComponentId, e.value()); // TODO define different error code? } +} + +void +Flow::onIncomingBindRequestProcessed(unsigned int socketDest, const StunTuple& sourceTuple, const reTurn::StunMessage& bindRequest) +{ + InfoLog(<< "Flow::onIncomingBindRequestProcessed, socketDest=" << socketDest << ", sourceTuple=" << sourceTuple); + // this is essentially a 'triggered check'; the other side sent a connectivity check to us + // from sourceTuple, so we want to immediately send one back to it (if we haven't already) + // so that we can speed up convergeance + if (mMediaStream.mNatTraversalMode == MediaStream::Ice && !mOutgoingIceUsername.empty()) + { + std::vector::iterator candIt = mIceCheckList.begin(); + bool candidateExists = false; + for (; candIt != mIceCheckList.end(); ++candIt) + { + if (candIt->mRemoteCandidate.getTransportAddr() == sourceTuple) + { + candidateExists = true; + if (candIt->mState == IceCandidatePair::Waiting || candIt->mState == IceCandidatePair::Failed) + { + DebugLog(<< "set username/password for ICE: " << mOutgoingIceUsername << ", " << mOutgoingIcePassword); + mTurnSocket->setUsernameAndPassword(mOutgoingIceUsername.c_str(), mOutgoingIcePassword.c_str(), true); + + candIt->mState = IceCandidatePair::InProgress; + const reTurn::IceCandidate& c = candIt->mRemoteCandidate; + mTurnSocket->connectivityCheck( // !jjg! FIXME: race condition here, ICE might already be complete! + c.getTransportAddr(), + mPeerRflxCandidatePriority, + mIceRole == Flow::IceRole_Controlling || !bindRequest.mHasIceControlling, + mIceRole == Flow::IceRole_Controlled || bindRequest.mHasIceControlling, + CONNECTIVITY_CHECK_MAX_RETRANSMITS, + CONNECTIVITY_CHECK_RETRANS_INTERVAL_MS); + DebugLog(<< "checking connectivity to remote candidate " << c.getTransportAddr().getAddress().to_string() << ":" << c.getTransportAddr().getPort()); + } + break; + } + } + + // handle the case where we get a connectivity check from the remote end before + // setActiveDestination(..) is called + if (!candidateExists) + { + InfoLog(<< "creating placeholder candidate for sourceTuple " << sourceTuple); + // create a 'placeholder' candidate at the top of the check list; + // this should get updated once we get the remote candidates in the SDP answer + // @see Flow::setActiveDestination(..) + IceCandidatePair candidatePair; + candidatePair.mLocalCandidate = IceCandidate(mLocalBinding, IceCandidate::CandidateType_Host, 0, Data::Empty, mComponentId, StunTuple()); + candidatePair.mRemoteCandidate = IceCandidate(sourceTuple, IceCandidate::CandidateType_Unknown, 0, Data::Empty, 0, StunTuple()); + candidatePair.mState = IceCandidatePair::Waiting; + mIceCheckList.push_back(candidatePair); + } + } + else + { + DebugLog(<< "can't do triggered check; mMediaStream.mNatTraversalMode == " << mMediaStream.mNatTraversalMode << ", mOutgoingIceUsername == " << mOutgoingIceUsername); + } +} void Flow::onAllocationSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& relayTuple, unsigned int lifetime, unsigned int bandwidth, UInt64 reservationToken) @@ -667,7 +1149,6 @@ Flow::onAllocationSuccess(unsigned int socketDesc, const StunTuple& reflexiveTup ", reservationToken=" << reservationToken << ", componentId=" << mComponentId); { - Lock lock(mMutex); mReflexiveTuple = reflexiveTuple; mRelayTuple = relayTuple; mReservationToken = reservationToken; @@ -724,24 +1205,6 @@ Flow::onClearActiveDestinationFailure(unsigned int socketDesc, const asio::error WarningLog(<< "Flow::onClearActiveDestinationFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId ); } -void -Flow::onChannelBindRequestSent(unsigned int socketDesc, unsigned short channelNumber) -{ - InfoLog(<< "Flow::onChannelBindRequestSent: socketDesc=" << socketDesc << ", channelNumber=" << channelNumber << ", componentId=" << mComponentId); -} - -void -Flow::onChannelBindSuccess(unsigned int socketDesc, unsigned short channelNumber) -{ - InfoLog(<< "Flow::onChannelBindSuccess: socketDesc=" << socketDesc << ", channelNumber=" << channelNumber << ", componentId=" << mComponentId); -} - -void -Flow::onChannelBindFailure(unsigned int socketDesc, const asio::error_code& e) -{ - WarningLog(<< "Flow::onChannelBindFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId ); -} - void Flow::onSendSuccess(unsigned int socketDesc) { @@ -766,15 +1229,14 @@ Flow::onSendFailure(unsigned int socketDesc, const asio::error_code& e) void Flow::onReceiveSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port, boost::shared_ptr& data) { - DebugLog(<< "Flow::onReceiveSuccess: socketDesc=" << socketDesc << ", fromAddress=" << address.to_string() << ", fromPort=" << port << ", size=" << data->size() << ", componentId=" << mComponentId); + StackLog(<< "Flow::onReceiveSuccess: socketDesc=" << socketDesc << ", fromAddress=" << address.to_string() << ", fromPort=" << port << ", size=" << data->size() << ", componentId=" << mComponentId); #ifdef USE_SSL +#ifdef USE_DTLS // Check if packet is a dtls packet - if so then process it // Note: Stun messaging should be picked off by the reTurn library - so we only need to tell the difference between DTLS and SRTP here if(DtlsFactory::demuxPacket((const unsigned char*) data->data(), data->size()) == DtlsFactory::dtls) { - Lock lock(mMutex); - StunTuple endpoint(mLocalBinding.getTransportType(), address, port); DtlsSocket* dtlsSocket = getDtlsSocket(endpoint); if(!dtlsSocket) @@ -791,7 +1253,53 @@ Flow::onReceiveSuccess(unsigned int socketDesc, const asio::ip::address& address return; } #endif +#endif + if (mHandler) + { + bool usingSRTP = +#ifdef USE_SSL +#ifdef USE_DTLS + mMediaStream.mDtlsFactory || +#endif +#endif + mMediaStream.mSRTPSessionInCreated; + asio::error_code errorCode; + if (usingSRTP) // only do this if we're using SRTP (saves on a memcpy) + { + unsigned int ncbuff_size = data->size() * 2; // a guesstimate + char* ncbuff = new char[ncbuff_size]; + memset(ncbuff, 0, ncbuff_size); + ReceivedData recvData(address, port, data); + errorCode = processReceivedData(ncbuff, ncbuff_size, &recvData); + if (!errorCode) + { + // The size of the data may have changed. Make sure we create a + // new DataBuffer with the new contents after decoding. Use this + // to forward to the system. + boost::shared_ptr newBuf(reTurn::DataBuffer::own(ncbuff, ncbuff_size)); + data.swap(newBuf); + } + } + else if (mMediaStream.mSRTPEnabled && !mMediaStream.mSRTPSessionInCreated) + { + // we haven't processed the remote party's answer yet, so we haven't + // setup our inbound SRTP session and can't decode this data; toss it + WarningLog(<< "discarding incoming SRTP since we don't have an inbound SRTP session yet"); + return; + } + + if (!errorCode) + { + mHandler->onReceiveSuccess(this, socketDesc, address, port, data); + } + else + { + mHandler->onReceiveFailure(this, socketDesc, errorCode); + } + } + else + { if(!mReceivedDataFifo.add(new ReceivedData(address, port, data), ReceivedDataFifo::EnforceTimeDepth)) { WarningLog(<< "Flow::onReceiveSuccess: TimeLimitFifo is full - discarding data! componentId=" << mComponentId); @@ -801,25 +1309,28 @@ Flow::onReceiveSuccess(unsigned int socketDesc, const asio::ip::address& address mFakeSelectSocketDescriptor.send(); } } +} void Flow::onReceiveFailure(unsigned int socketDesc, const asio::error_code& e) { WarningLog(<< "Flow::onReceiveFailure: socketDesc=" << socketDesc << " error=" << e.value() << "(" << e.message() << "), componentId=" << mComponentId); - // Make sure we keep receiving if we get an ICMP error on a UDP socket - if(e.value() == asio::error::connection_reset && mLocalBinding.getTransportType() == StunTuple::UDP) + if ((e.value() == asio::error::connection_refused || e.value() == asio::error::connection_reset) + && mLocalBinding.getTransportType() == StunTuple::UDP) { - assert(mTurnSocket.get()); + // .jjg. keep receiving -- after endless support tickets where ICMP errors were leading to Bria + // hanging up (even with a grace period), I've decided it's better to just let 'er keep trucking along + // as though nothing is wrong ... BUT we'll still call the onReceiveFailure callback so that apps can + // customize their behaviour in this regard mTurnSocket->turnReceive(); - } + } -void -Flow::onIncomingBindRequestProcessed(unsigned int socketDesc, const StunTuple& sourceTuple) + if (mHandler) { - InfoLog(<< "Flow::onIncomingBindRequestProcessed: socketDesc=" << socketDesc << ", sourceTuple=" << sourceTuple ); - // TODO - handle + mHandler->onReceiveFailure(this, socketDesc, e); + } } void @@ -848,6 +1359,8 @@ Flow::flowStateToString(FlowState state) return "Connected"; case Ready: return "Ready"; + case CheckingConnectivity: + return "CheckingConnectivity"; default: assert(false); return "Unknown"; @@ -855,6 +1368,7 @@ Flow::flowStateToString(FlowState state) } #ifdef USE_SSL +#ifdef USE_DTLS DtlsSocket* Flow::getDtlsSocket(const StunTuple& endpoint) { @@ -878,7 +1392,6 @@ Flow::createDtlsSocketClient(const StunTuple& endpoint) dtlsSocket->startClient(); mDtlsSockets[endpoint] = dtlsSocket; } - return dtlsSocket; } @@ -896,8 +1409,8 @@ Flow::createDtlsSocketServer(const StunTuple& endpoint) return dtlsSocket; } - -#endif +#endif // USE_DTLS +#endif // USE_SSL /* ==================================================================== diff --git a/reflow/Flow.hxx b/reflow/Flow.hxx index 58485baf54..8ce238a8a9 100644 --- a/reflow/Flow.hxx +++ b/reflow/Flow.hxx @@ -9,11 +9,7 @@ #include #include -#ifdef WIN32 #include -#else -#include -#endif #include #include "reTurn/client/TurnAsyncUdpSocket.hxx" @@ -21,8 +17,15 @@ #include "reTurn/client/TurnAsyncTlsSocket.hxx" #include "reTurn/client/TurnAsyncSocketHandler.hxx" #include "reTurn/StunMessage.hxx" +#include "reTurn/DataBuffer.hxx" +#include "reTurn/StunMessage.hxx" +#include "IceCandidate.hxx" #include "FakeSelectSocketDescriptor.hxx" +#ifdef USE_SSL +#ifdef USE_DTLS #include "dtls_wrapper/DtlsSocket.hxx" +#endif +#endif using namespace reTurn; @@ -38,6 +41,7 @@ namespace flowmanager Author: Scott Godin (sgodin AT SipSpectrum DOT com) */ class MediaStream; +class FlowHandler; class Flow; class Flow : public TurnAsyncSocketHandler @@ -49,6 +53,7 @@ public: Unconnected, ConnectingServer, Connecting, + CheckingConnectivity, Binding, Allocating, Connected, @@ -64,34 +69,29 @@ public: MediaStream& mediaStream); ~Flow(); - void activateFlow(UInt8 allocationProps = StunMessage::PropsNone); - void activateFlow(UInt64 reservationToken); - - bool isReady() { return mFlowState == Ready; } - - /// Returns a socket descriptor that can be used in a select call - /// WARNING - this descriptor should not be used for any other purpose - /// - do NOT set socket options, or send, receive from this descriptor, - /// instead use the Flow api's - unsigned int getSelectSocketDescriptor(); + void setHandler(FlowHandler* handler); - unsigned int getSocketDescriptor(); // returns the real socket descriptor - used to correlate callbacks + // this can also be obtained from the FlowManager, but is here as well for convenience + // for apps that need to do operations on the socket (setting options, setting Qos options, etc.) + asio::io_service& getIOService() { return mIOService; } /// Turn Send Methods /// WARNING - if using Secure media, then there must be room at the /// end of the passed in buffer for the SRTP HMAC code to be appended /// ***It would be good to make this safer*** - void send(char* buffer, unsigned int size); - void sendTo(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int size); + void send(boost::shared_ptr& buffer); void rawSendTo(const asio::ip::address& address, unsigned short port, const char* buffer, unsigned int size); - /// Receive Methods - asio::error_code receive(char* buffer, unsigned int& size, unsigned int timeout, asio::ip::address* sourceAddress=0, unsigned short* sourcePort=0); - asio::error_code receiveFrom(const asio::ip::address& address, unsigned short port, char* buffer, unsigned int& size, unsigned int timeout); + void asyncReceive(); /// Used to set where this flow should be sending to - void setActiveDestination(const char* address, unsigned short port); + void setActiveDestination(const resip::Data& address, unsigned short port, const std::vector& candidates); + + void setIceRole(bool controlling); + void setPeerReflexiveCandidatePriority(UInt32 priority); +#ifdef USE_SSL +#ifdef USE_DTLS /// Dtls-Srtp Methods /// Starts the dtls client handshake process - (must call setActiveDestination first) @@ -105,24 +105,73 @@ public: /// Retrieves the stored remote SDP Fingerprint. const resip::Data getRemoteSDPFingerprint(); +#endif +#endif + + //void setOnBeforeSocketClosedFp(boost::function fp); const StunTuple& getLocalTuple(); + unsigned int getComponentId() const { return mComponentId; } + +private: + friend class MediaStream; + + void initialize(); + void shutdown(); + + void activateFlow(UInt8 allocationProps = reTurn::StunMessage::PropsNone); + void activateFlow(UInt64 reservationToken); + + bool isReady() const { return mFlowState == Ready; } StunTuple getSessionTuple(); // returns either local, reflexive, or relay tuple depending on NatTraversalMode StunTuple getRelayTuple(); StunTuple getReflexiveTuple(); UInt64 getReservationToken(); - unsigned int getComponentId() { return mComponentId; } + bool iceComplete() const { return mIceComplete; } + reTurn::IceCandidate getLocalNominatedIceCandidate() const; + reTurn::IceCandidate getRemoteNominatedIceCandidate() const; + void setOutgoingIceUsernameAndPassword(const resip::Data& username, const resip::Data& password); + void setLocalIcePassword(const resip::Data& password); + + /// Returns a socket descriptor that can be used in a select call + /// WARNING - this descriptor should not be used for any other purpose + /// - do NOT set socket options, or send, receive from this descriptor, + /// instead use the Flow api's + unsigned int getSelectSocketDescriptor(); + unsigned int getSocketDescriptor(); // returns the real socket descriptor - used to correlate callbacks + + void setHandlerImpl(FlowHandler* handler, resip::Condition& cv); + void sendImpl(boost::shared_ptr& buffer); + void asyncReceiveImpl(); + void setActiveDestinationImpl(const resip::Data& address, unsigned short port, const std::vector& candidates); + void setIceRoleImpl(bool controlling); + void setPeerReflexiveCandidatePriorityImpl(UInt32 priority) { mPeerRflxCandidatePriority = priority; } + void startDtlsClientImpl(const std::string& address, unsigned short port); + void setRemoteSDPFingerprintImpl(const resip::Data& fingerprint); + + /// Used only when ICE is enabled; should be called once the offer/answer exchange has been completed + void scheduleConnectivityChecks(); + void onConnectivityCheckTimer(const asio::error_code& error); -private: asio::io_service& mIOService; + asio::deadline_timer mConnectivityCheckTimer; #ifdef USE_SSL asio::ssl::context& mSslContext; #endif + enum IceRole + { + IceRole_Controlled, + IceRole_Controlling, + IceRole_Unknown + }; + IceRole mIceRole; + UInt32 mPeerRflxCandidatePriority; + // Note: these member variables are set at creation time and never changed, thus // they do not require mutex protection - unsigned int mComponentId; - StunTuple mLocalBinding; + const unsigned int mComponentId; + const StunTuple mLocalBinding; // MediaStream that this Flow belongs too MediaStream& mMediaStream; @@ -133,20 +182,45 @@ private: // These are only set once, then accessed - thus they do not require mutex protection UInt8 mAllocationProps; UInt64 mReservationToken; + bool mActiveDestinationSet; + bool mConnectivityChecksPending; // Mutex to protect the following members that may be get/set from multiple threads resip::Mutex mMutex; StunTuple mReflexiveTuple; StunTuple mRelayTuple; + FlowHandler* mHandler; + resip::Data mRemoteSDPFingerprint; + bool mIceComplete; + resip::Data mOutgoingIceUsername; + resip::Data mOutgoingIcePassword; + + struct IceCandidatePair + { + enum State + { + InProgress, + Frozen, + Waiting, + Failed, + Succeeded + }; + reTurn::IceCandidate mLocalCandidate; + reTurn::IceCandidate mRemoteCandidate; + State mState; + }; + std::vector mIceCheckList; #ifdef USE_SSL +#ifdef USE_DTLS // Map to store all DtlsSockets - in forking cases there can be more than one std::map mDtlsSockets; dtls::DtlsSocket* getDtlsSocket(const reTurn::StunTuple& endpoint); dtls::DtlsSocket* createDtlsSocketClient(const StunTuple& endpoint); dtls::DtlsSocket* createDtlsSocketServer(const StunTuple& endpoint); #endif +#endif volatile FlowState mFlowState; void changeFlowState(FlowState newState); @@ -168,8 +242,9 @@ private: ReceivedDataFifo mReceivedDataFifo; // Helpers to perform SRTP protection/unprotection - bool processSendData(char* buffer, unsigned int& size, const asio::ip::address& address, unsigned short port); + bool processSendData(boost::shared_ptr& buffer, const asio::ip::address& address, unsigned short port); asio::error_code processReceivedData(char* buffer, unsigned int& size, ReceivedData* receivedData, asio::ip::address* sourceAddress=0, unsigned short* sourcePort=0); + FakeSelectSocketDescriptor mFakeSelectSocketDescriptor; virtual void onConnectSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port); @@ -181,6 +256,8 @@ private: virtual void onBindSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& stunServerTuple); virtual void onBindFailure(unsigned int socketDesc, const asio::error_code& e, const StunTuple& stunServerTuple); + virtual void onIncomingBindRequestProcessed(unsigned int socketDest, const StunTuple& sourceTuple, const reTurn::StunMessage& bindRequest); + virtual void onAllocationSuccess(unsigned int socketDesc, const StunTuple& reflexiveTuple, const StunTuple& relayTuple, unsigned int lifetime, unsigned int bandwidth, UInt64 reservationToken); virtual void onAllocationFailure(unsigned int socketDesc, const asio::error_code& e); @@ -192,18 +269,15 @@ private: virtual void onClearActiveDestinationSuccess(unsigned int socketDesc); virtual void onClearActiveDestinationFailure(unsigned int socketDesc, const asio::error_code &e); - virtual void onChannelBindRequestSent(unsigned int socketDesc, unsigned short channelNumber); - virtual void onChannelBindSuccess(unsigned int socketDesc, unsigned short channelNumber); - virtual void onChannelBindFailure(unsigned int socketDesc, const asio::error_code& e); + virtual void onChannelBindRequestSent(unsigned int socketDesc, unsigned short channelNumber) {}; + virtual void onChannelBindSuccess(unsigned int socketDesc, unsigned short channelNumber) {}; + virtual void onChannelBindFailure(unsigned int socketDesc, const asio::error_code& e) {}; - //virtual void onReceiveSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port, const char* buffer, unsigned int size); virtual void onReceiveSuccess(unsigned int socketDesc, const asio::ip::address& address, unsigned short port, boost::shared_ptr& data); virtual void onReceiveFailure(unsigned int socketDesc, const asio::error_code& e); virtual void onSendSuccess(unsigned int socketDesc); virtual void onSendFailure(unsigned int socketDesc, const asio::error_code& e); - - virtual void onIncomingBindRequestProcessed(unsigned int socketDesc, const StunTuple& sourceTuple); }; } diff --git a/reflow/FlowDtlsSocketContext.cxx b/reflow/FlowDtlsSocketContext.cxx index 9e731b7905..70545285cf 100644 --- a/reflow/FlowDtlsSocketContext.cxx +++ b/reflow/FlowDtlsSocketContext.cxx @@ -3,14 +3,15 @@ #endif #ifdef USE_SSL -#include -#include -#include +#ifdef USE_DTLS #include #include #include +#include +#include + #include "FlowDtlsSocketContext.hxx" #include "FlowManagerSubsystem.hxx" @@ -150,6 +151,7 @@ FlowDtlsSocketContext::srtpUnprotect(void* data, int* size, bool rtcp) } #endif +#endif /* ==================================================================== Copyright (c) 2007-2008, Plantronics, Inc. diff --git a/reflow/FlowDtlsSocketContext.hxx b/reflow/FlowDtlsSocketContext.hxx index d69d65fb01..6548448c5b 100644 --- a/reflow/FlowDtlsSocketContext.hxx +++ b/reflow/FlowDtlsSocketContext.hxx @@ -8,11 +8,7 @@ #define FlowDtlsSocketContext_hxx #include -#ifdef WIN32 #include -#else -#include -#endif #include "dtls_wrapper/DtlsSocket.hxx" #include "Flow.hxx" diff --git a/reflow/FlowDtlsTimerContext.cxx b/reflow/FlowDtlsTimerContext.cxx index 0659ab6f92..9d5d6e9a6e 100644 --- a/reflow/FlowDtlsTimerContext.cxx +++ b/reflow/FlowDtlsTimerContext.cxx @@ -3,6 +3,8 @@ #endif #ifdef USE_SSL +#ifdef USE_DTLS + #include @@ -49,6 +51,7 @@ FlowDtlsTimerContext::handleTimeout(dtls::DtlsTimer *timer, const asio::error_co mDeadlineTimers.erase(timer); } +#endif #endif /* ==================================================================== diff --git a/reflow/FlowHandler.hxx b/reflow/FlowHandler.hxx new file mode 100644 index 0000000000..a965ec8b59 --- /dev/null +++ b/reflow/FlowHandler.hxx @@ -0,0 +1,59 @@ +#if !defined(FlowHandler_hxx) +#define FlowHandler_hxx + +#include "reTurn/DataBuffer.hxx" + +namespace flowmanager +{ +class Flow; + +class FlowHandler +{ +public: + + virtual ~FlowHandler() {} + + virtual void onReceiveSuccess(Flow* flow, unsigned int socketDesc, const asio::ip::address& address, unsigned short port, boost::shared_ptr& data) = 0; + + virtual void onReceiveFailure(Flow* flow, unsigned int socketDesc, const asio::error_code& e) = 0; + + virtual void onFlowHandlerCanBeDeleted() = 0; +}; + +} + +#endif + +/* ==================================================================== + + Copyright (c) 2007-2008, Plantronics, Inc. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the name of Plantronics nor the names of its contributors + may be used to endorse or promote products derived from this + software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + ==================================================================== */ diff --git a/reflow/FlowManager.cxx b/reflow/FlowManager.cxx index c7351ffb42..c02bac55b9 100644 --- a/reflow/FlowManager.cxx +++ b/reflow/FlowManager.cxx @@ -2,27 +2,23 @@ #include "config.h" #endif +#include +#include +#include #include #include #include #include #include -#include -#include -#include -#include - -#ifdef WIN32 #include -#else -#include -#endif #ifdef USE_SSL #include #include +#ifdef USE_DTLS #include "FlowDtlsTimerContext.hxx" +#endif //USE_DTLS #endif //USE_SSL #include "FlowManagerSubsystem.hxx" @@ -31,8 +27,10 @@ using namespace flowmanager; using namespace resip; #ifdef USE_SSL +#ifdef USE_DTLS using namespace dtls; #endif +#endif using namespace std; #define RESIPROCATE_SUBSYSTEM FlowManagerSubsystem::FLOWMANAGER @@ -46,6 +44,14 @@ class IOServiceThread : public ThreadIf virtual ~IOServiceThread() {} +#ifdef WIN32 + virtual void run() + { + ThreadIf::run(); + SetThreadPriority(mThread, THREAD_PRIORITY_HIGHEST); + } +#endif + virtual void thread() { mIOService.run(); @@ -59,9 +65,12 @@ FlowManager::FlowManager() #ifdef USE_SSL : mSslContext(mIOService, asio::ssl::context::tlsv1), +#ifdef USE_DTLS + mDtlsFactory(0), +#endif mClientCert(0), - mClientKey(0), - mDtlsFactory(0) + mClientKey(0) + #endif { mIOServiceWork = new asio::io_service::work(mIOService); @@ -91,21 +100,29 @@ FlowManager::FlowManager() status = srtp_install_event_handler(FlowManager::srtpEventHandler); } - FlowManager::~FlowManager() { + if( mIOServiceWork != NULL ) + { + mIOServiceWork->get_io_service().stop(); delete mIOServiceWork; + mIOServiceWork = NULL; + } + mIOServiceThread->join(); delete mIOServiceThread; #ifdef USE_SSL + #ifdef USE_DTLS if(mDtlsFactory) delete mDtlsFactory; + #endif if(mClientCert) X509_free(mClientCert); if(mClientKey) EVP_PKEY_free(mClientKey); #endif } #ifdef USE_SSL +#ifdef USE_DTLS void FlowManager::initializeDtlsFactory(const char* certAor) { @@ -128,6 +145,7 @@ FlowManager::initializeDtlsFactory(const char* certAor) } } #endif +#endif void FlowManager::srtpEventHandler(srtp_event_data_t *data) @@ -161,43 +179,27 @@ FlowManager::createMediaStream(MediaStreamHandler& mediaStreamHandler, const char* stunPassword) { MediaStream* newMediaStream = 0; - if(rtcpEnabled) { - StunTuple localRtcpBinding(localBinding.getTransportType(), localBinding.getAddress(), localBinding.getPort() + 1); - newMediaStream = new MediaStream(mIOService, -#ifdef USE_SSL - mSslContext, -#endif + StunTuple localRtcpBinding = (rtcpEnabled ? StunTuple(localBinding.getTransportType(), localBinding.getAddress(), localBinding.getPort() + 1) : StunTuple()); + newMediaStream = new MediaStream( + mIOService, mediaStreamHandler, - localBinding, - localRtcpBinding, #ifdef USE_SSL +#ifdef USE_DTLS mDtlsFactory, #endif +#endif natTraversalMode, natTraversalServerHostname, natTraversalServerPort, stunUsername, stunPassword); - } - else - { - StunTuple rtcpDisabled; // Default constructor sets transport type to None - this signals Rtcp is disabled - newMediaStream = new MediaStream(mIOService, + newMediaStream->initialize( #ifdef USE_SSL - mSslContext, + &mSslContext, #endif - mediaStreamHandler, localBinding, - rtcpDisabled, -#ifdef USE_SSL - mDtlsFactory, -#endif - natTraversalMode, - natTraversalServerHostname, - natTraversalServerPort, - stunUsername, - stunPassword); + localRtcpBinding); } return newMediaStream; } diff --git a/reflow/FlowManager.hxx b/reflow/FlowManager.hxx index 4975eeafaa..86acfe8aeb 100644 --- a/reflow/FlowManager.hxx +++ b/reflow/FlowManager.hxx @@ -9,7 +9,9 @@ #include "FlowManagerException.hxx" #ifdef USE_SSL +#ifdef USE_DTLS #include "dtls_wrapper/DtlsFactory.hxx" +#endif //USE_DTLS #include #include #endif //USE_SSL @@ -32,6 +34,7 @@ namespace flowmanager Author: Scott Godin (sgodin AT SipSpectrum DOT com) */ class IOServiceThread; +class MediaStreamHandler; class FlowManager { @@ -51,10 +54,13 @@ public: const char* stunPassword = 0); #ifdef USE_SSL +#ifdef USE_DTLS void initializeDtlsFactory(const char* certAor); dtls::DtlsFactory* getDtlsFactory() { return mDtlsFactory; } +#endif //USE_DTLS #endif //USE_SSL + asio::io_service& getIOService() { return mIOService; } protected: private: @@ -72,8 +78,10 @@ private: #ifdef USE_SSL X509* mClientCert; EVP_PKEY* mClientKey; +#ifdef USE_DTLS dtls::DtlsFactory* mDtlsFactory; -#endif +#endif //USE_DTLS +#endif //USE_SSL }; } diff --git a/reflow/IceCandidate.hxx b/reflow/IceCandidate.hxx new file mode 100644 index 0000000000..295650a507 --- /dev/null +++ b/reflow/IceCandidate.hxx @@ -0,0 +1,183 @@ +#if !defined(IceCandidate_hxx) +#define IceCandidate_hxx + +#include "rutil/ParseBuffer.hxx" + +namespace reTurn +{ +class IceCandidate +{ +public: + enum CandidateType + { + CandidateType_Host, + CandidateType_Srflx, + CandidateType_Prflx, + CandidateType_Relay, + CandidateType_Unknown + }; + + IceCandidate() {} + IceCandidate( + const StunTuple& transportAddr, + CandidateType candidateType, + unsigned int priority, + const resip::Data& foundation, + unsigned int componentId, + const StunTuple& relatedAddr) + : mTransportAddr(transportAddr), + mCandidateType(candidateType), + mPriority(priority), + mFoundation(foundation), + mComponentId(componentId), + mRelatedAddr(relatedAddr) + { + } + IceCandidate(const IceCandidate& rhs) + : mTransportAddr(rhs.mTransportAddr), + mCandidateType(rhs.mCandidateType), + mPriority(rhs.mPriority), + mFoundation(rhs.mFoundation), + mComponentId(rhs.mComponentId), + mRelatedAddr(rhs.mRelatedAddr) + { + } + + /* + .jjg. untested, and currently unneeded + IceCandidate(const resip::Data& sdpAttribute) + { + resip::ParseBuffer pb(sdpAttribute); + + pb.skipWhitespace(); + const char* start = pb.position(); + + pb.skipNonWhitespace(); + pb.data(mFoundation, start); + + pb.skipWhitespace(); + resip::Data componentId; + start = pb.position(); + pb.skipNonWhitespace(); + pb.data(componentId, start); + mComponentId = (unsigned int)componentId.convertInt(); + + pb.skipWhitespace(); + resip::Data transport; + start = pb.position(); + pb.skipNonWhitespace(); + pb.data(transport, start); + if (resip::isEqualNoCase(transport, "udp")) { mTransportAddr.setTransportType(StunTuple::UDP); } + else if (resip::isEqualNoCase(transport, "tcp")) { mTransportAddr.setTransportType(StunTuple::TCP); } + else { assert(0); } + + pb.skipWhitespace(); + resip::Data priority; + start = pb.position(); + pb.skipNonWhitespace(); + pb.data(priority, start); + mPriority = (unsigned int)priority.convertInt(); + + pb.skipWhitespace(); + resip::Data connectionAddr; + start = pb.position(); + pb.skipNonWhitespace(); + pb.data(connectionAddr, start); + mTransportAddr.setAddress(asio::ip::address::from_string(connectionAddr.c_str())); + + pb.skipWhitespace(); + resip::Data port; + start = pb.position(); + pb.skipNonWhitespace(); + pb.data(port, start); + mTransportAddr.setPort(port.convertInt()); + + pb.skipWhitespace(); + resip::Data candidateType; + pb.skipChars("typ "); + start = pb.position(); + pb.skipNonWhitespace(); + pb.data(candidateType, start); + if (resip::isEqualNoCase(candidateType, "host")) { mCandidateType = CandidateType_Host; } + else if (resip::isEqualNoCase(candidateType, "srflx")) { mCandidateType = CandidateType_Srflx; } + else if (resip::isEqualNoCase(candidateType, "prflx")) { mCandidateType = CandidateType_Prflx; } + else if (resip::isEqualNoCase(candidateType, "relay")) { mCandidateType = CandidateType_Relay; } + else { mCandidateType = CandidateType_Unknown; } + + pb.skipWhitespace(); + if (!pb.eof()) + { + resip::Data relatedAddr; + start = pb.position(); + pb.skipNonWhitespace(); + pb.data(relatedAddr, start); + mRelatedAddr.setTransportType(mTransportAddr.getTransportType()); + mRelatedAddr.setAddress(asio::ip::address::from_string(relatedAddr.c_str())); + } + + pb.skipWhitespace(); + if (!pb.eof()) + { + resip::Data relatedPort; + start = pb.position(); + pb.skipNonWhitespace(); + pb.data(relatedPort, start); + mRelatedAddr.setPort(relatedPort.convertInt()); + } + } + */ + + virtual ~IceCandidate() {} + + const StunTuple& getTransportAddr() const { return mTransportAddr; } + const resip::Data& getFoundation() const { return mFoundation; } + unsigned int getComponentId() const { return mComponentId; } + CandidateType getCandidateType() const { return mCandidateType; } + +private: + StunTuple mTransportAddr; + CandidateType mCandidateType; + unsigned int mPriority; + resip::Data mFoundation; + unsigned int mComponentId; + StunTuple mRelatedAddr; + +}; +} + +#endif // IceCandidate_hxx + + +/* ==================================================================== + + Copyright (c) 2009, CounterPath, Inc. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the name of CounterPath nor the names of its contributors + may be used to endorse or promote products derived from this + software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + ==================================================================== */ diff --git a/reflow/MediaStream.cxx b/reflow/MediaStream.cxx index 0a4c2ac38a..f6c3b48bc0 100644 --- a/reflow/MediaStream.cxx +++ b/reflow/MediaStream.cxx @@ -4,7 +4,6 @@ #include #include -#include #include "FlowManagerSubsystem.hxx" #include "FlowManager.hxx" @@ -12,22 +11,21 @@ using namespace flowmanager; #ifdef USE_SSL +#ifdef USE_DTLS using namespace dtls; #endif +#endif using namespace resip; using namespace std; #define RESIPROCATE_SUBSYSTEM FlowManagerSubsystem::FLOWMANAGER MediaStream::MediaStream(asio::io_service& ioService, -#ifdef USE_SSL - asio::ssl::context& sslContext, -#endif MediaStreamHandler& mediaStreamHandler, - const StunTuple& localRtpBinding, - const StunTuple& localRtcpBinding, #ifdef USE_SSL +#ifdef USE_DTLS DtlsFactory* dtlsFactory, +#endif #endif NatTraversalMode natTraversalMode, const char* natTraversalServerHostname, @@ -35,90 +33,153 @@ MediaStream::MediaStream(asio::io_service& ioService, const char* stunUsername, const char* stunPassword) : #ifdef USE_SSL +#ifdef USE_DTLS mDtlsFactory(dtlsFactory), +#endif #endif mSRTPSessionInCreated(false), mSRTPSessionOutCreated(false), + mSRTPEnabled(false), mNatTraversalMode(natTraversalMode), + mIceAttempted(natTraversalMode == Ice), mNatTraversalServerHostname(natTraversalServerHostname), mNatTraversalServerPort(natTraversalServerPort), mStunUsername(stunUsername), mStunPassword(stunPassword), - mMediaStreamHandler(mediaStreamHandler) + mMediaStreamHandler(mediaStreamHandler), + mRtcpEnabled(false), + mRtpFlow(NULL), + mRtcpFlow(NULL), + mIOService(ioService) { - // Rtcp is enabled if localRtcpBinding transport type != None - mRtcpEnabled = localRtcpBinding.getTransportType() != StunTuple::None; +} +MediaStream::~MediaStream() +{ + shutdown(); +} + +void +MediaStream::initialize( +#ifdef USE_SSL + asio::ssl::context* sslContext, +#endif + const StunTuple& localRtpBinding, + const StunTuple& localRtcpBinding +) +{ + Lock lock(mMutex); + Condition cv; + mIOService.post(boost::bind(&MediaStream::initializeImpl, this, sslContext, localRtpBinding, localRtcpBinding, boost::ref(cv))); + cv.wait(mMutex); +} + +void +MediaStream::initializeImpl( +#ifdef USE_SSL + asio::ssl::context* sslContext, +#endif + const StunTuple& localRtpBinding, + const StunTuple& localRtcpBinding, + resip::Condition& cv +) +{ + Lock lock(mMutex); + mRtcpEnabled = (localRtcpBinding.getTransportType() != StunTuple::None); if(mRtcpEnabled) { - mRtpFlow = new Flow(ioService, + mRtpFlow = new Flow(mIOService, #ifdef USE_SSL - sslContext, + *sslContext, #endif RTP_COMPONENT_ID, localRtpBinding, *this); + mRtpFlow->initialize(); - mRtcpFlow = new Flow(ioService, + mRtcpFlow = new Flow(mIOService, #ifdef USE_SSL - sslContext, + *sslContext, #endif RTCP_COMPONENT_ID, localRtcpBinding, *this); + mRtcpFlow->initialize(); mRtpFlow->activateFlow(StunMessage::PropsPortPair); // If doing an allocation then wait until RTP flow is allocated, then activate RTCP flow - if(natTraversalMode != TurnAllocation) + if(mNatTraversalMode != TurnAllocation) { mRtcpFlow->activateFlow(); } } else { - mRtpFlow = new Flow(ioService, + mRtpFlow = new Flow(mIOService, #ifdef USE_SSL - sslContext, + *sslContext, #endif RTP_COMPONENT_ID, localRtpBinding, *this); + mRtpFlow->initialize(); mRtpFlow->activateFlow(StunMessage::PropsPortEven); mRtcpFlow = 0; } + cv.signal(); } -MediaStream::~MediaStream() +void +MediaStream::shutdown() { - { - Lock lock(mMutex); - - if(mSRTPSessionOutCreated) - { - mSRTPSessionOutCreated = false; - srtp_dealloc(mSRTPSessionOut); - } - if(mSRTPSessionInCreated) - { - mSRTPSessionInCreated = false; - srtp_dealloc(mSRTPSessionIn); - } + Lock lock(mMutex); + Condition cv; + mIOService.dispatch(boost::bind(&MediaStream::shutdownImpl, this, boost::ref(cv))); + cv.wait(mMutex); +} + +void +MediaStream::shutdownImpl(resip::Condition& cv) +{ + Lock lock(mMutex); + + if(mSRTPSessionOutCreated) + { + mSRTPSessionOutCreated = false; + srtp_dealloc(mSRTPSessionOut); + } + if(mSRTPSessionInCreated) + { + mSRTPSessionInCreated = false; + srtp_dealloc(mSRTPSessionIn); } + + mRtpFlow->shutdown(); delete mRtpFlow; + if(mRtcpEnabled) { + mRtcpFlow->shutdown(); delete mRtcpFlow; } + + cv.signal(); } -bool -MediaStream::createOutboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* key, unsigned int keyLen) +void +MediaStream::createOutboundSRTPSession(SrtpCryptoSuite cryptoSuite, const resip::Data& key) { - if(keyLen != SRTP_MASTER_KEY_LEN) + mIOService.post(boost::bind(&MediaStream::createOutboundSRTPSessionImpl, this, cryptoSuite, key)); +} + +void +MediaStream::createOutboundSRTPSessionImpl(SrtpCryptoSuite cryptoSuite, const resip::Data& key) +{ + if(key.size() != SRTP_MASTER_KEY_LEN) { - ErrLog(<< "Unable to create outbound SRTP session, invalid keyLen=" << keyLen); - return false; + ErrLog(<< "Unable to create outbound SRTP session, invalid keyLen=" << key.size()); + return; } err_status_t status; @@ -126,10 +187,10 @@ MediaStream::createOutboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* if(mSRTPSessionOutCreated) { // Check if settings are the same - if so just return true - if(cryptoSuite == mCryptoSuiteOut && memcmp(mSRTPMasterKeyOut, key, keyLen) == 0) + if(cryptoSuite == mCryptoSuiteOut && memcmp(mSRTPMasterKeyOut, key.data(), key.size()) == 0) { InfoLog(<< "Outbound SRTP session settings unchanged."); - return true; + return; } else { @@ -138,12 +199,13 @@ MediaStream::createOutboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* srtp_dealloc(mSRTPSessionOut); } } - memset(&mSRTPPolicyOut, 0, sizeof(mSRTPPolicyOut)); // Copy key locally - memcpy(mSRTPMasterKeyOut, key, SRTP_MASTER_KEY_LEN); + memcpy(mSRTPMasterKeyOut, key.data(), SRTP_MASTER_KEY_LEN); // load default srtp/srtcp policy settings + memset(&mSRTPPolicyOut, 0, sizeof(srtp_policy_t)); + mCryptoSuiteOut = cryptoSuite; switch(cryptoSuite) { @@ -157,12 +219,13 @@ MediaStream::createOutboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* break; default: ErrLog(<< "Unable to create outbound SRTP session, invalid crypto suite=" << cryptoSuite); - return false; + return; } // set remaining policy settings mSRTPPolicyOut.ssrc.type = ssrc_any_outbound; mSRTPPolicyOut.key = mSRTPMasterKeyOut; + mSRTPPolicyOut.next = 0; mSRTPPolicyOut.window_size = 64; // Allocate and initailize the SRTP sessions @@ -170,20 +233,24 @@ MediaStream::createOutboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* if(status) { ErrLog(<< "Unable to create srtp out session, error code=" << status); - return false; + return; } mSRTPSessionOutCreated = true; +} - return true; +void +MediaStream::createInboundSRTPSession(SrtpCryptoSuite cryptoSuite, const resip::Data& key) +{ + mIOService.post(boost::bind(&MediaStream::createInboundSRTPSessionImpl, this, cryptoSuite, key)); } -bool -MediaStream::createInboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* key, unsigned int keyLen) +void +MediaStream::createInboundSRTPSessionImpl(SrtpCryptoSuite cryptoSuite, const resip::Data& key) { - if(keyLen != SRTP_MASTER_KEY_LEN) + if(key.size() != SRTP_MASTER_KEY_LEN) { - ErrLog(<< "Unable to create inbound SRTP session, invalid keyLen=" << keyLen); - return false; + ErrLog(<< "Unable to create inbound SRTP session, invalid keyLen=" << key.size()); + return; } err_status_t status; @@ -191,10 +258,10 @@ MediaStream::createInboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* k if(mSRTPSessionInCreated) { // Check if settings are the same - if so just return true - if(cryptoSuite == mCryptoSuiteIn && memcmp(mSRTPMasterKeyIn, key, keyLen) == 0) + if(cryptoSuite == mCryptoSuiteIn && memcmp(mSRTPMasterKeyIn, key.data(), key.size()) == 0) { InfoLog(<< "Inbound SRTP session settings unchanged."); - return true; + return; } else { @@ -203,10 +270,11 @@ MediaStream::createInboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* k srtp_dealloc(mSRTPSessionIn); } } - memset(&mSRTPPolicyIn, 0, sizeof(mSRTPPolicyIn)); // Copy key locally - memcpy(mSRTPMasterKeyIn, key, SRTP_MASTER_KEY_LEN); + memcpy(mSRTPMasterKeyIn, key.data(), SRTP_MASTER_KEY_LEN); + + memset(&mSRTPPolicyIn, 0, sizeof(srtp_policy_t)); // load default srtp/srtcp policy settings mCryptoSuiteIn = cryptoSuite; @@ -222,12 +290,13 @@ MediaStream::createInboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* k break; default: ErrLog(<< "Unable to create inbound SRTP session, invalid crypto suite=" << cryptoSuite); - return false; + return; } // set remaining policy settings mSRTPPolicyIn.ssrc.type = ssrc_any_inbound; mSRTPPolicyIn.key = mSRTPMasterKeyIn; + mSRTPPolicyIn.next = 0; mSRTPPolicyIn.window_size = 64; // Allocate and initailize the SRTP sessions @@ -235,17 +304,14 @@ MediaStream::createInboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* k if(status) { ErrLog(<< "Unable to create srtp in session, error code=" << status); - return false; + return; } mSRTPSessionInCreated = true; - - return true; } err_status_t MediaStream::srtpProtect(void* data, int* size, bool rtcp) { - Lock lock(mMutex); err_status_t status = err_status_no_ctx; if(mSRTPSessionOutCreated) { @@ -264,7 +330,6 @@ MediaStream::srtpProtect(void* data, int* size, bool rtcp) err_status_t MediaStream::srtpUnprotect(void* data, int* size, bool rtcp) { - Lock lock(mMutex); err_status_t status = err_status_no_ctx; if(mSRTPSessionInCreated) { @@ -280,6 +345,46 @@ MediaStream::srtpUnprotect(void* data, int* size, bool rtcp) return status; } +void +MediaStream::setOutgoingIceUsernameAndPassword(const resip::Data& username, const resip::Data& password) +{ + mIOService.post(boost::bind(&MediaStream::setOutgoingIceUsernameAndPasswordImpl, this, username, password)); +} + +void +MediaStream::setOutgoingIceUsernameAndPasswordImpl(const resip::Data& username, const resip::Data& password) +{ + Lock lock(mMutex); + if (mRtpFlow) + { + mRtpFlow->setOutgoingIceUsernameAndPassword(username, password); + } + if (mRtcpFlow) + { + mRtcpFlow->setOutgoingIceUsernameAndPassword(username, password); + } +} + +void +MediaStream::setLocalIcePassword(const resip::Data& password) +{ + mIOService.post(boost::bind(&MediaStream::setLocalIcePasswordImpl, this, password)); +} + +void +MediaStream::setLocalIcePasswordImpl(const resip::Data& password) +{ + Lock lock(mMutex); + if (mRtpFlow) + { + mRtpFlow->setLocalIcePassword(password); + } + if (mRtcpFlow) + { + mRtcpFlow->setLocalIcePassword(password); + } +} + void MediaStream::onFlowReady(unsigned int componentId) { @@ -294,12 +399,12 @@ MediaStream::onFlowReady(unsigned int componentId) { if(mRtpFlow->isReady() && mRtcpFlow->isReady()) { - mMediaStreamHandler.onMediaStreamReady(mRtpFlow->getSessionTuple(), mRtcpFlow->getSessionTuple()); + mMediaStreamHandler.onMediaStreamReady(this, mRtpFlow->getSessionTuple(), mRtcpFlow->getSessionTuple()); } } else if(mRtpFlow && mRtpFlow->isReady()) { - mMediaStreamHandler.onMediaStreamReady(mRtpFlow->getSessionTuple(), StunTuple()); + mMediaStreamHandler.onMediaStreamReady(this, mRtpFlow->getSessionTuple(), StunTuple()); } } } @@ -307,7 +412,116 @@ MediaStream::onFlowReady(unsigned int componentId) void MediaStream::onFlowError(unsigned int componentId, unsigned int errorCode) { - mMediaStreamHandler.onMediaStreamError(errorCode); // TODO assign real error code + if ((errorCode == asio::error::host_not_found || errorCode == 8008) && mIceAttempted) + { + // .jjg. ignore this error if ICE is in use, since there's a chance media will still flow + if (mRtpFlow && mRtcpFlow) + { + if (componentId == RTCP_COMPONENT_ID) + { + mNatTraversalServerHostname = resip::Data::Empty; + mRtpFlow->activateFlow(); + mRtcpFlow->activateFlow(); + } + } + else if (mRtpFlow) + { + if (componentId == RTP_COMPONENT_ID) + { + mNatTraversalServerHostname = resip::Data::Empty; + mRtpFlow->activateFlow(); + } + } + } + else + { + mMediaStreamHandler.onMediaStreamError(this, errorCode); // TODO assign real error code + } +} + +void +MediaStream::onFlowIceComplete(unsigned int componentId, bool iAmIceControlling) +{ + if(mNatTraversalMode == Ice) + { + if(mRtpFlow && mRtcpFlow) + { + if(mRtpFlow->isReady() && mRtcpFlow->isReady()) + { + mMediaStreamHandler.onIceComplete( + this, + mRtpFlow->getLocalNominatedIceCandidate(), + mRtcpFlow->getLocalNominatedIceCandidate(), + mRtpFlow->getRemoteNominatedIceCandidate(), + mRtcpFlow->getRemoteNominatedIceCandidate(), + iAmIceControlling); + } + } + else if(mRtpFlow && mRtpFlow->isReady()) + { + mMediaStreamHandler.onIceComplete( + this, + mRtpFlow->getLocalNominatedIceCandidate(), + reTurn::IceCandidate(), + mRtpFlow->getRemoteNominatedIceCandidate(), + reTurn::IceCandidate(), + iAmIceControlling); + } + } +} + +void +MediaStream::onFlowIceFailed(unsigned int componentId, bool iAmIceControlling) +{ + if(mNatTraversalMode == Ice) + { + if(mRtpFlow && mRtcpFlow) + { + if(componentId == RTCP_COMPONENT_ID) + { + StunTuple fallbackRtpTuple = mRtpFlow->getSessionTuple(); + StunTuple fallbackRtcpTuple = mRtcpFlow->getSessionTuple(); + mNatTraversalMode = StunBindDiscovery; + mMediaStreamHandler.onIceFailed( + this, + fallbackRtpTuple, + fallbackRtcpTuple, + iAmIceControlling); + } + } + else if(mRtpFlow) + { + StunTuple fallbackRtpTuple = mRtpFlow->getSessionTuple(); + mNatTraversalMode = StunBindDiscovery; + mMediaStreamHandler.onIceFailed( + this, + fallbackRtpTuple, + StunTuple(), + iAmIceControlling); + } + } +} + +void +MediaStream::setSRTPEnabled(bool enabled) +{ + mIOService.post(boost::bind(&MediaStream::setSRTPEnabledImpl, this, enabled)); +} + +void +MediaStream::setIceDisabled() +{ + mIOService.post(boost::bind(&MediaStream::setIceDisabledImpl, this)); +} + +void +MediaStream::setIceDisabledImpl() +{ + if(mNatTraversalMode == Ice) + { + // fall back to STUN, since we will still want our public IP (if we have one) to show up + mNatTraversalMode = StunBindDiscovery; + } } diff --git a/reflow/MediaStream.hxx b/reflow/MediaStream.hxx index 49564dcd8e..4242c15cef 100644 --- a/reflow/MediaStream.hxx +++ b/reflow/MediaStream.hxx @@ -9,11 +9,7 @@ #ifdef USE_SSL #include #endif -#ifdef WIN32 #include -#else -#include -#endif #include "dtls_wrapper/DtlsFactory.hxx" #include "Flow.hxx" @@ -36,8 +32,11 @@ public: MediaStreamHandler() {} virtual ~MediaStreamHandler() {} - virtual void onMediaStreamReady(const StunTuple& rtpTuple, const StunTuple& rtcpTuple) = 0; - virtual void onMediaStreamError(unsigned int errorCode) = 0; + virtual void onMediaStreamReady(MediaStream* ms, const StunTuple& rtpTuple, const StunTuple& rtcpTuple) = 0; + virtual void onMediaStreamError(MediaStream* ms, unsigned int errorCode) = 0; + + virtual void onIceComplete(MediaStream* ms, const reTurn::IceCandidate& nominatedLocalRtpTuple, const reTurn::IceCandidate& nominatedLocalRtcpTuple, const reTurn::IceCandidate& nominatedRemoteRtpTuple, const reTurn::IceCandidate& nominatedRemoteRtcpTuple, bool iAmIceControlling) = 0; + virtual void onIceFailed(MediaStream* ms, const reTurn::StunTuple& rtpTuple, const reTurn::StunTuple& rtcpTuple, bool iAmIceControlling) = 0; }; #define RTP_COMPONENT_ID 1 @@ -50,7 +49,8 @@ public: { NoNatTraversal, StunBindDiscovery, - TurnAllocation + TurnAllocation, + Ice }; enum SrtpCryptoSuite @@ -60,14 +60,11 @@ public: }; MediaStream(asio::io_service& ioService, -#ifdef USE_SSL - asio::ssl::context& sslContext, -#endif MediaStreamHandler& mediaStreamHandler, - const StunTuple& localRtpBinding, - const StunTuple& localRtcpBinding, // pass in transport type = None to disable RTCP #ifdef USE_SSL + #ifdef USE_DTLS dtls::DtlsFactory* dtlsFactory = 0, + #endif #endif NatTraversalMode natTraversalMode = NoNatTraversal, const char* natTraversalServerHostname = 0, @@ -76,22 +73,39 @@ public: const char* stunPassword = 0); virtual ~MediaStream(); + void initialize( +#ifdef USE_SSL + asio::ssl::context* sslContext, +#endif + const StunTuple& localRtpBinding, + const StunTuple& localRtcpBinding + ); + void shutdown(); + Flow* getRtpFlow() { return mRtpFlow; } Flow* getRtcpFlow() { return mRtcpFlow; } // SRTP methods - should be called before sending or receiving on RTP or RTCP flows - bool createOutboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* key, unsigned int keyLen); - bool createInboundSRTPSession(SrtpCryptoSuite cryptoSuite, const char* key, unsigned int keyLen); + void setSRTPEnabled(bool enabled); + void createOutboundSRTPSession(SrtpCryptoSuite cryptoSuite, const resip::Data& key); + void createInboundSRTPSession(SrtpCryptoSuite cryptoSuite, const resip::Data& key); + + void setOutgoingIceUsernameAndPassword(const resip::Data& username, const resip::Data& password); + void setLocalIcePassword(const resip::Data& password); + void setIceDisabled(); protected: friend class Flow; // SRTP members #ifdef USE_SSL +#ifdef USE_DTLS dtls::DtlsFactory* mDtlsFactory; +#endif #endif volatile bool mSRTPSessionInCreated; volatile bool mSRTPSessionOutCreated; + volatile bool mSRTPEnabled; resip::Mutex mMutex; SrtpCryptoSuite mCryptoSuiteIn; SrtpCryptoSuite mCryptoSuiteOut; @@ -107,6 +121,7 @@ protected: // Nat Traversal Members NatTraversalMode mNatTraversalMode; + bool mIceAttempted; resip::Data mNatTraversalServerHostname; unsigned short mNatTraversalServerPort; resip::Data mStunUsername; @@ -118,11 +133,37 @@ private: MediaStreamHandler& mMediaStreamHandler; bool mRtcpEnabled; + asio::io_service& mIOService; + Flow* mRtpFlow; Flow* mRtcpFlow; virtual void onFlowReady(unsigned int componentId); virtual void onFlowError(unsigned int componentId, unsigned int errorCode); + virtual void onFlowIceComplete(unsigned int componentId, bool iAmIceControlling); + virtual void onFlowIceFailed(unsigned int componentId, bool iAmIceControlling); + + void initializeImpl( +#ifdef USE_SSL + asio::ssl::context* sslContext, +#endif + const StunTuple& localRtpBinding, + const StunTuple& localRtcpBinding, + resip::Condition& cv + ); + void shutdownImpl( + resip::Condition& cv); + + // SRTP methods - should be called before sending or receiving on RTP or RTCP flows + void setSRTPEnabledImpl(bool enabled) { mSRTPEnabled = enabled; } + void createOutboundSRTPSessionImpl(SrtpCryptoSuite cryptoSuite, const resip::Data& key); + void createInboundSRTPSessionImpl(SrtpCryptoSuite cryptoSuite, const resip::Data& key); + + void setOutgoingIceUsernameAndPasswordImpl(const resip::Data& username, const resip::Data& password); + void setLocalIcePasswordImpl(const resip::Data& password); + void setIceDisabledImpl(); + + }; } diff --git a/reflow/ErrorCode.hxx b/reflow/ReflowErrorCode.hxx similarity index 97% rename from reflow/ErrorCode.hxx rename to reflow/ReflowErrorCode.hxx index 90f3670319..83e2c0ab14 100644 --- a/reflow/ErrorCode.hxx +++ b/reflow/ReflowErrorCode.hxx @@ -1,57 +1,57 @@ -#ifndef FM_ERRORCODE_HXX -#define FM_ERRORCODE_HXX - -#include - -namespace flowmanager { - -typedef asio::error_code::value_type ErrorType; - -static const ErrorType Success = 0; -static const ErrorType GeneralError = -1; - -static const ErrorType ErrorBase = 9000; - -static const ErrorType BufferTooSmall = ErrorBase + 1; -static const ErrorType ReceiveTimeout = ErrorBase + 2; -static const ErrorType InvalidState = ErrorBase + 3; -static const ErrorType SRTPError = ErrorBase + 4; -static const ErrorType DtlsPacket = ErrorBase + 5; -} - -#endif - - -/* ==================================================================== - - Copyright (c) 2007-2008, Plantronics, Inc. - All rights reserved. - - Redistribution and use in source and binary forms, with or without - modification, are permitted provided that the following conditions are - met: - - 1. Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - - 2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - - 3. Neither the name of Plantronics nor the names of its contributors - may be used to endorse or promote products derived from this - software without specific prior written permission. - - THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - ==================================================================== */ +#ifndef FM_ERRORCODE_HXX +#define FM_ERRORCODE_HXX + +#include + +namespace flowmanager { + +typedef asio::error_code::value_type ErrorType; + +static const ErrorType Success = 0; +static const ErrorType GeneralError = -1; + +static const ErrorType ErrorBase = 9000; + +static const ErrorType BufferTooSmall = ErrorBase + 1; +static const ErrorType ReceiveTimeout = ErrorBase + 2; +static const ErrorType InvalidState = ErrorBase + 3; +static const ErrorType SRTPError = ErrorBase + 4; +static const ErrorType DtlsPacket = ErrorBase + 5; +} + +#endif + + +/* ==================================================================== + + Copyright (c) 2007-2008, Plantronics, Inc. + All rights reserved. + + Redistribution and use in source and binary forms, with or without + modification, are permitted provided that the following conditions are + met: + + 1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + 2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + + 3. Neither the name of Plantronics nor the names of its contributors + may be used to endorse or promote products derived from this + software without specific prior written permission. + + THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + ==================================================================== */ diff --git a/reflow/dtls_wrapper/DtlsFactory.cxx b/reflow/dtls_wrapper/DtlsFactory.cxx index 2ec2130a30..e8c5dadfa6 100644 --- a/reflow/dtls_wrapper/DtlsFactory.cxx +++ b/reflow/dtls_wrapper/DtlsFactory.cxx @@ -3,6 +3,7 @@ #endif #ifdef USE_SSL +#ifdef USE_DTLS #include #include @@ -98,6 +99,7 @@ DtlsFactory::demuxPacket(const unsigned char *data, unsigned int len) return unknown; } +#endif //USE_DTLS #endif //USE_SSL diff --git a/reflow/dtls_wrapper/DtlsSocket.cxx b/reflow/dtls_wrapper/DtlsSocket.cxx index a40e4fcc67..32489cd964 100644 --- a/reflow/dtls_wrapper/DtlsSocket.cxx +++ b/reflow/dtls_wrapper/DtlsSocket.cxx @@ -3,6 +3,7 @@ #endif #ifdef USE_SSL +#ifdef USE_DTLS #include #include @@ -420,6 +421,7 @@ DtlsSocket::getReadTimeout() return 500; } +#endif #endif /* ==================================================================== diff --git a/reflow/dtls_wrapper/DtlsSocket.hxx b/reflow/dtls_wrapper/DtlsSocket.hxx index 300512c1e6..fe43e11acf 100644 --- a/reflow/dtls_wrapper/DtlsSocket.hxx +++ b/reflow/dtls_wrapper/DtlsSocket.hxx @@ -10,11 +10,7 @@ #include extern "C" { -#ifdef WIN32 #include -#else -#include -#endif } #include diff --git a/reflow/dtls_wrapper/DtlsTimer.cxx b/reflow/dtls_wrapper/DtlsTimer.cxx index a6ee2ba0f4..be2096ae1c 100644 --- a/reflow/dtls_wrapper/DtlsTimer.cxx +++ b/reflow/dtls_wrapper/DtlsTimer.cxx @@ -3,6 +3,7 @@ #endif #ifdef USE_SSL +#ifdef USE_DTLS #include "DtlsTimer.hxx" @@ -38,6 +39,7 @@ DtlsTimerContext::fire(DtlsTimer *timer) timer->fire(); } +#endif //USE_DTLS #endif //USE_SSL /* ====================================================================