Skip to content

Commit

Permalink
[XrdCl] Update tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal authored and gganis committed Nov 23, 2021
1 parent ef767df commit 9a0c34c
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 77 deletions.
22 changes: 0 additions & 22 deletions src/XrdCl/XrdClPostMasterInterfaces.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,6 @@ namespace XrdCl
class URL;
class Socket;

//----------------------------------------------------------------------------
//! Message filter
//----------------------------------------------------------------------------
class MessageFilter
{
public:
virtual ~MessageFilter() {}

//------------------------------------------------------------------------
//! Examine the message and return true if the message should be picked
//! up (usually removed from the queue and to the caller)
//------------------------------------------------------------------------
virtual bool Filter( const std::shared_ptr<Message> msg ) = 0;

//------------------------------------------------------------------------
//! Get sid of the filter
//!
//! @return filter sid if exists, otherwise 0
//------------------------------------------------------------------------
virtual uint16_t GetSid() const = 0;
};

//----------------------------------------------------------------------------
//! Message handler
//----------------------------------------------------------------------------
Expand Down
228 changes: 173 additions & 55 deletions tests/XrdClTests/PostMasterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,18 @@ namespace
//------------------------------------------------------------------------------
// Message filter
//------------------------------------------------------------------------------
class XrdFilter: public XrdCl::MessageFilter
class XrdFilter
{
friend class SyncMsgHandler;

public:
XrdFilter( unsigned char id0 = 0, unsigned char id1 = 0 )
{
streamId[0] = id0;
streamId[1] = id1;
}

virtual bool Filter( const std::shared_ptr<XrdCl::Message> msg )
virtual bool Filter( const XrdCl::Message *msg )
{
ServerResponse *resp = (ServerResponse *)msg->GetBuffer();
if( resp->hdr.streamid[0] == streamId[0] &&
Expand All @@ -105,13 +107,121 @@ class XrdFilter: public XrdCl::MessageFilter
unsigned char streamId[2];
};

//------------------------------------------------------------------------------
// Synchronous Message Handler
//------------------------------------------------------------------------------
class SyncMsgHandler : public XrdCl::MsgHandler
{
public:
SyncMsgHandler() :
sem( 0 ), request( nullptr ), response( nullptr )
{
}

~SyncMsgHandler()
{
delete response;
}

private:

XrdFilter filter;
XrdSysSemaphore sem;
XrdCl::XRootDStatus status;
const XrdCl::Message *request;
XrdCl::Message *response;

public:

//------------------------------------------------------------------------
// Examine an incoming message, and decide on the action to be taken
//------------------------------------------------------------------------
virtual uint16_t Examine( std::shared_ptr<XrdCl::Message> &msg )
{
if( filter.Filter( msg.get() ) )
return RemoveHandler;
return Ignore;
}

//------------------------------------------------------------------------
// Reexamine the incoming message, and decide on the action to be taken
//------------------------------------------------------------------------
virtual uint16_t InspectStatusRsp()
{
return XrdCl::MsgHandler::Action::None;
}

//------------------------------------------------------------------------
// Get handler sid
//------------------------------------------------------------------------
virtual uint16_t GetSid() const
{
return filter.GetSid();
}

//------------------------------------------------------------------------
// Process the message if it was "taken" by the examine action
//------------------------------------------------------------------------
virtual void Process( XrdCl::Message *msg )
{
response = msg;
sem.Post();
};

//------------------------------------------------------------------------
// Handle an event other that a message arrival
//------------------------------------------------------------------------
virtual uint8_t OnStreamEvent( StreamEvent event,
XrdCl::XRootDStatus status )
{
if( event == Ready )
return 0;
this->status = status;
sem.Post();
return RemoveHandler;
};

//------------------------------------------------------------------------
// The requested action has been performed and the status is available
//------------------------------------------------------------------------
virtual void OnStatusReady( const XrdCl::Message *message,
XrdCl::XRootDStatus status )
{
request = message;
this->status = status;
if( !status.IsOK() )
sem.Post();
}

//------------------------------------------------------------------------
// Get a timestamp after which we give up
//------------------------------------------------------------------------
virtual time_t GetExpiration()
{
return 0;
}

XrdCl::XRootDStatus WaitFor( XrdCl::Message &rsp )
{
sem.Wait();
rsp = std::move( *response );
return status;
}

void SetFilter( unsigned char id0 = 0, unsigned char id1 = 0 )
{
filter.streamId[0] = id0;
filter.streamId[1] = id1;
}
};

//------------------------------------------------------------------------------
// Thread argument passing helper
//------------------------------------------------------------------------------
struct ArgHelper
{
XrdCl::PostMaster *pm;
int index;
int index;
};

//------------------------------------------------------------------------------
Expand All @@ -127,40 +237,39 @@ void *TestThreadFunc( void *arg )

ArgHelper *a = (ArgHelper*)arg;
URL host( address );
XrdFilter f( a->index, 0 );

//----------------------------------------------------------------------------
// Send the ping messages
//----------------------------------------------------------------------------
SyncMsgHandler msgHandlers[100];
Message msgs[100];
time_t expires = time(0)+1200;
Message m;
m.Allocate( sizeof( ClientPingRequest ) );
m.Zero();
m.SetDescription( "kXR_ping ()" );
ClientPingRequest *request = (ClientPingRequest *)m.GetBuffer();
request->streamid[0] = a->index;
request->requestid = kXR_ping;
request->dlen = 0;
XRootDTransport::MarshallRequest( &m );

for( int i = 0; i < 100; ++i )
{
msgs[i].Allocate( sizeof( ClientPingRequest ) );
msgs[i].Zero();
msgs[i].SetDescription( "kXR_ping ()" );
ClientPingRequest *request = (ClientPingRequest *)msgs[i].GetBuffer();
request->streamid[0] = a->index;
request->requestid = kXR_ping;
request->dlen = 0;
XRootDTransport::MarshallRequest( &msgs[i] );
request->streamid[1] = i;
CPPUNIT_ASSERT_XRDST( a->pm->Send( host, &m, false, expires ) );
msgHandlers[i].SetFilter( a->index, i );
CPPUNIT_ASSERT_XRDST( a->pm->Send( host, &msgs[i], &msgHandlers[i], false, expires ) );
}

//----------------------------------------------------------------------------
// Receive the answers
//----------------------------------------------------------------------------
for( int i = 0; i < 100; ++i )
{
std::shared_ptr<Message> m;
f.streamId[1] = i;
CPPUNIT_ASSERT_XRDST( a->pm->Receive( host, m, &f, expires ) );
ServerResponse *resp = (ServerResponse *)m->GetBuffer();
XrdCl::Message msg;
CPPUNIT_ASSERT_XRDST( msgHandlers[i].WaitFor( msg ) );
ServerResponse *resp = (ServerResponse *)msg.GetBuffer();
CPPUNIT_ASSERT( resp != 0 );
CPPUNIT_ASSERT( resp->hdr.status == kXR_ok );
CPPUNIT_ASSERT( m->GetSize() == 8 );
CPPUNIT_ASSERT( msg.GetSize() == 8 );
}
return 0;
}
Expand Down Expand Up @@ -217,11 +326,12 @@ void PostMasterTest::FunctionalTest()
// Send a message and wait for the answer
//----------------------------------------------------------------------------
time_t expires = ::time(0)+1200;
Message m1;
std::shared_ptr<Message> m2;
XrdFilter f1( 1, 2 );
Message m1, m2;
URL host( address );

SyncMsgHandler msgHandler1;
msgHandler1.SetFilter( 1, 2 );

m1.Allocate( sizeof( ClientPingRequest ) );
m1.Zero();

Expand All @@ -232,31 +342,30 @@ void PostMasterTest::FunctionalTest()
request->dlen = 0;
XRootDTransport::MarshallRequest( &m1 );

CPPUNIT_ASSERT_XRDST( postMaster.Send( host, &m1, false, expires ) );
CPPUNIT_ASSERT_XRDST( postMaster.Send( host, &m1, &msgHandler1, false, expires ) );

CPPUNIT_ASSERT_XRDST( postMaster.Receive( host, m2, &f1, expires ) );
ServerResponse *resp = (ServerResponse *)m2->GetBuffer();
CPPUNIT_ASSERT_XRDST( msgHandler1.WaitFor( m2 ) );
ServerResponse *resp = (ServerResponse *)m2.GetBuffer();
CPPUNIT_ASSERT( resp != 0 );
CPPUNIT_ASSERT( resp->hdr.status == kXR_ok );
CPPUNIT_ASSERT( m2->GetSize() == 8 );

//----------------------------------------------------------------------------
// Wait for an answer to a message that has not been sent - test the
// reception timeout
//----------------------------------------------------------------------------
CPPUNIT_ASSERT_XRDST_NOTOK( postMaster.Receive( host, m2, &f1, 2 ),
errOperationExpired );
CPPUNIT_ASSERT( m2.GetSize() == 8 );

//----------------------------------------------------------------------------
// Send out some stuff to a location where nothing listens
//----------------------------------------------------------------------------
env->PutInt( "ConnectionWindow", 5 );
env->PutInt( "ConnectionRetry", 3 );
URL localhost1( "root://localhost:10101" );
CPPUNIT_ASSERT_XRDST_NOTOK( postMaster.Send( localhost1, &m1, false,
::time(0)+3 ),
errOperationExpired );
CPPUNIT_ASSERT_XRDST_NOTOK( postMaster.Send( localhost1, &m1, false,

SyncMsgHandler msgHandler2;
msgHandler2.SetFilter( 1, 2 );
CPPUNIT_ASSERT_XRDST( postMaster.Send( localhost1, &m1, &msgHandler2, false,
::time(0)+3 ) );
CPPUNIT_ASSERT_XRDST_NOTOK( msgHandler2.WaitFor( m2 ), errOperationExpired );

SyncMsgHandler msgHandler3;
msgHandler3.SetFilter( 1, 2 );
CPPUNIT_ASSERT_XRDST_NOTOK( postMaster.Send( localhost1, &m1, &msgHandler3, false,
expires ),
errConnectionError );

Expand Down Expand Up @@ -285,7 +394,7 @@ void PostMasterTest::FunctionalTest()
postMaster.Initialize();
postMaster.Start();

m2.reset();
m2.Free();
m1.Zero();

request = (ClientPingRequest *)m1.GetBuffer();
Expand All @@ -295,26 +404,30 @@ void PostMasterTest::FunctionalTest()
request->dlen = 0;
XRootDTransport::MarshallRequest( &m1 );

CPPUNIT_ASSERT_XRDST( postMaster.Send( host, &m1, false, expires ) );
SyncMsgHandler msgHandler4;
msgHandler4.SetFilter( 1, 2 );
CPPUNIT_ASSERT_XRDST( postMaster.Send( host, &m1, &msgHandler4, false, expires ) );

CPPUNIT_ASSERT_XRDST( postMaster.Receive( host, m2, &f1, expires ) );
resp = (ServerResponse *)m2->GetBuffer();
CPPUNIT_ASSERT_XRDST( msgHandler4.WaitFor( m2 ) );
resp = (ServerResponse *)m2.GetBuffer();
CPPUNIT_ASSERT( resp != 0 );
CPPUNIT_ASSERT( resp->hdr.status == kXR_ok );
CPPUNIT_ASSERT( m2->GetSize() == 8 );
CPPUNIT_ASSERT( m2.GetSize() == 8 );

//----------------------------------------------------------------------------
// Sleep 10 secs waiting for iddle connection to be closed and see
// whether we can reconnect
//----------------------------------------------------------------------------
sleep( 10 );
CPPUNIT_ASSERT_XRDST( postMaster.Send( host, &m1, false, expires ) );
SyncMsgHandler msgHandler5;
msgHandler5.SetFilter( 1, 2 );
CPPUNIT_ASSERT_XRDST( postMaster.Send( host, &m1, &msgHandler5, false, expires ) );

CPPUNIT_ASSERT_XRDST( postMaster.Receive( host, m2, &f1, expires ) );
resp = (ServerResponse *)m2->GetBuffer();
CPPUNIT_ASSERT_XRDST( msgHandler5.WaitFor( m2 ) );
resp = (ServerResponse *)m2.GetBuffer();
CPPUNIT_ASSERT( resp != 0 );
CPPUNIT_ASSERT( resp->hdr.status == kXR_ok );
CPPUNIT_ASSERT( m2->GetSize() == 8 );
CPPUNIT_ASSERT( m2.GetSize() == 8 );
}


Expand Down Expand Up @@ -441,26 +554,31 @@ void PostMasterTest::MultiIPConnectionTest()
//----------------------------------------------------------------------------
// Sent ping to a nonexistent host
//----------------------------------------------------------------------------
SyncMsgHandler msgHandler1;
msgHandler1.SetFilter( 1, 2 );
Message *m = CreatePing( 1, 2 );
CPPUNIT_ASSERT_XRDST_NOTOK( postMaster.Send( url1, m, false, expires ),
CPPUNIT_ASSERT_XRDST_NOTOK( postMaster.Send( url1, m, &msgHandler1, false, expires ),
errInvalidAddr );

//----------------------------------------------------------------------------
// Try on the wrong port
//----------------------------------------------------------------------------
CPPUNIT_ASSERT_XRDST_NOTOK( postMaster.Send( url2, m, false, expires ),
SyncMsgHandler msgHandler2;
msgHandler2.SetFilter( 1, 2 );
CPPUNIT_ASSERT_XRDST_NOTOK( postMaster.Send( url2, m, &msgHandler2, false, expires ),
errConnectionError );

//----------------------------------------------------------------------------
// Try on a good one
//----------------------------------------------------------------------------
std::shared_ptr<Message> m2 = 0;
XrdFilter f1( 1, 2 );
SyncMsgHandler msgHandler3;
msgHandler1.SetFilter( 1, 2 );
Message m2;

CPPUNIT_ASSERT_XRDST( postMaster.Send( url3, m, false, expires ) );
CPPUNIT_ASSERT_XRDST( postMaster.Receive( url3, m2, &f1, expires ) );
ServerResponse *resp = (ServerResponse *)m2->GetBuffer();
CPPUNIT_ASSERT_XRDST( postMaster.Send( url3, m, &msgHandler3, false, expires ) );
CPPUNIT_ASSERT_XRDST( msgHandler3.WaitFor( m2 ) );
ServerResponse *resp = (ServerResponse *)m2.GetBuffer();
CPPUNIT_ASSERT( resp != 0 );
CPPUNIT_ASSERT( resp->hdr.status == kXR_ok );
CPPUNIT_ASSERT( m2->GetSize() == 8 );
CPPUNIT_ASSERT( m2.GetSize() == 8 );
}

0 comments on commit 9a0c34c

Please sign in to comment.