-
Notifications
You must be signed in to change notification settings - Fork 149
/
XrdEcStrmWriter.cc
297 lines (270 loc) · 14.2 KB
/
XrdEcStrmWriter.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
/*
* XrdEcStrmWriter.cc
*
* Created on: 5 May 2020
* Author: simonm
*/
#include "XrdEc/XrdEcStrmWriter.hh"
#include "XrdEc/XrdEcThreadPool.hh"
#include "XrdOuc/XrdOucCRC32C.hh"
#include <numeric>
#include <algorithm>
#include <future>
namespace XrdEc
{
//---------------------------------------------------------------------------
// Open the data object for writting
//---------------------------------------------------------------------------
void StrmWriter::Open( XrdCl::ResponseHandler *handler )
{
const size_t size = objcfg.plgr.size();
std::vector<XrdCl::Pipeline> opens;
opens.reserve( size );
// initialize all zip archive objects
for( size_t i = 0; i < size; ++i )
dataarchs.emplace_back( std::make_shared<XrdCl::ZipArchive>() );
for( size_t i = 0; i < size; ++i )
{
std::string url = objcfg.GetDataUrl( i );
XrdCl::Ctx<XrdCl::ZipArchive> zip( *dataarchs[i] );
opens.emplace_back( XrdCl::OpenArchive( zip, url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write ) );
}
XrdCl::Async( XrdCl::Parallel( opens ).AtLeast( objcfg.nbchunks ) >>
[=]( XrdCl::XRootDStatus &st )
{
if( !st.IsOK() ) global_status.report_open( st );
handler->HandleResponse( new XrdCl::XRootDStatus( st ), nullptr );
} );
}
//---------------------------------------------------------------------------
// Write data to the data object
//---------------------------------------------------------------------------
void StrmWriter::Write( uint32_t size, const void *buff, XrdCl::ResponseHandler *handler )
{
//-------------------------------------------------------------------------
// First, check the global status, if we are in an error state just
// fail the request.
//-------------------------------------------------------------------------
XrdCl::XRootDStatus gst = global_status.get();
if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
//-------------------------------------------------------------------------
// Update the number of bytes left to be written
//-------------------------------------------------------------------------
global_status.issue_write( size );
const char* buffer = reinterpret_cast<const char*>( buff );
uint32_t wrtsize = size;
while( wrtsize > 0 )
{
if( !wrtbuff ) wrtbuff.reset( new WrtBuff( objcfg ) );
uint64_t written = wrtbuff->Write( wrtsize, buffer );
buffer += written;
wrtsize -= written;
if( wrtbuff->Complete() ) EnqueueBuff( std::move( wrtbuff ) );
}
//-------------------------------------------------------------------------
// We can tell the user it's done as we have the date cached in the
// buffer
//-------------------------------------------------------------------------
ScheduleHandler( handler );
}
//---------------------------------------------------------------------------
// Close the data object
//---------------------------------------------------------------------------
void StrmWriter::Close( XrdCl::ResponseHandler *handler )
{
//-------------------------------------------------------------------------
// First, check the global status, if we are in an error state just
// fail the request.
//-------------------------------------------------------------------------
XrdCl::XRootDStatus gst = global_status.get();
if( !gst.IsOK() ) return ScheduleHandler( handler, gst );
//-------------------------------------------------------------------------
// Take care of the left-over data ...
//-------------------------------------------------------------------------
if( wrtbuff && !wrtbuff->Empty() ) EnqueueBuff( std::move( wrtbuff ) );
//-------------------------------------------------------------------------
// Let the global status handle the close
//-------------------------------------------------------------------------
global_status.issue_close( handler );
}
//---------------------------------------------------------------------------
// Issue the write requests for the given write buffer
//---------------------------------------------------------------------------
void StrmWriter::WriteBuff( std::unique_ptr<WrtBuff> buff )
{
//-------------------------------------------------------------------------
// Our buffer with the data block, will be shared between all pipelines
// writing to different servers.
//-------------------------------------------------------------------------
std::shared_ptr<WrtBuff> wrtbuff( std::move( buff ) );
//-------------------------------------------------------------------------
// Shuffle the servers so every block has a different placement
//-------------------------------------------------------------------------
static std::default_random_engine random_engine( std::chrono::system_clock::now().time_since_epoch().count() );
std::shared_ptr<sync_queue<size_t>> servers = std::make_shared<sync_queue<size_t>>();
std::vector<size_t> zipid( dataarchs.size() );
std::iota( zipid.begin(), zipid.end(), 0 );
std::shuffle( zipid.begin(), zipid.end(), random_engine );
auto itr = zipid.begin();
for( ; itr != zipid.end() ; ++itr ) servers->enqueue( std::move( *itr ) );
//-------------------------------------------------------------------------
// Create the write pipelines for updating stripes
//-------------------------------------------------------------------------
const size_t nbchunks = objcfg.nbchunks;
std::vector<XrdCl::Pipeline> writes;
writes.reserve( nbchunks );
size_t blknb = next_blknb++;
uint64_t blksize = 0;
for( size_t strpnb = 0; strpnb < nbchunks; ++strpnb )
{
std::string fn = objcfg.GetFileName( blknb, strpnb );
uint32_t crc32c = wrtbuff->GetCrc32c( strpnb );
uint64_t strpsize = wrtbuff->GetStrpSize( strpnb );
char* strpbuff = wrtbuff->GetStrpBuff( strpnb );
if( strpnb < objcfg.nbdata ) blksize += strpsize;
//-----------------------------------------------------------------------
// Find a server where we can append the next data chunk
//-----------------------------------------------------------------------
XrdCl::Ctx<XrdCl::ZipArchive> zip;
size_t srvid;
if( !servers->dequeue( srvid ) )
{
XrdCl::XRootDStatus err( XrdCl::stError, XrdCl::errNoMoreReplicas,
0, "No more data servers to try." );
//---------------------------------------------------------------------
// calculate the full block size, otherwise the user handler
// will be never called
//---------------------------------------------------------------------
for( size_t i = strpnb + 1; i < objcfg.nbdata; ++i )
blksize += wrtbuff->GetStrpSize( i );
global_status.report_wrt( err, blksize );
return;
}
zip = *dataarchs[srvid];
//-----------------------------------------------------------------------
// Create the Write request
//-----------------------------------------------------------------------
XrdCl::Pipeline p = XrdCl::AppendFile( zip, fn, crc32c, strpsize, strpbuff ) >>
[=]( XrdCl::XRootDStatus &st ) mutable
{
//------------------------------------------------
// Try to recover from error
//------------------------------------------------
if( !st.IsOK() )
{
//----------------------------------------------
// Select another server
//----------------------------------------------
size_t srvid;
if( !servers->dequeue( srvid ) ) return; // if there are no more servers we simply fail
zip = *dataarchs[srvid];
//----------------------------------------------
// Retry this operation at different server
//----------------------------------------------
XrdCl::Pipeline::Repeat();
}
//------------------------------------------------
// Make sure the buffer is only deallocated
// after the handler is called
//------------------------------------------------
wrtbuff.reset();
};
writes.emplace_back( std::move( p ) );
}
XrdCl::Async( XrdCl::Parallel( writes ) >> [=]( XrdCl::XRootDStatus &st ){ global_status.report_wrt( st, blksize ); } );
}
//---------------------------------------------------------------------------
// Get a buffer with metadata (CDFH and EOCD records)
//---------------------------------------------------------------------------
XrdZip::buffer_t StrmWriter::GetMetadataBuffer()
{
using namespace XrdZip;
const size_t cdcnt = objcfg.plgr.size();
std::vector<buffer_t> buffs; buffs.reserve( cdcnt ); // buffers with raw data
std::vector<LFH> lfhs; lfhs.reserve( cdcnt ); // LFH records
std::vector<CDFH> cdfhs; cdfhs.reserve( cdcnt ); // CDFH records
//-------------------------------------------------------------------------
// prepare data structures (LFH and CDFH records)
//-------------------------------------------------------------------------
uint64_t offset = 0;
uint64_t cdsize = 0;
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
for( size_t i = 0; i < cdcnt; ++i )
{
std::string fn = objcfg.GetDataUrl( i ); // file name (URL of the data archive)
buffer_t buff( dataarchs[i]->GetCD() ); // raw data buffer (central directory of the data archive)
uint32_t cksum = crc32c( 0, buff.data(), buff.size() ); // crc32c of the buffer
lfhs.emplace_back( fn, cksum, buff.size(), time( 0 ) ); // LFH record for the buffer
LFH &lfh = lfhs.back();
cdfhs.emplace_back( &lfh, mode, offset ); // CDFH record for the buffer
offset += LFH::lfhBaseSize + fn.size() + buff.size(); // shift the offset
cdsize += cdfhs.back().cdfhSize; // update central directory size
buffs.emplace_back( std::move( buff ) ); // keep the buffer for later
}
uint64_t zipsize = offset + cdsize + EOCD::eocdBaseSize;
buffer_t zipbuff; zipbuff.reserve( zipsize );
//-------------------------------------------------------------------------
// write into the final buffer LFH records + raw data
//-------------------------------------------------------------------------
for( size_t i = 0; i < cdcnt; ++i )
{
lfhs[i].Serialize( zipbuff );
std::copy( buffs[i].begin(), buffs[i].end(), std::back_inserter( zipbuff ) );
}
//-------------------------------------------------------------------------
// write into the final buffer CDFH records
//-------------------------------------------------------------------------
for( size_t i = 0; i < cdcnt; ++i )
cdfhs[i].Serialize( zipbuff );
//-------------------------------------------------------------------------
// prepare and write into the final buffer the EOCD record
//-------------------------------------------------------------------------
EOCD eocd( offset, cdcnt, cdsize );
eocd.Serialize( zipbuff );
return zipbuff;
}
//---------------------------------------------------------------------------
// Close the data object (implementation)
//---------------------------------------------------------------------------
void StrmWriter::CloseImpl( XrdCl::ResponseHandler *handler )
{
const size_t size = objcfg.plgr.size();
//-------------------------------------------------------------------------
// prepare the metadata (the Central Directory of each data ZIP)
//-------------------------------------------------------------------------
auto zipbuff = std::make_shared<XrdZip::buffer_t>( GetMetadataBuffer() );
//-------------------------------------------------------------------------
// prepare the pipelines ...
//-------------------------------------------------------------------------
std::vector<XrdCl::Pipeline> closes;
std::vector<XrdCl::Pipeline> save_metadata;
closes.reserve( size );
for( size_t i = 0; i < size; ++i )
{
//-----------------------------------------------------------------------
// close ZIP archives with data
//-----------------------------------------------------------------------
closes.emplace_back( XrdCl::CloseArchive( *dataarchs[i] ) );
//-----------------------------------------------------------------------
// replicate the metadata
//-----------------------------------------------------------------------
std::string url = objcfg.GetMetadataUrl( i );
metadataarchs.emplace_back( std::make_shared<XrdCl::File>() );
XrdCl::Pipeline p = XrdCl::Open( *metadataarchs[i], url, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write )
| XrdCl::Write( *metadataarchs[i], 0, zipbuff->size(), zipbuff->data() )
| XrdCl::Close( *metadataarchs[i] )
| XrdCl::Final( [zipbuff]( const XrdCl::XRootDStatus& ){ } );
save_metadata.emplace_back( std::move( p ) );
}
//-------------------------------------------------------------------------
// compose closes & save_metadata:
// - closes must be successful at least for #data + #parity
// - save_metadata must be successful at least for #parity + 1
//-------------------------------------------------------------------------
XrdCl::Pipeline p = XrdCl::Parallel(
XrdCl::Parallel( closes ).AtLeast( objcfg.nbchunks ),
XrdCl::Parallel( save_metadata ).AtLeast( objcfg.nbparity + 1 )
) >> handler;
XrdCl::Async( std::move( p ) );
}
}