-
Notifications
You must be signed in to change notification settings - Fork 149
/
XrdTpcStream.cc
211 lines (189 loc) · 7.01 KB
/
XrdTpcStream.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#include <sstream>
#include "XrdTpcStream.hh"
#include "XrdSfs/XrdSfsInterface.hh"
#include "XrdSys/XrdSysError.hh"
using namespace TPC;
Stream::~Stream()
{
for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
buffer_iter != m_buffers.end();
buffer_iter++) {
delete *buffer_iter;
*buffer_iter = NULL;
}
m_fh->close();
}
bool
Stream::Finalize()
{
// Do not close twice
if (!m_open_for_write) {
return false;
}
m_open_for_write = false;
for (std::vector<Entry*>::iterator buffer_iter = m_buffers.begin();
buffer_iter != m_buffers.end();
buffer_iter++) {
delete *buffer_iter;
*buffer_iter = NULL;
}
if (m_fh->close() == SFS_ERROR) {
std::stringstream ss;
const char *msg = m_fh->error.getErrText();
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << "Failure when closing file handle: " << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
return false;
}
// If there are outstanding buffers to reorder, finalization failed
return m_avail_count == m_buffers.size();
}
int
Stream::Stat(struct stat* buf)
{
return m_fh->stat(buf);
}
ssize_t
Stream::Write(off_t offset, const char *buf, size_t size, bool force)
{
/*
* NOTE: these lines are useful for debuggin the state of the buffer
* management code; too expensive to compile in and have a runtime switch.
std::stringstream ss;
ss << "Offset=" << offset << ", Size=" << size << ", force=" << force;
m_log.Emsg("Stream::Write", ss.str().c_str());
DumpBuffers();
*/
if (!m_open_for_write) {
if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a buffer not opened for write";}
return SFS_ERROR;
}
size_t bytes_accepted = 0;
int retval = size;
if (offset < m_offset) {
if (!m_error_buf.size()) {m_error_buf = "Logic error: writing to a prior offset";}
return SFS_ERROR;
}
// If this is write is appending to the stream and
// MB-aligned, then we write it to disk; otherwise, the
// data will be buffered.
if (offset == m_offset && (force || (size && !(size % (1024*1024))))) {
retval = WriteImpl(offset, buf, size);
bytes_accepted = retval;
// On failure, we don't care about flushing buffers from memory --
// the stream is now invalid.
if (retval < 0) {
return retval;
}
// If there are no in-use buffers, then we don't need to
// do any accounting.
if (m_avail_count == m_buffers.size()) {
return retval;
}
}
// Even if we already accepted the current data, always
// iterate through available buffers and try to write as
// much out to disk as possible.
Entry *avail_entry;
bool buffer_was_written;
size_t avail_count = 0;
do {
avail_count = 0;
avail_entry = NULL;
buffer_was_written = false;
for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
entry_iter != m_buffers.end();
entry_iter++) {
// Always try to dump from memory; when size == 0, then we are
// going to force a flush even if things are not MB-aligned.
int retval2 = (*entry_iter)->Write(*this, size == 0);
if (retval2 == SFS_ERROR) {
if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
return retval2;
}
buffer_was_written |= retval2 > 0;
if ((*entry_iter)->Available()) { // Empty buffer
if (!avail_entry) {avail_entry = *entry_iter;}
avail_count ++;
}
else if (bytes_accepted != size && size) {
size_t new_accept = (*entry_iter)->Accept(offset + bytes_accepted, buf, size - bytes_accepted);
// Partial accept; buffer should be writable which means we should free it up
// for next iteration
if (new_accept && new_accept != size - bytes_accepted) {
int retval3 = (*entry_iter)->Write(*this, false);
if (retval3 == SFS_ERROR) {
if (!m_error_buf.size()) {m_error_buf = "Unknown filesystem write failure.";}
return SFS_ERROR;
}
buffer_was_written = true;
}
bytes_accepted += new_accept;
}
}
} while ((avail_count != m_buffers.size()) && buffer_was_written);
m_avail_count = avail_count;
if (bytes_accepted != size && size) { // No place for this data in allocated buffers
if (!avail_entry) { // No available buffers to allocate; logic error, should not happen.
DumpBuffers();
m_error_buf = "No empty buffers available to place unordered data.";
return SFS_ERROR;
}
if (avail_entry->Accept(offset + bytes_accepted, buf, size - bytes_accepted) != size - bytes_accepted) { // Empty buffer cannot accept?!?
m_error_buf = "Empty re-ordering buffer was unable to to accept data; internal logic error.";
return SFS_ERROR;
}
m_avail_count --;
}
// If we have low buffer occupancy, then release memory.
if ((m_buffers.size() > 2) && (m_avail_count * 2 > m_buffers.size())) {
for (std::vector<Entry*>::iterator entry_iter = m_buffers.begin();
entry_iter != m_buffers.end();
entry_iter++) {
(*entry_iter)->ShrinkIfUnused();
}
}
return retval;
}
ssize_t Stream::WriteImpl(off_t offset, const char *buf, size_t size)
{
ssize_t retval;
if (size == 0) {return 0;}
retval = m_fh->write(offset, buf, size);
if (retval != SFS_ERROR) {
m_offset += retval;
} else {
std::stringstream ss;
const char *msg = m_fh->error.getErrText();
if (!msg || (*msg == '\0')) {msg = "(no error message provided)";}
ss << msg << " (code=" << m_fh->error.getErrInfo() << ")";
m_error_buf = ss.str();
}
return retval;
}
void
Stream::DumpBuffers() const
{
m_log.Emsg("Stream::DumpBuffers", "Beginning dump of stream buffers.");
{
std::stringstream ss;
ss << "Stream offset: " << m_offset;
m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
}
size_t idx = 0;
for (std::vector<Entry*>::const_iterator entry_iter = m_buffers.begin();
entry_iter!= m_buffers.end();
entry_iter++) {
std::stringstream ss;
ss << "Buffer " << idx << ": Offset=" << (*entry_iter)->GetOffset() << ", Size="
<< (*entry_iter)->GetSize() << ", Capacity=" << (*entry_iter)->GetCapacity();
m_log.Emsg("Stream::DumpBuffers", ss.str().c_str());
idx ++;
}
m_log.Emsg("Stream::DumpBuffers", "Finish dump of stream buffers.");
}
int
Stream::Read(off_t offset, char *buf, size_t size)
{
return m_fh->read(offset, buf, size);
}