Skip to content

Commit

Permalink
First working version with gfal-copy.
Browse files Browse the repository at this point in the history
  • Loading branch information
bbockelm committed Dec 21, 2017
1 parent 7da01d3 commit 6d8e54a
Showing 1 changed file with 36 additions and 16 deletions.
52 changes: 36 additions & 16 deletions src/tpc.cpp
Expand Up @@ -165,7 +165,7 @@ class XrdHttpTPCState {
}

int Header(const std::string &header) {
printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str());
//printf("Recieved remote header (%d, %d): %s", m_recv_all_headers, m_recv_status_line, header.c_str());
if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect?
m_recv_all_headers = false;
m_recv_status_line = false;
Expand All @@ -175,7 +175,7 @@ class XrdHttpTPCState {
std::string item;
if (!std::getline(ss, item, ' ')) return 0;
m_resp_protocol = item;
printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str());
//printf("\n\nResponse protocol: %s\n", m_resp_protocol.c_str());
if (!std::getline(ss, item, ' ')) return 0;
try {
m_status_code = std::stol(item);
Expand All @@ -201,6 +201,7 @@ class XrdHttpTPCState {
return -1;
}
m_offset += retval;
//printf("Wrote a total of %ld bytes.\n", m_offset);
return retval;
}

Expand All @@ -217,13 +218,14 @@ class XrdHttpTPCState {
return -1;
}
m_offset += retval;
//printf("Read a total of %ld bytes.\n", m_offset);
return retval;
}

bool m_push{true};
bool m_recv_status_line{false};
bool m_recv_all_headers{false};
XrdSfsXferSize m_offset{0};
off_t m_offset{0};
int m_status_code{-1};
std::unique_ptr<XrdSfsFile> m_fh;
CURL *m_curl{nullptr};
Expand All @@ -250,7 +252,8 @@ class XrdHttpTPC : public XrdHttpExtHandler {
if (header != req.headers.end()) {
return ProcessPushReq(header->second, req);
}
return req.SendSimpleResp(400, NULL, NULL, (char *)"No Source or Destination specified", 0);
m_log.Emsg("ProcessReq", "COPY verb requested but no source or destination specified.");
return req.SendSimpleResp(400, NULL, NULL, "No Source or Destination specified", 0);
}

/**
Expand Down Expand Up @@ -332,12 +335,14 @@ class XrdHttpTPC : public XrdHttpExtHandler {
#ifdef XRD_CHUNK_RESP
int SendPerfMarker(XrdHttpExtReq &req, XrdHttpTPCState &state) {
std::stringstream ss;
const std::string crlf = "\r\n";
const std::string crlf = "\n";
ss << "Perf Marker" << crlf;
ss << " Timestamp: " << time(NULL) << crlf;
ss << " Stripe Index: 0" << crlf;
ss << " Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
ss << " Total Stripe Count: 1" << crlf;
ss << "Timestamp: " << time(NULL) << crlf;
ss << "Stripe Index: 0" << crlf;
for (int i=0; i<30; i++)
ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
ss << "Stripe Bytes Transferred: " << state.BytesTransferred() << crlf;
ss << "Total Stripe Count: 1" << crlf;
ss << "End" << crlf;

return req.ChunkResp(ss.str().c_str(), 0);
Expand Down Expand Up @@ -378,10 +383,11 @@ class XrdHttpTPC : public XrdHttpExtHandler {
// interrupt things to send back performance updates to the client.
int running_handles = 1;
time_t last_marker = 0;
CURLcode res = static_cast<CURLcode>(-1);
do {
time_t now = time(NULL);
time_t next_marker = last_marker + m_marker_period;
if (now > next_marker) {
if (now >= next_marker) {
if (SendPerfMarker(req, state)) {
curl_multi_remove_handle(multi_handle, curl);
curl_easy_cleanup(curl);
Expand All @@ -400,9 +406,23 @@ class XrdHttpTPC : public XrdHttpExtHandler {
} else if (running_handles == 0) {
break;
}
//printf("There are %d running handles\n", running_handles);

// Harvest any messages, looking for CURLMSG_DONE.
CURLMsg *msg;
do {
int msgq = 0;
msg = curl_multi_info_read(multi_handle, &msgq);
if (msg && (msg->msg == CURLMSG_DONE)) {
CURL *easy_handle = msg->easy_handle;
res = msg->data.result;
curl_multi_remove_handle(multi_handle, easy_handle);
curl_easy_cleanup(easy_handle);
}
} while (msg);

int64_t max_sleep_time = next_marker - time(NULL);
if (max_sleep_time < 0) {
if (max_sleep_time <= 0) {
continue;
}
int fd_count;
Expand All @@ -424,7 +444,6 @@ class XrdHttpTPC : public XrdHttpExtHandler {
}

// Harvest any messages, looking for CURLMSG_DONE.
CURLcode res = static_cast<CURLcode>(-1);
CURLMsg *msg;
do {
int msgq = 0;
Expand Down Expand Up @@ -490,6 +509,7 @@ class XrdHttpTPC : public XrdHttpExtHandler {
#endif

int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
m_log.Emsg("ProcessPushReq", "Starting a push request for resource", resource.c_str());
CURL *curl = curl_easy_init();
if (!curl) {
char msg[] = "Failed to initialize internal transfer resources";
Expand Down Expand Up @@ -544,10 +564,10 @@ class XrdHttpTPC : public XrdHttpExtHandler {
}
std::string authz = GetAuthz(req);
XrdSfsFileOpenMode mode = SFS_O_CREAT;
auto overwrite_header = req.headers.find("Overwrite");
/*auto overwrite_header = req.headers.find("Overwrite");
if ((overwrite_header == req.headers.end()) || (overwrite_header->second == "T")) {
mode = SFS_O_TRUNC|SFS_O_POSC;
}
}*/

int open_result = OpenWaitStall(*fh, req.resource, mode|SFS_O_WRONLY, 0644, req.GetSecEntity(), authz);
if (SFS_REDIRECT == open_result) {
Expand Down Expand Up @@ -705,7 +725,7 @@ class XrdHttpTPC : public XrdHttpExtHandler {
return true;
}

static constexpr int m_marker_period = 20;
static constexpr int m_marker_period = 5;
bool m_desthttps{false};
static std::atomic<uint64_t> m_monid;
XrdSysError &m_log;
Expand Down Expand Up @@ -734,7 +754,7 @@ XrdHttpExtHandler *XrdHttpGetExtHandler(XrdSysError *log, const char * config, c
retval = new XrdHttpTPC(log, config, myEnv);
} catch (std::runtime_error &re) {
log->Emsg("Initialize", "Encountered a runtime failure when loading ", re.what());
printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
//printf("Provided env vars: %p, XrdInet*: %p\n", myEnv, myEnv->GetPtr("XrdInet*"));
}
return retval;
}
Expand Down

0 comments on commit 6d8e54a

Please sign in to comment.