Skip to content

Commit

Permalink
Add initial performance marker support.
Browse files Browse the repository at this point in the history
If Xrootd supports chunked transfer encoding, then send performance
markers along with the COPY request response.
  • Loading branch information
bbockelm committed Dec 21, 2017
1 parent 6ea5749 commit 7da01d3
Showing 1 changed file with 184 additions and 42 deletions.
226 changes: 184 additions & 42 deletions src/tpc.cpp
Expand Up @@ -111,6 +111,8 @@ class XrdHttpTPCState {
m_fh->close();
}

off_t BytesTransferred() const {return m_offset;}

bool InstallHandlers(CURL *curl) {
curl_easy_setopt(curl, CURLOPT_USERAGENT, "xrootd-tpc/" XRDTPC_VERSION);
curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, &XrdHttpTPCState::HeaderCB);
Expand All @@ -127,6 +129,7 @@ class XrdHttpTPCState {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &XrdHttpTPCState::WriteCB);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
}
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
//curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1L);
return true;
}
Expand All @@ -147,6 +150,11 @@ class XrdHttpTPCState {
}
}

/**
* Perform the curl-based transfer, responding periodically with transfer
* markers.
*/

int GetStatusCode() const {return m_status_code;}

private:
Expand All @@ -157,8 +165,7 @@ class XrdHttpTPCState {
}

int Header(const std::string &header) {
// TODO: Handle status codes appropriately.
printf("Recieved remote header: %s\n", header.c_str());
printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str());
if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect?
m_recv_all_headers = false;
m_recv_status_line = false;
Expand All @@ -168,7 +175,7 @@ class XrdHttpTPCState {
std::string item;
if (!std::getline(ss, item, ' ')) return 0;
m_resp_protocol = item;
printf("Response protocol: %s\n", m_resp_protocol.c_str());
printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str());
if (!std::getline(ss, item, ' ')) return 0;
try {
m_status_code = std::stol(item);
Expand All @@ -177,7 +184,7 @@ class XrdHttpTPCState {
}
m_recv_status_line = true;
}
if (header.size() == 0) {m_recv_all_headers = true;}
if (header.size() == 0 || header == "\n") {m_recv_all_headers = true;}
return header.size();
}

Expand Down Expand Up @@ -322,6 +329,166 @@ class XrdHttpTPC : public XrdHttpExtHandler {
return open_result;
}

#ifdef XRD_CHUNK_RESP
int SendPerfMarker(XrdHttpExtReq &req, XrdHttpTPCState &state) {
std::stringstream ss;
const std::string crlf = "\r\n";
ss << "Perf Marker" << crlf;
ss << " Timestamp: " << time(NULL) << crlf;
ss << " Stripe Index: 0" << crlf;
ss << " Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
ss << " Total Stripe Count: 1" << crlf;
ss << "End" << crlf;

return req.ChunkResp(ss.str().c_str(), 0);
}

int RunCurlWithUpdates(CURL *curl, XrdHttpExtReq &req, XrdHttpTPCState &state,
const char *log_prefix) {

// Create the multi-handle and add in the current transfer to it.
CURLM *multi_handle = curl_multi_init();
if (!multi_handle) {
m_log.Emsg(log_prefix, "Failed to initialize a libcurl multi-handle");
char msg[] = "Failed to initialize internal server memory";
curl_easy_cleanup(curl);
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}

CURLMcode mres;
mres = curl_multi_add_handle(multi_handle, curl);
if (mres) {
m_log.Emsg(log_prefix, "Failed to add transfer to libcurl multi-handle",
curl_multi_strerror(mres));
char msg[] = "Failed to initialize internal server handle";
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}

// Start response to client prior to the first call to curl_multi_perform
int retval = req.StartChunkedResp(201, "Created", "Content-Type: text/plain");
if (retval) {
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
return retval;
}

// Transfer loop: use curl to actually run the transfer, but periodically
// interrupt things to send back performance updates to the client.
int running_handles = 1;
time_t last_marker = 0;
do {
time_t now = time(NULL);
time_t next_marker = last_marker + m_marker_period;
if (now > next_marker) {
if (SendPerfMarker(req, state)) {
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
return -1;
}
last_marker = now;
}
mres = curl_multi_perform(multi_handle, &running_handles);
if (mres == CURLM_CALL_MULTI_PERFORM) {
// curl_multi_perform should be called again immediately. On newer
// versions of curl, this is no longer used.
continue;
} else if (mres != CURLM_OK) {
break;
} else if (running_handles == 0) {
break;
}

int64_t max_sleep_time = next_marker - time(NULL);
if (max_sleep_time < 0) {
continue;
}
int fd_count;
mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
if (mres != CURLM_OK) {
break;
}
} while (running_handles);

if (mres != CURLM_OK) {
m_log.Emsg(log_prefix, "Internal libcurl multi-handle error",
curl_multi_strerror(mres));
char msg[] = "Internal server error due to libcurl";
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);

curl_multi_cleanup(multi_handle);
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}

// Harvest any messages, looking for CURLMSG_DONE.
CURLcode res = static_cast<CURLcode>(-1);
CURLMsg *msg;
do {
int msgq = 0;
msg = curl_multi_info_read(multi_handle, &msgq);
if (msg && (msg->msg == CURLMSG_DONE)) {
CURL *easy_handle = msg->easy_handle;
res = msg->data.result;
curl_multi_remove_handle(multi_handle, easy_handle);
curl_easy_cleanup(easy_handle);
}
} while (msg);

if (res == -1) { // No transfers returned?!?
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
curl_multi_cleanup(multi_handle);
char msg[] = "Internal state error in libcurl";
m_log.Emsg(log_prefix, msg);
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}
curl_multi_cleanup(multi_handle);

// Generate the final response back to the client.
std::stringstream ss;
if (res != CURLE_OK) {
m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res));
ss << "failure: " << curl_easy_strerror(res);
} else if (state.GetStatusCode() >= 400) {
ss << "failure: Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str());
} else {
ss << "success: Created";
}

if ((retval = req.ChunkResp(ss.str().c_str(), 0))) {
return retval;
}
return req.ChunkResp(nullptr, 0);
}
#else
int RunCurlBasic(CURL *curl, XrdHttpExtReq &req, XrdHttpTPCState &state,
const char *log_prefix) {
CURLcode res;
res = curl_easy_perform(curl);
curl_easy_cleanup(curl);
if (res == CURLE_HTTP_RETURNED_ERROR) {
m_log.Emsg(log_prefix, "Remote server failed request", curl_easy_strerror(res));
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(curl_easy_strerror(res)), 0);
} else if (state.GetStatusCode() >= 400) {
std::stringstream ss;
ss << "Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str());
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(ss.str().c_str()), 0);
} else if (res) {
m_log.Emsg(log_prefix, "Curl failed", curl_easy_strerror(res));
char msg[] = "Unknown internal transfer failure";
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
} else {
char msg[] = "Created";
return req.SendSimpleResp(201, nullptr, nullptr, msg, 0);
}
}
#endif

int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
CURL *curl = curl_easy_init();
if (!curl) {
Expand Down Expand Up @@ -351,28 +518,16 @@ class XrdHttpTPC : public XrdHttpExtHandler {
return resp_result;
}

CURLcode res;
curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());

XrdHttpTPCState state(std::move(fh), curl, true);
state.CopyHeaders(req);
res = curl_easy_perform(curl);
if (res == CURLE_HTTP_RETURNED_ERROR) {
m_log.Emsg("ProcessPushReq", "Remote server failed request", curl_easy_strerror(res));
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(curl_easy_strerror(res)), 0);
} else if (state.GetStatusCode() >= 400) {
std::stringstream ss;
ss << "Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg("ProcessPushReq", "Remote server failed request", ss.str().c_str());
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(ss.str().c_str()), 0);
} else if (res) {
m_log.Emsg("ProcessPushReq", "Curl failed", curl_easy_strerror(res));
char msg[] = "Unknown internal transfer failure";
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
} else {
char msg[] = "Created";
return req.SendSimpleResp(201, nullptr, nullptr, msg, 0);
}

#ifdef XRD_CHUNK_RESP
return RunCurlWithUpdates(curl, req, state, "ProcessPushReq");
#else
return RunCurlBasic(curl, req, state, "ProcessPushReq");
#endif
}

int ProcessPullReq(const std::string &resource, XrdHttpExtReq &req) {
Expand Down Expand Up @@ -409,29 +564,15 @@ class XrdHttpTPC : public XrdHttpExtHandler {
fh->close();
return resp_result;
}

CURLcode res;
curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());

XrdHttpTPCState state(std::move(fh), curl, false);
state.CopyHeaders(req);
res = curl_easy_perform(curl);
if (res == CURLE_HTTP_RETURNED_ERROR) {
m_log.Emsg("ProcessPullReq", "Remote server failed request", curl_easy_strerror(res));
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(curl_easy_strerror(res)), 0);
} else if (state.GetStatusCode() >= 400) {
std::stringstream ss;
ss << "Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg("ProcessPushReq", "Remote server failed request", ss.str().c_str());
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(ss.str().c_str()), 0);
} else if (res) {
m_log.Emsg("ProcessPullReq", "Curl failed", curl_easy_strerror(res));
char msg[] = "Unknown internal transfer failure";
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
} else {
char msg[] = "Created";
return req.SendSimpleResp(201, nullptr, nullptr, msg, 0);
}

#ifdef XRD_CHUNK_RESP
return RunCurlWithUpdates(curl, req, state, "ProcessPushReq");
#else
return RunCurlBasic(curl, req, state, "ProcessPushReq");
#endif
}

bool ConfigureFSLib(XrdOucStream &Config, std::string &path1, bool &path1_alt, std::string &path2, bool &path2_alt) {
Expand Down Expand Up @@ -564,6 +705,7 @@ class XrdHttpTPC : public XrdHttpExtHandler {
return true;
}

static constexpr int m_marker_period = 20;
bool m_desthttps{false};
static std::atomic<uint64_t> m_monid;
XrdSysError &m_log;
Expand Down

0 comments on commit 7da01d3

Please sign in to comment.