-
Notifications
You must be signed in to change notification settings - Fork 35
/
Archive.pm
389 lines (282 loc) · 13.7 KB
/
Archive.pm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
package OmniPITR::Program::Archive;
use strict;
use warnings;
our $VERSION = '0.1.0';
use base qw( OmniPITR::Program );
use Carp;
use OmniPITR::Tools qw( :all );
use English qw( -no_match_vars );
use File::Basename;
use File::Spec;
use File::Path qw( mkpath rmtree );
use File::Copy;
use Storable;
use Getopt::Long;
=head1 run()
Main function, called by actual script in bin/, wraps all work done by script with the sole exception of reading and validating command line arguments.
These tasks (reading and validating arguments) are in this module, but they are called from L<OmniPITR::Program::new()>
Name of called method should be self explanatory, and if you need further information - simply check doc for the method you have questions about.
=cut
sub run {
my $self = shift;
$self->read_state();
$self->prepare_temp_directory( basename( $self->{ 'segment' } ) );
$self->make_all_necessary_compressions();
$self->send_to_destinations();
$self->cleanup();
$self->log->log( 'Segment %s successfully sent to all destinations.', $self->{ 'segment' } );
return;
}
=head1 send_to_destinations()
Does all the actual sending of segments to local and remote destinations.
It keeps it's state to be able to continue in case of error.
Since both local and remote destinations are handled in the same way, there is no point in duplicating the code to 2 methods.
Important notice - this function has to have the ability to choose whether to use temp file (for compressed destinations), or original segment (for uncompressed ones). This is done by this line:
my $local_file = $dst->{ 'compression' } eq 'none' ? $self->{ 'segment' } : $self->get_temp_filename_for( $dst->{ 'compression' } );
=cut
sub send_to_destinations {
my $self = shift;
for my $destination_type ( qw( local remote ) ) {
next unless my $dst_list = $self->{ 'destination' }->{ $destination_type };
for my $dst ( @{ $dst_list } ) {
next if $self->segment_already_sent( $destination_type, $dst );
my $local_file = $dst->{ 'compression' } eq 'none' ? $self->{ 'segment' } : $self->get_temp_filename_for( $dst->{ 'compression' } );
my $destination_file_path = $dst->{ 'path' };
my $is_backup = undef;
if ( $self->{ 'dst-backup' } ) {
$is_backup = 1 if $dst->{ 'path' } eq $self->{ 'dst-backup' };
}
$destination_file_path =~ s{/*\z}{};
$destination_file_path .= '/' . basename( $local_file );
my $comment = 'Sending ' . $local_file . ' to ' . $destination_file_path;
$self->log->time_start( $comment ) if $self->verbose;
my $response = run_command( $self->{ 'temp-dir' }, $self->{ 'rsync-path' }, $local_file, $destination_file_path );
$self->log->time_finish( $comment ) if $self->verbose;
if ( $response->{ 'error_code' } ) {
if ( $is_backup ) {
$self->log->error( "Sending segment %s to backup destination %s generated (ignored) error: %s", $local_file, $destination_file_path, $response );
}
else {
$self->log->fatal( "Cannot send segment %s to %s : %s", $local_file, $destination_file_path, $response );
}
}
$self->{ 'state' }->{ 'sent' }->{ $destination_type }->{ $dst->{ 'path' } } = 1;
$self->save_state();
}
}
return;
}
=head1 segment_already_sent()
Simple function, that checks if segment has been already sent to given destination, and if yes - logs such information.
=cut
sub segment_already_sent {
my $self = shift;
my ( $type, $dst ) = @_;
return unless $self->{ 'state' }->{ 'sent' }->{ $type };
return unless $self->{ 'state' }->{ 'sent' }->{ $type }->{ $dst->{ 'path' } };
$self->log->log( 'Segment already sent to %s. Skipping.', $dst->{ 'path' } );
return 1;
}
=head1 cleanup()
Function is called only if segment has been successfully compressed and sent to all destinations.
It basically removes tempdir with compressed copies of segment, and state file for given segment.
=cut
sub cleanup {
my $self = shift;
rmtree( $self->{ 'temp-dir' } );
unlink $self->{ 'state-file' } if $self->{ 'state-file' };
return;
}
=head1 make_all_necessary_compressions()
Wraps all work required to compress segment to all necessary formats.
Call to actuall compressor has to be done via "bash -c" to be able to easily use run_command() function which has side benefits of getting stdout, stderr, and proper fetching error codes.
Overhead of additional fork+exec for bash should be negligible.
=cut
sub make_all_necessary_compressions {
my $self = shift;
$self->get_list_of_all_necessary_compressions();
for my $compression ( @{ $self->{ 'compressions' } } ) {
next if 'none' eq $compression;
next if $self->segment_already_compressed( $compression );
my $compressed_filename = $self->get_temp_filename_for( $compression );
my $compressor_binary = $self->{ $compression . '-path' } || $compression;
my $compression_command = sprintf '%s --stdout %s > %s', $compressor_binary, quotemeta( $self->{ 'segment' } ), quotemeta( $compressed_filename );
unless ( $self->{ 'not-nice' } ) {
$compression_command = quotemeta( $self->{ 'nice-path' } ) . ' ' . $compression_command;
}
$self->log->time_start( 'Compressing with ' . $compression ) if $self->verbose;
my $response = run_command( $self->{ 'temp-dir' }, 'bash', '-c', $compression_command );
$self->log->time_finish( 'Compressing with ' . $compression ) if $self->verbose;
if ( $response->{ 'error_code' } ) {
$self->log->fatal( 'Error while compressing with %s : %s', $compression, $response );
}
$self->{ 'state' }->{ 'compressed' }->{ $compression } = file_md5sum( $compressed_filename );
$self->save_state();
}
return;
}
=head1 segment_already_compressed()
Helper function which checks if segment has been already compressed.
It uses state file, and checks compressed file md5sum to be sure that the file wasn't damaged between prior run and now.
=cut
sub segment_already_compressed {
my $self = shift;
my $type = shift;
return unless $self->{ 'state' }->{ 'compressed' }->{ $type };
my $want_md5 = $self->{ 'state' }->{ 'compressed' }->{ $type };
my $temp_file_name = $self->get_temp_filename_for( $type );
return unless -e $temp_file_name;
my $has_md5 = file_md5sum( $temp_file_name );
if ( $has_md5 eq $want_md5 ) {
$self->log->log( 'Segment has been already compressed with %s.', $type );
return 1;
}
unlink $temp_file_name;
$self->log->error( 'Segment already compressed to %s, but with bad MD5 (file: %s, state: %s), recompressing.', $type, $has_md5, $want_md5 );
return;
}
=head1 get_temp_filename_for()
Helper function to build full (with path) filename for compressed segment, assuming given compression.
=cut
sub get_temp_filename_for {
my $self = shift;
my $type = shift;
return File::Spec->catfile( $self->{ 'temp-dir' }, basename( $self->{ 'segment' } ) . ext_for_compression( $type ) );
}
=head1 read_state()
Helper function to read state from state file.
Name of state file is the same as filename of WAL segment being archived, but it is in state-dir.
=cut
sub read_state {
my $self = shift;
$self->{ 'state' } = {};
return unless $self->{ 'state-dir' };
$self->{ 'state-file' } = File::Spec->catfile( $self->{ 'state-dir' }, basename( $self->{ 'segment' } ) );
return unless -f $self->{ 'state-file' };
$self->{ 'state' } = retrieve( $self->{ 'state-file' } );
return;
}
=head1 save_state()
Helper function to save state to state-file.
=cut
sub save_state {
my $self = shift;
return unless $self->{ 'state-file' };
store( $self->{ 'state' }, $self->{ 'state-file' } );
return;
}
=head1 read_args()
Function which does all the parsing, and transformation of command line arguments.
It also verified base facts about passed WAL segment name, but all other validations, are being done in separate function: L<validate_args()>.
=cut
sub read_args {
my $self = shift;
my @argv_copy = @ARGV;
my %args = (
'data-dir' => '.',
'temp-dir' => $ENV{ 'TMPDIR' } || '/tmp',
'gzip-path' => 'gzip',
'bzip2-path' => 'bzip2',
'lzma-path' => 'lzma',
'rsync-path' => 'rsync',
'nice-path' => 'nice',
);
croak( 'Error while reading command line arguments. Please check documentation in doc/omnipitr-archive.pod' )
unless GetOptions(
\%args,
'bzip2-path|bp=s',
'data-dir|D=s',
'dst-backup|db=s',
'dst-local|dl=s@',
'dst-remote|dr=s@',
'force-data-dir|f',
'gzip-path|gp=s',
'log|l=s',
'lzma-path|lp=s',
'rsync-path|rp=s',
'pid-file=s',
'state-dir|s=s',
'temp-dir|t=s',
'nice-path|np=s',
'verbose|v',
'not-nice|nn',
);
croak( '--log was not provided - cannot continue.' ) unless $args{ 'log' };
$args{ 'log' } =~ tr/^/%/;
for my $key ( qw( data-dir dst-backup temp-dir state-dir pid-file verbose gzip-path bzip2-path lzma-path nice-path force-data-dir rsync-path not-nice ) ) {
$self->{ $key } = $args{ $key };
}
for my $type ( qw( local remote ) ) {
my $D = [];
$self->{ 'destination' }->{ $type } = $D;
next unless defined $args{ 'dst-' . $type };
my %temp_for_uniq = ();
my @items = grep { !$temp_for_uniq{ $_ }++ } @{ $args{ 'dst-' . $type } };
for my $item ( @items ) {
my $current = { 'compression' => 'none', };
if ( $item =~ s/\A(gzip|bzip2|lzma)=// ) {
$current->{ 'compression' } = $1;
}
$current->{ 'path' } = $item;
push @{ $D }, $current;
}
}
# We do it here so it will actually work for reporing problems in validation
$self->{ 'log_template' } = $args{ 'log' };
$self->{ 'log' } = OmniPITR::Log->new( $self->{ 'log_template' } );
# These could theoretically go into validation, but we need to check if we can get anything to {'segment'}
$self->log->fatal( 'WAL segment file name has not been given' ) if 0 == scalar @ARGV;
$self->log->fatal( 'More than 1 WAL segment file name has been given' ) if 1 < scalar @ARGV;
$self->{ 'segment' } = shift @ARGV;
$self->log->log( 'Called with parameters: %s', join( ' ', @argv_copy ) ) if $self->verbose;
return;
}
=head1 validate_args()
Does all necessary validation of given command line arguments.
One exception is for compression programs paths - technically, it could be validated in here, but benefit would be pretty limited, and code to do so relatively complex, as compression program path
might, but doesn't have to be actual file path - it might be just program name (without path), which is the default.
=cut
sub validate_args {
my $self = shift;
unless ( $self->{ 'force-data-dir' } ) {
$self->log->fatal( "Given data-dir (%s) is not valid", $self->{ 'data-dir' } ) unless -d $self->{ 'data-dir' } && -f File::Spec->catfile( $self->{ 'data-dir' }, 'PG_VERSION' );
}
if ( $self->{ 'dst-backup' } ) {
if ( $self->{ 'dst-backup' } =~ m{\A(gzip|bzip2|lzma)=} ) {
$self->log->fatal( 'dst-backup cannot be compressed! [%]', $self->{ 'dst-backup' } );
}
unless ( $self->{ 'dst-backup' } =~ m{\A/} ) {
$self->log->fatal( 'dst-backup has to be absolute path, and it is not: %s', $self->{ 'dst-backup' } );
}
if ( -e $self->{ 'dst-backup' } ) {
push @{ $self->{ 'destination' }->{ 'local' } },
{
'compression' => 'none',
'path' => $self->{ 'dst-backup' },
};
}
}
my $dst_count = scalar( @{ $self->{ 'destination' }->{ 'local' } } ) + scalar( @{ $self->{ 'destination' }->{ 'remote' } } );
$self->log->fatal( "No --dst-* has been provided!" ) if 0 == $dst_count;
if ( 1 < $dst_count ) {
$self->log->fatal( "More than 1 --dst-* has been provided, but no --state-dir!" ) if !$self->{ 'state-dir' };
$self->log->fatal( "Given --state-dir (%s) does not exist", $self->{ 'state-dir' } ) unless -e $self->{ 'state-dir' };
$self->log->fatal( "Given --state-dir (%s) is not a directory", $self->{ 'state-dir' } ) unless -d $self->{ 'state-dir' };
$self->log->fatal( "Given --state-dir (%s) is not writable", $self->{ 'state-dir' } ) unless -w $self->{ 'state-dir' };
}
$self->log->fatal( 'Given segment name is not valid (%s)', $self->{ 'segment' } )
unless basename( $self->{ 'segment' } ) =~ m{\A(?:[a-fA-F0-9]{24}(?:\.[a-fA-F0-9]{8}\.backup)?|[a-fA-F0-9]{8}\.history)\z};
my $segment_file_name = $self->{ 'segment' };
$segment_file_name = File::Spec->catfile( $self->{ 'data-dir' }, $self->{ 'segment' } ) unless $self->{ 'segment' } =~ m{^/};
$self->log->fatal( 'Given segment (%s) does not exist.', $segment_file_name ) unless -e $segment_file_name;
$self->log->fatal( 'Given segment (%s) is not a file.', $segment_file_name ) unless -f $segment_file_name;
$self->log->fatal( 'Given segment (%s) is not readable.', $segment_file_name ) unless -r $segment_file_name;
if ( $self->{ 'segment' } =~ m{\A[a-fA-F0-9]{24}\z} ) {
my $expected_size = 256**3;
my $file_size = ( -s $segment_file_name );
$self->log->fatal( 'Given segment (%s) has incorrect size (%u vs %u).', $segment_file_name, $file_size, $expected_size ) unless $expected_size == $file_size;
}
$self->{ 'segment' } = $segment_file_name;
return;
}
1;