Skip to content

Commit

Permalink
Add support for redirections.
Browse files Browse the repository at this point in the history
With this, both push and pull mode can involve redirections.
  • Loading branch information
bbockelm committed Nov 13, 2017
1 parent 4bd2126 commit 2afed06
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 37 deletions.
132 changes: 107 additions & 25 deletions src/tpc.cpp
Expand Up @@ -94,9 +94,9 @@ static XrdSfsFileSystem *load_sfs(void *handle, bool alt, XrdSysError &log, cons

class XrdHttpTPCState {
public:
XrdHttpTPCState (XrdSfsFile &fh, CURL *curl, bool push) :
XrdHttpTPCState (std::unique_ptr<XrdSfsFile> fh, CURL *curl, bool push) :
m_push(push),
m_fh(fh),
m_fh(std::move(fh)),
m_curl(curl)
{
InstallHandlers(curl);
Expand All @@ -108,6 +108,7 @@ class XrdHttpTPCState {
m_headers = nullptr;
curl_easy_setopt(m_curl, CURLOPT_HTTPHEADER, m_headers);
}
m_fh->close();
}

bool InstallHandlers(CURL *curl) {
Expand All @@ -119,14 +120,14 @@ class XrdHttpTPCState {
curl_easy_setopt(curl, CURLOPT_READFUNCTION, &XrdHttpTPCState::ReadCB);
curl_easy_setopt(curl, CURLOPT_READDATA, this);
struct stat buf;
if (SFS_OK == m_fh.stat(&buf)) {
if (SFS_OK == m_fh->stat(&buf)) {
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, buf.st_size);
}
} else {
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, &XrdHttpTPCState::WriteCB);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, this);
}
curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1L);
//curl_easy_setopt(curl, CURLOPT_FAILONERROR, 1L);
return true;
}

Expand All @@ -146,7 +147,7 @@ class XrdHttpTPCState {
}
}

int GetStatus() const {return m_status_code;}
int GetStatusCode() const {return m_status_code;}

private:
static size_t HeaderCB(char *buffer, size_t size, size_t nitems, void *userdata) {
Expand All @@ -158,16 +159,37 @@ class XrdHttpTPCState {
int Header(const std::string &header) {
// TODO: Handle status codes appropriately.
printf("Recieved remote header: %s\n", header.c_str());
if (m_recv_all_headers) { // This is the second request -- maybe processed a redirect?
m_recv_all_headers = false;
m_recv_status_line = false;
}
if (!m_recv_status_line) {
std::stringstream ss(header);
std::string item;
if (!std::getline(ss, item, ' ')) return 0;
m_resp_protocol = item;
printf("Response protocol: %s\n", m_resp_protocol.c_str());
if (!std::getline(ss, item, ' ')) return 0;
try {
m_status_code = std::stol(item);
} catch (...) {
return 0;
}
m_recv_status_line = true;
}
if (header.size() == 0) {m_recv_all_headers = true;}
return header.size();
}

static size_t WriteCB(void *buffer, size_t size, size_t nitems, void *userdata) {
XrdHttpTPCState *obj = static_cast<XrdHttpTPCState*>(userdata);
if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers.
if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure.
return obj->Write(static_cast<char*>(buffer), size*nitems);
}

int Write(char *buffer, size_t size) {
int retval = m_fh.write(m_offset, buffer, size);
int retval = m_fh->write(m_offset, buffer, size);
if (retval == SFS_ERROR) {
return -1;
}
Expand All @@ -177,11 +199,13 @@ class XrdHttpTPCState {

static size_t ReadCB(void *buffer, size_t size, size_t nitems, void *userdata) {
XrdHttpTPCState *obj = static_cast<XrdHttpTPCState*>(userdata);
if (obj->GetStatusCode() < 0) {return 0;} // malformed request - got body before headers.
if (obj->GetStatusCode() >= 400) {return 0;} // Status indicates failure.
return obj->Read(static_cast<char*>(buffer), size*nitems);
}

int Read(char *buffer, size_t size) {
int retval = m_fh.read(m_offset, buffer, size);
int retval = m_fh->read(m_offset, buffer, size);
if (retval == SFS_ERROR) {
return -1;
}
Expand All @@ -190,11 +214,14 @@ class XrdHttpTPCState {
}

bool m_push{true};
bool m_recv_status_line{false};
bool m_recv_all_headers{false};
XrdSfsXferSize m_offset{0};
int m_status_code{-1};
XrdSfsFile &m_fh;
std::unique_ptr<XrdSfsFile> m_fh;
CURL *m_curl{nullptr};
struct curl_slist *m_headers{nullptr};
std::string m_resp_protocol;
};


Expand Down Expand Up @@ -268,43 +295,76 @@ class XrdHttpTPC : public XrdHttpExtHandler {
return authz;
}

int RedirectTransfer(XrdHttpExtReq &req, XrdOucErrInfo &error) {
int port;
const char *host = error.getErrText(port);
if ((host == nullptr) || (*host == '\0') || (port == 0)) {
char msg[] = "Internal error: redirect without hostname";
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}
std::stringstream ss;
ss << "Location: http" << (m_desthttps ? "s" : "") << "://" << host << ":" << port << "/" << req.resource;
return req.SendSimpleResp(307, nullptr, const_cast<char *>(ss.str().c_str()), nullptr, 0);
}

int OpenWaitStall(XrdSfsFile &fh, const std::string &resource, int mode, int openMode, const XrdSecEntity &sec,
const std::string &authz) {
int open_result;
while (1) {
open_result = fh.open(resource.c_str(), mode, openMode, &sec, authz.empty() ? nullptr : authz.c_str());
if ((open_result == SFS_STALL) || (open_result == SFS_STARTED)) {
int secs_to_stall = fh.error.getErrInfo();
if (open_result == SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
sleep(secs_to_stall);
}
break;
}
return open_result;
}

int ProcessPushReq(const std::string & resource, XrdHttpExtReq &req) {
CURL *curl = curl_easy_init();
if (!curl) {
char msg[] = "Failed to initialize internal transfer resources";
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}
char *name = req.GetSecEntity().name;
XrdSfsFile *fh = m_sfs->newFile(name, m_monid++);
if (!fh) {
std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
if (!fh.get()) {
char msg[] = "Failed to initialize internal transfer file handle";
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}
std::string authz = GetAuthz(req);

if (SFS_OK != fh->open(req.resource.c_str(), SFS_O_RDONLY, 0600, &(req.GetSecEntity()), authz.empty() ? nullptr : authz.c_str())) {
int open_results = OpenWaitStall(*fh, req.resource, SFS_O_RDONLY, 0644, req.GetSecEntity(), authz);
if (SFS_REDIRECT == open_results) {
return RedirectTransfer(req, fh->error);
} else if (SFS_OK != open_results) {
int code;
char msg_generic[] = "Failed to open local resource";
const char *msg = fh->error.getErrText(code);
if (msg == nullptr) msg = msg_generic;
int status_code = 400;
if (code == EACCES) status_code = 401;
int resp_result = req.SendSimpleResp(status_code, nullptr, nullptr, const_cast<char *>(msg), 0);
fh->close();
delete fh;
return req.SendSimpleResp(status_code, nullptr, nullptr, const_cast<char *>(msg), 0);
return resp_result;
}

CURLcode res;
curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());

XrdHttpTPCState state(*fh, curl, true);
XrdHttpTPCState state(std::move(fh), curl, true);
state.CopyHeaders(req);
res = curl_easy_perform(curl);
fh->close();
delete fh;
if (res == CURLE_HTTP_RETURNED_ERROR) {
m_log.Emsg("ProcessPushReq", "Remote server failed request", curl_easy_strerror(res));
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(curl_easy_strerror(res)), 0);
} else if (state.GetStatusCode() >= 400) {
std::stringstream ss;
ss << "Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg("ProcessPushReq", "Remote server failed request", ss.str().c_str());
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(ss.str().c_str()), 0);
} else if (res) {
m_log.Emsg("ProcessPushReq", "Curl failed", curl_easy_strerror(res));
char msg[] = "Unknown internal transfer failure";
Expand All @@ -322,8 +382,8 @@ class XrdHttpTPC : public XrdHttpExtHandler {
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}
char *name = req.GetSecEntity().name;
XrdSfsFile *fh = m_sfs->newFile(name, m_monid++);
if (!fh) {
std::unique_ptr<XrdSfsFile> fh(m_sfs->newFile(name, m_monid++));
if (!fh.get()) {
char msg[] = "Failed to initialize internal transfer file handle";
return req.SendSimpleResp(500, nullptr, nullptr, msg, 0);
}
Expand All @@ -334,30 +394,36 @@ class XrdHttpTPC : public XrdHttpExtHandler {
mode = SFS_O_TRUNC|SFS_O_POSC;
}

if (SFS_OK != fh->open(req.resource.c_str(), mode, 0600, &(req.GetSecEntity()), authz.empty() ? nullptr : authz.c_str())) {
int open_result = OpenWaitStall(*fh, req.resource, mode|SFS_O_WRONLY, 0644, req.GetSecEntity(), authz);
if (SFS_REDIRECT == open_result) {
return RedirectTransfer(req, fh->error);
} else if (SFS_OK != open_result) {
int code;
char msg_generic[] = "Failed to open local resource";
const char *msg = fh->error.getErrText(code);
if (msg == nullptr) msg = msg_generic;
if ((msg == nullptr) || (*msg == '\0')) msg = msg_generic;
int status_code = 400;
if (code == EACCES) status_code = 401;
if (code == EEXIST) status_code = 412;
int resp_result = req.SendSimpleResp(status_code, nullptr, nullptr, const_cast<char *>(msg), 0);
fh->close();
delete fh;
return req.SendSimpleResp(status_code, nullptr, nullptr, const_cast<char *>(msg), 0);
return resp_result;
}

CURLcode res;
curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());

XrdHttpTPCState state(*fh, curl, false);
XrdHttpTPCState state(std::move(fh), curl, false);
state.CopyHeaders(req);
res = curl_easy_perform(curl);
fh->close();
delete fh;
if (res == CURLE_HTTP_RETURNED_ERROR) {
m_log.Emsg("ProcessPullReq", "Remote server failed request", curl_easy_strerror(res));
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(curl_easy_strerror(res)), 0);
} else if (state.GetStatusCode() >= 400) {
std::stringstream ss;
ss << "Remote side failed with status code " << state.GetStatusCode();
m_log.Emsg("ProcessPushReq", "Remote server failed request", ss.str().c_str());
return req.SendSimpleResp(500, nullptr, nullptr, const_cast<char *>(ss.str().c_str()), 0);
} else if (res) {
m_log.Emsg("ProcessPullReq", "Curl failed", curl_easy_strerror(res));
char msg[] = "Unknown internal transfer failure";
Expand Down Expand Up @@ -436,6 +502,21 @@ class XrdHttpTPC : public XrdHttpExtHandler {
return false;
}
m_log.Emsg("Config", "xrootd.fslib line successfully processed by XrdHttpTPC");
} else if (!strcmp("http.desthttps", val)) {
if (!(val = Config.GetWord())) {
Config.Close();
m_log.Emsg("Config", "http.desthttps value not specified");
return false;
}
if (!strcmp("1", val) || !strcasecmp("yes", val) || !strcasecmp("true", val)) {
m_desthttps = true;
} else if (!strcmp("1", val) || !strcasecmp("yes", val) || !strcasecmp("true", val)) {
m_desthttps = false;
} else {
Config.Close();
m_log.Emsg("Config", "https.dests value is invalid", val);
return false;
}
}
}
Config.Close();
Expand Down Expand Up @@ -483,6 +564,7 @@ class XrdHttpTPC : public XrdHttpExtHandler {
return true;
}

bool m_desthttps{false};
static std::atomic<uint64_t> m_monid;
XrdSysError &m_log;
std::unique_ptr<XrdSfsFileSystem> m_sfs;
Expand Down
31 changes: 19 additions & 12 deletions tools/xrootd-test-tpc
Expand Up @@ -67,19 +67,26 @@ def main():
if mode == "auto":
mode = determine_mode(args)
print "Auto detect determined %s mode" % mode
if mode == 'pull':
headers['Authorization'] = 'Bearer %s' % dest_token
headers['Source'] = args.src
headers['Copy-Header'] = 'Authorization: Bearer %s' % src_token
resp = requests.request('COPY', args.dest, headers=headers)
else: # Push
headers['Authorization'] = 'Bearer %s' % src_token
headers['Destination'] = args.dest
headers['Copy-Header'] = 'Authorization: Bearer %s' % dest_token
resp = requests.request('COPY', args.src, headers=headers)

print resp.status_code
print resp.text
with requests.Session() as session:
session.verify = '/etc/grid-security/certificates'
if mode == 'pull':
headers['Authorization'] = 'Bearer %s' % dest_token
headers['Source'] = args.src
headers['Copy-Header'] = 'Authorization: Bearer %s' % src_token
url = args.dest
else: # Push
headers['Authorization'] = 'Bearer %s' % src_token
headers['Destination'] = args.dest
headers['Copy-Header'] = 'Authorization: Bearer %s' % dest_token
url = args.src
try_again = True
while try_again:
resp = session.request('COPY', url, headers=headers, allow_redirects = True)
print resp.status_code
print resp.headers
print resp.text
break

if __name__ == '__main__':
main()

0 comments on commit 2afed06

Please sign in to comment.