Skip to content

Commit

Permalink
[XrdCl] record/replay : report avg action duration.
Browse files Browse the repository at this point in the history
  • Loading branch information
simonmichal committed Feb 3, 2022
1 parent d3f9686 commit bf64501
Showing 1 changed file with 116 additions and 15 deletions.
131 changes: 116 additions & 15 deletions src/XrdApps/XrdClRecordPlugin/XrdClReplay.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,76 @@
#include <chrono>
#include <iostream>
#include <thread>
#include <chrono>

namespace XrdCl
{

inline uint64_t get_time()
{
return std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch() ).count();
}

class ActionStatistics
{
public:
static ActionStatistics& Instance()
{
static ActionStatistics instance;
return instance;
}

void UpdateOrg( const std::string &action, uint64_t duration )
{
std::unique_lock<std::mutex> lck( mtx );
auto &tpl = orgstats[action];
std::get<0>( tpl ) += duration;
++std::get<1>( tpl );
}

void UpdateAct( const std::string &action, uint64_t duration )
{
std::unique_lock<std::mutex> lck( mtx );
auto &tpl = actstats[action];
std::get<0>( tpl ) += duration;
++std::get<1>( tpl );
}

void Print()
{
std::cout << "Reference average durations per action:\n";
for( auto &p : orgstats )
{
auto &tpl = p.second;
double avg = double( std::get<0>( tpl ) ) / double( std::get<1>( tpl ) );
std::cout << '\t' << p.first << "\t: " << avg << "s\n";
}
std::cout << "Average durations per action:\n";
for( auto &p : actstats )
{
auto &tpl = p.second;
double avg = double( std::get<0>( tpl ) ) / double( std::get<1>( tpl ) );
std::cout << '\t' << p.first << "\t: " << avg << "s\n";
}
}

private:

ActionStatistics(){};

ActionStatistics( ActionStatistics&& ) = delete;
ActionStatistics( const ActionStatistics& ) = delete;

ActionStatistics& operator=( ActionStatistics&& ) = delete;
ActionStatistics& operator=( const ActionStatistics& ) = delete;

std::mutex mtx;
std::unordered_map<std::string, std::tuple<uint64_t, size_t>> orgstats;
std::unordered_map<std::string, std::tuple<uint64_t, size_t>> actstats;
};


//------------------------------------------------------------------------------
//! Timer helper class
//------------------------------------------------------------------------------
Expand Down Expand Up @@ -118,11 +184,15 @@ class ActionExecutor
const std::string &action,
const std::string &args,
const std::string &orgststr,
const std::string &resp ) :
const std::string &resp,
uint64_t start,
uint64_t stop ) :
file( file ),
action( action ),
args( args ),
orgststr( orgststr )
orgststr( orgststr ),
orgstart( start ),
orgstop( stop )
{
}

Expand All @@ -133,6 +203,7 @@ class ActionExecutor
void Execute( std::shared_ptr<barrier_t> &ending,
std::shared_ptr<barrier_t> &closing )
{
uint64_t start = get_time();
if( action == "Open" ) // open action
{
std::string url;
Expand All @@ -142,8 +213,10 @@ class ActionExecutor
std::tie( url, flags, mode, timeout ) = GetOpenArgs();
std::string tmp( orgststr );
WaitFor( Open( file, url, flags, mode, timeout ) >>
[tmp, ending, closing]( XRootDStatus &s ) mutable
[tmp, ending, closing, start]( XRootDStatus &s ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "Open", duration );
HandleStatus( s, tmp );
ending.reset();
closing.reset();
Expand All @@ -161,8 +234,10 @@ class ActionExecutor
sem.Wait();
}
Async( Close( file, timeout ) >>
[tmp, ending]( XRootDStatus &s ) mutable
[tmp, ending, start]( XRootDStatus &s ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "Close", duration );
HandleStatus( s, tmp );
ending.reset();
} );
Expand All @@ -174,8 +249,10 @@ class ActionExecutor
std::tie( force, timeout ) = GetStatArgs();
std::string tmp( orgststr );
Async( Stat( file, force, timeout ) >>
[tmp, ending, closing]( XRootDStatus &s, StatInfo &r ) mutable
[tmp, ending, closing, start]( XRootDStatus &s, StatInfo &r ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "Stat", duration );
HandleStatus( s, tmp );
ending.reset();
closing.reset();
Expand All @@ -189,8 +266,10 @@ class ActionExecutor
uint16_t timeout;
std::tie( offset, buffer, timeout ) = GetReadArgs();
Async( Read( file, offset, buffer->size(), buffer->data(), timeout ) >>
[buffer, orgststr{ orgststr }, ending, closing]( XRootDStatus &s, ChunkInfo &r ) mutable
[buffer, orgststr{ orgststr }, ending, closing, start]( XRootDStatus &s, ChunkInfo &r ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "Read", duration );
HandleStatus( s, orgststr );
buffer.reset();
ending.reset();
Expand All @@ -204,8 +283,10 @@ class ActionExecutor
uint16_t timeout;
std::tie( offset, buffer, timeout ) = GetPgReadArgs();
Async( PgRead( file, offset, buffer->size(), buffer->data(), timeout ) >>
[buffer, orgststr{ orgststr }, ending, closing]( XRootDStatus &s, PageInfo &r ) mutable
[buffer, orgststr{ orgststr }, ending, closing, start]( XRootDStatus &s, PageInfo &r ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "PgRead", duration );
HandleStatus( s, orgststr );
buffer.reset();
ending.reset();
Expand All @@ -219,8 +300,10 @@ class ActionExecutor
uint16_t timeout;
std::tie( offset, buffer, timeout ) = GetWriteArgs();
Async( Write( file, offset, buffer->size(), buffer->data(), timeout ) >>
[buffer, orgststr{ orgststr }, ending, closing]( XRootDStatus &s ) mutable
[buffer, orgststr{ orgststr }, ending, closing, start]( XRootDStatus &s ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "Write", duration );
HandleStatus( s, orgststr );
buffer.reset();
ending.reset();
Expand All @@ -234,8 +317,10 @@ class ActionExecutor
uint16_t timeout;
std::tie( offset, buffer, timeout ) = GetPgWriteArgs();
Async( PgWrite( file, offset, buffer->size(), buffer->data(), timeout ) >>
[buffer, orgststr{ orgststr }, ending, closing]( XRootDStatus &s ) mutable
[buffer, orgststr{ orgststr }, ending, closing, start]( XRootDStatus &s ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "PgWrite", duration );
HandleStatus( s, orgststr );
buffer.reset();
ending.reset();
Expand All @@ -247,8 +332,10 @@ class ActionExecutor
uint16_t timeout = GetSyncArgs();
std::string tmp( orgststr );
Async( Sync( file, timeout ) >>
[tmp, ending, closing]( XRootDStatus &s ) mutable
[tmp, ending, closing, start]( XRootDStatus &s ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "Sync", duration );
HandleStatus( s, tmp );
ending.reset();
closing.reset();
Expand All @@ -261,8 +348,10 @@ class ActionExecutor
std::tie( size, timeout ) = GetTruncateArgs();
std::string tmp( orgststr );
Async( Truncate( file, size, timeout ) >>
[tmp, ending, closing]( XRootDStatus &s ) mutable
[tmp, ending, closing, start]( XRootDStatus &s ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "Truncate", duration );
HandleStatus( s, tmp );
ending.reset();
closing.reset();
Expand All @@ -275,8 +364,10 @@ class ActionExecutor
std::tie( chunks, timeout ) = GetVectorReadArgs();
std::string tmp( orgststr );
Async( VectorRead( file, chunks, timeout ) >>
[chunks, tmp, ending, closing]( XRootDStatus &s, VectorReadInfo &r ) mutable
[chunks, tmp, ending, closing, start]( XRootDStatus &s, VectorReadInfo &r ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "VectorRead", duration );
HandleStatus( s, tmp );
for( auto &ch : chunks )
delete[] (char*)ch.buffer;
Expand All @@ -292,8 +383,10 @@ class ActionExecutor
std::tie( chunks, timeout ) = GetVectorWriteArgs();
std::string tmp( orgststr );
Async( VectorWrite( file, chunks, timeout ) >>
[chunks, tmp, ending, closing]( XRootDStatus &s ) mutable
[chunks, tmp, ending, closing, start]( XRootDStatus &s ) mutable
{
uint64_t duration = get_time() - start;
ActionStatistics::Instance().UpdateAct( "VectorWrite", duration );
HandleStatus( s, tmp );
for( auto &ch : chunks )
delete[] (char*)ch.buffer;
Expand Down Expand Up @@ -451,6 +544,8 @@ class ActionExecutor
const std::string action; //< the action to be executed
const std::string args; //< arguments for the operation
std::string orgststr; //< the original response status of the action
uint64_t orgstart; //< original start time
uint64_t orgstop; //< original stop time
};

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -487,15 +582,19 @@ std::unordered_map<File*, action_list> ParseInput( const std::string &path )
std::string action = tokens[1]; // action name (e.g. Open)
uint64_t start = std::stoull( tokens[2] ); // start time
std::string args = tokens[3]; // operation arguments
// uint64_t stop = std::stoull( tokens[4] ); // stop time
uint64_t stop = std::stoull( tokens[4] ); // stop time
std::string status = tokens[5]; // operation status
std::string resp = tokens[6]; // server response

// update statistics
ActionStatistics::Instance().UpdateOrg( action, stop - start );

if( !files.count( id ) )
{
files[id] = new File( false );
files[id]->SetProperty( "BundledClose", "true" );
}
result[files[id]].emplace( start, ActionExecutor( *files[id], action, args, status, resp ) );
result[files[id]].emplace( start, ActionExecutor( *files[id], action, args, status, resp, start, stop ) );
}

return result;
Expand Down Expand Up @@ -559,6 +658,8 @@ int main( int argc, char **argv )
}
for( auto &t : threads ) // wait until we are done
t.join();

XrdCl::ActionStatistics::Instance().Print();
}
catch( const std::invalid_argument &ex )
{
Expand Down

0 comments on commit bf64501

Please sign in to comment.