Skip to content

Commit

Permalink
[Server] Implement gStream to monitor all http and xroot TPC events.
Browse files Browse the repository at this point in the history
  • Loading branch information
abh3 committed Apr 6, 2022
1 parent d0ad178 commit 0fadbee
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 11 deletions.
8 changes: 7 additions & 1 deletion src/XrdHttp/XrdHttpExtHandler.cc
Expand Up @@ -21,7 +21,7 @@
//------------------------------------------------------------------------------



#include "Xrd/XrdLink.hh"
#include "XrdHttpExtHandler.hh"
#include "XrdHttpReq.hh"
#include "XrdHttpProtocol.hh"
Expand Down Expand Up @@ -70,6 +70,12 @@ int XrdHttpExtReq::BuffgetData(int blen, char **data, bool wait) {
return nb;
}

void XrdHttpExtReq::GetClientID(std::string &clid)
{
char buff[512];
prot->Link->Client(buff, sizeof(buff));
clid = buff;
}

const XrdSecEntity &XrdHttpExtReq::GetSecEntity() const
{
Expand Down
3 changes: 3 additions & 0 deletions src/XrdHttp/XrdHttpExtHandler.hh
Expand Up @@ -54,6 +54,9 @@ public:

std::string clientdn, clienthost, clientgroups;
long long length;

// Get full client identifier
void GetClientID(std::string &clid);

// A view of the XrdSecEntity associated with the request.
const XrdSecEntity &GetSecEntity() const;
Expand Down
25 changes: 20 additions & 5 deletions src/XrdOfs/XrdOfsTPCProg.cc
Expand Up @@ -33,6 +33,7 @@
#include <unistd.h>
#include <sys/stat.h>

#include "XrdNet/XrdNetIdentity.hh"
#include "XrdOfs/XrdOfsTPC.hh"
#include "XrdOfs/XrdOfsTPCConfig.hh"
#include "XrdOfs/XrdOfsTPCJob.hh"
Expand Down Expand Up @@ -188,9 +189,11 @@ void XrdOfsTPCProg::Run()
{
XrdXrootdTpcMon::TpcInfo monInfo;
struct stat Stat;
const char *clID, *at;
char *questSrc, *questLfn, *questDst;
int rc;
bool doMon = Cfg.tpcMon != 0;
bool isIPv4, doMon = Cfg.tpcMon != 0;
char clBuff[592];

// Run the current job and indicate it's ending status and possibly getting a
// another job to run. Note "Job" will always be valid.
Expand All @@ -200,7 +203,7 @@ do{if (doMon)
gettimeofday(&monInfo.begT, 0);
}

rc = Xeq();
rc = Xeq(isIPv4);

if (doMon)
{gettimeofday(&monInfo.endT, 0);
Expand All @@ -210,7 +213,17 @@ do{if (doMon)
monInfo.dstURL = Job->Info.Lfn;
monInfo.endRC = rc;
if (Job->Info.Str) monInfo.strm = Job->Info.Str;
//??? Need to set ipv4/6 bit
if (isIPv4) monInfo.opts |= XrdXrootdTpcMon::TpcInfo::isIPv4;

clID = Job->Info.Org;
if (clID && (at = index(clID, '@')) && !index(at+1, '.'))
{const char *dName = XrdNetIdentity::Domain();
if (dName)
{snprintf(clBuff, sizeof(clBuff), "%s%s", clID, dName);
clID = clBuff;
}
}
monInfo.clID = clID;

if ((questDst = index(Job->Info.Dst, '?'))) *questDst = 0;
if (!XrdOfsOss->Stat(Job->Info.Dst, &Stat)) monInfo.fSize = Stat.st_size;
Expand Down Expand Up @@ -264,7 +277,7 @@ XrdOfsTPCProg *XrdOfsTPCProg::Start(XrdOfsTPCJob *jP, int &rc)
/* X e q */
/******************************************************************************/

int XrdOfsTPCProg::Xeq()
int XrdOfsTPCProg::Xeq(bool &isIPv4)
{
EPNAME("Xeq");
credFile cFile(Job);
Expand Down Expand Up @@ -358,8 +371,10 @@ int XrdOfsTPCProg::Xeq()
// be printed as an error message should the copy fail.
//
*eRec = 0;
isIPv4 = false;
while((lP = JobStream.GetLine()))
{if ((Colon = index(lP, ':')) && *(Colon+1) == ' ')
{if (!strcmp(lP, "!-!IPv4")) isIPv4 = true;
if ((Colon = index(lP, ':')) && *(Colon+1) == ' ')
{strncpy(eRec, Colon+2, sizeof(eRec)-1);
eRec[sizeof(eRec)-1] = 0;
}
Expand Down
2 changes: 1 addition & 1 deletion src/XrdOfs/XrdOfsTPCProg.hh
Expand Up @@ -50,7 +50,7 @@ static int Init();
static
XrdOfsTPCProg *Start(XrdOfsTPCJob *jP, int &rc);

int Xeq();
int Xeq(bool &isIPv4);

XrdOfsTPCProg(XrdOfsTPCProg *Prev, int num, int errMon);

Expand Down
3 changes: 3 additions & 0 deletions src/XrdTpc/XrdTpcTPC.cc
Expand Up @@ -49,6 +49,7 @@ TPCHandler::TPCLogRecord::~TPCLogRecord()
if (tpcMonitor)
{XrdXrootdTpcMon::TpcInfo monInfo;

monInfo.clID = clID.c_str();
monInfo.begT = begT;
gettimeofday(&monInfo.endT, 0);

Expand Down Expand Up @@ -803,6 +804,7 @@ int TPCHandler::ProcessPushReq(const std::string & resource, XrdHttpExtReq &req)
rec.local = req.resource;
rec.remote = resource;
char *name = req.GetSecEntity().name;
req.GetClientID(rec.clID);
if (name) rec.name = name;
logTransferEvent(LogMask::Info, rec, "PUSH_START", "Starting a push request");

Expand Down Expand Up @@ -882,6 +884,7 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
rec.local = req.resource;
rec.remote = resource;
char *name = req.GetSecEntity().name;
req.GetClientID(rec.clID);
if (name) rec.name = name;
logTransferEvent(LogMask::Info, rec, "PULL_START", "Starting a pull request");

Expand Down
1 change: 1 addition & 0 deletions src/XrdTpc/XrdTpcTPC.hh
Expand Up @@ -68,6 +68,7 @@ private:
std::string local;
std::string remote;
std::string name;
std::string clID;
static XrdXrootdTpcMon* tpcMonitor;
timeval begT;
off_t bytes_transferred;
Expand Down
6 changes: 3 additions & 3 deletions src/XrdXrootd/XrdXrootdTpcMon.cc
Expand Up @@ -41,9 +41,9 @@

namespace
{
const char *json_fmt = "{\"TPC\":\"%s\","
const char *json_fmt = "{\"TPC\":\"%s\",\"Client\":\"%s\","
"\"Xeq\":{\"Beg\":\"%s\",\"End\":\"%s\",\"RC\":%d,\"Strm\":%u,\"Type\":\"%s\","
"\"IPv\":%c},},"
"\"IPv\":%c},"
"\"Src\":\"%s\",\"Dst\":\"%s\",\"Size\":%d}";

const char *urlFMT = "";
Expand Down Expand Up @@ -133,7 +133,7 @@ void XrdXrootdTpcMon::Report(XrdXrootdTpcMon::TpcInfo &info)

// Format the line
//
int n = snprintf(buff, sizeof(buff), json_fmt, protocol,
int n = snprintf(buff, sizeof(buff), json_fmt, protocol, info.clID,
getUTC(info.begT, bt_buff, sizeof(bt_buff)),
getUTC(info.endT, et_buff, sizeof(et_buff)),
info.endRC, static_cast<unsigned int>(info.strm),
Expand Down
4 changes: 3 additions & 1 deletion src/XrdXrootd/XrdXrootdTpcMon.hh
Expand Up @@ -43,6 +43,7 @@ public:

struct TpcInfo
{
const char* clID; // Client ID
struct timeval begT; // gettimeofday copy started
struct timeval endT; // gettimeofday copy ended
const char* srcURL; // The source URL used
Expand All @@ -56,7 +57,8 @@ unsigned char rsvd; // Reserved
static const int isaPush = 0x0001; // opts: Push request otherwise a pull
static const int isIPv4 = 0x0002; // opts: Used IPv4 for xfr else IPv6.

void Init() {begT.tv_sec = 0; begT.tv_usec = 0;
void Init() {clID = "";
begT.tv_sec = 0; begT.tv_usec = 0;
endT.tv_sec = 0; endT.tv_usec = 0;
srcURL = ""; dstURL = "";
fSize = 0; endRC = 0,
Expand Down

0 comments on commit 0fadbee

Please sign in to comment.