Skip to content

Commit

Permalink
Merge pull request #1801 from ccaffy/xrdhttptpc-pass-filesize-ofs-layer
Browse files Browse the repository at this point in the history
[XrdHttpTPC] TPC-Pull: passing the source file size information to the OFS layer via opaque parameter "oss.asize"
  • Loading branch information
simonmichal committed Oct 14, 2022
2 parents 24a75a3 + 725e070 commit 644ea31
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 53 deletions.
34 changes: 19 additions & 15 deletions src/XrdTpc/XrdTpcState.cc
Expand Up @@ -38,12 +38,14 @@ void State::Move(State &other)
m_headers = other.m_headers;
m_headers_copy = other.m_headers_copy;
m_resp_protocol = other.m_resp_protocol;

m_is_transfer_state = other.m_is_transfer_state;
curl_easy_setopt(m_curl, CURLOPT_HEADERDATA, this);
if (m_push) {
curl_easy_setopt(m_curl, CURLOPT_READDATA, this);
} else {
curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this);
if (m_is_transfer_state) {
if (m_push) {
curl_easy_setopt(m_curl, CURLOPT_READDATA, this);
} else {
curl_easy_setopt(m_curl, CURLOPT_WRITEDATA, this);
}
}
other.m_headers_copy.clear();
other.m_curl = NULL;
Expand All @@ -56,17 +58,19 @@ bool State::InstallHandlers(CURL *curl) {
curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XrdVERSION);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &State::HeaderCB);
curl_easy_setopt(curl, CURLOPT_HEADERDATA, this);
if (m_push) {
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB);
curl_easy_setopt(curl, CURLOPT_READDATA, this);
struct stat buf;
if (SFS_OK == m_stream->Stat(&buf)) {
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size);
if(m_is_transfer_state) {
if (m_push) {
curl_easy_setopt(curl, CURLOPT_UPLOAD, 1);
curl_easy_setopt(curl, CURLOPT_READFUNCTION, &State::ReadCB);
curl_easy_setopt(curl, CURLOPT_READDATA, this);
struct stat buf;
if (SFS_OK == m_stream->Stat(&buf)) {
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size);
}
} else {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
}
} else {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &State::WriteCB);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
}
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);

Expand Down
28 changes: 26 additions & 2 deletions src/XrdTpc/XrdTpcState.hh
Expand Up @@ -31,9 +31,31 @@ public:
m_content_length(-1),
m_stream(NULL),
m_curl(NULL),
m_headers(NULL)
m_headers(NULL),
m_is_transfer_state(true)
{}

/**
* Don't use that constructor if you want to do some transfers.
* @param curl the curl handle
*/
State(CURL * curl):
m_push(true),
m_recv_status_line(false),
m_recv_all_headers(false),
m_offset(0),
m_start_offset(0),
m_status_code(-1),
m_error_code(0),
m_content_length(-1),
m_stream(NULL),
m_curl(curl),
m_headers(NULL),
m_is_transfer_state(false)
{
InstallHandlers(curl);
}

// Note that we are "borrowing" a reference to the curl handle;
// it is not owned / freed by the State object. However, we use it
// as if there's only one handle per State.
Expand All @@ -48,7 +70,8 @@ public:
m_content_length(-1),
m_stream(&stream),
m_curl(curl),
m_headers(NULL)
m_headers(NULL),
m_is_transfer_state(true)
{
InstallHandlers(curl);
}
Expand Down Expand Up @@ -143,6 +166,7 @@ private:
std::vector<std::string> m_headers_copy; // Copies of custom headers.
std::string m_resp_protocol; // Response protocol in the HTTP status line.
std::string m_error_buf; // Any error associated with a response.
bool m_is_transfer_state; // If set to true, this state will be used to perform some transfers
};

};
110 changes: 75 additions & 35 deletions src/XrdTpc/XrdTpcTPC.cc
Expand Up @@ -144,33 +144,41 @@ if (purpose == CURLSOCKTYPE_IPCXN && clientp)
// One special key is `authz`; this is always stripped out and copied to the Authorization
// header (which will later be used for XrdSecEntity). The latter copy is only done if
// the Authorization header is not already present.
static std::string prepareURL(XrdHttpExtReq &req) {
std::map<std::string, std::string>::const_iterator iter = req.headers.find("xrd-http-query");
if (iter == req.headers.end() || iter->second.empty()) {return req.resource;}

auto has_authz_header = req.headers.find("Authorization") != req.headers.end();

std::istringstream requestStream(iter->second);
std::string token;
std::stringstream result;
bool found_first_header = false;
while (std::getline(requestStream, token, '&')) {
if (token.empty()) {
continue;
} else if (!strncmp(token.c_str(), "authz=", 6)) {
if (!has_authz_header) {
req.headers["Authorization"] = token.substr(6);
has_authz_header = true;
}
} else if (!found_first_header) {
result << "?" << token;
found_first_header = true;
} else {
result << "&" << token;
//
// hasSetOpaque will be set to true if at least one opaque data has been set in the URL that is returned,
// false otherwise
static std::string prepareURL(XrdHttpExtReq &req, bool & hasSetOpaque) {
std::map<std::string, std::string>::const_iterator iter = req.headers.find("xrd-http-query");
if (iter == req.headers.end() || iter->second.empty()) {return req.resource;}

auto has_authz_header = req.headers.find("Authorization") != req.headers.end();

std::istringstream requestStream(iter->second);
std::string token;
std::stringstream result;
bool found_first_header = false;
while (std::getline(requestStream, token, '&')) {
if (token.empty()) {
continue;
} else if (!strncmp(token.c_str(), "authz=", 6)) {
if (!has_authz_header) {
req.headers["Authorization"] = token.substr(6);
has_authz_header = true;
}
} else if (!found_first_header) {
result << "?" << token;
found_first_header = true;
} else {
result << "&" << token;
}
}
}
hasSetOpaque = found_first_header;
return req.resource + result.str().c_str();
}

return req.resource + result.str().c_str();
static std::string prepareURL(XrdHttpExtReq &req) {
bool foundHeader;
return prepareURL(req,foundHeader);
}

/******************************************************************************/
Expand Down Expand Up @@ -401,9 +409,11 @@ int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
opaque = resource.substr(pos + 1);
}

// Append the authz information
opaque += (opaque.empty() ? "" : "&");
opaque += authz;
// Append the authz information if there are some
if(!authz.empty()) {
opaque += (opaque.empty() ? "" : "&");
opaque += authz;
}
open_result = fh.open(path.c_str(), mode, openMode, &sec, opaque.c_str());

if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
Expand All @@ -422,11 +432,14 @@ int TPCHandler::OpenWaitStall(XrdSfsFile &fh, const std::string &resource,
/******************************************************************************/

#ifdef XRD_CHUNK_RESP



/**
* Determine size at remote end.
*/
int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
bool &success, TPCLogRecord &rec) {
bool &success, TPCLogRecord &rec, bool shouldReturnErrorToClient) {
success = false;
curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
CURLcode res;
Expand All @@ -436,20 +449,20 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
ss << "Remote server failed request: " << curl_easy_strerror(res);
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
return req.SendSimpleResp(rec.status, NULL, NULL, const_cast<char *>(curl_easy_strerror(res)), 0);
return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, const_cast<char *>(curl_easy_strerror(res)), 0) : -1;
} else if (state.GetStatusCode() >= 400) {
std::stringstream ss;
ss << "Remote side failed with status code " << state.GetStatusCode();
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
return req.SendSimpleResp(rec.status, NULL, NULL, const_cast<char *>(ss.str().c_str()), 0);
return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, const_cast<char *>(ss.str().c_str()), 0): -1;
} else if (res) {
std::stringstream ss;
ss << "HTTP library failed: " << curl_easy_strerror(res);
rec.status = 500;
logTransferEvent(LogMask::Error, rec, "SIZE_FAIL", ss.str());
char msg[] = "Unknown internal transfer failure";
return req.SendSimpleResp(rec.status, NULL, NULL, msg, 0);
return shouldReturnErrorToClient ? req.SendSimpleResp(rec.status, NULL, NULL, msg, 0) : -1;
}
std::stringstream ss;
ss << "Successfully determined remote size for pull request: "
Expand All @@ -459,6 +472,17 @@ int TPCHandler::DetermineXferSize(CURL *curl, XrdHttpExtReq &req, State &state,
success = true;
return 0;
}

int TPCHandler::GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t &contentLength, bool & success, TPCLogRecord &rec) {
State state(curl);
int result;
//In case we cannot get the content length, we don't return anything to the client
if ((result = DetermineXferSize(curl, req, state, success, rec, false)) || !success) {
return result;
}
contentLength = state.GetContentLength();
return result;
}

/******************************************************************************/
/* XRD_CHUNK_RESP: */
Expand Down Expand Up @@ -936,9 +960,26 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
}
}
rec.streams = streams;
std::string full_url = prepareURL(req);
bool hasSetOpaque;
std::string full_url = prepareURL(req, hasSetOpaque);
std::string authz = GetAuthz(req);

curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
#ifdef XRD_CHUNK_RESP
{
//Get the content-length of the source file and pass it to the OSS layer
//during the open
uint64_t sourceFileContentLength = 0;
bool success;
TPCLogRecord getContentLengthRec;
GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, getContentLengthRec);
if(success) {
//In the case we cannot get the information from the source server (offline or other error)
//we just don't add the size information to the opaque of the local file to open
full_url += hasSetOpaque ? "&" : "?";
full_url += "oss.asize=" + std::to_string(sourceFileContentLength);
}
}
#endif
int open_result = OpenWaitStall(*fh, full_url, mode|SFS_O_WRONLY, 0644,
req.GetSecEntity(), authz);
if (SFS_REDIRECT == open_result) {
Expand All @@ -959,7 +1000,6 @@ int TPCHandler::ProcessPullReq(const std::string &resource, XrdHttpExtReq &req)
return resp_result;
}
ConfigureCurlCA(curl);
curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
State state(0, stream, curl, false);
state.CopyHeaders(req);
Expand Down
4 changes: 3 additions & 1 deletion src/XrdTpc/XrdTpcTPC.hh
Expand Up @@ -96,7 +96,9 @@ private:

#ifdef XRD_CHUNK_RESP
int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, TPC::State &state,
bool &success, TPCLogRecord &);
bool &success, TPCLogRecord &, bool shouldReturnErrorToClient = true);

int GetContentLengthTPCPull(CURL *curl, XrdHttpExtReq &req, uint64_t & contentLength, bool & success, TPCLogRecord &rec);

// Send a 'performance marker' back to the TPC client, informing it of our
// progress. The TPC client will use this information to determine whether
Expand Down

0 comments on commit 644ea31

Please sign in to comment.