diff --git a/packaging/common/xrootd-clustered.cfg b/packaging/common/xrootd-clustered.cfg index d3c0d84e4e7..baffe806b11 100644 --- a/packaging/common/xrootd-clustered.cfg +++ b/packaging/common/xrootd-clustered.cfg @@ -51,3 +51,9 @@ frm.xfr.copycmd /bin/cp /dev/null $PFN # all.adminpath /var/spool/xrootd all.pidpath /var/run/xrootd + +# More configuration files can be added in /etc/xrootd/config.d/ +# For example /etc/xrootd/config.d/10-mygrid.cfg and +# /etc/xrootd/config.d/98-mysite-specifics.cfg +# +continue /etc/xrootd/config.d/ \ No newline at end of file diff --git a/packaging/common/xrootd-filecache-clustered.cfg b/packaging/common/xrootd-filecache-clustered.cfg index 22609fd803c..4ff5a3adb6b 100644 --- a/packaging/common/xrootd-filecache-clustered.cfg +++ b/packaging/common/xrootd-filecache-clustered.cfg @@ -88,3 +88,8 @@ pfc.ram 100g fi +# More configuration files can be added in /etc/xrootd/config.d/ +# For example /etc/xrootd/config.d/10-mygrid.cfg and +# /etc/xrootd/config.d/98-mysite-specifics.cfg +# +continue /etc/xrootd/config.d/ \ No newline at end of file diff --git a/packaging/common/xrootd-filecache-standalone.cfg b/packaging/common/xrootd-filecache-standalone.cfg index 79f7a6923c1..d2a87ffd4e1 100644 --- a/packaging/common/xrootd-filecache-standalone.cfg +++ b/packaging/common/xrootd-filecache-standalone.cfg @@ -35,3 +35,8 @@ oss.localroot /data/xrd pfc.ram 16g +# More configuration files can be added in /etc/xrootd/config.d/ +# For example /etc/xrootd/config.d/10-mygrid.cfg and +# /etc/xrootd/config.d/98-mysite-specifics.cfg +# +continue /etc/xrootd/config.d/ \ No newline at end of file diff --git a/packaging/common/xrootd-http.cfg b/packaging/common/xrootd-http.cfg index 087a7ee99c7..d883f901460 100644 --- a/packaging/common/xrootd-http.cfg +++ b/packaging/common/xrootd-http.cfg @@ -34,3 +34,8 @@ all.pidpath /var/run/xrootd # systemd start xrdhttp.socket # xrd.protocol XrdHttp:80 /usr/lib64/libXrdHttp-4.so +# More configuration files can be added in /etc/xrootd/config.d/ +# For example /etc/xrootd/config.d/10-mygrid.cfg and +# /etc/xrootd/config.d/98-mysite-specifics.cfg +# +continue /etc/xrootd/config.d/ \ No newline at end of file diff --git a/packaging/common/xrootd-standalone.cfg b/packaging/common/xrootd-standalone.cfg index 7837b22b9ab..33d1093690e 100644 --- a/packaging/common/xrootd-standalone.cfg +++ b/packaging/common/xrootd-standalone.cfg @@ -23,3 +23,9 @@ all.export /tmp # all.adminpath /var/spool/xrootd all.pidpath /var/run/xrootd + +# More configuration files can be added in /etc/xrootd/config.d/ +# For example /etc/xrootd/config.d/10-mygrid.cfg and +# /etc/xrootd/config.d/98-mysite-specifics.cfg +# +continue /etc/xrootd/config.d/ \ No newline at end of file diff --git a/packaging/rhel/xrootd.spec.in b/packaging/rhel/xrootd.spec.in index 5a92f8bc99e..a858f86159e 100644 --- a/packaging/rhel/xrootd.spec.in +++ b/packaging/rhel/xrootd.spec.in @@ -499,6 +499,9 @@ rm -rf $RPM_BUILD_ROOT%{_sysconfdir}/xrootd/* # ceph posix unversioned so rm -f $RPM_BUILD_ROOT%{_libdir}/libXrdCephPosix.so +# config paths +mkdir -p $RPM_BUILD_ROOT%{_sysconfdir}/%{name}/config.d/ + # var paths mkdir -p $RPM_BUILD_ROOT%{_var}/log/xrootd mkdir -p $RPM_BUILD_ROOT%{_var}/run/xrootd @@ -720,6 +723,7 @@ fi %attr(-,xrootd,xrootd) %dir %{_var}/log/xrootd %attr(-,xrootd,xrootd) %dir %{_var}/run/xrootd %attr(-,xrootd,xrootd) %dir %{_var}/spool/xrootd +%attr(-,xrootd,xrootd) %dir %{_sysconfdir}/%{name}/config.d %config(noreplace) %{_sysconfdir}/logrotate.d/xrootd %if %{use_systemd} @@ -932,6 +936,9 @@ fi # Changelog #------------------------------------------------------------------------------- %changelog +* Tue Jan 08 2019 Edgar Fajardo +- Create config dir /etc/xrootd/config.d + * Tue May 08 2018 Michal Simon - Make python3 sub-package optional diff --git a/src/XrdHttp/XrdHttpProtocol.cc b/src/XrdHttp/XrdHttpProtocol.cc index 273b38312f9..8c2696c8992 100644 --- a/src/XrdHttp/XrdHttpProtocol.cc +++ b/src/XrdHttp/XrdHttpProtocol.cc @@ -1359,7 +1359,7 @@ int XrdHttpProtocol::BuffgetData(int blen, char **data, bool wait) { if (wait && (blen > BuffUsed())) { TRACE(REQ, "BuffgetData: need to read " << blen - BuffUsed() << " bytes"); - if (getDataOneShot(blen - BuffUsed(), true) < 0) return 0; + if ( getDataOneShot(blen - BuffUsed(), true) ) return 0; } if (myBuffStart < myBuffEnd) { diff --git a/src/XrdMacaroons/XrdMacaroonsHandler.cc b/src/XrdMacaroons/XrdMacaroonsHandler.cc index 095693a6717..ce81bac4f72 100644 --- a/src/XrdMacaroons/XrdMacaroonsHandler.cc +++ b/src/XrdMacaroons/XrdMacaroonsHandler.cc @@ -17,6 +17,41 @@ using namespace Macaroons; + +char *unquote(const char *str) { + int l = strlen(str); + char *r = (char *) malloc(l + 1); + r[0] = '\0'; + int i, j = 0; + + for (i = 0; i < l; i++) { + + if (str[i] == '%') { + char savec[3]; + if (l <= i + 3) { + free(r); + return NULL; + } + savec[0] = str[i + 1]; + savec[1] = str[i + 2]; + savec[2] = '\0'; + + r[j] = strtol(savec, 0, 16); + + i += 2; + } else if (str[i] == '+') r[j] = ' '; + else r[j] = str[i]; + + j++; + } + + r[j] = '\0'; + + return r; + +} + + static ssize_t determine_validity(const std::string& input) { @@ -95,6 +130,7 @@ Handler::GenerateID(const XrdSecEntity &entity, const std::string &activities, return result; } + std::string Handler::GenerateActivities(const XrdHttpExtReq & req) const { @@ -109,19 +145,174 @@ Handler::GenerateActivities(const XrdHttpExtReq & req) const return result; } + // See if the macaroon handler is interested in this request. // We intercept all POST requests as we will be looking for a particular // header. bool Handler::MatchesPath(const char *verb, const char *path) { - return !strcmp(verb, "POST"); + return !strcmp(verb, "POST") || !strncmp(path, "/.well-known/", 13) || + !strncmp(path, "/.oauth2/", 9); +} + + +int Handler::ProcessOAuthConfig(XrdHttpExtReq &req) { + if (req.verb != "GET") + { + return req.SendSimpleResp(405, NULL, NULL, "Only GET is valid for oauth config.", 0); + } + auto header = req.headers.find("Host"); + if (header == req.headers.end()) + { + return req.SendSimpleResp(400, NULL, NULL, "Host header is required.", 0); + } + + json_object *response_obj = json_object_new_object(); + if (!response_obj) + { + return req.SendSimpleResp(500, NULL, NULL, "Unable to create new JSON response object.", 0); + } + std::string token_endpoint = "https://" + header->second + "/.oauth2/token"; + json_object *endpoint_obj = + json_object_new_string_len(token_endpoint.c_str(), token_endpoint.size()); + if (!endpoint_obj) + { + return req.SendSimpleResp(500, NULL, NULL, "Unable to create a new JSON macaroon string.", 0); + } + json_object_object_add(response_obj, "token_endpoint", endpoint_obj); + + const char *response_result = json_object_to_json_string_ext(response_obj, JSON_C_TO_STRING_PRETTY); + int retval = req.SendSimpleResp(200, NULL, NULL, response_result, 0); + json_object_put(response_obj); + return retval; +} + + +int Handler::ProcessTokenRequest(XrdHttpExtReq &req) +{ + if (req.verb != "POST") + { + return req.SendSimpleResp(405, NULL, NULL, "Only POST is valid for token request.", 0); + } + auto header = req.headers.find("Content-Type"); + if (header == req.headers.end()) + { + return req.SendSimpleResp(400, NULL, NULL, "Content-Type missing; not a valid macaroon request?", 0); + } + if (header->second != "application/x-www-form-urlencoded") + { + return req.SendSimpleResp(400, NULL, NULL, "Content-Type must be set to `application/macaroon-request' to request a macaroon", 0); + } + char *request_data_raw; + // Note: this does not null-terminate the buffer contents. + if (req.BuffgetData(req.length, &request_data_raw, true) != req.length) + { + return req.SendSimpleResp(400, NULL, NULL, "Missing or invalid body of request.", 0); + } + std::string request_data(request_data_raw, req.length); + bool found_grant_type = false; + ssize_t validity = -1; + std::string scope; + std::string token; + std::istringstream token_stream(request_data); + while (std::getline(token_stream, token, '&')) + { + std::string::size_type eq = token.find("="); + if (eq == std::string::npos) + { + return req.SendSimpleResp(400, NULL, NULL, "Invalid format for form-encoding", 0); + } + std::string key = token.substr(0, eq); + std::string value = token.substr(eq + 1); + //std::cout << "Found key " << key << ", value " << value << std::endl; + if (key == "grant_type") + { + found_grant_type = true; + if (value != "client_credentials") + { + return req.SendSimpleResp(400, NULL, NULL, "Invalid grant type specified.", 0); + } + } + else if (key == "expire_in") + { + try + { + validity = std::stoll(value); + } + catch (...) + { + return req.SendSimpleResp(400, NULL, NULL, "Expiration request not parseable.", 0); + } + if (validity <= 0) + { + return req.SendSimpleResp(400, NULL, NULL, "Expiration request has invalid value.", 0); + } + } + else if (key == "scope") + { + char *value_raw = unquote(value.c_str()); + if (value_raw == NULL) + { + return req.SendSimpleResp(400, NULL, NULL, "Unable to unquote scope.", 0); + } + scope = value_raw; + free(value_raw); + } + } + if (!found_grant_type) + { + return req.SendSimpleResp(400, NULL, NULL, "Grant type not specified.", 0); + } + if (scope.empty()) + { + return req.SendSimpleResp(400, NULL, NULL, "Scope was not specified.", 0); + } + std::istringstream token_stream_scope(scope); + std::string path; + std::vector other_caveats; + while (std::getline(token_stream_scope, token, ' ')) + { + std::string::size_type col = token.find(":"); + if (col == std::string::npos) + { + return req.SendSimpleResp(400, NULL, NULL, "Invalid format for requested scope", 0); + } + std::string key = token.substr(0, col); + std::string value = token.substr(col + 1); + //std::cout << "Found activity " << key << ", path " << value << std::endl; + if (path.empty()) + { + path = value; + } + else if (value != path) + { + std::stringstream ss; + ss << "Encountered requested scope request for authorization " << key + << " with resource path " << value << "; however, prior request had path " + << path; + m_log->Emsg("MacaroonRequest", ss.str().c_str()); + return req.SendSimpleResp(500, NULL, NULL, "Server only supports all scopes having the same path", 0); + } + other_caveats.push_back(key); + } + if (path.empty()) + { + path = "/"; + } + return GenerateMacaroonResponse(req, path, other_caveats, validity, true); } // Process a macaroon request. int Handler::ProcessReq(XrdHttpExtReq &req) { + if (req.resource == "/.well-known/oauth-authorization-server") { + return ProcessOAuthConfig(req); + } else if (req.resource == "/.oauth2/token") { + return ProcessTokenRequest(req); + } + auto header = req.headers.find("Content-Type"); if (header == req.headers.end()) { @@ -176,46 +367,53 @@ int Handler::ProcessReq(XrdHttpExtReq &req) { return req.SendSimpleResp(400, NULL, NULL, "Invalid ISO 8601 duration for validity key", 0); } - time_t now; - time(&now); - if (m_max_duration > 0) - { - now += (validity > m_max_duration) ? m_max_duration : validity; - } - else - { - now += validity; - } - char utc_time_buf[21]; - if (!strftime(utc_time_buf, 21, "%FT%TZ", gmtime(&now))) - { - return req.SendSimpleResp(500, NULL, NULL, "Internal error constructing UTC time", 0); - } - std::string utc_time_str(utc_time_buf); - std::stringstream ss; - ss << "before:" << utc_time_str; - std::string utc_time_caveat = ss.str(); - json_object *caveats_obj; std::vector other_caveats; if (json_object_object_get_ex(macaroon_req, "caveats", &caveats_obj)) - { + { if (json_object_is_type(caveats_obj, json_type_array)) { // Caveats were provided. Let's record them. // TODO - could just add these in-situ. No need for the other_caveats vector. int array_length = json_object_array_length(caveats_obj); other_caveats.reserve(array_length); for (int idx=0; idx &other_caveats, ssize_t validity, bool oauth_response) +{ + time_t now; + time(&now); + if (m_max_duration > 0) + { + validity = (validity > m_max_duration) ? m_max_duration : validity; + } + now += validity; + + char utc_time_buf[21]; + if (!strftime(utc_time_buf, 21, "%FT%TZ", gmtime(&now))) + { + return req.SendSimpleResp(500, NULL, NULL, "Internal error constructing UTC time", 0); + } + std::string utc_time_str(utc_time_buf); + std::stringstream ss; + ss << "before:" << utc_time_str; + std::string utc_time_caveat = ss.str(); std::string activities = GenerateActivities(req); std::string macaroon_id = GenerateID(req.GetSecEntity(), activities, utc_time_str); @@ -276,7 +474,7 @@ int Handler::ProcessReq(XrdHttpExtReq &req) } } - std::string path_caveat = "path:" + req.resource; + std::string path_caveat = "path:" + resource; struct macaroon *mac_with_path = macaroon_add_first_party_caveat(mac_with_activities, reinterpret_cast(path_caveat.c_str()), path_caveat.size(), @@ -314,11 +512,17 @@ int Handler::ProcessReq(XrdHttpExtReq &req) { return req.SendSimpleResp(500, NULL, NULL, "Unable to create a new JSON macaroon string.", 0); } - json_object_object_add(response_obj, "macaroon", macaroon_obj); + json_object_object_add(response_obj, oauth_response ? "access_token" : "macaroon", macaroon_obj); + + json_object *expire_in_obj = json_object_new_int64(validity); + if (!expire_in_obj) + { + return req.SendSimpleResp(500, NULL, NULL, "Unable to create a new JSON validity object.", 0); + } + json_object_object_add(response_obj, "expires_in", expire_in_obj); const char *macaroon_result = json_object_to_json_string_ext(response_obj, JSON_C_TO_STRING_PRETTY); int retval = req.SendSimpleResp(200, NULL, NULL, macaroon_result, 0); json_object_put(response_obj); return retval; } - diff --git a/src/XrdMacaroons/XrdMacaroonsHandler.hh b/src/XrdMacaroons/XrdMacaroonsHandler.hh index 3cfb22f230c..446a38d8d8c 100644 --- a/src/XrdMacaroons/XrdMacaroonsHandler.hh +++ b/src/XrdMacaroons/XrdMacaroonsHandler.hh @@ -2,6 +2,7 @@ #include #include #include +#include #include "XrdHttp/XrdHttpExtHandler.hh" @@ -50,6 +51,10 @@ private: std::string GenerateID(const XrdSecEntity &, const std::string &, const std::string &); std::string GenerateActivities(const XrdHttpExtReq &) const; + int ProcessOAuthConfig(XrdHttpExtReq &req); + int ProcessTokenRequest(XrdHttpExtReq& req); + int GenerateMacaroonResponse(XrdHttpExtReq& req, const std::string &response, const std::vector &, ssize_t validity, bool oauth_response); + static bool xsecretkey(XrdOucStream &Config, XrdSysError *log, std::string &secret); static bool xsitename(XrdOucStream &Config, XrdSysError *log, std::string &location); static bool xtrace(XrdOucStream &Config, XrdSysError *log); diff --git a/src/XrdMacaroons/macaroon-init b/src/XrdMacaroons/macaroon-init new file mode 100755 index 00000000000..1c5c8e70161 --- /dev/null +++ b/src/XrdMacaroons/macaroon-init @@ -0,0 +1,154 @@ +#!/usr/bin/python + +""" +Given an X509 proxy, generate a dCache-style macaroon. +""" + +from __future__ import print_function + +import os +import sys +import json +import urlparse +import argparse + +import requests + + +class NoTokenEndpoint(Exception): + pass + + +def parse_args(): + """ + Parse command line arguments to this tool + """ + parser = argparse.ArgumentParser( + description="Generate a macaroon for authorized transfers") + parser.add_argument("url", metavar="URL", + help="URL to generate macaroon for.") + parser.add_argument("--activity", nargs="+", help="Activity for authorization (LIST," + "DOWNLOAD,UPLOAD, etc)") + parser.add_argument("--validity", type=int, default=10, help="Time," + "in minutes, the resulting macaroon should be valid.", + required=False) + return parser.parse_args() + + +def configure_authenticated_session(): + """ + Generate a new session object for use with requests to the issuer. + + Configures TLS appropriately to work with a GSI environment. + """ + euid = os.geteuid() + if euid == 0: + cert = '/etc/grid-security/hostcert.pem' + key = '/etc/grid-security/hostkey.pem' + else: + cert = '/tmp/x509up_u%d' % euid + key = '/tmp/x509up_u%d' % euid + + cert = os.environ.get('X509_USER_PROXY', cert) + key = os.environ.get('X509_USER_PROXY', key) + + session = requests.Session() + + if os.path.exists(cert): + session.cert = cert + if os.path.exists(key): + session.cert = (cert, key) + #session.verify = '/etc/grid-security/certificates' + + return session + + +def get_token_endpoint(issuer): + """ + From the provided issuer, use OAuth auto-discovery to bootstrap the token endpoint. + """ + parse_result = urlparse.urlparse(issuer) + norm_path = os.path.normpath(parse_result.path) + new_path = norm_path if norm_path != "/" else "" + new_path = "/.well-known/oauth-authorization-server" + new_path + config_url = urlparse.urlunparse(urlparse.ParseResult( + scheme = "https", + netloc = parse_result.netloc, + path = new_path, + fragment = "", + query = "", + params = "")) + response = requests.get(config_url) + endpoint_info = json.loads(response.text) + if response.status_code != requests.codes.ok: + print >> sys.stderr, "Failed to access the auto-discovery URL (%s) for issuer %s (status=%d): %s" % (config_url, issuer, response.status_code, response.text[:2048]) + raise NoTokenEndpoint() + elif 'token_endpoint' not in endpoint_info: + print >> sys.stderr, "Token endpoint not available for issuer %s" % issuer + raise NoTokenEndpoint() + return endpoint_info['token_endpoint'] + + +def generate_token(url, validity, activity): + """ + Call out to the macaroon issuer, using the specified validity and activity, + and receive a resulting token. + """ + print("Querying %s for new token." % url, file=sys.stderr) + validity = "PT%dM" % validity + print("Validity: %s, activities: %s." % (validity, ", ".join(activity)), + file=sys.stderr) + data_json = {"caveats": ["activity:%s" % ",".join(activity)], + "validity": validity} + with configure_authenticated_session() as session: + response = session.post(url, + headers={"Content-Type": "application/macaroon-request"}, + data=json.dumps(data_json) + ) + + if response.status_code == requests.codes.ok: + print("Successfully generated a new token:", file=sys.stderr) + return response.text + else: + print("Issuer failed request (status %d): %s" % (response.status_code, response.text[:2048]), file=sys.stderr) + sys.exit(1) + + +def generate_token_oauth(url, validity, activity): + parse_result = urlparse.urlparse(url) + token_issuer = urlparse.urlunparse(urlparse.ParseResult( + scheme = "https", + netloc = parse_result.netloc, + path = "/", + fragment = "", + query = "", + params = "")) + token_endpoint = get_token_endpoint(token_issuer) + print("Querying %s for new token." % token_endpoint) + scope = " ".join(["{}:{}".format(i, parse_result.path) for i in activity]) + with configure_authenticated_session() as session: + response = session.post(token_endpoint, headers={"Accept": "application/json"}, + data={"grant_type": "client_credentials", + "scope": scope, + "expire_in": "{}".format(validity*60)}) + + if response.status_code == requests.codes.ok: + print("Successfully generated a new token:") + return response.text + else: + print("Issuer failed request (status %d): %s" % (response.status_code, response.text[:2048]), + file=sys.stderr) + sys.exit(1) + + +def main(): + args = parse_args() + try: + token = generate_token_oauth(args.url, args.validity, args.activity) + except NoTokenEndpoint: + token = generate_token(args.url, args.validity, args.activity) + print(token) + + +if __name__ == '__main__': + main() diff --git a/src/XrdTpc/XrdTpcMultistream.cc b/src/XrdTpc/XrdTpcMultistream.cc index 6f7542c13e5..942bb3e92e6 100644 --- a/src/XrdTpc/XrdTpcMultistream.cc +++ b/src/XrdTpc/XrdTpcMultistream.cc @@ -372,7 +372,12 @@ int TPCHandler::RunCurlWithStreamsImpl(XrdHttpExtReq &req, State &state, ss << "failure: Remote side failed with status code " << state.GetStatusCode(); m_log.Emsg(log_prefix, "Remote server failed request", ss.str().c_str()); } else { - ss << "success: Created"; + if (!handles[0]->Finalize()) { + ss << "failure: Failed to finalize and close file handle."; + m_log.Emsg(log_prefix, "Failed to finalize file handle"); + } else { + ss << "success: Created"; + } } if ((retval = req.ChunkResp(ss.str().c_str(), 0))) { diff --git a/src/XrdTpc/XrdTpcState.cc b/src/XrdTpc/XrdTpcState.cc index d9b749d9159..7d7ed5cc0c2 100644 --- a/src/XrdTpc/XrdTpcState.cc +++ b/src/XrdTpc/XrdTpcState.cc @@ -243,3 +243,9 @@ void State::DumpBuffers() const { m_stream->DumpBuffers(); } + +bool State::Finalize() +{ + return m_stream->Finalize(); +} + diff --git a/src/XrdTpc/XrdTpcState.hh b/src/XrdTpc/XrdTpcState.hh index fdbab41255d..d710e5150e4 100644 --- a/src/XrdTpc/XrdTpcState.hh +++ b/src/XrdTpc/XrdTpcState.hh @@ -81,6 +81,15 @@ public: // constructor once C++11 is allowed in XRootD. void Move (State &other); + // Flush and finalize a transfer state. Eventually calls close() on the underlying + // file handle, which should hopefully synchronize the file metadata across + // all readers (even other load-balanced servers on the same distributed file + // system). + // + // Returns true on success; false otherwise. Failures can happen, for example, if + // not all buffers have been reordered by the underlying stream. + bool Finalize(); + private: bool InstallHandlers(CURL *curl); diff --git a/src/XrdTpc/XrdTpcStream.cc b/src/XrdTpc/XrdTpcStream.cc index 0bf52656b3c..c16d7071f4c 100644 --- a/src/XrdTpc/XrdTpcStream.cc +++ b/src/XrdTpc/XrdTpcStream.cc @@ -20,6 +20,26 @@ Stream::~Stream() } +bool +Stream::Finalize() +{ + // Do not close twice + if (!m_open_for_write) { + return false; + } + for (std::vector::iterator buffer_iter = m_buffers.begin(); + buffer_iter != m_buffers.end(); + buffer_iter++) { + delete *buffer_iter; + *buffer_iter = NULL; + } + m_fh->close(); + m_open_for_write = false; + // If there are outstanding buffers to reorder, finalization failed + return m_avail_count == m_buffers.size(); +} + + int Stream::Stat(struct stat* buf) { @@ -29,6 +49,7 @@ Stream::Stat(struct stat* buf) int Stream::Write(off_t offset, const char *buf, size_t size) { + if (!m_open_for_write) return SFS_ERROR; bool buffer_accepted = false; int retval = size; if (offset < m_offset) { diff --git a/src/XrdTpc/XrdTpcStream.hh b/src/XrdTpc/XrdTpcStream.hh index f6c559308ce..ef5688a4817 100644 --- a/src/XrdTpc/XrdTpcStream.hh +++ b/src/XrdTpc/XrdTpcStream.hh @@ -21,7 +21,8 @@ namespace TPC { class Stream { public: Stream(std::unique_ptr fh, size_t max_blocks, size_t buffer_size, XrdSysError &log) - : m_avail_count(max_blocks), + : m_open_for_write(false), + m_avail_count(max_blocks), m_fh(std::move(fh)), m_offset(0), m_log(log) @@ -30,6 +31,7 @@ public: for (size_t idx=0; idx < max_blocks; idx++) { m_buffers.push_back(new Entry(buffer_size)); } + m_open_for_write = true; } ~Stream(); @@ -44,6 +46,15 @@ public: void DumpBuffers() const; + // Flush and finalize the stream. If all data has been sent to the underlying + // file handle, close() will be invoked on the file handle. + // + // Further write operations on this stream will result in an error. + // If any memory buffers remain, an error occurs. + // + // Returns true on success; false otherwise. + bool Finalize(); + private: class Entry { @@ -124,6 +135,7 @@ private: std::vector m_buffer; }; + bool m_open_for_write; size_t m_avail_count; std::unique_ptr m_fh; off_t m_offset;