Skip to content

Commit

Permalink
[XrdCl] xrdreplay: call close only on idle file objects.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Jan 26, 2022
1 parent 88badb3 commit 1048bce
Showing 1 changed file with 79 additions and 51 deletions.
130 changes: 79 additions & 51 deletions src/XrdApps/XrdClRecordPlugin/XrdClReplay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,36 @@ class mytimer_t
std::chrono::time_point<clock_t> start; //< registered start time
};

//------------------------------------------------------------------------------
//! Barrier for synchronizing the asynchronous execution of actions
//! It is actually a wrapper around semaphore.
//------------------------------------------------------------------------------
class barrier_t
{
public:
//------------------------------------------------------------------------
//! Constructor
//! @param sem : the semaphore
//------------------------------------------------------------------------
barrier_t( XrdSysSemaphore &sem ) : sem( sem ) { }

//------------------------------------------------------------------------
//! Destructor
//------------------------------------------------------------------------
~barrier_t()
{
sem.Post();
}

inline XrdSysSemaphore& get()
{
return sem;
}

private:
XrdSysSemaphore &sem; //< the semaphore to be posted
};

//------------------------------------------------------------------------------
//! Executes an action registered in the csv file
//------------------------------------------------------------------------------
Expand All @@ -75,28 +105,6 @@ class ActionExecutor

public:

//--------------------------------------------------------------------------
//! Semaphore wrapper that posts the semaphore on destruction
//--------------------------------------------------------------------------
struct action_sync
{
//------------------------------------------------------------------------
//! Constructor
//! @param sem : the semaphore
//------------------------------------------------------------------------
action_sync( XrdSysSemaphore &sem ) : sem( sem ) { }

//------------------------------------------------------------------------
//! Destructor
//------------------------------------------------------------------------
~action_sync()
{
sem.Post();
}

XrdSysSemaphore &sem; //< the semaphore to be posted
};

//--------------------------------------------------------------------------
//! Constructor
//! @param file : the file that should be the context of the action
Expand All @@ -119,9 +127,10 @@ class ActionExecutor

//--------------------------------------------------------------------------
//! Execute the action
//! @param sync : synchronization object
//! @param ending : synchronization object for ending the execution
//--------------------------------------------------------------------------
void Execute( std::shared_ptr<action_sync> sync )
void Execute( std::shared_ptr<barrier_t> &ending,
std::shared_ptr<barrier_t> &closing )
{
if( action == "Open" ) // open action
{
Expand All @@ -132,22 +141,29 @@ class ActionExecutor
std::tie( url, flags, mode, timeout ) = GetOpenArgs();
std::string tmp( orgststr );
WaitFor( Open( file, url, flags, mode, timeout ) >>
[tmp, sync]( XRootDStatus &s ) mutable
[tmp, ending, closing]( XRootDStatus &s ) mutable
{
HandleStatus( s, tmp );
sync.reset();
ending.reset();
closing.reset();
} );

}
else if( action == "Close" ) // close action
{
uint16_t timeout = GetCloseArgs();
std::string tmp( orgststr );
if( closing )
{
auto &sem = closing->get();
closing.reset();
sem.Wait();
}
Async( Close( file, timeout ) >>
[tmp, sync]( XRootDStatus &s ) mutable
[tmp, ending]( XRootDStatus &s ) mutable
{
HandleStatus( s, tmp );
sync.reset();
ending.reset();
} );
}
else if( action == "Stat" ) // stat action
Expand All @@ -157,10 +173,11 @@ class ActionExecutor
std::tie( force, timeout ) = GetStatArgs();
std::string tmp( orgststr );
Async( Stat( file, force, timeout ) >>
[tmp, sync]( XRootDStatus &s, StatInfo &r ) mutable
[tmp, ending, closing]( XRootDStatus &s, StatInfo &r ) mutable
{
HandleStatus( s, tmp );
sync.reset();
ending.reset();
closing.reset();
} );

}
Expand All @@ -171,11 +188,12 @@ class ActionExecutor
uint16_t timeout;
std::tie( offset, buffer, timeout ) = GetReadArgs();
Async( Read( file, offset, buffer->size(), buffer->data(), timeout ) >>
[buffer, orgststr{ orgststr }, sync]( XRootDStatus &s, ChunkInfo &r ) mutable
[buffer, orgststr{ orgststr }, ending, closing]( XRootDStatus &s, ChunkInfo &r ) mutable
{
HandleStatus( s, orgststr );
buffer.reset();
sync.reset();
ending.reset();
closing.reset();
} );
}
else if( action == "PgRead" ) // pgread action
Expand All @@ -185,11 +203,12 @@ class ActionExecutor
uint16_t timeout;
std::tie( offset, buffer, timeout ) = GetPgReadArgs();
Async( PgRead( file, offset, buffer->size(), buffer->data(), timeout ) >>
[buffer, orgststr{ orgststr }, sync]( XRootDStatus &s, PageInfo &r ) mutable
[buffer, orgststr{ orgststr }, ending, closing]( XRootDStatus &s, PageInfo &r ) mutable
{
HandleStatus( s, orgststr );
buffer.reset();
sync.reset();
ending.reset();
closing.reset();
} );
}
else if( action == "Write" ) // write action
Expand All @@ -199,11 +218,12 @@ class ActionExecutor
uint16_t timeout;
std::tie( offset, buffer, timeout ) = GetWriteArgs();
Async( Write( file, offset, buffer->size(), buffer->data(), timeout ) >>
[buffer, orgststr{ orgststr }, sync]( XRootDStatus &s ) mutable
[buffer, orgststr{ orgststr }, ending, closing]( XRootDStatus &s ) mutable
{
HandleStatus( s, orgststr );
buffer.reset();
sync.reset();
ending.reset();
closing.reset();
} );
}
else if( action == "PgWrite" ) // pgwrite action
Expand All @@ -213,22 +233,24 @@ class ActionExecutor
uint16_t timeout;
std::tie( offset, buffer, timeout ) = GetPgWriteArgs();
Async( PgWrite( file, offset, buffer->size(), buffer->data(), timeout ) >>
[buffer, orgststr{ orgststr }, sync]( XRootDStatus &s ) mutable
[buffer, orgststr{ orgststr }, ending, closing]( XRootDStatus &s ) mutable
{
HandleStatus( s, orgststr );
buffer.reset();
sync.reset();
ending.reset();
closing.reset();
} );
}
else if( action == "Sync" ) // sync action
{
uint16_t timeout = GetSyncArgs();
std::string tmp( orgststr );
Async( Sync( file, timeout ) >>
[tmp, sync]( XRootDStatus &s ) mutable
[tmp, ending, closing]( XRootDStatus &s ) mutable
{
HandleStatus( s, tmp );
sync.reset();
ending.reset();
closing.reset();
} );
}
else if( action == "Truncate" ) // truncate action
Expand All @@ -238,10 +260,11 @@ class ActionExecutor
std::tie( size, timeout ) = GetTruncateArgs();
std::string tmp( orgststr );
Async( Truncate( file, size, timeout ) >>
[tmp, sync]( XRootDStatus &s ) mutable
[tmp, ending, closing]( XRootDStatus &s ) mutable
{
HandleStatus( s, tmp );
sync.reset();
ending.reset();
closing.reset();
} );
}
else if( action == "VectorRead" ) // vector read action
Expand All @@ -251,12 +274,13 @@ class ActionExecutor
std::tie( chunks, timeout ) = GetVectorReadArgs();
std::string tmp( orgststr );
Async( VectorRead( file, chunks, timeout ) >>
[chunks, tmp, sync]( XRootDStatus &s, VectorReadInfo &r ) mutable
[chunks, tmp, ending, closing]( XRootDStatus &s, VectorReadInfo &r ) mutable
{
HandleStatus( s, tmp );
for( auto &ch : chunks )
delete[] (char*)ch.buffer;
sync.reset();
ending.reset();
closing.reset();
} );

}
Expand All @@ -267,12 +291,13 @@ class ActionExecutor
std::tie( chunks, timeout ) = GetVectorWriteArgs();
std::string tmp( orgststr );
Async( VectorWrite( file, chunks, timeout ) >>
[chunks, tmp, sync]( XRootDStatus &s ) mutable
[chunks, tmp, ending, closing]( XRootDStatus &s ) mutable
{
HandleStatus( s, tmp );
for( auto &ch : chunks )
delete[] (char*)ch.buffer;
sync.reset();
ending.reset();
closing.reset();
} );
}
else
Expand Down Expand Up @@ -485,8 +510,10 @@ std::thread ExecuteActions( std::unique_ptr<File> file, action_list &&actions )
{
std::thread t( [file{ std::move( file ) }, actions{ std::move( actions ) }]() mutable
{
XrdSysSemaphore sem( 0 );
auto sync = std::make_shared<ActionExecutor::action_sync>( sem );
XrdSysSemaphore endsem( 0 );
XrdSysSemaphore closesem( 0 );
auto ending = std::make_shared<barrier_t>( endsem );
auto closing = std::make_shared<barrier_t>( closesem );
auto prevstop = actions.begin()->first;
for( auto &p : actions )
{
Expand All @@ -495,12 +522,13 @@ std::thread ExecuteActions( std::unique_ptr<File> file, action_list &&actions )
prevstop = p.first;
auto &action = p.second;
mytimer_t timer;
action.Execute( sync );
action.Execute( ending, closing );
uint64_t duration = timer.elapsed();
prevstop += duration;
}
sync.reset();
sem.Wait();
ending.reset();
closing.reset();
endsem.Wait();
file.reset();
} );
return t;
Expand Down

0 comments on commit 1048bce

Please sign in to comment.