Skip to content

Commit

Permalink
Allow TPC code to determine remote size when pulling.
Browse files Browse the repository at this point in the history
Knowing the size of the remote resource is the first step in doing
a multi-stream HTTP GET transfer.
  • Loading branch information
bbockelm committed Jan 4, 2018
1 parent 1a4e67b commit 200170c
Showing 1 changed file with 78 additions and 7 deletions.
85 changes: 78 additions & 7 deletions src/tpc.cpp
Expand Up @@ -12,6 +12,7 @@
#include <dlfcn.h>
#include <fcntl.h>

#include <algorithm>
#include <atomic>
#include <memory>
#include <sstream>
Expand Down Expand Up @@ -156,13 +157,17 @@ class XrdHttpTPCState {
}
}

/**
* Perform the curl-based transfer, responding periodically with transfer
* markers.
*/
off_t GetContentLength() const {return m_content_length;}

int GetStatusCode() const {return m_status_code;}

void ResetAfterSize() {
m_status_code = -1;
m_content_length = -1;
m_recv_all_headers = false;
m_recv_status_line = false;
}

private:
static size_t HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata) {
XrdHttpTPCState *obj = static_cast<XrdHttpTPCState*>(userdata);
Expand All @@ -189,8 +194,33 @@ class XrdHttpTPCState {
return 0;
}
m_recv_status_line = true;
} else if (header.size() == 0 || header == "\n") {
m_recv_all_headers = true;
}
else if (header != "\r\n") {
// Parse the header
std::size_t found = header.find(":");
if (found != std::string::npos) {
std::string header_name = header.substr(0, found);
std::transform(header_name.begin(), header_name.end(), header_name.begin(), ::tolower);
std::string header_value = header.substr(found+1);
if (header_name == "content-length")
{
try {
m_content_length = std::stoll(header_value);
} catch (...) {
// Header unparseable -- not a great sign, fail request.
//printf("Content-length header unparseable\n");
return 0;
}
}
} else {
// Non-empty header that isn't the status line, but no ':' present --
// malformed request?
//printf("Malformed header: %s\n", header.c_str());
return 0;
}
}
if (header.size() == 0 || header == "\n") {m_recv_all_headers = true;}
return header.size();
}

Expand Down Expand Up @@ -233,6 +263,7 @@ class XrdHttpTPCState {
bool m_recv_all_headers{false};
off_t m_offset{0};
int m_status_code{-1};
off_t m_content_length{-1};
std::unique_ptr<XrdSfsFile> m_fh;
CURL *m_curl{nullptr};
struct curl_slist *m_headers{nullptr};
Expand Down Expand Up @@ -339,6 +370,36 @@ class XrdHttpTPC : public XrdHttpExtHandler {
}

#ifdef XRD_CHUNK_RESP
/**
* Determine size at remote end.
*/
int DetermineXferSize(CURL *curl, XrdHttpExtReq &req, XrdHttpTPCState &state,
bool &success) {
success = false;
curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
CURLcode res;
res = curl_easy_perform(curl);
if (res == CURLE_HTTP_RETURNED_ERROR) {
m_log.Emsg("DetermineXferSize", "Remote server failed request", curl_easy_strerror(res));
curl_easy_cleanup(curl);
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(curl_easy_strerror(res)), 0);
} else if (state.GetStatusCode() >= 400) {
std::stringstream ss;
ss << "Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg("DetermineXferSize", "Remote server failed request", ss.str().c_str());
curl_easy_cleanup(curl);
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(ss.str().c_str()), 0);
} else if (res) {
m_log.Emsg("DetermineXferSize", "Curl failed", curl_easy_strerror(res));
char msg[] = "Unknown internal transfer failure";
curl_easy_cleanup(curl);
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}
curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
success = true;
return 0;
}

int SendPerfMarker(XrdHttpExtReq &req, XrdHttpTPCState &state) {
std::stringstream ss;
const std::string crlf = "\n";
Expand Down Expand Up @@ -599,9 +660,19 @@ class XrdHttpTPC : public XrdHttpExtHandler {
state.CopyHeaders(req);

#ifdef XRD_CHUNK_RESP
return RunCurlWithUpdates(curl, req, state, "ProcessPushReq");
int result;
bool success;
if ((result = DetermineXferSize(curl, req, state, success)) || !success) {
return result;
}
std::stringstream ss;
ss << "Successfully determined remote size for pull request: " << state.GetContentLength();
m_log.Emsg("ProcessPullReq", ss.str().c_str());
state.ResetAfterSize();

return RunCurlWithUpdates(curl, req, state, "ProcessPullReq");
#else
return RunCurlBasic(curl, req, state, "ProcessPushReq");
return RunCurlBasic(curl, req, state, "ProcessPullReq");
#endif
}

Expand Down

0 comments on commit 200170c

Please sign in to comment.